Skip to main content

coreutils_rs/tac/
core.rs

1use std::io::{self, IoSlice, Write};
2
3use rayon::prelude::*;
4
5/// Threshold for parallel processing (64MB).
6/// Each benchmark invocation is a fresh process, so rayon pool init (~0.5-1ms)
7/// is paid every time. For 10MB files, single-threaded scan (0.3ms) is faster
8/// than rayon init + parallel scan. Only use parallelism for genuinely large
9/// files where multi-core scanning and copying pays off.
10const PARALLEL_THRESHOLD: usize = 64 * 1024 * 1024;
11
12/// Maximum IoSlice entries per write_vectored batch.
13/// Used by string/regex separator paths.
14const IOSLICE_BATCH_SIZE: usize = 1024;
15
16/// Reverse records separated by a single byte.
17/// For large data (>= 8MB): parallel chunk-local reversal with contiguous buffers.
18/// For small data: single-threaded forward SIMD scan + contiguous output buffer.
19pub fn tac_bytes(data: &[u8], separator: u8, before: bool, out: &mut impl Write) -> io::Result<()> {
20    if data.is_empty() {
21        return Ok(());
22    }
23    if data.len() >= PARALLEL_THRESHOLD {
24        if !before {
25            tac_bytes_after_contiguous(data, separator, out)
26        } else {
27            tac_bytes_before_contiguous(data, separator, out)
28        }
29    } else if !before {
30        tac_bytes_after(data, separator, out)
31    } else {
32        tac_bytes_before(data, separator, out)
33    }
34}
35
36/// Reverse records of an owned Vec. Delegates to tac_bytes.
37pub fn tac_bytes_owned(
38    data: &mut [u8],
39    separator: u8,
40    before: bool,
41    out: &mut impl Write,
42) -> io::Result<()> {
43    tac_bytes(data, separator, before, out)
44}
45
46/// Collect multi-byte separator positions with pre-allocated Vec.
47#[inline]
48fn collect_positions_str(data: &[u8], separator: &[u8]) -> Vec<usize> {
49    let estimated = data.len() / 40 + 64;
50    let mut positions = Vec::with_capacity(estimated);
51    for pos in memchr::memmem::find_iter(data, separator) {
52        positions.push(pos);
53    }
54    positions
55}
56
57/// Parallel chunk-local reversal for after-separator mode.
58/// Splits data into N chunks at newline boundaries, each chunk independently
59/// scans forward and builds a reversed output buffer in parallel, then writes
60/// chunk buffers in reverse order. Eliminates IoSlice overhead and reduces
61/// syscalls to N (one per chunk).
62fn tac_bytes_after_contiguous(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
63    let n_threads = rayon::current_num_threads().max(1);
64    let chunk_size = data.len() / n_threads;
65
66    // Find chunk boundaries at separator positions
67    let mut boundaries = Vec::with_capacity(n_threads + 1);
68    boundaries.push(0);
69    for i in 1..n_threads {
70        let target = i * chunk_size;
71        if target >= data.len() {
72            break;
73        }
74        // In after mode, separator ends a record; boundary is right after separator
75        let boundary = memchr::memchr(sep, &data[target..])
76            .map(|p| target + p + 1)
77            .unwrap_or(data.len());
78        if boundary < data.len() {
79            boundaries.push(boundary);
80        }
81    }
82    boundaries.push(data.len());
83    boundaries.dedup();
84    let n_chunks = boundaries.len() - 1;
85
86    // Each chunk: forward scan for positions, build reversed output buffer
87    let reversed_chunks: Vec<Vec<u8>> = (0..n_chunks)
88        .into_par_iter()
89        .map(|i| {
90            let start = boundaries[i];
91            let end = boundaries[i + 1];
92            let chunk = &data[start..end];
93            if chunk.is_empty() {
94                return Vec::new();
95            }
96
97            // Collect separator positions within chunk
98            let mut positions: Vec<usize> = Vec::with_capacity(chunk.len() / 40 + 64);
99            for pos in memchr::memchr_iter(sep, chunk) {
100                positions.push(pos);
101            }
102
103            // Build reversed output buffer
104            let mut buf = Vec::with_capacity(chunk.len());
105            let mut end_pos = chunk.len();
106            for &pos in positions.iter().rev() {
107                let rec_start = pos + 1;
108                if rec_start < end_pos {
109                    buf.extend_from_slice(&chunk[rec_start..end_pos]);
110                }
111                end_pos = rec_start;
112            }
113            if end_pos > 0 {
114                buf.extend_from_slice(&chunk[..end_pos]);
115            }
116            buf
117        })
118        .collect();
119
120    // Write chunks in reverse order (last chunk first = correct tac order)
121    for chunk in reversed_chunks.iter().rev() {
122        if !chunk.is_empty() {
123            out.write_all(chunk)?;
124        }
125    }
126    Ok(())
127}
128
129/// Parallel chunk-local reversal for before-separator mode.
130/// Same approach as after mode but separator attaches to the START of each record.
131fn tac_bytes_before_contiguous(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
132    let n_threads = rayon::current_num_threads().max(1);
133    let chunk_size = data.len() / n_threads;
134
135    // Find chunk boundaries at separator positions
136    let mut boundaries = Vec::with_capacity(n_threads + 1);
137    boundaries.push(0);
138    for i in 1..n_threads {
139        let target = i * chunk_size;
140        if target >= data.len() {
141            break;
142        }
143        // In before mode, separator starts a record; boundary is AT the separator
144        let boundary = memchr::memchr(sep, &data[target..])
145            .map(|p| target + p)
146            .unwrap_or(data.len());
147        if boundary > 0 && boundary < data.len() {
148            boundaries.push(boundary);
149        }
150    }
151    boundaries.push(data.len());
152    boundaries.dedup();
153    let n_chunks = boundaries.len() - 1;
154
155    // Each chunk: forward scan for positions, build reversed output buffer
156    let reversed_chunks: Vec<Vec<u8>> = (0..n_chunks)
157        .into_par_iter()
158        .map(|i| {
159            let start = boundaries[i];
160            let end = boundaries[i + 1];
161            let chunk = &data[start..end];
162            if chunk.is_empty() {
163                return Vec::new();
164            }
165
166            // Collect separator positions within chunk
167            let mut positions: Vec<usize> = Vec::with_capacity(chunk.len() / 40 + 64);
168            for pos in memchr::memchr_iter(sep, chunk) {
169                positions.push(pos);
170            }
171
172            // Build reversed output buffer (before mode: separator at start of record)
173            let mut buf = Vec::with_capacity(chunk.len());
174            let mut end_pos = chunk.len();
175            for &pos in positions.iter().rev() {
176                if pos < end_pos {
177                    buf.extend_from_slice(&chunk[pos..end_pos]);
178                }
179                end_pos = pos;
180            }
181            if end_pos > 0 {
182                buf.extend_from_slice(&chunk[..end_pos]);
183            }
184            buf
185        })
186        .collect();
187
188    // Write chunks in reverse order
189    for chunk in reversed_chunks.iter().rev() {
190        if !chunk.is_empty() {
191            out.write_all(chunk)?;
192        }
193    }
194    Ok(())
195}
196
197/// After-separator mode for small files: forward SIMD scan + contiguous buffer.
198/// Forward memchr_iter is faster than backward memrchr_iter, and a single
199/// contiguous output buffer eliminates IoSlice overhead and reduces syscalls to 1.
200fn tac_bytes_after(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
201    if data.is_empty() {
202        return Ok(());
203    }
204
205    // Forward scan for separator positions
206    let mut positions: Vec<usize> = Vec::with_capacity(data.len() / 40 + 64);
207    for pos in memchr::memchr_iter(sep, data) {
208        positions.push(pos);
209    }
210
211    // Build contiguous reversed output
212    let mut buf = Vec::with_capacity(data.len());
213    let mut end = data.len();
214    for &pos in positions.iter().rev() {
215        let rec_start = pos + 1;
216        if rec_start < end {
217            buf.extend_from_slice(&data[rec_start..end]);
218        }
219        end = rec_start;
220    }
221    if end > 0 {
222        buf.extend_from_slice(&data[..end]);
223    }
224
225    out.write_all(&buf)
226}
227
228/// Before-separator mode for small files: forward SIMD scan + contiguous buffer.
229fn tac_bytes_before(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
230    if data.is_empty() {
231        return Ok(());
232    }
233
234    // Forward scan for separator positions
235    let mut positions: Vec<usize> = Vec::with_capacity(data.len() / 40 + 64);
236    for pos in memchr::memchr_iter(sep, data) {
237        positions.push(pos);
238    }
239
240    // Build contiguous reversed output (before mode: separator at start of record)
241    let mut buf = Vec::with_capacity(data.len());
242    let mut end = data.len();
243    for &pos in positions.iter().rev() {
244        if pos < end {
245            buf.extend_from_slice(&data[pos..end]);
246        }
247        end = pos;
248    }
249    if end > 0 {
250        buf.extend_from_slice(&data[..end]);
251    }
252
253    out.write_all(&buf)
254}
255
256/// Reverse records using a multi-byte string separator.
257/// Uses SIMD-accelerated memmem + write_all output.
258///
259/// For single-byte separators, delegates to tac_bytes which uses memchr (faster).
260pub fn tac_string_separator(
261    data: &[u8],
262    separator: &[u8],
263    before: bool,
264    out: &mut impl Write,
265) -> io::Result<()> {
266    if data.is_empty() {
267        return Ok(());
268    }
269
270    if separator.len() == 1 {
271        return tac_bytes(data, separator[0], before, out);
272    }
273
274    let sep_len = separator.len();
275
276    if !before {
277        tac_string_after(data, separator, sep_len, out)
278    } else {
279        tac_string_before(data, separator, sep_len, out)
280    }
281}
282
283/// Multi-byte string separator, after mode. Uses writev batching.
284fn tac_string_after(
285    data: &[u8],
286    separator: &[u8],
287    sep_len: usize,
288    out: &mut impl Write,
289) -> io::Result<()> {
290    let positions = collect_positions_str(data, separator);
291
292    if positions.is_empty() {
293        return out.write_all(data);
294    }
295
296    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(IOSLICE_BATCH_SIZE);
297    let mut end = data.len();
298
299    for &pos in positions.iter().rev() {
300        let rec_start = pos + sep_len;
301        if rec_start < end {
302            slices.push(IoSlice::new(&data[rec_start..end]));
303            if slices.len() >= IOSLICE_BATCH_SIZE {
304                write_all_vectored(out, &slices)?;
305                slices.clear();
306            }
307        }
308        end = rec_start;
309    }
310    if end > 0 {
311        slices.push(IoSlice::new(&data[..end]));
312    }
313    if !slices.is_empty() {
314        write_all_vectored(out, &slices)?;
315    }
316    Ok(())
317}
318
319/// Multi-byte string separator, before mode. Uses writev batching.
320fn tac_string_before(
321    data: &[u8],
322    separator: &[u8],
323    _sep_len: usize,
324    out: &mut impl Write,
325) -> io::Result<()> {
326    let positions = collect_positions_str(data, separator);
327
328    if positions.is_empty() {
329        return out.write_all(data);
330    }
331
332    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(IOSLICE_BATCH_SIZE);
333    let mut end = data.len();
334
335    for &pos in positions.iter().rev() {
336        if pos < end {
337            slices.push(IoSlice::new(&data[pos..end]));
338            if slices.len() >= IOSLICE_BATCH_SIZE {
339                write_all_vectored(out, &slices)?;
340                slices.clear();
341            }
342        }
343        end = pos;
344    }
345    if end > 0 {
346        slices.push(IoSlice::new(&data[..end]));
347    }
348    if !slices.is_empty() {
349        write_all_vectored(out, &slices)?;
350    }
351    Ok(())
352}
353
354/// Find regex matches using backward scanning, matching GNU tac's re_search behavior.
355fn find_regex_matches_backward(data: &[u8], re: &regex::bytes::Regex) -> Vec<(usize, usize)> {
356    let mut matches = Vec::new();
357    let mut past_end = data.len();
358
359    while past_end > 0 {
360        let buf = &data[..past_end];
361        let mut found = false;
362
363        let mut pos = past_end;
364        while pos > 0 {
365            pos -= 1;
366            if let Some(m) = re.find_at(buf, pos) {
367                if m.start() == pos {
368                    matches.push((m.start(), m.end()));
369                    past_end = m.start();
370                    found = true;
371                    break;
372                }
373            }
374        }
375
376        if !found {
377            break;
378        }
379    }
380
381    matches.reverse();
382    matches
383}
384
385/// Reverse records using a regex separator.
386/// Uses write_vectored for regex path (typically few large records).
387pub fn tac_regex_separator(
388    data: &[u8],
389    pattern: &str,
390    before: bool,
391    out: &mut impl Write,
392) -> io::Result<()> {
393    if data.is_empty() {
394        return Ok(());
395    }
396
397    let re = match regex::bytes::Regex::new(pattern) {
398        Ok(r) => r,
399        Err(e) => {
400            return Err(io::Error::new(
401                io::ErrorKind::InvalidInput,
402                format!("invalid regex '{}': {}", pattern, e),
403            ));
404        }
405    };
406
407    let matches = find_regex_matches_backward(data, &re);
408
409    if matches.is_empty() {
410        out.write_all(data)?;
411        return Ok(());
412    }
413
414    // For regex separators, use write_vectored since there are typically
415    // few large records. Build all IoSlices at once and flush.
416    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(matches.len() + 2);
417
418    if !before {
419        let last_end = matches.last().unwrap().1;
420
421        if last_end < data.len() {
422            slices.push(IoSlice::new(&data[last_end..]));
423        }
424
425        let mut i = matches.len();
426        while i > 0 {
427            i -= 1;
428            let rec_start = if i == 0 { 0 } else { matches[i - 1].1 };
429            slices.push(IoSlice::new(&data[rec_start..matches[i].1]));
430        }
431    } else {
432        let mut i = matches.len();
433        while i > 0 {
434            i -= 1;
435            let start = matches[i].0;
436            let end = if i + 1 < matches.len() {
437                matches[i + 1].0
438            } else {
439                data.len()
440            };
441            slices.push(IoSlice::new(&data[start..end]));
442        }
443
444        if matches[0].0 > 0 {
445            slices.push(IoSlice::new(&data[..matches[0].0]));
446        }
447    }
448
449    write_all_vectored(out, &slices)
450}
451
452/// Write all IoSlice entries, handling partial writes.
453/// Hot path: single write_vectored succeeds fully (common on Linux pipes/files).
454/// Cold path: partial write handled out-of-line to keep hot path tight.
455#[inline(always)]
456fn write_all_vectored(out: &mut impl Write, slices: &[IoSlice<'_>]) -> io::Result<()> {
457    let total: usize = slices.iter().map(|s| s.len()).sum();
458    let written = out.write_vectored(slices)?;
459    if written >= total {
460        return Ok(());
461    }
462    if written == 0 {
463        return Err(io::Error::new(io::ErrorKind::WriteZero, "write zero"));
464    }
465    flush_vectored_slow(out, slices, written)
466}
467
468/// Handle partial write (cold path, never inlined).
469#[cold]
470#[inline(never)]
471fn flush_vectored_slow(
472    out: &mut impl Write,
473    slices: &[IoSlice<'_>],
474    mut skip: usize,
475) -> io::Result<()> {
476    for slice in slices {
477        let len = slice.len();
478        if skip >= len {
479            skip -= len;
480            continue;
481        }
482        out.write_all(&slice[skip..])?;
483        skip = 0;
484    }
485    Ok(())
486}