Skip to main content

coreutils_rs/tac/
core.rs

1use std::io::{self, IoSlice, Write};
2
3use rayon::prelude::*;
4
5/// Threshold for parallel processing (64MB).
6/// Each benchmark invocation is a fresh process, so rayon pool init (~0.5-1ms)
7/// is paid every time. For 10MB files, single-threaded memrchr scan (0.3ms) is
8/// faster than rayon init + parallel scan. Only use parallelism for genuinely
9/// large files where multi-core scanning pays off.
10const PARALLEL_THRESHOLD: usize = 64 * 1024 * 1024;
11
12/// Maximum IoSlice entries per write_vectored batch.
13/// Matches Linux UIO_MAXIOV limit (1024).
14const IOSLICE_BATCH_SIZE: usize = 1024;
15
16/// Reverse records separated by a single byte.
17/// For large data (>= 64MB): parallel forward SIMD scan + zero-copy IoSlice output.
18/// For small data: single-threaded backward SIMD scan + IoSlice batches.
19pub fn tac_bytes(data: &[u8], separator: u8, before: bool, out: &mut impl Write) -> io::Result<()> {
20    if data.is_empty() {
21        return Ok(());
22    }
23    if data.len() >= PARALLEL_THRESHOLD {
24        if !before {
25            tac_bytes_after_contiguous(data, separator, out)
26        } else {
27            tac_bytes_before_contiguous(data, separator, out)
28        }
29    } else if !before {
30        tac_bytes_after(data, separator, out)
31    } else {
32        tac_bytes_before(data, separator, out)
33    }
34}
35
36/// Reverse records of an owned Vec. Delegates to tac_bytes.
37pub fn tac_bytes_owned(
38    data: &mut [u8],
39    separator: u8,
40    before: bool,
41    out: &mut impl Write,
42) -> io::Result<()> {
43    tac_bytes(data, separator, before, out)
44}
45
46/// Collect multi-byte separator positions with pre-allocated Vec.
47#[inline]
48fn collect_positions_str(data: &[u8], separator: &[u8]) -> Vec<usize> {
49    let estimated = data.len() / 40 + 64;
50    let mut positions = Vec::with_capacity(estimated);
51    for pos in memchr::memmem::find_iter(data, separator) {
52        positions.push(pos);
53    }
54    positions
55}
56
57/// Parallel forward scan to collect all separator positions.
58/// Splits data into chunks, each processed by a rayon worker thread.
59/// Returns per-chunk position vectors in chunk order (positions are absolute).
60fn parallel_scan_positions(data: &[u8], sep: u8) -> Vec<Vec<usize>> {
61    let n_threads = rayon::current_num_threads().max(1);
62    let chunk_size = (data.len() + n_threads - 1) / n_threads;
63
64    (0..n_threads)
65        .into_par_iter()
66        .map(|i| {
67            let start = i * chunk_size;
68            if start >= data.len() {
69                return Vec::new();
70            }
71            let end = (start + chunk_size).min(data.len());
72            let chunk = &data[start..end];
73            let estimated = chunk.len() / 40 + 64;
74            let mut positions = Vec::with_capacity(estimated);
75            for pos in memchr::memchr_iter(sep, chunk) {
76                positions.push(start + pos);
77            }
78            positions
79        })
80        .collect()
81}
82
83/// Zero-copy parallel after-separator mode: parallel forward SIMD scan to collect
84/// separator positions, then zero-copy reverse output via IoSlice batches pointing
85/// directly into the input data. No output buffer allocation needed.
86///
87/// vs old contiguous-copy approach for 100MB:
88/// - Old: alloc 100MB + parallel scan+copy (200MB bandwidth) + 1 write_all
89/// - New: parallel scan only (100MB bandwidth) + ~2500 write_vectored (zero-copy)
90/// Eliminates 100MB allocation and 100MB of copy bandwidth.
91fn tac_bytes_after_contiguous(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
92    let chunk_positions = parallel_scan_positions(data, sep);
93
94    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(IOSLICE_BATCH_SIZE);
95    let mut end = data.len();
96
97    for positions in chunk_positions.iter().rev() {
98        for &pos in positions.iter().rev() {
99            let rec_start = pos + 1;
100            if rec_start < end {
101                slices.push(IoSlice::new(&data[rec_start..end]));
102                if slices.len() >= IOSLICE_BATCH_SIZE {
103                    write_all_vectored(out, &slices)?;
104                    slices.clear();
105                }
106            }
107            end = rec_start;
108        }
109    }
110
111    if end > 0 {
112        slices.push(IoSlice::new(&data[..end]));
113    }
114    if !slices.is_empty() {
115        write_all_vectored(out, &slices)?;
116    }
117    Ok(())
118}
119
120/// Zero-copy parallel before-separator mode.
121fn tac_bytes_before_contiguous(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
122    let chunk_positions = parallel_scan_positions(data, sep);
123
124    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(IOSLICE_BATCH_SIZE);
125    let mut end = data.len();
126
127    for positions in chunk_positions.iter().rev() {
128        for &pos in positions.iter().rev() {
129            if pos < end {
130                slices.push(IoSlice::new(&data[pos..end]));
131                if slices.len() >= IOSLICE_BATCH_SIZE {
132                    write_all_vectored(out, &slices)?;
133                    slices.clear();
134                }
135            }
136            end = pos;
137        }
138    }
139
140    if end > 0 {
141        slices.push(IoSlice::new(&data[..end]));
142    }
143    if !slices.is_empty() {
144        write_all_vectored(out, &slices)?;
145    }
146    Ok(())
147}
148
149/// Zero-copy after-separator mode: streaming IoSlice directly from input data.
150/// No buffer allocation — scans backward and emits IoSlice batches of 1024.
151fn tac_bytes_after(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
152    if data.is_empty() {
153        return Ok(());
154    }
155
156    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(IOSLICE_BATCH_SIZE);
157    let mut end = data.len();
158
159    for pos in memchr::memrchr_iter(sep, data) {
160        let rec_start = pos + 1;
161        if rec_start < end {
162            slices.push(IoSlice::new(&data[rec_start..end]));
163            if slices.len() >= IOSLICE_BATCH_SIZE {
164                write_all_vectored(out, &slices)?;
165                slices.clear();
166            }
167        }
168        end = rec_start;
169    }
170
171    if end > 0 {
172        slices.push(IoSlice::new(&data[..end]));
173    }
174    if !slices.is_empty() {
175        write_all_vectored(out, &slices)?;
176    }
177    Ok(())
178}
179
180/// Zero-copy before-separator mode: streaming IoSlice directly from input data.
181fn tac_bytes_before(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
182    if data.is_empty() {
183        return Ok(());
184    }
185
186    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(IOSLICE_BATCH_SIZE);
187    let mut end = data.len();
188
189    for pos in memchr::memrchr_iter(sep, data) {
190        if pos < end {
191            slices.push(IoSlice::new(&data[pos..end]));
192            if slices.len() >= IOSLICE_BATCH_SIZE {
193                write_all_vectored(out, &slices)?;
194                slices.clear();
195            }
196        }
197        end = pos;
198    }
199
200    if end > 0 {
201        slices.push(IoSlice::new(&data[..end]));
202    }
203    if !slices.is_empty() {
204        write_all_vectored(out, &slices)?;
205    }
206    Ok(())
207}
208
209/// Reverse records using a multi-byte string separator.
210/// Uses SIMD-accelerated memmem + write_all output.
211///
212/// For single-byte separators, delegates to tac_bytes which uses memchr (faster).
213pub fn tac_string_separator(
214    data: &[u8],
215    separator: &[u8],
216    before: bool,
217    out: &mut impl Write,
218) -> io::Result<()> {
219    if data.is_empty() {
220        return Ok(());
221    }
222
223    if separator.len() == 1 {
224        return tac_bytes(data, separator[0], before, out);
225    }
226
227    let sep_len = separator.len();
228
229    if !before {
230        tac_string_after(data, separator, sep_len, out)
231    } else {
232        tac_string_before(data, separator, sep_len, out)
233    }
234}
235
236/// Multi-byte string separator, after mode. Uses writev batching.
237fn tac_string_after(
238    data: &[u8],
239    separator: &[u8],
240    sep_len: usize,
241    out: &mut impl Write,
242) -> io::Result<()> {
243    let positions = collect_positions_str(data, separator);
244
245    if positions.is_empty() {
246        return out.write_all(data);
247    }
248
249    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(IOSLICE_BATCH_SIZE);
250    let mut end = data.len();
251
252    for &pos in positions.iter().rev() {
253        let rec_start = pos + sep_len;
254        if rec_start < end {
255            slices.push(IoSlice::new(&data[rec_start..end]));
256            if slices.len() >= IOSLICE_BATCH_SIZE {
257                write_all_vectored(out, &slices)?;
258                slices.clear();
259            }
260        }
261        end = rec_start;
262    }
263    if end > 0 {
264        slices.push(IoSlice::new(&data[..end]));
265    }
266    if !slices.is_empty() {
267        write_all_vectored(out, &slices)?;
268    }
269    Ok(())
270}
271
272/// Multi-byte string separator, before mode. Uses writev batching.
273fn tac_string_before(
274    data: &[u8],
275    separator: &[u8],
276    _sep_len: usize,
277    out: &mut impl Write,
278) -> io::Result<()> {
279    let positions = collect_positions_str(data, separator);
280
281    if positions.is_empty() {
282        return out.write_all(data);
283    }
284
285    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(IOSLICE_BATCH_SIZE);
286    let mut end = data.len();
287
288    for &pos in positions.iter().rev() {
289        if pos < end {
290            slices.push(IoSlice::new(&data[pos..end]));
291            if slices.len() >= IOSLICE_BATCH_SIZE {
292                write_all_vectored(out, &slices)?;
293                slices.clear();
294            }
295        }
296        end = pos;
297    }
298    if end > 0 {
299        slices.push(IoSlice::new(&data[..end]));
300    }
301    if !slices.is_empty() {
302        write_all_vectored(out, &slices)?;
303    }
304    Ok(())
305}
306
307/// Find regex matches using backward scanning, matching GNU tac's re_search behavior.
308fn find_regex_matches_backward(data: &[u8], re: &regex::bytes::Regex) -> Vec<(usize, usize)> {
309    let mut matches = Vec::new();
310    let mut past_end = data.len();
311
312    while past_end > 0 {
313        let buf = &data[..past_end];
314        let mut found = false;
315
316        let mut pos = past_end;
317        while pos > 0 {
318            pos -= 1;
319            if let Some(m) = re.find_at(buf, pos) {
320                if m.start() == pos {
321                    matches.push((m.start(), m.end()));
322                    past_end = m.start();
323                    found = true;
324                    break;
325                }
326            }
327        }
328
329        if !found {
330            break;
331        }
332    }
333
334    matches.reverse();
335    matches
336}
337
338/// Reverse records using a regex separator.
339/// Uses write_vectored for regex path (typically few large records).
340pub fn tac_regex_separator(
341    data: &[u8],
342    pattern: &str,
343    before: bool,
344    out: &mut impl Write,
345) -> io::Result<()> {
346    if data.is_empty() {
347        return Ok(());
348    }
349
350    let re = match regex::bytes::Regex::new(pattern) {
351        Ok(r) => r,
352        Err(e) => {
353            return Err(io::Error::new(
354                io::ErrorKind::InvalidInput,
355                format!("invalid regex '{}': {}", pattern, e),
356            ));
357        }
358    };
359
360    let matches = find_regex_matches_backward(data, &re);
361
362    if matches.is_empty() {
363        out.write_all(data)?;
364        return Ok(());
365    }
366
367    // For regex separators, use write_vectored since there are typically
368    // few large records. Build all IoSlices at once and flush.
369    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(matches.len() + 2);
370
371    if !before {
372        let last_end = matches.last().unwrap().1;
373
374        if last_end < data.len() {
375            slices.push(IoSlice::new(&data[last_end..]));
376        }
377
378        let mut i = matches.len();
379        while i > 0 {
380            i -= 1;
381            let rec_start = if i == 0 { 0 } else { matches[i - 1].1 };
382            slices.push(IoSlice::new(&data[rec_start..matches[i].1]));
383        }
384    } else {
385        let mut i = matches.len();
386        while i > 0 {
387            i -= 1;
388            let start = matches[i].0;
389            let end = if i + 1 < matches.len() {
390                matches[i + 1].0
391            } else {
392                data.len()
393            };
394            slices.push(IoSlice::new(&data[start..end]));
395        }
396
397        if matches[0].0 > 0 {
398            slices.push(IoSlice::new(&data[..matches[0].0]));
399        }
400    }
401
402    write_all_vectored(out, &slices)
403}
404
405/// Write all IoSlice entries, handling partial writes.
406/// Hot path: single write_vectored succeeds fully (common on Linux pipes/files).
407/// Cold path: partial write handled out-of-line to keep hot path tight.
408#[inline(always)]
409fn write_all_vectored(out: &mut impl Write, slices: &[IoSlice<'_>]) -> io::Result<()> {
410    let total: usize = slices.iter().map(|s| s.len()).sum();
411    let written = out.write_vectored(slices)?;
412    if written >= total {
413        return Ok(());
414    }
415    if written == 0 {
416        return Err(io::Error::new(io::ErrorKind::WriteZero, "write zero"));
417    }
418    flush_vectored_slow(out, slices, written)
419}
420
421/// Handle partial write (cold path, never inlined).
422#[cold]
423#[inline(never)]
424fn flush_vectored_slow(
425    out: &mut impl Write,
426    slices: &[IoSlice<'_>],
427    mut skip: usize,
428) -> io::Result<()> {
429    for slice in slices {
430        let len = slice.len();
431        if skip >= len {
432            skip -= len;
433            continue;
434        }
435        out.write_all(&slice[skip..])?;
436        skip = 0;
437    }
438    Ok(())
439}