1use std::io::{self, IoSlice, Write};
2
3use rayon::prelude::*;
4
5const PARALLEL_THRESHOLD: usize = 64 * 1024 * 1024;
11
12const IOSLICE_BATCH_SIZE: usize = 1024;
15
16pub fn tac_bytes(data: &[u8], separator: u8, before: bool, out: &mut impl Write) -> io::Result<()> {
20 if data.is_empty() {
21 return Ok(());
22 }
23 if data.len() >= PARALLEL_THRESHOLD {
24 if !before {
25 tac_bytes_after_contiguous(data, separator, out)
26 } else {
27 tac_bytes_before_contiguous(data, separator, out)
28 }
29 } else if !before {
30 tac_bytes_after(data, separator, out)
31 } else {
32 tac_bytes_before(data, separator, out)
33 }
34}
35
36pub fn tac_bytes_owned(
38 data: &mut [u8],
39 separator: u8,
40 before: bool,
41 out: &mut impl Write,
42) -> io::Result<()> {
43 tac_bytes(data, separator, before, out)
44}
45
46#[inline]
48fn collect_positions_str(data: &[u8], separator: &[u8]) -> Vec<usize> {
49 let estimated = data.len() / 40 + 64;
50 let mut positions = Vec::with_capacity(estimated);
51 for pos in memchr::memmem::find_iter(data, separator) {
52 positions.push(pos);
53 }
54 positions
55}
56
57fn tac_bytes_after_contiguous(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
63 let n_threads = rayon::current_num_threads().max(1);
64 let chunk_size = data.len() / n_threads;
65
66 let mut boundaries = Vec::with_capacity(n_threads + 1);
68 boundaries.push(0);
69 for i in 1..n_threads {
70 let target = i * chunk_size;
71 if target >= data.len() {
72 break;
73 }
74 let boundary = memchr::memchr(sep, &data[target..])
76 .map(|p| target + p + 1)
77 .unwrap_or(data.len());
78 if boundary < data.len() {
79 boundaries.push(boundary);
80 }
81 }
82 boundaries.push(data.len());
83 boundaries.dedup();
84 let n_chunks = boundaries.len() - 1;
85
86 let reversed_chunks: Vec<Vec<u8>> = (0..n_chunks)
88 .into_par_iter()
89 .map(|i| {
90 let start = boundaries[i];
91 let end = boundaries[i + 1];
92 let chunk = &data[start..end];
93 if chunk.is_empty() {
94 return Vec::new();
95 }
96
97 let mut positions: Vec<usize> = Vec::with_capacity(chunk.len() / 40 + 64);
99 for pos in memchr::memchr_iter(sep, chunk) {
100 positions.push(pos);
101 }
102
103 let mut buf = Vec::with_capacity(chunk.len());
105 let mut end_pos = chunk.len();
106 for &pos in positions.iter().rev() {
107 let rec_start = pos + 1;
108 if rec_start < end_pos {
109 buf.extend_from_slice(&chunk[rec_start..end_pos]);
110 }
111 end_pos = rec_start;
112 }
113 if end_pos > 0 {
114 buf.extend_from_slice(&chunk[..end_pos]);
115 }
116 buf
117 })
118 .collect();
119
120 for chunk in reversed_chunks.iter().rev() {
122 if !chunk.is_empty() {
123 out.write_all(chunk)?;
124 }
125 }
126 Ok(())
127}
128
129fn tac_bytes_before_contiguous(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
132 let n_threads = rayon::current_num_threads().max(1);
133 let chunk_size = data.len() / n_threads;
134
135 let mut boundaries = Vec::with_capacity(n_threads + 1);
137 boundaries.push(0);
138 for i in 1..n_threads {
139 let target = i * chunk_size;
140 if target >= data.len() {
141 break;
142 }
143 let boundary = memchr::memchr(sep, &data[target..])
145 .map(|p| target + p)
146 .unwrap_or(data.len());
147 if boundary > 0 && boundary < data.len() {
148 boundaries.push(boundary);
149 }
150 }
151 boundaries.push(data.len());
152 boundaries.dedup();
153 let n_chunks = boundaries.len() - 1;
154
155 let reversed_chunks: Vec<Vec<u8>> = (0..n_chunks)
157 .into_par_iter()
158 .map(|i| {
159 let start = boundaries[i];
160 let end = boundaries[i + 1];
161 let chunk = &data[start..end];
162 if chunk.is_empty() {
163 return Vec::new();
164 }
165
166 let mut positions: Vec<usize> = Vec::with_capacity(chunk.len() / 40 + 64);
168 for pos in memchr::memchr_iter(sep, chunk) {
169 positions.push(pos);
170 }
171
172 let mut buf = Vec::with_capacity(chunk.len());
174 let mut end_pos = chunk.len();
175 for &pos in positions.iter().rev() {
176 if pos < end_pos {
177 buf.extend_from_slice(&chunk[pos..end_pos]);
178 }
179 end_pos = pos;
180 }
181 if end_pos > 0 {
182 buf.extend_from_slice(&chunk[..end_pos]);
183 }
184 buf
185 })
186 .collect();
187
188 for chunk in reversed_chunks.iter().rev() {
190 if !chunk.is_empty() {
191 out.write_all(chunk)?;
192 }
193 }
194 Ok(())
195}
196
197fn tac_bytes_after(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
201 if data.is_empty() {
202 return Ok(());
203 }
204
205 let mut positions: Vec<usize> = Vec::with_capacity(data.len() / 40 + 64);
207 for pos in memchr::memchr_iter(sep, data) {
208 positions.push(pos);
209 }
210
211 let mut buf = Vec::with_capacity(data.len());
213 let mut end = data.len();
214 for &pos in positions.iter().rev() {
215 let rec_start = pos + 1;
216 if rec_start < end {
217 buf.extend_from_slice(&data[rec_start..end]);
218 }
219 end = rec_start;
220 }
221 if end > 0 {
222 buf.extend_from_slice(&data[..end]);
223 }
224
225 out.write_all(&buf)
226}
227
228fn tac_bytes_before(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
230 if data.is_empty() {
231 return Ok(());
232 }
233
234 let mut positions: Vec<usize> = Vec::with_capacity(data.len() / 40 + 64);
236 for pos in memchr::memchr_iter(sep, data) {
237 positions.push(pos);
238 }
239
240 let mut buf = Vec::with_capacity(data.len());
242 let mut end = data.len();
243 for &pos in positions.iter().rev() {
244 if pos < end {
245 buf.extend_from_slice(&data[pos..end]);
246 }
247 end = pos;
248 }
249 if end > 0 {
250 buf.extend_from_slice(&data[..end]);
251 }
252
253 out.write_all(&buf)
254}
255
256pub fn tac_string_separator(
261 data: &[u8],
262 separator: &[u8],
263 before: bool,
264 out: &mut impl Write,
265) -> io::Result<()> {
266 if data.is_empty() {
267 return Ok(());
268 }
269
270 if separator.len() == 1 {
271 return tac_bytes(data, separator[0], before, out);
272 }
273
274 let sep_len = separator.len();
275
276 if !before {
277 tac_string_after(data, separator, sep_len, out)
278 } else {
279 tac_string_before(data, separator, sep_len, out)
280 }
281}
282
283fn tac_string_after(
285 data: &[u8],
286 separator: &[u8],
287 sep_len: usize,
288 out: &mut impl Write,
289) -> io::Result<()> {
290 let positions = collect_positions_str(data, separator);
291
292 if positions.is_empty() {
293 return out.write_all(data);
294 }
295
296 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(IOSLICE_BATCH_SIZE);
297 let mut end = data.len();
298
299 for &pos in positions.iter().rev() {
300 let rec_start = pos + sep_len;
301 if rec_start < end {
302 slices.push(IoSlice::new(&data[rec_start..end]));
303 if slices.len() >= IOSLICE_BATCH_SIZE {
304 write_all_vectored(out, &slices)?;
305 slices.clear();
306 }
307 }
308 end = rec_start;
309 }
310 if end > 0 {
311 slices.push(IoSlice::new(&data[..end]));
312 }
313 if !slices.is_empty() {
314 write_all_vectored(out, &slices)?;
315 }
316 Ok(())
317}
318
319fn tac_string_before(
321 data: &[u8],
322 separator: &[u8],
323 _sep_len: usize,
324 out: &mut impl Write,
325) -> io::Result<()> {
326 let positions = collect_positions_str(data, separator);
327
328 if positions.is_empty() {
329 return out.write_all(data);
330 }
331
332 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(IOSLICE_BATCH_SIZE);
333 let mut end = data.len();
334
335 for &pos in positions.iter().rev() {
336 if pos < end {
337 slices.push(IoSlice::new(&data[pos..end]));
338 if slices.len() >= IOSLICE_BATCH_SIZE {
339 write_all_vectored(out, &slices)?;
340 slices.clear();
341 }
342 }
343 end = pos;
344 }
345 if end > 0 {
346 slices.push(IoSlice::new(&data[..end]));
347 }
348 if !slices.is_empty() {
349 write_all_vectored(out, &slices)?;
350 }
351 Ok(())
352}
353
354fn find_regex_matches_backward(data: &[u8], re: ®ex::bytes::Regex) -> Vec<(usize, usize)> {
356 let mut matches = Vec::new();
357 let mut past_end = data.len();
358
359 while past_end > 0 {
360 let buf = &data[..past_end];
361 let mut found = false;
362
363 let mut pos = past_end;
364 while pos > 0 {
365 pos -= 1;
366 if let Some(m) = re.find_at(buf, pos) {
367 if m.start() == pos {
368 matches.push((m.start(), m.end()));
369 past_end = m.start();
370 found = true;
371 break;
372 }
373 }
374 }
375
376 if !found {
377 break;
378 }
379 }
380
381 matches.reverse();
382 matches
383}
384
385pub fn tac_regex_separator(
388 data: &[u8],
389 pattern: &str,
390 before: bool,
391 out: &mut impl Write,
392) -> io::Result<()> {
393 if data.is_empty() {
394 return Ok(());
395 }
396
397 let re = match regex::bytes::Regex::new(pattern) {
398 Ok(r) => r,
399 Err(e) => {
400 return Err(io::Error::new(
401 io::ErrorKind::InvalidInput,
402 format!("invalid regex '{}': {}", pattern, e),
403 ));
404 }
405 };
406
407 let matches = find_regex_matches_backward(data, &re);
408
409 if matches.is_empty() {
410 out.write_all(data)?;
411 return Ok(());
412 }
413
414 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(matches.len() + 2);
417
418 if !before {
419 let last_end = matches.last().unwrap().1;
420
421 if last_end < data.len() {
422 slices.push(IoSlice::new(&data[last_end..]));
423 }
424
425 let mut i = matches.len();
426 while i > 0 {
427 i -= 1;
428 let rec_start = if i == 0 { 0 } else { matches[i - 1].1 };
429 slices.push(IoSlice::new(&data[rec_start..matches[i].1]));
430 }
431 } else {
432 let mut i = matches.len();
433 while i > 0 {
434 i -= 1;
435 let start = matches[i].0;
436 let end = if i + 1 < matches.len() {
437 matches[i + 1].0
438 } else {
439 data.len()
440 };
441 slices.push(IoSlice::new(&data[start..end]));
442 }
443
444 if matches[0].0 > 0 {
445 slices.push(IoSlice::new(&data[..matches[0].0]));
446 }
447 }
448
449 write_all_vectored(out, &slices)
450}
451
452#[inline(always)]
456fn write_all_vectored(out: &mut impl Write, slices: &[IoSlice<'_>]) -> io::Result<()> {
457 let total: usize = slices.iter().map(|s| s.len()).sum();
458 let written = out.write_vectored(slices)?;
459 if written >= total {
460 return Ok(());
461 }
462 if written == 0 {
463 return Err(io::Error::new(io::ErrorKind::WriteZero, "write zero"));
464 }
465 flush_vectored_slow(out, slices, written)
466}
467
468#[cold]
470#[inline(never)]
471fn flush_vectored_slow(
472 out: &mut impl Write,
473 slices: &[IoSlice<'_>],
474 mut skip: usize,
475) -> io::Result<()> {
476 for slice in slices {
477 let len = slice.len();
478 if skip >= len {
479 skip -= len;
480 continue;
481 }
482 out.write_all(&slice[skip..])?;
483 skip = 0;
484 }
485 Ok(())
486}