Skip to main content

coreutils_rs/tac/
core.rs

1use std::io::{self, IoSlice, Write};
2
3/// Threshold for parallel processing (4MB).
4/// The parallel path uses zero-copy scatter-gather: threads scan in parallel
5/// to find separator positions, then IoSlice entries point directly at the
6/// original mmap data — no full-file copy. Thread creation (~200us for 4 threads)
7/// is amortized at 4MB+. Below 4MB, sequential memrchr_iter + IoSlice is fast enough.
8const PARALLEL_THRESHOLD: usize = 4 * 1024 * 1024;
9
10/// Reverse records separated by a single byte.
11/// Scans for separators with SIMD memchr, then outputs records in reverse
12/// order directly from the input buffer. Expects the caller to provide
13/// a buffered writer (e.g., BufWriter) for efficient syscall batching.
14pub fn tac_bytes(data: &[u8], separator: u8, before: bool, out: &mut impl Write) -> io::Result<()> {
15    if data.is_empty() {
16        return Ok(());
17    }
18    if data.len() >= PARALLEL_THRESHOLD {
19        if !before {
20            tac_bytes_after_parallel(data, separator, out)
21        } else {
22            tac_bytes_before_parallel(data, separator, out)
23        }
24    } else if !before {
25        tac_bytes_after(data, separator, out)
26    } else {
27        tac_bytes_before(data, separator, out)
28    }
29}
30
31/// Reverse records of an owned Vec. Delegates to tac_bytes.
32pub fn tac_bytes_owned(
33    data: &mut [u8],
34    separator: u8,
35    before: bool,
36    out: &mut impl Write,
37) -> io::Result<()> {
38    tac_bytes(data, separator, before, out)
39}
40
41/// Collect multi-byte separator positions with pre-allocated Vec.
42#[inline]
43fn collect_positions_str(data: &[u8], separator: &[u8]) -> Vec<usize> {
44    let estimated = data.len() / 40 + 64;
45    let mut positions = Vec::with_capacity(estimated);
46    for pos in memchr::memmem::find_iter(data, separator) {
47        positions.push(pos);
48    }
49    positions
50}
51
52/// Split data into chunks at separator boundaries for parallel processing.
53/// Returns chunk boundary positions (indices into data).
54fn split_into_chunks(data: &[u8], sep: u8) -> Vec<usize> {
55    let num_threads = std::thread::available_parallelism()
56        .map(|n| n.get())
57        .unwrap_or(4)
58        .max(1);
59    let chunk_target = data.len() / num_threads;
60    let mut boundaries = vec![0usize];
61    for i in 1..num_threads {
62        let target = i * chunk_target;
63        if target >= data.len() {
64            break;
65        }
66        if let Some(p) = memchr::memchr(sep, &data[target..]) {
67            let b = target + p + 1;
68            if b > *boundaries.last().unwrap() && b <= data.len() {
69                boundaries.push(b);
70            }
71        }
72    }
73    boundaries.push(data.len());
74    boundaries
75}
76
77/// Parallel after-separator mode: zero-copy scatter-gather from mmap pages.
78///
79/// Phase 1 (parallel): Each thread scans its chunk backward with memrchr_iter,
80/// collecting separator positions into a compact Vec<u32>. No data is copied.
81///
82/// Phase 2 (sequential): The main thread iterates chunks in reverse order,
83/// building IoSlice entries that point directly at the original mmap data.
84/// These are flushed via write_vectored (vmsplice on Linux) in batches.
85///
86/// This eliminates the full-file copy (~100MB for 100MB input) at the cost
87/// of more write_vectored calls. The positions Vec uses ~10MB for 100MB input
88/// (2.5M separators × 4 bytes) vs 100MB for the copy approach.
89fn tac_bytes_after_parallel(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
90    let boundaries = split_into_chunks(data, sep);
91    let n_chunks = boundaries.len() - 1;
92    if n_chunks == 0 {
93        return out.write_all(data);
94    }
95
96    // Phase 1: Each thread scans its chunk backward, collecting separator
97    // positions as chunk-relative u32 offsets. No data copy — just positions.
98    // u32 supports chunks up to 4GB each (~16GB total with 4 threads).
99    let chunk_seps: Vec<Vec<u32>> = std::thread::scope(|s| {
100        let handles: Vec<_> = (0..n_chunks)
101            .map(|i| {
102                let start = boundaries[i];
103                let end = boundaries[i + 1];
104                s.spawn(move || {
105                    let chunk = &data[start..end];
106                    let est = chunk.len() / 40 + 64;
107                    let mut seps = Vec::with_capacity(est);
108                    // memrchr_iter returns positions in reverse order (end→start)
109                    // Store chunk-relative positions to avoid u32 overflow for large files
110                    for pos in memchr::memrchr_iter(sep, chunk) {
111                        seps.push(pos as u32);
112                    }
113                    seps
114                })
115            })
116            .collect();
117        handles.into_iter().map(|h| h.join().unwrap()).collect()
118    });
119
120    // Phase 2: Build IoSlice entries pointing at mmap data, chunks in reverse order.
121    // Each IoSlice references the original data — zero copy.
122    const BATCH: usize = 1024;
123    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
124
125    for i in (0..n_chunks).rev() {
126        let chunk_start = boundaries[i];
127        let chunk_end = boundaries[i + 1];
128        let seps = &chunk_seps[i];
129
130        // seps are chunk-relative, in reverse order (from memrchr_iter).
131        // For after-separator mode: record = data[sep_pos+1 .. rec_end].
132        let mut rec_end = chunk_end;
133        for &rel_pos in seps.iter() {
134            let sep_abs = chunk_start + rel_pos as usize;
135            let rec_start = sep_abs + 1;
136            if rec_start < rec_end {
137                slices.push(IoSlice::new(&data[rec_start..rec_end]));
138                if slices.len() >= BATCH {
139                    write_all_vectored(out, &slices)?;
140                    slices.clear();
141                }
142            }
143            rec_end = rec_start;
144        }
145        // Content before first separator in this chunk
146        if rec_end > chunk_start {
147            slices.push(IoSlice::new(&data[chunk_start..rec_end]));
148            if slices.len() >= BATCH {
149                write_all_vectored(out, &slices)?;
150                slices.clear();
151            }
152        }
153    }
154
155    if !slices.is_empty() {
156        write_all_vectored(out, &slices)?;
157    }
158    Ok(())
159}
160
161/// Parallel before-separator mode: zero-copy scatter-gather from mmap pages.
162/// Same approach as tac_bytes_after_parallel but records include the separator
163/// at their start rather than after.
164fn tac_bytes_before_parallel(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
165    let boundaries = split_into_chunks(data, sep);
166    let n_chunks = boundaries.len() - 1;
167    if n_chunks == 0 {
168        return out.write_all(data);
169    }
170
171    // Phase 1: Each thread scans backward, collecting chunk-relative separator positions.
172    let chunk_seps: Vec<Vec<u32>> = std::thread::scope(|s| {
173        let handles: Vec<_> = (0..n_chunks)
174            .map(|i| {
175                let start = boundaries[i];
176                let end = boundaries[i + 1];
177                s.spawn(move || {
178                    let chunk = &data[start..end];
179                    let est = chunk.len() / 40 + 64;
180                    let mut seps = Vec::with_capacity(est);
181                    for pos in memchr::memrchr_iter(sep, chunk) {
182                        seps.push(pos as u32);
183                    }
184                    seps
185                })
186            })
187            .collect();
188        handles.into_iter().map(|h| h.join().unwrap()).collect()
189    });
190
191    // Phase 2: Build IoSlice entries, chunks in reverse order.
192    // Before mode: record = data[sep_pos .. rec_end]
193    const BATCH: usize = 1024;
194    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
195
196    for i in (0..n_chunks).rev() {
197        let chunk_start = boundaries[i];
198        let chunk_end = boundaries[i + 1];
199        let seps = &chunk_seps[i];
200
201        let mut rec_end = chunk_end;
202        for &rel_pos in seps.iter() {
203            let pos = chunk_start + rel_pos as usize;
204            if pos < rec_end {
205                slices.push(IoSlice::new(&data[pos..rec_end]));
206                if slices.len() >= BATCH {
207                    write_all_vectored(out, &slices)?;
208                    slices.clear();
209                }
210            }
211            rec_end = pos;
212        }
213        if rec_end > chunk_start {
214            slices.push(IoSlice::new(&data[chunk_start..rec_end]));
215            if slices.len() >= BATCH {
216                write_all_vectored(out, &slices)?;
217                slices.clear();
218            }
219        }
220    }
221
222    if !slices.is_empty() {
223        write_all_vectored(out, &slices)?;
224    }
225    Ok(())
226}
227
228/// After-separator mode: zero-allocation backward scan with writev output.
229///
230/// Uses memrchr_iter to scan from end to start, finding separators in reverse
231/// order. This eliminates the positions Vec entirely — no allocation, no page
232/// faults. memrchr uses the same SIMD (SSE2/AVX2) as memchr, just scanning
233/// backwards. Records are output via writev batching as they're discovered.
234fn tac_bytes_after(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
235    if data.is_empty() {
236        return Ok(());
237    }
238
239    const BATCH: usize = 1024;
240    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
241    let mut end = data.len();
242
243    for pos in memchr::memrchr_iter(sep, data) {
244        let rec_start = pos + 1;
245        if rec_start < end {
246            slices.push(IoSlice::new(&data[rec_start..end]));
247            if slices.len() >= BATCH {
248                write_all_vectored(out, &slices)?;
249                slices.clear();
250            }
251        }
252        end = rec_start;
253    }
254
255    if end > 0 {
256        slices.push(IoSlice::new(&data[..end]));
257    }
258    if !slices.is_empty() {
259        write_all_vectored(out, &slices)?;
260    }
261
262    Ok(())
263}
264
265/// Before-separator mode: zero-allocation backward scan with writev output.
266fn tac_bytes_before(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
267    if data.is_empty() {
268        return Ok(());
269    }
270
271    const BATCH: usize = 1024;
272    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
273    let mut end = data.len();
274
275    for pos in memchr::memrchr_iter(sep, data) {
276        if pos < end {
277            slices.push(IoSlice::new(&data[pos..end]));
278            if slices.len() >= BATCH {
279                write_all_vectored(out, &slices)?;
280                slices.clear();
281            }
282        }
283        end = pos;
284    }
285
286    if end > 0 {
287        slices.push(IoSlice::new(&data[..end]));
288    }
289    if !slices.is_empty() {
290        write_all_vectored(out, &slices)?;
291    }
292
293    Ok(())
294}
295
296/// Reverse records using a multi-byte string separator.
297/// Uses SIMD-accelerated memmem + write_all output.
298///
299/// For single-byte separators, delegates to tac_bytes which uses memchr (faster).
300pub fn tac_string_separator(
301    data: &[u8],
302    separator: &[u8],
303    before: bool,
304    out: &mut impl Write,
305) -> io::Result<()> {
306    if data.is_empty() {
307        return Ok(());
308    }
309
310    if separator.len() == 1 {
311        return tac_bytes(data, separator[0], before, out);
312    }
313
314    let sep_len = separator.len();
315
316    if !before {
317        tac_string_after(data, separator, sep_len, out)
318    } else {
319        tac_string_before(data, separator, sep_len, out)
320    }
321}
322
323/// Multi-byte string separator, after mode. Uses writev batching.
324fn tac_string_after(
325    data: &[u8],
326    separator: &[u8],
327    sep_len: usize,
328    out: &mut impl Write,
329) -> io::Result<()> {
330    let positions = collect_positions_str(data, separator);
331
332    if positions.is_empty() {
333        return out.write_all(data);
334    }
335
336    const BATCH: usize = 1024;
337    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
338    let mut end = data.len();
339
340    for &pos in positions.iter().rev() {
341        let rec_start = pos + sep_len;
342        if rec_start < end {
343            slices.push(IoSlice::new(&data[rec_start..end]));
344            if slices.len() >= BATCH {
345                write_all_vectored(out, &slices)?;
346                slices.clear();
347            }
348        }
349        end = rec_start;
350    }
351    if end > 0 {
352        slices.push(IoSlice::new(&data[..end]));
353    }
354    if !slices.is_empty() {
355        write_all_vectored(out, &slices)?;
356    }
357    Ok(())
358}
359
360/// Multi-byte string separator, before mode. Uses writev batching.
361fn tac_string_before(
362    data: &[u8],
363    separator: &[u8],
364    _sep_len: usize,
365    out: &mut impl Write,
366) -> io::Result<()> {
367    let positions = collect_positions_str(data, separator);
368
369    if positions.is_empty() {
370        return out.write_all(data);
371    }
372
373    const BATCH: usize = 1024;
374    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
375    let mut end = data.len();
376
377    for &pos in positions.iter().rev() {
378        if pos < end {
379            slices.push(IoSlice::new(&data[pos..end]));
380            if slices.len() >= BATCH {
381                write_all_vectored(out, &slices)?;
382                slices.clear();
383            }
384        }
385        end = pos;
386    }
387    if end > 0 {
388        slices.push(IoSlice::new(&data[..end]));
389    }
390    if !slices.is_empty() {
391        write_all_vectored(out, &slices)?;
392    }
393    Ok(())
394}
395
396/// Find regex matches using backward scanning, matching GNU tac's re_search behavior.
397fn find_regex_matches_backward(data: &[u8], re: &regex::bytes::Regex) -> Vec<(usize, usize)> {
398    let mut matches = Vec::new();
399    let mut past_end = data.len();
400
401    while past_end > 0 {
402        let buf = &data[..past_end];
403        let mut found = false;
404
405        let mut pos = past_end;
406        while pos > 0 {
407            pos -= 1;
408            if let Some(m) = re.find_at(buf, pos) {
409                if m.start() == pos {
410                    matches.push((m.start(), m.end()));
411                    past_end = m.start();
412                    found = true;
413                    break;
414                }
415            }
416        }
417
418        if !found {
419            break;
420        }
421    }
422
423    matches.reverse();
424    matches
425}
426
427/// Reverse records using a regex separator.
428/// Uses write_vectored for regex path (typically few large records).
429pub fn tac_regex_separator(
430    data: &[u8],
431    pattern: &str,
432    before: bool,
433    out: &mut impl Write,
434) -> io::Result<()> {
435    if data.is_empty() {
436        return Ok(());
437    }
438
439    let re = match regex::bytes::Regex::new(pattern) {
440        Ok(r) => r,
441        Err(e) => {
442            return Err(io::Error::new(
443                io::ErrorKind::InvalidInput,
444                format!("invalid regex '{}': {}", pattern, e),
445            ));
446        }
447    };
448
449    let matches = find_regex_matches_backward(data, &re);
450
451    if matches.is_empty() {
452        out.write_all(data)?;
453        return Ok(());
454    }
455
456    // For regex separators, use write_vectored since there are typically
457    // few large records. Build all IoSlices at once and flush.
458    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(matches.len() + 2);
459
460    if !before {
461        let last_end = matches.last().unwrap().1;
462
463        if last_end < data.len() {
464            slices.push(IoSlice::new(&data[last_end..]));
465        }
466
467        let mut i = matches.len();
468        while i > 0 {
469            i -= 1;
470            let rec_start = if i == 0 { 0 } else { matches[i - 1].1 };
471            slices.push(IoSlice::new(&data[rec_start..matches[i].1]));
472        }
473    } else {
474        let mut i = matches.len();
475        while i > 0 {
476            i -= 1;
477            let start = matches[i].0;
478            let end = if i + 1 < matches.len() {
479                matches[i + 1].0
480            } else {
481                data.len()
482            };
483            slices.push(IoSlice::new(&data[start..end]));
484        }
485
486        if matches[0].0 > 0 {
487            slices.push(IoSlice::new(&data[..matches[0].0]));
488        }
489    }
490
491    write_all_vectored(out, &slices)
492}
493
494/// Write all IoSlice entries, handling partial writes.
495/// Hot path: single write_vectored succeeds fully (common on Linux pipes/files).
496/// Cold path: partial write handled out-of-line to keep hot path tight.
497#[inline(always)]
498fn write_all_vectored(out: &mut impl Write, slices: &[IoSlice<'_>]) -> io::Result<()> {
499    let total: usize = slices.iter().map(|s| s.len()).sum();
500    let written = out.write_vectored(slices)?;
501    if written >= total {
502        return Ok(());
503    }
504    if written == 0 {
505        return Err(io::Error::new(io::ErrorKind::WriteZero, "write zero"));
506    }
507    flush_vectored_slow(out, slices, written)
508}
509
510/// Handle partial write (cold path, never inlined).
511#[cold]
512#[inline(never)]
513fn flush_vectored_slow(
514    out: &mut impl Write,
515    slices: &[IoSlice<'_>],
516    mut skip: usize,
517) -> io::Result<()> {
518    for slice in slices {
519        let len = slice.len();
520        if skip >= len {
521            skip -= len;
522            continue;
523        }
524        out.write_all(&slice[skip..])?;
525        skip = 0;
526    }
527    Ok(())
528}