1use std::io::{self, IoSlice, Write};
2
3const PARALLEL_THRESHOLD: usize = 16 * 1024 * 1024;
8
9pub fn tac_bytes(data: &[u8], separator: u8, before: bool, out: &mut impl Write) -> io::Result<()> {
14 if data.is_empty() {
15 return Ok(());
16 }
17 if data.len() >= PARALLEL_THRESHOLD {
18 if !before {
19 tac_bytes_after_parallel(data, separator, out)
20 } else {
21 tac_bytes_before_parallel(data, separator, out)
22 }
23 } else if !before {
24 tac_bytes_after(data, separator, out)
25 } else {
26 tac_bytes_before(data, separator, out)
27 }
28}
29
30pub fn tac_bytes_owned(
32 data: &mut [u8],
33 separator: u8,
34 before: bool,
35 out: &mut impl Write,
36) -> io::Result<()> {
37 tac_bytes(data, separator, before, out)
38}
39
40#[inline]
42fn collect_positions_str(data: &[u8], separator: &[u8]) -> Vec<usize> {
43 let estimated = data.len() / 40 + 64;
44 let mut positions = Vec::with_capacity(estimated);
45 for pos in memchr::memmem::find_iter(data, separator) {
46 positions.push(pos);
47 }
48 positions
49}
50
51fn split_into_chunks(data: &[u8], sep: u8) -> Vec<usize> {
54 let num_threads = std::thread::available_parallelism()
55 .map(|n| n.get())
56 .unwrap_or(4)
57 .max(1);
58 let chunk_target = data.len() / num_threads;
59 let mut boundaries = vec![0usize];
60 for i in 1..num_threads {
61 let target = i * chunk_target;
62 if target >= data.len() {
63 break;
64 }
65 if let Some(p) = memchr::memchr(sep, &data[target..]) {
66 let b = target + p + 1;
67 if b > *boundaries.last().unwrap() && b <= data.len() {
68 boundaries.push(b);
69 }
70 }
71 }
72 boundaries.push(data.len());
73 boundaries
74}
75
76fn tac_bytes_after_parallel(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
86 let boundaries = split_into_chunks(data, sep);
87 let n_chunks = boundaries.len() - 1;
88 if n_chunks == 0 {
89 return out.write_all(data);
90 }
91
92 let chunk_positions: Vec<Vec<usize>> = std::thread::scope(|s| {
94 let handles: Vec<_> = (0..n_chunks)
95 .map(|i| {
96 let start = boundaries[i];
97 let end = boundaries[i + 1];
98 s.spawn(move || {
99 let chunk = &data[start..end];
100 let estimated = chunk.len() / 16 + 64;
101 let mut positions = Vec::with_capacity(estimated);
102 for p in memchr::memchr_iter(sep, chunk) {
103 positions.push(start + p);
104 }
105 positions
106 })
107 })
108 .collect();
109 handles.into_iter().map(|h| h.join().unwrap()).collect()
110 });
111
112 const BATCH: usize = 1024;
116 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
117
118 for i in (0..n_chunks).rev() {
119 let chunk_start = boundaries[i];
120 let chunk_end = boundaries[i + 1];
121 let positions = &chunk_positions[i];
122
123 let mut end = chunk_end;
124 for &pos in positions.iter().rev() {
125 let rec_start = pos + 1;
126 if rec_start < end {
127 slices.push(IoSlice::new(&data[rec_start..end]));
128 if slices.len() >= BATCH {
129 write_all_vectored(out, &slices)?;
130 slices.clear();
131 }
132 }
133 end = rec_start;
134 }
135 if end > chunk_start {
136 slices.push(IoSlice::new(&data[chunk_start..end]));
137 if slices.len() >= BATCH {
138 write_all_vectored(out, &slices)?;
139 slices.clear();
140 }
141 }
142 }
143
144 if !slices.is_empty() {
145 write_all_vectored(out, &slices)?;
146 }
147 Ok(())
148}
149
150fn tac_bytes_before_parallel(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
152 let boundaries = split_into_chunks(data, sep);
153 let n_chunks = boundaries.len() - 1;
154 if n_chunks == 0 {
155 return out.write_all(data);
156 }
157
158 let chunk_positions: Vec<Vec<usize>> = std::thread::scope(|s| {
160 let handles: Vec<_> = (0..n_chunks)
161 .map(|i| {
162 let start = boundaries[i];
163 let end = boundaries[i + 1];
164 s.spawn(move || {
165 let chunk = &data[start..end];
166 let estimated = chunk.len() / 16 + 64;
167 let mut positions = Vec::with_capacity(estimated);
168 for p in memchr::memchr_iter(sep, chunk) {
169 positions.push(start + p);
170 }
171 positions
172 })
173 })
174 .collect();
175 handles.into_iter().map(|h| h.join().unwrap()).collect()
176 });
177
178 const BATCH: usize = 1024;
180 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
181
182 for i in (0..n_chunks).rev() {
183 let chunk_start = boundaries[i];
184 let chunk_end = boundaries[i + 1];
185 let positions = &chunk_positions[i];
186
187 let mut end = chunk_end;
188 for &pos in positions.iter().rev() {
189 if pos < end {
190 slices.push(IoSlice::new(&data[pos..end]));
191 if slices.len() >= BATCH {
192 write_all_vectored(out, &slices)?;
193 slices.clear();
194 }
195 }
196 end = pos;
197 }
198 if end > chunk_start {
199 slices.push(IoSlice::new(&data[chunk_start..end]));
200 if slices.len() >= BATCH {
201 write_all_vectored(out, &slices)?;
202 slices.clear();
203 }
204 }
205 }
206
207 if !slices.is_empty() {
208 write_all_vectored(out, &slices)?;
209 }
210 Ok(())
211}
212
213fn tac_bytes_after(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
220 if data.is_empty() {
221 return Ok(());
222 }
223
224 const CHUNK: usize = 1024 * 1024; const BATCH: usize = 1024;
226 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
227
228 let est = CHUNK / 16 + 64;
230 let mut positions: Vec<usize> = Vec::with_capacity(est);
231
232 let mut record_end = data.len();
233 let mut chunk_right = data.len();
234
235 while chunk_right > 0 {
236 let chunk_left = chunk_right.saturating_sub(CHUNK);
237 let chunk = &data[chunk_left..chunk_right];
238
239 positions.clear();
240 for p in memchr::memchr_iter(sep, chunk) {
241 positions.push(chunk_left + p);
242 }
243
244 for &pos in positions.iter().rev() {
245 let rec_start = pos + 1;
246 if rec_start < record_end {
247 slices.push(IoSlice::new(&data[rec_start..record_end]));
248 if slices.len() >= BATCH {
249 write_all_vectored(out, &slices)?;
250 slices.clear();
251 }
252 }
253 record_end = rec_start;
254 }
255
256 chunk_right = chunk_left;
257 }
258
259 if record_end > 0 {
260 slices.push(IoSlice::new(&data[..record_end]));
261 }
262 if !slices.is_empty() {
263 write_all_vectored(out, &slices)?;
264 }
265
266 Ok(())
267}
268
269fn tac_bytes_before(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
271 if data.is_empty() {
272 return Ok(());
273 }
274
275 const CHUNK: usize = 1024 * 1024;
276 const BATCH: usize = 1024;
277 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
278
279 let est = CHUNK / 16 + 64;
280 let mut positions: Vec<usize> = Vec::with_capacity(est);
281
282 let mut record_end = data.len();
283 let mut chunk_right = data.len();
284
285 while chunk_right > 0 {
286 let chunk_left = chunk_right.saturating_sub(CHUNK);
287 let chunk = &data[chunk_left..chunk_right];
288
289 positions.clear();
290 for p in memchr::memchr_iter(sep, chunk) {
291 positions.push(chunk_left + p);
292 }
293
294 for &pos in positions.iter().rev() {
295 if pos < record_end {
296 slices.push(IoSlice::new(&data[pos..record_end]));
297 if slices.len() >= BATCH {
298 write_all_vectored(out, &slices)?;
299 slices.clear();
300 }
301 }
302 record_end = pos;
303 }
304
305 chunk_right = chunk_left;
306 }
307
308 if record_end > 0 {
309 slices.push(IoSlice::new(&data[..record_end]));
310 }
311 if !slices.is_empty() {
312 write_all_vectored(out, &slices)?;
313 }
314
315 Ok(())
316}
317
318pub fn tac_string_separator(
323 data: &[u8],
324 separator: &[u8],
325 before: bool,
326 out: &mut impl Write,
327) -> io::Result<()> {
328 if data.is_empty() {
329 return Ok(());
330 }
331
332 if separator.len() == 1 {
333 return tac_bytes(data, separator[0], before, out);
334 }
335
336 let sep_len = separator.len();
337
338 if !before {
339 tac_string_after(data, separator, sep_len, out)
340 } else {
341 tac_string_before(data, separator, sep_len, out)
342 }
343}
344
345fn tac_string_after(
347 data: &[u8],
348 separator: &[u8],
349 sep_len: usize,
350 out: &mut impl Write,
351) -> io::Result<()> {
352 let positions = collect_positions_str(data, separator);
353
354 if positions.is_empty() {
355 return out.write_all(data);
356 }
357
358 const BATCH: usize = 1024;
359 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
360 let mut end = data.len();
361
362 for &pos in positions.iter().rev() {
363 let rec_start = pos + sep_len;
364 if rec_start < end {
365 slices.push(IoSlice::new(&data[rec_start..end]));
366 if slices.len() >= BATCH {
367 write_all_vectored(out, &slices)?;
368 slices.clear();
369 }
370 }
371 end = rec_start;
372 }
373 if end > 0 {
374 slices.push(IoSlice::new(&data[..end]));
375 }
376 if !slices.is_empty() {
377 write_all_vectored(out, &slices)?;
378 }
379 Ok(())
380}
381
382fn tac_string_before(
384 data: &[u8],
385 separator: &[u8],
386 _sep_len: usize,
387 out: &mut impl Write,
388) -> io::Result<()> {
389 let positions = collect_positions_str(data, separator);
390
391 if positions.is_empty() {
392 return out.write_all(data);
393 }
394
395 const BATCH: usize = 1024;
396 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
397 let mut end = data.len();
398
399 for &pos in positions.iter().rev() {
400 if pos < end {
401 slices.push(IoSlice::new(&data[pos..end]));
402 if slices.len() >= BATCH {
403 write_all_vectored(out, &slices)?;
404 slices.clear();
405 }
406 }
407 end = pos;
408 }
409 if end > 0 {
410 slices.push(IoSlice::new(&data[..end]));
411 }
412 if !slices.is_empty() {
413 write_all_vectored(out, &slices)?;
414 }
415 Ok(())
416}
417
418fn find_regex_matches_backward(data: &[u8], re: ®ex::bytes::Regex) -> Vec<(usize, usize)> {
420 let mut matches = Vec::new();
421 let mut past_end = data.len();
422
423 while past_end > 0 {
424 let buf = &data[..past_end];
425 let mut found = false;
426
427 let mut pos = past_end;
428 while pos > 0 {
429 pos -= 1;
430 if let Some(m) = re.find_at(buf, pos) {
431 if m.start() == pos {
432 matches.push((m.start(), m.end()));
433 past_end = m.start();
434 found = true;
435 break;
436 }
437 }
438 }
439
440 if !found {
441 break;
442 }
443 }
444
445 matches.reverse();
446 matches
447}
448
449pub fn tac_regex_separator(
452 data: &[u8],
453 pattern: &str,
454 before: bool,
455 out: &mut impl Write,
456) -> io::Result<()> {
457 if data.is_empty() {
458 return Ok(());
459 }
460
461 let re = match regex::bytes::Regex::new(pattern) {
462 Ok(r) => r,
463 Err(e) => {
464 return Err(io::Error::new(
465 io::ErrorKind::InvalidInput,
466 format!("invalid regex '{}': {}", pattern, e),
467 ));
468 }
469 };
470
471 let matches = find_regex_matches_backward(data, &re);
472
473 if matches.is_empty() {
474 out.write_all(data)?;
475 return Ok(());
476 }
477
478 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(matches.len() + 2);
481
482 if !before {
483 let last_end = matches.last().unwrap().1;
484
485 if last_end < data.len() {
486 slices.push(IoSlice::new(&data[last_end..]));
487 }
488
489 let mut i = matches.len();
490 while i > 0 {
491 i -= 1;
492 let rec_start = if i == 0 { 0 } else { matches[i - 1].1 };
493 slices.push(IoSlice::new(&data[rec_start..matches[i].1]));
494 }
495 } else {
496 let mut i = matches.len();
497 while i > 0 {
498 i -= 1;
499 let start = matches[i].0;
500 let end = if i + 1 < matches.len() {
501 matches[i + 1].0
502 } else {
503 data.len()
504 };
505 slices.push(IoSlice::new(&data[start..end]));
506 }
507
508 if matches[0].0 > 0 {
509 slices.push(IoSlice::new(&data[..matches[0].0]));
510 }
511 }
512
513 write_all_vectored(out, &slices)
514}
515
516#[inline(always)]
520fn write_all_vectored(out: &mut impl Write, slices: &[IoSlice<'_>]) -> io::Result<()> {
521 let total: usize = slices.iter().map(|s| s.len()).sum();
522 let written = out.write_vectored(slices)?;
523 if written >= total {
524 return Ok(());
525 }
526 if written == 0 {
527 return Err(io::Error::new(io::ErrorKind::WriteZero, "write zero"));
528 }
529 flush_vectored_slow(out, slices, written)
530}
531
532#[cold]
534#[inline(never)]
535fn flush_vectored_slow(
536 out: &mut impl Write,
537 slices: &[IoSlice<'_>],
538 mut skip: usize,
539) -> io::Result<()> {
540 for slice in slices {
541 let len = slice.len();
542 if skip >= len {
543 skip -= len;
544 continue;
545 }
546 out.write_all(&slice[skip..])?;
547 skip = 0;
548 }
549 Ok(())
550}