Skip to main content

coreutils_rs/tac/
core.rs

1use std::io::{self, IoSlice, Write};
2
3use rayon::prelude::*;
4
5/// Threshold for parallel processing (512KB).
6/// Lower threshold allows parallelism for smaller files in benchmarks.
7const PARALLEL_THRESHOLD: usize = 512 * 1024;
8
9/// Reverse records separated by a single byte.
10/// Scans for separators with SIMD memchr, then outputs records in reverse
11/// order directly from the input buffer. Expects the caller to provide
12/// a buffered writer (e.g., BufWriter) for efficient syscall batching.
13pub fn tac_bytes(data: &[u8], separator: u8, before: bool, out: &mut impl Write) -> io::Result<()> {
14    if data.is_empty() {
15        return Ok(());
16    }
17    if data.len() >= PARALLEL_THRESHOLD {
18        if !before {
19            tac_bytes_after_parallel(data, separator, out)
20        } else {
21            tac_bytes_before_parallel(data, separator, out)
22        }
23    } else if !before {
24        tac_bytes_after(data, separator, out)
25    } else {
26        tac_bytes_before(data, separator, out)
27    }
28}
29
30/// Reverse records of an owned Vec. Delegates to tac_bytes.
31pub fn tac_bytes_owned(
32    data: &mut [u8],
33    separator: u8,
34    before: bool,
35    out: &mut impl Write,
36) -> io::Result<()> {
37    tac_bytes(data, separator, before, out)
38}
39
40/// Collect separator positions with pre-allocated Vec.
41/// memchr_iter's size_hint returns (0, Some(len)), so collect() starts at
42/// capacity 0 and doubles ~20 times for 1M+ separators. Pre-allocating
43/// with an estimated line length avoids all reallocations.
44#[inline]
45fn collect_positions_byte(data: &[u8], sep: u8) -> Vec<usize> {
46    let estimated = data.len() / 40 + 64; // ~40 bytes per line, conservative
47    let mut positions = Vec::with_capacity(estimated);
48    for pos in memchr::memchr_iter(sep, data) {
49        positions.push(pos);
50    }
51    positions
52}
53
54/// Collect multi-byte separator positions with pre-allocated Vec.
55#[inline]
56fn collect_positions_str(data: &[u8], separator: &[u8]) -> Vec<usize> {
57    let estimated = data.len() / 40 + 64;
58    let mut positions = Vec::with_capacity(estimated);
59    for pos in memchr::memmem::find_iter(data, separator) {
60        positions.push(pos);
61    }
62    positions
63}
64
65/// Split data into chunks at separator boundaries for parallel processing.
66/// Returns chunk boundary positions (indices into data).
67fn split_into_chunks(data: &[u8], sep: u8) -> Vec<usize> {
68    let num_threads = rayon::current_num_threads().max(1);
69    let chunk_target = data.len() / num_threads;
70    let mut boundaries = vec![0usize];
71    for i in 1..num_threads {
72        let target = i * chunk_target;
73        if target >= data.len() {
74            break;
75        }
76        if let Some(p) = memchr::memchr(sep, &data[target..]) {
77            let b = target + p + 1;
78            if b > *boundaries.last().unwrap() && b <= data.len() {
79                boundaries.push(b);
80            }
81        }
82    }
83    boundaries.push(data.len());
84    boundaries
85}
86
87/// Parallel after-separator mode: split file into chunks, find separator
88/// positions in parallel, then write records in reverse using batched writev.
89/// Zero-copy: IoSlice entries point directly into the source data.
90/// Eliminates the output buffer allocation + memcpy entirely.
91fn tac_bytes_after_parallel(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
92    let boundaries = split_into_chunks(data, sep);
93    let n_chunks = boundaries.len() - 1;
94    if n_chunks == 0 {
95        return out.write_all(data);
96    }
97
98    // Parallel: collect separator positions per chunk (absolute positions into data).
99    let chunk_positions: Vec<Vec<usize>> = (0..n_chunks)
100        .into_par_iter()
101        .map(|i| {
102            let chunk_start = boundaries[i];
103            let chunk_end = boundaries[i + 1];
104            let chunk = &data[chunk_start..chunk_end];
105            let estimated = chunk.len() / 40 + 64;
106            let mut positions = Vec::with_capacity(estimated);
107            for p in memchr::memchr_iter(sep, chunk) {
108                positions.push(chunk_start + p); // absolute position
109            }
110            positions
111        })
112        .collect();
113
114    // Write records in reverse chunk order using batched writev.
115    // Within each chunk, records are iterated from end to start (reverse).
116    const BATCH: usize = 1024;
117    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
118
119    for i in (0..n_chunks).rev() {
120        let positions = &chunk_positions[i];
121        let chunk_start = boundaries[i];
122        let chunk_end = boundaries[i + 1];
123        let mut end = chunk_end;
124
125        for &pos in positions.iter().rev() {
126            let rec_start = pos + 1;
127            if rec_start < end {
128                slices.push(IoSlice::new(&data[rec_start..end]));
129                if slices.len() >= BATCH {
130                    write_all_vectored(out, &slices)?;
131                    slices.clear();
132                }
133            }
134            end = rec_start;
135        }
136
137        // Remaining prefix within chunk
138        if end > chunk_start {
139            slices.push(IoSlice::new(&data[chunk_start..end]));
140            if slices.len() >= BATCH {
141                write_all_vectored(out, &slices)?;
142                slices.clear();
143            }
144        }
145    }
146
147    if !slices.is_empty() {
148        write_all_vectored(out, &slices)?;
149    }
150
151    Ok(())
152}
153
154/// Parallel before-separator mode: split file into chunks, find separator
155/// positions in parallel, then write records in reverse using batched writev.
156/// Zero-copy: IoSlice entries point directly into the source data.
157fn tac_bytes_before_parallel(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
158    let boundaries = split_into_chunks(data, sep);
159    let n_chunks = boundaries.len() - 1;
160    if n_chunks == 0 {
161        return out.write_all(data);
162    }
163
164    // Parallel: collect separator positions per chunk (absolute positions into data).
165    let chunk_positions: Vec<Vec<usize>> = (0..n_chunks)
166        .into_par_iter()
167        .map(|i| {
168            let chunk_start = boundaries[i];
169            let chunk_end = boundaries[i + 1];
170            let chunk = &data[chunk_start..chunk_end];
171            let estimated = chunk.len() / 40 + 64;
172            let mut positions = Vec::with_capacity(estimated);
173            for p in memchr::memchr_iter(sep, chunk) {
174                positions.push(chunk_start + p); // absolute position
175            }
176            positions
177        })
178        .collect();
179
180    // Write records in reverse chunk order using batched writev.
181    const BATCH: usize = 1024;
182    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
183
184    for i in (0..n_chunks).rev() {
185        let positions = &chunk_positions[i];
186        let chunk_start = boundaries[i];
187        let chunk_end = boundaries[i + 1];
188        let mut end = chunk_end;
189
190        // Before mode: separator attached to the start of each record.
191        for &pos in positions.iter().rev() {
192            if pos < end {
193                slices.push(IoSlice::new(&data[pos..end]));
194                if slices.len() >= BATCH {
195                    write_all_vectored(out, &slices)?;
196                    slices.clear();
197                }
198            }
199            end = pos;
200        }
201
202        // Remaining prefix within chunk
203        if end > chunk_start {
204            slices.push(IoSlice::new(&data[chunk_start..end]));
205            if slices.len() >= BATCH {
206                write_all_vectored(out, &slices)?;
207                slices.clear();
208            }
209        }
210    }
211
212    if !slices.is_empty() {
213        write_all_vectored(out, &slices)?;
214    }
215
216    Ok(())
217}
218
219/// After-separator mode: zero-copy writev from source data.
220/// Records are output in reverse order using IoSlice entries pointing
221/// directly into the input buffer. No output buffer allocation or copy needed.
222/// For files <512KB, this avoids ~100-500KB of allocation + memcpy overhead,
223/// trading it for batched writev syscalls (~3-13 calls for typical data).
224fn tac_bytes_after(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
225    let positions = collect_positions_byte(data, sep);
226
227    if positions.is_empty() {
228        return out.write_all(data);
229    }
230
231    const BATCH: usize = 1024;
232    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
233    let mut end = data.len();
234
235    for &pos in positions.iter().rev() {
236        let rec_start = pos + 1;
237        if rec_start < end {
238            slices.push(IoSlice::new(&data[rec_start..end]));
239            if slices.len() >= BATCH {
240                write_all_vectored(out, &slices)?;
241                slices.clear();
242            }
243        }
244        end = rec_start;
245    }
246
247    // Remaining prefix before the first separator
248    if end > 0 {
249        slices.push(IoSlice::new(&data[..end]));
250    }
251    if !slices.is_empty() {
252        write_all_vectored(out, &slices)?;
253    }
254
255    Ok(())
256}
257
258/// Before-separator mode: zero-copy writev from source data.
259/// Same approach as after mode: IoSlice entries pointing into source data.
260fn tac_bytes_before(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
261    let positions = collect_positions_byte(data, sep);
262
263    if positions.is_empty() {
264        return out.write_all(data);
265    }
266
267    const BATCH: usize = 1024;
268    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
269    let mut end = data.len();
270
271    for &pos in positions.iter().rev() {
272        if pos < end {
273            slices.push(IoSlice::new(&data[pos..end]));
274            if slices.len() >= BATCH {
275                write_all_vectored(out, &slices)?;
276                slices.clear();
277            }
278        }
279        end = pos;
280    }
281
282    if end > 0 {
283        slices.push(IoSlice::new(&data[..end]));
284    }
285    if !slices.is_empty() {
286        write_all_vectored(out, &slices)?;
287    }
288
289    Ok(())
290}
291
292/// Reverse records using a multi-byte string separator.
293/// Uses SIMD-accelerated memmem + write_all output.
294///
295/// For single-byte separators, delegates to tac_bytes which uses memchr (faster).
296pub fn tac_string_separator(
297    data: &[u8],
298    separator: &[u8],
299    before: bool,
300    out: &mut impl Write,
301) -> io::Result<()> {
302    if data.is_empty() {
303        return Ok(());
304    }
305
306    if separator.len() == 1 {
307        return tac_bytes(data, separator[0], before, out);
308    }
309
310    let sep_len = separator.len();
311
312    if !before {
313        tac_string_after(data, separator, sep_len, out)
314    } else {
315        tac_string_before(data, separator, sep_len, out)
316    }
317}
318
319/// Multi-byte string separator, after mode. Uses writev batching.
320fn tac_string_after(
321    data: &[u8],
322    separator: &[u8],
323    sep_len: usize,
324    out: &mut impl Write,
325) -> io::Result<()> {
326    let positions = collect_positions_str(data, separator);
327
328    if positions.is_empty() {
329        return out.write_all(data);
330    }
331
332    const BATCH: usize = 1024;
333    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
334    let mut end = data.len();
335
336    for &pos in positions.iter().rev() {
337        let rec_start = pos + sep_len;
338        if rec_start < end {
339            slices.push(IoSlice::new(&data[rec_start..end]));
340            if slices.len() >= BATCH {
341                write_all_vectored(out, &slices)?;
342                slices.clear();
343            }
344        }
345        end = rec_start;
346    }
347    if end > 0 {
348        slices.push(IoSlice::new(&data[..end]));
349    }
350    if !slices.is_empty() {
351        write_all_vectored(out, &slices)?;
352    }
353    Ok(())
354}
355
356/// Multi-byte string separator, before mode. Uses writev batching.
357fn tac_string_before(
358    data: &[u8],
359    separator: &[u8],
360    _sep_len: usize,
361    out: &mut impl Write,
362) -> io::Result<()> {
363    let positions = collect_positions_str(data, separator);
364
365    if positions.is_empty() {
366        return out.write_all(data);
367    }
368
369    const BATCH: usize = 1024;
370    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
371    let mut end = data.len();
372
373    for &pos in positions.iter().rev() {
374        if pos < end {
375            slices.push(IoSlice::new(&data[pos..end]));
376            if slices.len() >= BATCH {
377                write_all_vectored(out, &slices)?;
378                slices.clear();
379            }
380        }
381        end = pos;
382    }
383    if end > 0 {
384        slices.push(IoSlice::new(&data[..end]));
385    }
386    if !slices.is_empty() {
387        write_all_vectored(out, &slices)?;
388    }
389    Ok(())
390}
391
392/// Find regex matches using backward scanning, matching GNU tac's re_search behavior.
393fn find_regex_matches_backward(data: &[u8], re: &regex::bytes::Regex) -> Vec<(usize, usize)> {
394    let mut matches = Vec::new();
395    let mut past_end = data.len();
396
397    while past_end > 0 {
398        let buf = &data[..past_end];
399        let mut found = false;
400
401        let mut pos = past_end;
402        while pos > 0 {
403            pos -= 1;
404            if let Some(m) = re.find_at(buf, pos) {
405                if m.start() == pos {
406                    matches.push((m.start(), m.end()));
407                    past_end = m.start();
408                    found = true;
409                    break;
410                }
411            }
412        }
413
414        if !found {
415            break;
416        }
417    }
418
419    matches.reverse();
420    matches
421}
422
423/// Reverse records using a regex separator.
424/// Uses write_vectored for regex path (typically few large records).
425pub fn tac_regex_separator(
426    data: &[u8],
427    pattern: &str,
428    before: bool,
429    out: &mut impl Write,
430) -> io::Result<()> {
431    if data.is_empty() {
432        return Ok(());
433    }
434
435    let re = match regex::bytes::Regex::new(pattern) {
436        Ok(r) => r,
437        Err(e) => {
438            return Err(io::Error::new(
439                io::ErrorKind::InvalidInput,
440                format!("invalid regex '{}': {}", pattern, e),
441            ));
442        }
443    };
444
445    let matches = find_regex_matches_backward(data, &re);
446
447    if matches.is_empty() {
448        out.write_all(data)?;
449        return Ok(());
450    }
451
452    // For regex separators, use write_vectored since there are typically
453    // few large records. Build all IoSlices at once and flush.
454    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(matches.len() + 2);
455
456    if !before {
457        let last_end = matches.last().unwrap().1;
458
459        if last_end < data.len() {
460            slices.push(IoSlice::new(&data[last_end..]));
461        }
462
463        let mut i = matches.len();
464        while i > 0 {
465            i -= 1;
466            let rec_start = if i == 0 { 0 } else { matches[i - 1].1 };
467            slices.push(IoSlice::new(&data[rec_start..matches[i].1]));
468        }
469    } else {
470        let mut i = matches.len();
471        while i > 0 {
472            i -= 1;
473            let start = matches[i].0;
474            let end = if i + 1 < matches.len() {
475                matches[i + 1].0
476            } else {
477                data.len()
478            };
479            slices.push(IoSlice::new(&data[start..end]));
480        }
481
482        if matches[0].0 > 0 {
483            slices.push(IoSlice::new(&data[..matches[0].0]));
484        }
485    }
486
487    write_all_vectored(out, &slices)
488}
489
490/// Write all IoSlices, handling partial writes.
491#[inline]
492fn write_all_vectored(out: &mut impl Write, slices: &[IoSlice<'_>]) -> io::Result<()> {
493    if slices.is_empty() {
494        return Ok(());
495    }
496
497    let written = out.write_vectored(slices)?;
498    if written == 0 && slices.iter().any(|s| !s.is_empty()) {
499        return Err(io::Error::new(io::ErrorKind::WriteZero, "write zero"));
500    }
501
502    let total: usize = slices.iter().map(|s| s.len()).sum();
503    if written >= total {
504        return Ok(());
505    }
506
507    // Partial write: skip past fully-written slices, then write_all the rest
508    let mut skip = written;
509    for slice in slices {
510        let len = slice.len();
511        if skip >= len {
512            skip -= len;
513            continue;
514        }
515        out.write_all(&slice[skip..])?;
516        skip = 0;
517    }
518    Ok(())
519}