1use std::io::{self, IoSlice, Write};
2
3use rayon::prelude::*;
4
5const PARALLEL_THRESHOLD: usize = 64 * 1024 * 1024;
11
12const IOSLICE_BATCH_SIZE: usize = 1024;
15
16pub fn tac_bytes(data: &[u8], separator: u8, before: bool, out: &mut impl Write) -> io::Result<()> {
20 if data.is_empty() {
21 return Ok(());
22 }
23 if data.len() >= PARALLEL_THRESHOLD {
24 if !before {
25 tac_bytes_after_contiguous(data, separator, out)
26 } else {
27 tac_bytes_before_contiguous(data, separator, out)
28 }
29 } else if !before {
30 tac_bytes_after(data, separator, out)
31 } else {
32 tac_bytes_before(data, separator, out)
33 }
34}
35
36pub fn tac_bytes_owned(
38 data: &mut [u8],
39 separator: u8,
40 before: bool,
41 out: &mut impl Write,
42) -> io::Result<()> {
43 tac_bytes(data, separator, before, out)
44}
45
46#[inline]
48fn collect_positions_str(data: &[u8], separator: &[u8]) -> Vec<usize> {
49 let estimated = data.len() / 40 + 64;
50 let mut positions = Vec::with_capacity(estimated);
51 for pos in memchr::memmem::find_iter(data, separator) {
52 positions.push(pos);
53 }
54 positions
55}
56
57fn parallel_scan_positions(data: &[u8], sep: u8) -> Vec<Vec<usize>> {
61 let n_threads = rayon::current_num_threads().max(1);
62 let chunk_size = (data.len() + n_threads - 1) / n_threads;
63
64 (0..n_threads)
65 .into_par_iter()
66 .map(|i| {
67 let start = i * chunk_size;
68 if start >= data.len() {
69 return Vec::new();
70 }
71 let end = (start + chunk_size).min(data.len());
72 let chunk = &data[start..end];
73 let estimated = chunk.len() / 40 + 64;
74 let mut positions = Vec::with_capacity(estimated);
75 for pos in memchr::memchr_iter(sep, chunk) {
76 positions.push(start + pos);
77 }
78 positions
79 })
80 .collect()
81}
82
83fn tac_bytes_after_contiguous(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
92 let chunk_positions = parallel_scan_positions(data, sep);
93
94 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(IOSLICE_BATCH_SIZE);
95 let mut end = data.len();
96
97 for positions in chunk_positions.iter().rev() {
98 for &pos in positions.iter().rev() {
99 let rec_start = pos + 1;
100 if rec_start < end {
101 slices.push(IoSlice::new(&data[rec_start..end]));
102 if slices.len() >= IOSLICE_BATCH_SIZE {
103 write_all_vectored(out, &slices)?;
104 slices.clear();
105 }
106 }
107 end = rec_start;
108 }
109 }
110
111 if end > 0 {
112 slices.push(IoSlice::new(&data[..end]));
113 }
114 if !slices.is_empty() {
115 write_all_vectored(out, &slices)?;
116 }
117 Ok(())
118}
119
120fn tac_bytes_before_contiguous(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
122 let chunk_positions = parallel_scan_positions(data, sep);
123
124 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(IOSLICE_BATCH_SIZE);
125 let mut end = data.len();
126
127 for positions in chunk_positions.iter().rev() {
128 for &pos in positions.iter().rev() {
129 if pos < end {
130 slices.push(IoSlice::new(&data[pos..end]));
131 if slices.len() >= IOSLICE_BATCH_SIZE {
132 write_all_vectored(out, &slices)?;
133 slices.clear();
134 }
135 }
136 end = pos;
137 }
138 }
139
140 if end > 0 {
141 slices.push(IoSlice::new(&data[..end]));
142 }
143 if !slices.is_empty() {
144 write_all_vectored(out, &slices)?;
145 }
146 Ok(())
147}
148
149fn tac_bytes_after(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
152 if data.is_empty() {
153 return Ok(());
154 }
155
156 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(IOSLICE_BATCH_SIZE);
157 let mut end = data.len();
158
159 for pos in memchr::memrchr_iter(sep, data) {
160 let rec_start = pos + 1;
161 if rec_start < end {
162 slices.push(IoSlice::new(&data[rec_start..end]));
163 if slices.len() >= IOSLICE_BATCH_SIZE {
164 write_all_vectored(out, &slices)?;
165 slices.clear();
166 }
167 }
168 end = rec_start;
169 }
170
171 if end > 0 {
172 slices.push(IoSlice::new(&data[..end]));
173 }
174 if !slices.is_empty() {
175 write_all_vectored(out, &slices)?;
176 }
177 Ok(())
178}
179
180fn tac_bytes_before(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
182 if data.is_empty() {
183 return Ok(());
184 }
185
186 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(IOSLICE_BATCH_SIZE);
187 let mut end = data.len();
188
189 for pos in memchr::memrchr_iter(sep, data) {
190 if pos < end {
191 slices.push(IoSlice::new(&data[pos..end]));
192 if slices.len() >= IOSLICE_BATCH_SIZE {
193 write_all_vectored(out, &slices)?;
194 slices.clear();
195 }
196 }
197 end = pos;
198 }
199
200 if end > 0 {
201 slices.push(IoSlice::new(&data[..end]));
202 }
203 if !slices.is_empty() {
204 write_all_vectored(out, &slices)?;
205 }
206 Ok(())
207}
208
209pub fn tac_string_separator(
214 data: &[u8],
215 separator: &[u8],
216 before: bool,
217 out: &mut impl Write,
218) -> io::Result<()> {
219 if data.is_empty() {
220 return Ok(());
221 }
222
223 if separator.len() == 1 {
224 return tac_bytes(data, separator[0], before, out);
225 }
226
227 let sep_len = separator.len();
228
229 if !before {
230 tac_string_after(data, separator, sep_len, out)
231 } else {
232 tac_string_before(data, separator, sep_len, out)
233 }
234}
235
236fn tac_string_after(
238 data: &[u8],
239 separator: &[u8],
240 sep_len: usize,
241 out: &mut impl Write,
242) -> io::Result<()> {
243 let positions = collect_positions_str(data, separator);
244
245 if positions.is_empty() {
246 return out.write_all(data);
247 }
248
249 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(IOSLICE_BATCH_SIZE);
250 let mut end = data.len();
251
252 for &pos in positions.iter().rev() {
253 let rec_start = pos + sep_len;
254 if rec_start < end {
255 slices.push(IoSlice::new(&data[rec_start..end]));
256 if slices.len() >= IOSLICE_BATCH_SIZE {
257 write_all_vectored(out, &slices)?;
258 slices.clear();
259 }
260 }
261 end = rec_start;
262 }
263 if end > 0 {
264 slices.push(IoSlice::new(&data[..end]));
265 }
266 if !slices.is_empty() {
267 write_all_vectored(out, &slices)?;
268 }
269 Ok(())
270}
271
272fn tac_string_before(
274 data: &[u8],
275 separator: &[u8],
276 _sep_len: usize,
277 out: &mut impl Write,
278) -> io::Result<()> {
279 let positions = collect_positions_str(data, separator);
280
281 if positions.is_empty() {
282 return out.write_all(data);
283 }
284
285 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(IOSLICE_BATCH_SIZE);
286 let mut end = data.len();
287
288 for &pos in positions.iter().rev() {
289 if pos < end {
290 slices.push(IoSlice::new(&data[pos..end]));
291 if slices.len() >= IOSLICE_BATCH_SIZE {
292 write_all_vectored(out, &slices)?;
293 slices.clear();
294 }
295 }
296 end = pos;
297 }
298 if end > 0 {
299 slices.push(IoSlice::new(&data[..end]));
300 }
301 if !slices.is_empty() {
302 write_all_vectored(out, &slices)?;
303 }
304 Ok(())
305}
306
307fn find_regex_matches_backward(data: &[u8], re: ®ex::bytes::Regex) -> Vec<(usize, usize)> {
309 let mut matches = Vec::new();
310 let mut past_end = data.len();
311
312 while past_end > 0 {
313 let buf = &data[..past_end];
314 let mut found = false;
315
316 let mut pos = past_end;
317 while pos > 0 {
318 pos -= 1;
319 if let Some(m) = re.find_at(buf, pos) {
320 if m.start() == pos {
321 matches.push((m.start(), m.end()));
322 past_end = m.start();
323 found = true;
324 break;
325 }
326 }
327 }
328
329 if !found {
330 break;
331 }
332 }
333
334 matches.reverse();
335 matches
336}
337
338pub fn tac_regex_separator(
341 data: &[u8],
342 pattern: &str,
343 before: bool,
344 out: &mut impl Write,
345) -> io::Result<()> {
346 if data.is_empty() {
347 return Ok(());
348 }
349
350 let re = match regex::bytes::Regex::new(pattern) {
351 Ok(r) => r,
352 Err(e) => {
353 return Err(io::Error::new(
354 io::ErrorKind::InvalidInput,
355 format!("invalid regex '{}': {}", pattern, e),
356 ));
357 }
358 };
359
360 let matches = find_regex_matches_backward(data, &re);
361
362 if matches.is_empty() {
363 out.write_all(data)?;
364 return Ok(());
365 }
366
367 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(matches.len() + 2);
370
371 if !before {
372 let last_end = matches.last().unwrap().1;
373
374 if last_end < data.len() {
375 slices.push(IoSlice::new(&data[last_end..]));
376 }
377
378 let mut i = matches.len();
379 while i > 0 {
380 i -= 1;
381 let rec_start = if i == 0 { 0 } else { matches[i - 1].1 };
382 slices.push(IoSlice::new(&data[rec_start..matches[i].1]));
383 }
384 } else {
385 let mut i = matches.len();
386 while i > 0 {
387 i -= 1;
388 let start = matches[i].0;
389 let end = if i + 1 < matches.len() {
390 matches[i + 1].0
391 } else {
392 data.len()
393 };
394 slices.push(IoSlice::new(&data[start..end]));
395 }
396
397 if matches[0].0 > 0 {
398 slices.push(IoSlice::new(&data[..matches[0].0]));
399 }
400 }
401
402 write_all_vectored(out, &slices)
403}
404
405#[inline(always)]
409fn write_all_vectored(out: &mut impl Write, slices: &[IoSlice<'_>]) -> io::Result<()> {
410 let total: usize = slices.iter().map(|s| s.len()).sum();
411 let written = out.write_vectored(slices)?;
412 if written >= total {
413 return Ok(());
414 }
415 if written == 0 {
416 return Err(io::Error::new(io::ErrorKind::WriteZero, "write zero"));
417 }
418 flush_vectored_slow(out, slices, written)
419}
420
421#[cold]
423#[inline(never)]
424fn flush_vectored_slow(
425 out: &mut impl Write,
426 slices: &[IoSlice<'_>],
427 mut skip: usize,
428) -> io::Result<()> {
429 for slice in slices {
430 let len = slice.len();
431 if skip >= len {
432 skip -= len;
433 continue;
434 }
435 out.write_all(&slice[skip..])?;
436 skip = 0;
437 }
438 Ok(())
439}