Skip to main content

coreutils_rs/tac/
core.rs

1use std::io::{self, IoSlice, Write};
2
3use rayon::prelude::*;
4
5/// Threshold for parallel processing (64MB).
6/// Each benchmark invocation is a fresh process, so rayon pool init (~0.5-1ms)
7/// is paid every time. For 10MB files, single-threaded scan (0.3ms) is faster
8/// than rayon init + parallel scan. Only use parallelism for genuinely large
9/// files where multi-core scanning and copying pays off.
10const PARALLEL_THRESHOLD: usize = 64 * 1024 * 1024;
11
12/// Maximum IoSlice entries per write_vectored batch.
13/// Used by string/regex separator paths.
14const IOSLICE_BATCH_SIZE: usize = 1024;
15
16/// Reverse records separated by a single byte.
17/// For large data (>= 8MB): parallel chunk-local reversal with contiguous buffers.
18/// For small data: single-threaded forward SIMD scan + contiguous output buffer.
19pub fn tac_bytes(data: &[u8], separator: u8, before: bool, out: &mut impl Write) -> io::Result<()> {
20    if data.is_empty() {
21        return Ok(());
22    }
23    if data.len() >= PARALLEL_THRESHOLD {
24        if !before {
25            tac_bytes_after_contiguous(data, separator, out)
26        } else {
27            tac_bytes_before_contiguous(data, separator, out)
28        }
29    } else if !before {
30        tac_bytes_after(data, separator, out)
31    } else {
32        tac_bytes_before(data, separator, out)
33    }
34}
35
36/// Reverse records of an owned Vec. Delegates to tac_bytes.
37pub fn tac_bytes_owned(
38    data: &mut [u8],
39    separator: u8,
40    before: bool,
41    out: &mut impl Write,
42) -> io::Result<()> {
43    tac_bytes(data, separator, before, out)
44}
45
46/// Collect multi-byte separator positions with pre-allocated Vec.
47#[inline]
48fn collect_positions_str(data: &[u8], separator: &[u8]) -> Vec<usize> {
49    let estimated = data.len() / 40 + 64;
50    let mut positions = Vec::with_capacity(estimated);
51    for pos in memchr::memmem::find_iter(data, separator) {
52        positions.push(pos);
53    }
54    positions
55}
56
57/// Parallel chunk-local reversal for after-separator mode.
58/// Splits data into N chunks at newline boundaries, each chunk independently
59/// scans forward and builds a reversed output buffer in parallel, then writes
60/// chunk buffers in reverse order. Eliminates IoSlice overhead and reduces
61/// syscalls to N (one per chunk).
62fn tac_bytes_after_contiguous(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
63    let n_threads = rayon::current_num_threads().max(1);
64    let chunk_size = data.len() / n_threads;
65
66    // Find chunk boundaries at separator positions
67    let mut boundaries = Vec::with_capacity(n_threads + 1);
68    boundaries.push(0);
69    for i in 1..n_threads {
70        let target = i * chunk_size;
71        if target >= data.len() {
72            break;
73        }
74        // In after mode, separator ends a record; boundary is right after separator
75        let boundary = memchr::memchr(sep, &data[target..])
76            .map(|p| target + p + 1)
77            .unwrap_or(data.len());
78        if boundary < data.len() {
79            boundaries.push(boundary);
80        }
81    }
82    boundaries.push(data.len());
83    boundaries.dedup();
84    let n_chunks = boundaries.len() - 1;
85
86    // Each chunk: forward scan for positions, build reversed output buffer
87    let reversed_chunks: Vec<Vec<u8>> = (0..n_chunks)
88        .into_par_iter()
89        .map(|i| {
90            let start = boundaries[i];
91            let end = boundaries[i + 1];
92            let chunk = &data[start..end];
93            if chunk.is_empty() {
94                return Vec::new();
95            }
96
97            // Collect separator positions within chunk
98            let mut positions: Vec<usize> = Vec::with_capacity(chunk.len() / 40 + 64);
99            for pos in memchr::memchr_iter(sep, chunk) {
100                positions.push(pos);
101            }
102
103            // Build reversed output buffer
104            let mut buf = Vec::with_capacity(chunk.len());
105            let mut end_pos = chunk.len();
106            for &pos in positions.iter().rev() {
107                let rec_start = pos + 1;
108                if rec_start < end_pos {
109                    buf.extend_from_slice(&chunk[rec_start..end_pos]);
110                }
111                end_pos = rec_start;
112            }
113            if end_pos > 0 {
114                buf.extend_from_slice(&chunk[..end_pos]);
115            }
116            buf
117        })
118        .collect();
119
120    // Write chunks in reverse order (last chunk first = correct tac order)
121    for chunk in reversed_chunks.iter().rev() {
122        if !chunk.is_empty() {
123            out.write_all(chunk)?;
124        }
125    }
126    Ok(())
127}
128
129/// Parallel chunk-local reversal for before-separator mode.
130/// Same approach as after mode but separator attaches to the START of each record.
131fn tac_bytes_before_contiguous(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
132    let n_threads = rayon::current_num_threads().max(1);
133    let chunk_size = data.len() / n_threads;
134
135    // Find chunk boundaries at separator positions
136    let mut boundaries = Vec::with_capacity(n_threads + 1);
137    boundaries.push(0);
138    for i in 1..n_threads {
139        let target = i * chunk_size;
140        if target >= data.len() {
141            break;
142        }
143        // In before mode, separator starts a record; boundary is AT the separator
144        let boundary = memchr::memchr(sep, &data[target..])
145            .map(|p| target + p)
146            .unwrap_or(data.len());
147        if boundary > 0 && boundary < data.len() {
148            boundaries.push(boundary);
149        }
150    }
151    boundaries.push(data.len());
152    boundaries.dedup();
153    let n_chunks = boundaries.len() - 1;
154
155    // Each chunk: forward scan for positions, build reversed output buffer
156    let reversed_chunks: Vec<Vec<u8>> = (0..n_chunks)
157        .into_par_iter()
158        .map(|i| {
159            let start = boundaries[i];
160            let end = boundaries[i + 1];
161            let chunk = &data[start..end];
162            if chunk.is_empty() {
163                return Vec::new();
164            }
165
166            // Collect separator positions within chunk
167            let mut positions: Vec<usize> = Vec::with_capacity(chunk.len() / 40 + 64);
168            for pos in memchr::memchr_iter(sep, chunk) {
169                positions.push(pos);
170            }
171
172            // Build reversed output buffer (before mode: separator at start of record)
173            let mut buf = Vec::with_capacity(chunk.len());
174            let mut end_pos = chunk.len();
175            for &pos in positions.iter().rev() {
176                if pos < end_pos {
177                    buf.extend_from_slice(&chunk[pos..end_pos]);
178                }
179                end_pos = pos;
180            }
181            if end_pos > 0 {
182                buf.extend_from_slice(&chunk[..end_pos]);
183            }
184            buf
185        })
186        .collect();
187
188    // Write chunks in reverse order
189    for chunk in reversed_chunks.iter().rev() {
190        if !chunk.is_empty() {
191            out.write_all(chunk)?;
192        }
193    }
194    Ok(())
195}
196
197/// After-separator mode for small files: backward memrchr scan + streaming output.
198/// Zero extra allocation: scans backward with memrchr, writes records directly.
199fn tac_bytes_after(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
200    if data.is_empty() {
201        return Ok(());
202    }
203
204    let mut prev_end = data.len();
205    let mut search_end = data.len();
206
207    loop {
208        match memchr::memrchr(sep, &data[..search_end]) {
209            Some(pos) => {
210                let rec_start = pos + 1;
211                if rec_start < prev_end {
212                    out.write_all(&data[rec_start..prev_end])?;
213                }
214                prev_end = rec_start;
215                search_end = pos;
216            }
217            None => {
218                if prev_end > 0 {
219                    out.write_all(&data[..prev_end])?;
220                }
221                break;
222            }
223        }
224        if search_end == 0 {
225            if prev_end > 0 {
226                out.write_all(&data[..prev_end])?;
227            }
228            break;
229        }
230    }
231
232    Ok(())
233}
234
235/// Before-separator mode for small files: backward memrchr scan + streaming output.
236/// Zero extra allocation: scans backward with memrchr, writes records directly.
237fn tac_bytes_before(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
238    if data.is_empty() {
239        return Ok(());
240    }
241
242    let mut prev_end = data.len();
243    let mut search_end = data.len();
244
245    loop {
246        match memchr::memrchr(sep, &data[..search_end]) {
247            Some(pos) => {
248                if pos < prev_end {
249                    out.write_all(&data[pos..prev_end])?;
250                }
251                prev_end = pos;
252                if pos == 0 {
253                    break;
254                }
255                search_end = pos;
256            }
257            None => {
258                if prev_end > 0 {
259                    out.write_all(&data[..prev_end])?;
260                }
261                break;
262            }
263        }
264    }
265
266    Ok(())
267}
268
269/// Reverse records using a multi-byte string separator.
270/// Uses SIMD-accelerated memmem + write_all output.
271///
272/// For single-byte separators, delegates to tac_bytes which uses memchr (faster).
273pub fn tac_string_separator(
274    data: &[u8],
275    separator: &[u8],
276    before: bool,
277    out: &mut impl Write,
278) -> io::Result<()> {
279    if data.is_empty() {
280        return Ok(());
281    }
282
283    if separator.len() == 1 {
284        return tac_bytes(data, separator[0], before, out);
285    }
286
287    let sep_len = separator.len();
288
289    if !before {
290        tac_string_after(data, separator, sep_len, out)
291    } else {
292        tac_string_before(data, separator, sep_len, out)
293    }
294}
295
296/// Multi-byte string separator, after mode. Uses writev batching.
297fn tac_string_after(
298    data: &[u8],
299    separator: &[u8],
300    sep_len: usize,
301    out: &mut impl Write,
302) -> io::Result<()> {
303    let positions = collect_positions_str(data, separator);
304
305    if positions.is_empty() {
306        return out.write_all(data);
307    }
308
309    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(IOSLICE_BATCH_SIZE);
310    let mut end = data.len();
311
312    for &pos in positions.iter().rev() {
313        let rec_start = pos + sep_len;
314        if rec_start < end {
315            slices.push(IoSlice::new(&data[rec_start..end]));
316            if slices.len() >= IOSLICE_BATCH_SIZE {
317                write_all_vectored(out, &slices)?;
318                slices.clear();
319            }
320        }
321        end = rec_start;
322    }
323    if end > 0 {
324        slices.push(IoSlice::new(&data[..end]));
325    }
326    if !slices.is_empty() {
327        write_all_vectored(out, &slices)?;
328    }
329    Ok(())
330}
331
332/// Multi-byte string separator, before mode. Uses writev batching.
333fn tac_string_before(
334    data: &[u8],
335    separator: &[u8],
336    _sep_len: usize,
337    out: &mut impl Write,
338) -> io::Result<()> {
339    let positions = collect_positions_str(data, separator);
340
341    if positions.is_empty() {
342        return out.write_all(data);
343    }
344
345    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(IOSLICE_BATCH_SIZE);
346    let mut end = data.len();
347
348    for &pos in positions.iter().rev() {
349        if pos < end {
350            slices.push(IoSlice::new(&data[pos..end]));
351            if slices.len() >= IOSLICE_BATCH_SIZE {
352                write_all_vectored(out, &slices)?;
353                slices.clear();
354            }
355        }
356        end = pos;
357    }
358    if end > 0 {
359        slices.push(IoSlice::new(&data[..end]));
360    }
361    if !slices.is_empty() {
362        write_all_vectored(out, &slices)?;
363    }
364    Ok(())
365}
366
367/// Find regex matches using backward scanning, matching GNU tac's re_search behavior.
368fn find_regex_matches_backward(data: &[u8], re: &regex::bytes::Regex) -> Vec<(usize, usize)> {
369    let mut matches = Vec::new();
370    let mut past_end = data.len();
371
372    while past_end > 0 {
373        let buf = &data[..past_end];
374        let mut found = false;
375
376        let mut pos = past_end;
377        while pos > 0 {
378            pos -= 1;
379            if let Some(m) = re.find_at(buf, pos) {
380                if m.start() == pos {
381                    matches.push((m.start(), m.end()));
382                    past_end = m.start();
383                    found = true;
384                    break;
385                }
386            }
387        }
388
389        if !found {
390            break;
391        }
392    }
393
394    matches.reverse();
395    matches
396}
397
398/// Reverse records using a regex separator.
399/// Uses write_vectored for regex path (typically few large records).
400pub fn tac_regex_separator(
401    data: &[u8],
402    pattern: &str,
403    before: bool,
404    out: &mut impl Write,
405) -> io::Result<()> {
406    if data.is_empty() {
407        return Ok(());
408    }
409
410    let re = match regex::bytes::Regex::new(pattern) {
411        Ok(r) => r,
412        Err(e) => {
413            return Err(io::Error::new(
414                io::ErrorKind::InvalidInput,
415                format!("invalid regex '{}': {}", pattern, e),
416            ));
417        }
418    };
419
420    let matches = find_regex_matches_backward(data, &re);
421
422    if matches.is_empty() {
423        out.write_all(data)?;
424        return Ok(());
425    }
426
427    // For regex separators, use write_vectored since there are typically
428    // few large records. Build all IoSlices at once and flush.
429    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(matches.len() + 2);
430
431    if !before {
432        let last_end = matches.last().unwrap().1;
433
434        if last_end < data.len() {
435            slices.push(IoSlice::new(&data[last_end..]));
436        }
437
438        let mut i = matches.len();
439        while i > 0 {
440            i -= 1;
441            let rec_start = if i == 0 { 0 } else { matches[i - 1].1 };
442            slices.push(IoSlice::new(&data[rec_start..matches[i].1]));
443        }
444    } else {
445        let mut i = matches.len();
446        while i > 0 {
447            i -= 1;
448            let start = matches[i].0;
449            let end = if i + 1 < matches.len() {
450                matches[i + 1].0
451            } else {
452                data.len()
453            };
454            slices.push(IoSlice::new(&data[start..end]));
455        }
456
457        if matches[0].0 > 0 {
458            slices.push(IoSlice::new(&data[..matches[0].0]));
459        }
460    }
461
462    write_all_vectored(out, &slices)
463}
464
465/// Write all IoSlice entries, handling partial writes.
466/// Hot path: single write_vectored succeeds fully (common on Linux pipes/files).
467/// Cold path: partial write handled out-of-line to keep hot path tight.
468#[inline(always)]
469fn write_all_vectored(out: &mut impl Write, slices: &[IoSlice<'_>]) -> io::Result<()> {
470    let total: usize = slices.iter().map(|s| s.len()).sum();
471    let written = out.write_vectored(slices)?;
472    if written >= total {
473        return Ok(());
474    }
475    if written == 0 {
476        return Err(io::Error::new(io::ErrorKind::WriteZero, "write zero"));
477    }
478    flush_vectored_slow(out, slices, written)
479}
480
481/// Handle partial write (cold path, never inlined).
482#[cold]
483#[inline(never)]
484fn flush_vectored_slow(
485    out: &mut impl Write,
486    slices: &[IoSlice<'_>],
487    mut skip: usize,
488) -> io::Result<()> {
489    for slice in slices {
490        let len = slice.len();
491        if skip >= len {
492            skip -= len;
493            continue;
494        }
495        out.write_all(&slice[skip..])?;
496        skip = 0;
497    }
498    Ok(())
499}