1use std::io::{self, IoSlice, Write};
2
3const PARALLEL_THRESHOLD: usize = 4 * 1024 * 1024;
9
10pub fn tac_bytes(data: &[u8], separator: u8, before: bool, out: &mut impl Write) -> io::Result<()> {
15 if data.is_empty() {
16 return Ok(());
17 }
18 if data.len() >= PARALLEL_THRESHOLD {
19 if !before {
20 tac_bytes_after_parallel(data, separator, out)
21 } else {
22 tac_bytes_before_parallel(data, separator, out)
23 }
24 } else if !before {
25 tac_bytes_after(data, separator, out)
26 } else {
27 tac_bytes_before(data, separator, out)
28 }
29}
30
31pub fn tac_bytes_owned(
33 data: &mut [u8],
34 separator: u8,
35 before: bool,
36 out: &mut impl Write,
37) -> io::Result<()> {
38 tac_bytes(data, separator, before, out)
39}
40
41#[inline]
43fn collect_positions_str(data: &[u8], separator: &[u8]) -> Vec<usize> {
44 let estimated = data.len() / 40 + 64;
45 let mut positions = Vec::with_capacity(estimated);
46 for pos in memchr::memmem::find_iter(data, separator) {
47 positions.push(pos);
48 }
49 positions
50}
51
52fn split_into_chunks(data: &[u8], sep: u8) -> Vec<usize> {
55 let num_threads = std::thread::available_parallelism()
56 .map(|n| n.get())
57 .unwrap_or(4)
58 .max(1);
59 let chunk_target = data.len() / num_threads;
60 let mut boundaries = vec![0usize];
61 for i in 1..num_threads {
62 let target = i * chunk_target;
63 if target >= data.len() {
64 break;
65 }
66 if let Some(p) = memchr::memchr(sep, &data[target..]) {
67 let b = target + p + 1;
68 if b > *boundaries.last().unwrap() && b <= data.len() {
69 boundaries.push(b);
70 }
71 }
72 }
73 boundaries.push(data.len());
74 boundaries
75}
76
77fn tac_bytes_after_parallel(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
90 let boundaries = split_into_chunks(data, sep);
91 let n_chunks = boundaries.len() - 1;
92 if n_chunks == 0 {
93 return out.write_all(data);
94 }
95
96 let chunk_seps: Vec<Vec<u32>> = std::thread::scope(|s| {
100 let handles: Vec<_> = (0..n_chunks)
101 .map(|i| {
102 let start = boundaries[i];
103 let end = boundaries[i + 1];
104 s.spawn(move || {
105 let chunk = &data[start..end];
106 let est = chunk.len() / 40 + 64;
107 let mut seps = Vec::with_capacity(est);
108 for pos in memchr::memrchr_iter(sep, chunk) {
111 seps.push(pos as u32);
112 }
113 seps
114 })
115 })
116 .collect();
117 handles.into_iter().map(|h| h.join().unwrap()).collect()
118 });
119
120 const BATCH: usize = 1024;
123 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
124
125 for i in (0..n_chunks).rev() {
126 let chunk_start = boundaries[i];
127 let chunk_end = boundaries[i + 1];
128 let seps = &chunk_seps[i];
129
130 let mut rec_end = chunk_end;
133 for &rel_pos in seps.iter() {
134 let sep_abs = chunk_start + rel_pos as usize;
135 let rec_start = sep_abs + 1;
136 if rec_start < rec_end {
137 slices.push(IoSlice::new(&data[rec_start..rec_end]));
138 if slices.len() >= BATCH {
139 write_all_vectored(out, &slices)?;
140 slices.clear();
141 }
142 }
143 rec_end = rec_start;
144 }
145 if rec_end > chunk_start {
147 slices.push(IoSlice::new(&data[chunk_start..rec_end]));
148 if slices.len() >= BATCH {
149 write_all_vectored(out, &slices)?;
150 slices.clear();
151 }
152 }
153 }
154
155 if !slices.is_empty() {
156 write_all_vectored(out, &slices)?;
157 }
158 Ok(())
159}
160
161fn tac_bytes_before_parallel(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
165 let boundaries = split_into_chunks(data, sep);
166 let n_chunks = boundaries.len() - 1;
167 if n_chunks == 0 {
168 return out.write_all(data);
169 }
170
171 let chunk_seps: Vec<Vec<u32>> = std::thread::scope(|s| {
173 let handles: Vec<_> = (0..n_chunks)
174 .map(|i| {
175 let start = boundaries[i];
176 let end = boundaries[i + 1];
177 s.spawn(move || {
178 let chunk = &data[start..end];
179 let est = chunk.len() / 40 + 64;
180 let mut seps = Vec::with_capacity(est);
181 for pos in memchr::memrchr_iter(sep, chunk) {
182 seps.push(pos as u32);
183 }
184 seps
185 })
186 })
187 .collect();
188 handles.into_iter().map(|h| h.join().unwrap()).collect()
189 });
190
191 const BATCH: usize = 1024;
194 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
195
196 for i in (0..n_chunks).rev() {
197 let chunk_start = boundaries[i];
198 let chunk_end = boundaries[i + 1];
199 let seps = &chunk_seps[i];
200
201 let mut rec_end = chunk_end;
202 for &rel_pos in seps.iter() {
203 let pos = chunk_start + rel_pos as usize;
204 if pos < rec_end {
205 slices.push(IoSlice::new(&data[pos..rec_end]));
206 if slices.len() >= BATCH {
207 write_all_vectored(out, &slices)?;
208 slices.clear();
209 }
210 }
211 rec_end = pos;
212 }
213 if rec_end > chunk_start {
214 slices.push(IoSlice::new(&data[chunk_start..rec_end]));
215 if slices.len() >= BATCH {
216 write_all_vectored(out, &slices)?;
217 slices.clear();
218 }
219 }
220 }
221
222 if !slices.is_empty() {
223 write_all_vectored(out, &slices)?;
224 }
225 Ok(())
226}
227
228fn tac_bytes_after(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
235 if data.is_empty() {
236 return Ok(());
237 }
238
239 const BATCH: usize = 1024;
240 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
241 let mut end = data.len();
242
243 for pos in memchr::memrchr_iter(sep, data) {
244 let rec_start = pos + 1;
245 if rec_start < end {
246 slices.push(IoSlice::new(&data[rec_start..end]));
247 if slices.len() >= BATCH {
248 write_all_vectored(out, &slices)?;
249 slices.clear();
250 }
251 }
252 end = rec_start;
253 }
254
255 if end > 0 {
256 slices.push(IoSlice::new(&data[..end]));
257 }
258 if !slices.is_empty() {
259 write_all_vectored(out, &slices)?;
260 }
261
262 Ok(())
263}
264
265fn tac_bytes_before(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
267 if data.is_empty() {
268 return Ok(());
269 }
270
271 const BATCH: usize = 1024;
272 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
273 let mut end = data.len();
274
275 for pos in memchr::memrchr_iter(sep, data) {
276 if pos < end {
277 slices.push(IoSlice::new(&data[pos..end]));
278 if slices.len() >= BATCH {
279 write_all_vectored(out, &slices)?;
280 slices.clear();
281 }
282 }
283 end = pos;
284 }
285
286 if end > 0 {
287 slices.push(IoSlice::new(&data[..end]));
288 }
289 if !slices.is_empty() {
290 write_all_vectored(out, &slices)?;
291 }
292
293 Ok(())
294}
295
296pub fn tac_string_separator(
301 data: &[u8],
302 separator: &[u8],
303 before: bool,
304 out: &mut impl Write,
305) -> io::Result<()> {
306 if data.is_empty() {
307 return Ok(());
308 }
309
310 if separator.len() == 1 {
311 return tac_bytes(data, separator[0], before, out);
312 }
313
314 let sep_len = separator.len();
315
316 if !before {
317 tac_string_after(data, separator, sep_len, out)
318 } else {
319 tac_string_before(data, separator, sep_len, out)
320 }
321}
322
323fn tac_string_after(
325 data: &[u8],
326 separator: &[u8],
327 sep_len: usize,
328 out: &mut impl Write,
329) -> io::Result<()> {
330 let positions = collect_positions_str(data, separator);
331
332 if positions.is_empty() {
333 return out.write_all(data);
334 }
335
336 const BATCH: usize = 1024;
337 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
338 let mut end = data.len();
339
340 for &pos in positions.iter().rev() {
341 let rec_start = pos + sep_len;
342 if rec_start < end {
343 slices.push(IoSlice::new(&data[rec_start..end]));
344 if slices.len() >= BATCH {
345 write_all_vectored(out, &slices)?;
346 slices.clear();
347 }
348 }
349 end = rec_start;
350 }
351 if end > 0 {
352 slices.push(IoSlice::new(&data[..end]));
353 }
354 if !slices.is_empty() {
355 write_all_vectored(out, &slices)?;
356 }
357 Ok(())
358}
359
360fn tac_string_before(
362 data: &[u8],
363 separator: &[u8],
364 _sep_len: usize,
365 out: &mut impl Write,
366) -> io::Result<()> {
367 let positions = collect_positions_str(data, separator);
368
369 if positions.is_empty() {
370 return out.write_all(data);
371 }
372
373 const BATCH: usize = 1024;
374 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
375 let mut end = data.len();
376
377 for &pos in positions.iter().rev() {
378 if pos < end {
379 slices.push(IoSlice::new(&data[pos..end]));
380 if slices.len() >= BATCH {
381 write_all_vectored(out, &slices)?;
382 slices.clear();
383 }
384 }
385 end = pos;
386 }
387 if end > 0 {
388 slices.push(IoSlice::new(&data[..end]));
389 }
390 if !slices.is_empty() {
391 write_all_vectored(out, &slices)?;
392 }
393 Ok(())
394}
395
396fn find_regex_matches_backward(data: &[u8], re: ®ex::bytes::Regex) -> Vec<(usize, usize)> {
398 let mut matches = Vec::new();
399 let mut past_end = data.len();
400
401 while past_end > 0 {
402 let buf = &data[..past_end];
403 let mut found = false;
404
405 let mut pos = past_end;
406 while pos > 0 {
407 pos -= 1;
408 if let Some(m) = re.find_at(buf, pos) {
409 if m.start() == pos {
410 matches.push((m.start(), m.end()));
411 past_end = m.start();
412 found = true;
413 break;
414 }
415 }
416 }
417
418 if !found {
419 break;
420 }
421 }
422
423 matches.reverse();
424 matches
425}
426
427pub fn tac_regex_separator(
430 data: &[u8],
431 pattern: &str,
432 before: bool,
433 out: &mut impl Write,
434) -> io::Result<()> {
435 if data.is_empty() {
436 return Ok(());
437 }
438
439 let re = match regex::bytes::Regex::new(pattern) {
440 Ok(r) => r,
441 Err(e) => {
442 return Err(io::Error::new(
443 io::ErrorKind::InvalidInput,
444 format!("invalid regex '{}': {}", pattern, e),
445 ));
446 }
447 };
448
449 let matches = find_regex_matches_backward(data, &re);
450
451 if matches.is_empty() {
452 out.write_all(data)?;
453 return Ok(());
454 }
455
456 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(matches.len() + 2);
459
460 if !before {
461 let last_end = matches.last().unwrap().1;
462
463 if last_end < data.len() {
464 slices.push(IoSlice::new(&data[last_end..]));
465 }
466
467 let mut i = matches.len();
468 while i > 0 {
469 i -= 1;
470 let rec_start = if i == 0 { 0 } else { matches[i - 1].1 };
471 slices.push(IoSlice::new(&data[rec_start..matches[i].1]));
472 }
473 } else {
474 let mut i = matches.len();
475 while i > 0 {
476 i -= 1;
477 let start = matches[i].0;
478 let end = if i + 1 < matches.len() {
479 matches[i + 1].0
480 } else {
481 data.len()
482 };
483 slices.push(IoSlice::new(&data[start..end]));
484 }
485
486 if matches[0].0 > 0 {
487 slices.push(IoSlice::new(&data[..matches[0].0]));
488 }
489 }
490
491 write_all_vectored(out, &slices)
492}
493
494#[inline(always)]
498fn write_all_vectored(out: &mut impl Write, slices: &[IoSlice<'_>]) -> io::Result<()> {
499 let total: usize = slices.iter().map(|s| s.len()).sum();
500 let written = out.write_vectored(slices)?;
501 if written >= total {
502 return Ok(());
503 }
504 if written == 0 {
505 return Err(io::Error::new(io::ErrorKind::WriteZero, "write zero"));
506 }
507 flush_vectored_slow(out, slices, written)
508}
509
510#[cold]
512#[inline(never)]
513fn flush_vectored_slow(
514 out: &mut impl Write,
515 slices: &[IoSlice<'_>],
516 mut skip: usize,
517) -> io::Result<()> {
518 for slice in slices {
519 let len = slice.len();
520 if skip >= len {
521 skip -= len;
522 continue;
523 }
524 out.write_all(&slice[skip..])?;
525 skip = 0;
526 }
527 Ok(())
528}