1use std::io::{self, IoSlice, Write};
2
3use rayon::prelude::*;
4
5const PARALLEL_THRESHOLD: usize = 64 * 1024 * 1024;
11
12const IOSLICE_BATCH_SIZE: usize = 1024;
15
16pub fn tac_bytes(data: &[u8], separator: u8, before: bool, out: &mut impl Write) -> io::Result<()> {
20 if data.is_empty() {
21 return Ok(());
22 }
23 if data.len() >= PARALLEL_THRESHOLD {
24 if !before {
25 tac_bytes_after_contiguous(data, separator, out)
26 } else {
27 tac_bytes_before_contiguous(data, separator, out)
28 }
29 } else if !before {
30 tac_bytes_after(data, separator, out)
31 } else {
32 tac_bytes_before(data, separator, out)
33 }
34}
35
36pub fn tac_bytes_owned(
38 data: &mut [u8],
39 separator: u8,
40 before: bool,
41 out: &mut impl Write,
42) -> io::Result<()> {
43 tac_bytes(data, separator, before, out)
44}
45
46#[inline]
48fn collect_positions_str(data: &[u8], separator: &[u8]) -> Vec<usize> {
49 let estimated = data.len() / 40 + 64;
50 let mut positions = Vec::with_capacity(estimated);
51 for pos in memchr::memmem::find_iter(data, separator) {
52 positions.push(pos);
53 }
54 positions
55}
56
57fn tac_bytes_after_contiguous(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
63 let n_threads = rayon::current_num_threads().max(1);
64 let chunk_size = data.len() / n_threads;
65
66 let mut boundaries = Vec::with_capacity(n_threads + 1);
68 boundaries.push(0);
69 for i in 1..n_threads {
70 let target = i * chunk_size;
71 if target >= data.len() {
72 break;
73 }
74 let boundary = memchr::memchr(sep, &data[target..])
76 .map(|p| target + p + 1)
77 .unwrap_or(data.len());
78 if boundary < data.len() {
79 boundaries.push(boundary);
80 }
81 }
82 boundaries.push(data.len());
83 boundaries.dedup();
84 let n_chunks = boundaries.len() - 1;
85
86 let reversed_chunks: Vec<Vec<u8>> = (0..n_chunks)
88 .into_par_iter()
89 .map(|i| {
90 let start = boundaries[i];
91 let end = boundaries[i + 1];
92 let chunk = &data[start..end];
93 if chunk.is_empty() {
94 return Vec::new();
95 }
96
97 let mut positions: Vec<usize> = Vec::with_capacity(chunk.len() / 40 + 64);
99 for pos in memchr::memchr_iter(sep, chunk) {
100 positions.push(pos);
101 }
102
103 let mut buf = Vec::with_capacity(chunk.len());
105 let mut end_pos = chunk.len();
106 for &pos in positions.iter().rev() {
107 let rec_start = pos + 1;
108 if rec_start < end_pos {
109 buf.extend_from_slice(&chunk[rec_start..end_pos]);
110 }
111 end_pos = rec_start;
112 }
113 if end_pos > 0 {
114 buf.extend_from_slice(&chunk[..end_pos]);
115 }
116 buf
117 })
118 .collect();
119
120 for chunk in reversed_chunks.iter().rev() {
122 if !chunk.is_empty() {
123 out.write_all(chunk)?;
124 }
125 }
126 Ok(())
127}
128
129fn tac_bytes_before_contiguous(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
132 let n_threads = rayon::current_num_threads().max(1);
133 let chunk_size = data.len() / n_threads;
134
135 let mut boundaries = Vec::with_capacity(n_threads + 1);
137 boundaries.push(0);
138 for i in 1..n_threads {
139 let target = i * chunk_size;
140 if target >= data.len() {
141 break;
142 }
143 let boundary = memchr::memchr(sep, &data[target..])
145 .map(|p| target + p)
146 .unwrap_or(data.len());
147 if boundary > 0 && boundary < data.len() {
148 boundaries.push(boundary);
149 }
150 }
151 boundaries.push(data.len());
152 boundaries.dedup();
153 let n_chunks = boundaries.len() - 1;
154
155 let reversed_chunks: Vec<Vec<u8>> = (0..n_chunks)
157 .into_par_iter()
158 .map(|i| {
159 let start = boundaries[i];
160 let end = boundaries[i + 1];
161 let chunk = &data[start..end];
162 if chunk.is_empty() {
163 return Vec::new();
164 }
165
166 let mut positions: Vec<usize> = Vec::with_capacity(chunk.len() / 40 + 64);
168 for pos in memchr::memchr_iter(sep, chunk) {
169 positions.push(pos);
170 }
171
172 let mut buf = Vec::with_capacity(chunk.len());
174 let mut end_pos = chunk.len();
175 for &pos in positions.iter().rev() {
176 if pos < end_pos {
177 buf.extend_from_slice(&chunk[pos..end_pos]);
178 }
179 end_pos = pos;
180 }
181 if end_pos > 0 {
182 buf.extend_from_slice(&chunk[..end_pos]);
183 }
184 buf
185 })
186 .collect();
187
188 for chunk in reversed_chunks.iter().rev() {
190 if !chunk.is_empty() {
191 out.write_all(chunk)?;
192 }
193 }
194 Ok(())
195}
196
197fn tac_bytes_after(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
200 if data.is_empty() {
201 return Ok(());
202 }
203
204 let mut prev_end = data.len();
205 let mut search_end = data.len();
206
207 loop {
208 match memchr::memrchr(sep, &data[..search_end]) {
209 Some(pos) => {
210 let rec_start = pos + 1;
211 if rec_start < prev_end {
212 out.write_all(&data[rec_start..prev_end])?;
213 }
214 prev_end = rec_start;
215 search_end = pos;
216 }
217 None => {
218 if prev_end > 0 {
219 out.write_all(&data[..prev_end])?;
220 }
221 break;
222 }
223 }
224 if search_end == 0 {
225 if prev_end > 0 {
226 out.write_all(&data[..prev_end])?;
227 }
228 break;
229 }
230 }
231
232 Ok(())
233}
234
235fn tac_bytes_before(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
238 if data.is_empty() {
239 return Ok(());
240 }
241
242 let mut prev_end = data.len();
243 let mut search_end = data.len();
244
245 loop {
246 match memchr::memrchr(sep, &data[..search_end]) {
247 Some(pos) => {
248 if pos < prev_end {
249 out.write_all(&data[pos..prev_end])?;
250 }
251 prev_end = pos;
252 if pos == 0 {
253 break;
254 }
255 search_end = pos;
256 }
257 None => {
258 if prev_end > 0 {
259 out.write_all(&data[..prev_end])?;
260 }
261 break;
262 }
263 }
264 }
265
266 Ok(())
267}
268
269pub fn tac_string_separator(
274 data: &[u8],
275 separator: &[u8],
276 before: bool,
277 out: &mut impl Write,
278) -> io::Result<()> {
279 if data.is_empty() {
280 return Ok(());
281 }
282
283 if separator.len() == 1 {
284 return tac_bytes(data, separator[0], before, out);
285 }
286
287 let sep_len = separator.len();
288
289 if !before {
290 tac_string_after(data, separator, sep_len, out)
291 } else {
292 tac_string_before(data, separator, sep_len, out)
293 }
294}
295
296fn tac_string_after(
298 data: &[u8],
299 separator: &[u8],
300 sep_len: usize,
301 out: &mut impl Write,
302) -> io::Result<()> {
303 let positions = collect_positions_str(data, separator);
304
305 if positions.is_empty() {
306 return out.write_all(data);
307 }
308
309 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(IOSLICE_BATCH_SIZE);
310 let mut end = data.len();
311
312 for &pos in positions.iter().rev() {
313 let rec_start = pos + sep_len;
314 if rec_start < end {
315 slices.push(IoSlice::new(&data[rec_start..end]));
316 if slices.len() >= IOSLICE_BATCH_SIZE {
317 write_all_vectored(out, &slices)?;
318 slices.clear();
319 }
320 }
321 end = rec_start;
322 }
323 if end > 0 {
324 slices.push(IoSlice::new(&data[..end]));
325 }
326 if !slices.is_empty() {
327 write_all_vectored(out, &slices)?;
328 }
329 Ok(())
330}
331
332fn tac_string_before(
334 data: &[u8],
335 separator: &[u8],
336 _sep_len: usize,
337 out: &mut impl Write,
338) -> io::Result<()> {
339 let positions = collect_positions_str(data, separator);
340
341 if positions.is_empty() {
342 return out.write_all(data);
343 }
344
345 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(IOSLICE_BATCH_SIZE);
346 let mut end = data.len();
347
348 for &pos in positions.iter().rev() {
349 if pos < end {
350 slices.push(IoSlice::new(&data[pos..end]));
351 if slices.len() >= IOSLICE_BATCH_SIZE {
352 write_all_vectored(out, &slices)?;
353 slices.clear();
354 }
355 }
356 end = pos;
357 }
358 if end > 0 {
359 slices.push(IoSlice::new(&data[..end]));
360 }
361 if !slices.is_empty() {
362 write_all_vectored(out, &slices)?;
363 }
364 Ok(())
365}
366
367fn find_regex_matches_backward(data: &[u8], re: ®ex::bytes::Regex) -> Vec<(usize, usize)> {
369 let mut matches = Vec::new();
370 let mut past_end = data.len();
371
372 while past_end > 0 {
373 let buf = &data[..past_end];
374 let mut found = false;
375
376 let mut pos = past_end;
377 while pos > 0 {
378 pos -= 1;
379 if let Some(m) = re.find_at(buf, pos) {
380 if m.start() == pos {
381 matches.push((m.start(), m.end()));
382 past_end = m.start();
383 found = true;
384 break;
385 }
386 }
387 }
388
389 if !found {
390 break;
391 }
392 }
393
394 matches.reverse();
395 matches
396}
397
398pub fn tac_regex_separator(
401 data: &[u8],
402 pattern: &str,
403 before: bool,
404 out: &mut impl Write,
405) -> io::Result<()> {
406 if data.is_empty() {
407 return Ok(());
408 }
409
410 let re = match regex::bytes::Regex::new(pattern) {
411 Ok(r) => r,
412 Err(e) => {
413 return Err(io::Error::new(
414 io::ErrorKind::InvalidInput,
415 format!("invalid regex '{}': {}", pattern, e),
416 ));
417 }
418 };
419
420 let matches = find_regex_matches_backward(data, &re);
421
422 if matches.is_empty() {
423 out.write_all(data)?;
424 return Ok(());
425 }
426
427 let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(matches.len() + 2);
430
431 if !before {
432 let last_end = matches.last().unwrap().1;
433
434 if last_end < data.len() {
435 slices.push(IoSlice::new(&data[last_end..]));
436 }
437
438 let mut i = matches.len();
439 while i > 0 {
440 i -= 1;
441 let rec_start = if i == 0 { 0 } else { matches[i - 1].1 };
442 slices.push(IoSlice::new(&data[rec_start..matches[i].1]));
443 }
444 } else {
445 let mut i = matches.len();
446 while i > 0 {
447 i -= 1;
448 let start = matches[i].0;
449 let end = if i + 1 < matches.len() {
450 matches[i + 1].0
451 } else {
452 data.len()
453 };
454 slices.push(IoSlice::new(&data[start..end]));
455 }
456
457 if matches[0].0 > 0 {
458 slices.push(IoSlice::new(&data[..matches[0].0]));
459 }
460 }
461
462 write_all_vectored(out, &slices)
463}
464
465#[inline(always)]
469fn write_all_vectored(out: &mut impl Write, slices: &[IoSlice<'_>]) -> io::Result<()> {
470 let total: usize = slices.iter().map(|s| s.len()).sum();
471 let written = out.write_vectored(slices)?;
472 if written >= total {
473 return Ok(());
474 }
475 if written == 0 {
476 return Err(io::Error::new(io::ErrorKind::WriteZero, "write zero"));
477 }
478 flush_vectored_slow(out, slices, written)
479}
480
481#[cold]
483#[inline(never)]
484fn flush_vectored_slow(
485 out: &mut impl Write,
486 slices: &[IoSlice<'_>],
487 mut skip: usize,
488) -> io::Result<()> {
489 for slice in slices {
490 let len = slice.len();
491 if skip >= len {
492 skip -= len;
493 continue;
494 }
495 out.write_all(&slice[skip..])?;
496 skip = 0;
497 }
498 Ok(())
499}