Skip to main content

coreutils_rs/tac/
core.rs

1use std::io::{self, IoSlice, Write};
2
3/// Threshold for parallel processing (16MB).
4/// Thread creation costs ~200µs for 4 threads. For 10MB files (0.2ms sequential
5/// memchr scan), sequential is faster. At 16MB+, the parallel speedup (>0.2ms)
6/// amortizes the thread creation cost.
7const PARALLEL_THRESHOLD: usize = 16 * 1024 * 1024;
8
9/// Reverse records separated by a single byte.
10/// Scans for separators with SIMD memchr, then outputs records in reverse
11/// order directly from the input buffer. Expects the caller to provide
12/// a buffered writer (e.g., BufWriter) for efficient syscall batching.
13pub fn tac_bytes(data: &[u8], separator: u8, before: bool, out: &mut impl Write) -> io::Result<()> {
14    if data.is_empty() {
15        return Ok(());
16    }
17    if data.len() >= PARALLEL_THRESHOLD {
18        if !before {
19            tac_bytes_after_parallel(data, separator, out)
20        } else {
21            tac_bytes_before_parallel(data, separator, out)
22        }
23    } else if !before {
24        tac_bytes_after(data, separator, out)
25    } else {
26        tac_bytes_before(data, separator, out)
27    }
28}
29
30/// Reverse records of an owned Vec. Delegates to tac_bytes.
31pub fn tac_bytes_owned(
32    data: &mut [u8],
33    separator: u8,
34    before: bool,
35    out: &mut impl Write,
36) -> io::Result<()> {
37    tac_bytes(data, separator, before, out)
38}
39
40/// Collect multi-byte separator positions with pre-allocated Vec.
41#[inline]
42fn collect_positions_str(data: &[u8], separator: &[u8]) -> Vec<usize> {
43    let estimated = data.len() / 40 + 64;
44    let mut positions = Vec::with_capacity(estimated);
45    for pos in memchr::memmem::find_iter(data, separator) {
46        positions.push(pos);
47    }
48    positions
49}
50
51/// Split data into chunks at separator boundaries for parallel processing.
52/// Returns chunk boundary positions (indices into data).
53fn split_into_chunks(data: &[u8], sep: u8) -> Vec<usize> {
54    let num_threads = std::thread::available_parallelism()
55        .map(|n| n.get())
56        .unwrap_or(4)
57        .max(1);
58    let chunk_target = data.len() / num_threads;
59    let mut boundaries = vec![0usize];
60    for i in 1..num_threads {
61        let target = i * chunk_target;
62        if target >= data.len() {
63            break;
64        }
65        if let Some(p) = memchr::memchr(sep, &data[target..]) {
66            let b = target + p + 1;
67            if b > *boundaries.last().unwrap() && b <= data.len() {
68                boundaries.push(b);
69            }
70        }
71    }
72    boundaries.push(data.len());
73    boundaries
74}
75
76/// Parallel after-separator mode: find separator positions in parallel,
77/// then stream IoSlice entries pointing directly into the source data.
78/// Zero-copy: no output buffer allocation or memcpy. IoSlice entries
79/// reference the original mmap/owned data, and write_vectored (or vmsplice
80/// on Linux) transfers them directly to the pipe/file.
81///
82/// Uses streaming 1024-entry batches instead of pre-allocating all IoSlice
83/// entries at once. For 100MB with 2.5M lines, this avoids a ~40MB allocation
84/// (2.5M * 16 bytes) and its associated page fault overhead (~10ms for 10K pages).
85fn tac_bytes_after_parallel(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
86    let boundaries = split_into_chunks(data, sep);
87    let n_chunks = boundaries.len() - 1;
88    if n_chunks == 0 {
89        return out.write_all(data);
90    }
91
92    // Parallel: find separator positions within each chunk using scoped threads.
93    let chunk_positions: Vec<Vec<usize>> = std::thread::scope(|s| {
94        let handles: Vec<_> = (0..n_chunks)
95            .map(|i| {
96                let start = boundaries[i];
97                let end = boundaries[i + 1];
98                s.spawn(move || {
99                    let chunk = &data[start..end];
100                    let estimated = chunk.len() / 16 + 64;
101                    let mut positions = Vec::with_capacity(estimated);
102                    for p in memchr::memchr_iter(sep, chunk) {
103                        positions.push(start + p);
104                    }
105                    positions
106                })
107            })
108            .collect();
109        handles.into_iter().map(|h| h.join().unwrap()).collect()
110    });
111
112    // Stream IoSlice entries in 1024-entry batches. Chunks are processed in
113    // reverse order, records within each chunk in reverse. Each IoSlice
114    // points into the original data buffer (zero-copy).
115    const BATCH: usize = 1024;
116    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
117
118    for i in (0..n_chunks).rev() {
119        let chunk_start = boundaries[i];
120        let chunk_end = boundaries[i + 1];
121        let positions = &chunk_positions[i];
122
123        let mut end = chunk_end;
124        for &pos in positions.iter().rev() {
125            let rec_start = pos + 1;
126            if rec_start < end {
127                slices.push(IoSlice::new(&data[rec_start..end]));
128                if slices.len() >= BATCH {
129                    write_all_vectored(out, &slices)?;
130                    slices.clear();
131                }
132            }
133            end = rec_start;
134        }
135        if end > chunk_start {
136            slices.push(IoSlice::new(&data[chunk_start..end]));
137            if slices.len() >= BATCH {
138                write_all_vectored(out, &slices)?;
139                slices.clear();
140            }
141        }
142    }
143
144    if !slices.is_empty() {
145        write_all_vectored(out, &slices)?;
146    }
147    Ok(())
148}
149
150/// Parallel before-separator mode: same streaming zero-copy writev approach.
151fn tac_bytes_before_parallel(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
152    let boundaries = split_into_chunks(data, sep);
153    let n_chunks = boundaries.len() - 1;
154    if n_chunks == 0 {
155        return out.write_all(data);
156    }
157
158    // Parallel: find separator positions within each chunk using scoped threads.
159    let chunk_positions: Vec<Vec<usize>> = std::thread::scope(|s| {
160        let handles: Vec<_> = (0..n_chunks)
161            .map(|i| {
162                let start = boundaries[i];
163                let end = boundaries[i + 1];
164                s.spawn(move || {
165                    let chunk = &data[start..end];
166                    let estimated = chunk.len() / 16 + 64;
167                    let mut positions = Vec::with_capacity(estimated);
168                    for p in memchr::memchr_iter(sep, chunk) {
169                        positions.push(start + p);
170                    }
171                    positions
172                })
173            })
174            .collect();
175        handles.into_iter().map(|h| h.join().unwrap()).collect()
176    });
177
178    // Stream IoSlice entries in 1024-entry batches (same as after mode).
179    const BATCH: usize = 1024;
180    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
181
182    for i in (0..n_chunks).rev() {
183        let chunk_start = boundaries[i];
184        let chunk_end = boundaries[i + 1];
185        let positions = &chunk_positions[i];
186
187        let mut end = chunk_end;
188        for &pos in positions.iter().rev() {
189            if pos < end {
190                slices.push(IoSlice::new(&data[pos..end]));
191                if slices.len() >= BATCH {
192                    write_all_vectored(out, &slices)?;
193                    slices.clear();
194                }
195            }
196            end = pos;
197        }
198        if end > chunk_start {
199            slices.push(IoSlice::new(&data[chunk_start..end]));
200            if slices.len() >= BATCH {
201                write_all_vectored(out, &slices)?;
202                slices.clear();
203            }
204        }
205    }
206
207    if !slices.is_empty() {
208        write_all_vectored(out, &slices)?;
209    }
210    Ok(())
211}
212
213/// After-separator mode: zero-copy writev from source data.
214/// Uses chunked reverse processing to avoid allocating a huge positions Vec.
215///
216/// For repetitive 10MB files (~1M lines), collecting all positions into a single
217/// Vec requires 8MB → ~2000 page faults (~4ms). Chunked processing keeps the
218/// positions buffer at ~200KB (reused across chunks after first chunk's faults).
219fn tac_bytes_after(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
220    if data.is_empty() {
221        return Ok(());
222    }
223
224    const CHUNK: usize = 1024 * 1024; // 1MB chunks
225    const BATCH: usize = 1024;
226    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
227
228    // Reusable positions buffer — allocated once, pages hot after first chunk.
229    let est = CHUNK / 16 + 64;
230    let mut positions: Vec<usize> = Vec::with_capacity(est);
231
232    let mut record_end = data.len();
233    let mut chunk_right = data.len();
234
235    while chunk_right > 0 {
236        let chunk_left = chunk_right.saturating_sub(CHUNK);
237        let chunk = &data[chunk_left..chunk_right];
238
239        positions.clear();
240        for p in memchr::memchr_iter(sep, chunk) {
241            positions.push(chunk_left + p);
242        }
243
244        for &pos in positions.iter().rev() {
245            let rec_start = pos + 1;
246            if rec_start < record_end {
247                slices.push(IoSlice::new(&data[rec_start..record_end]));
248                if slices.len() >= BATCH {
249                    write_all_vectored(out, &slices)?;
250                    slices.clear();
251                }
252            }
253            record_end = rec_start;
254        }
255
256        chunk_right = chunk_left;
257    }
258
259    if record_end > 0 {
260        slices.push(IoSlice::new(&data[..record_end]));
261    }
262    if !slices.is_empty() {
263        write_all_vectored(out, &slices)?;
264    }
265
266    Ok(())
267}
268
269/// Before-separator mode: zero-copy writev with chunked reverse processing.
270fn tac_bytes_before(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
271    if data.is_empty() {
272        return Ok(());
273    }
274
275    const CHUNK: usize = 1024 * 1024;
276    const BATCH: usize = 1024;
277    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
278
279    let est = CHUNK / 16 + 64;
280    let mut positions: Vec<usize> = Vec::with_capacity(est);
281
282    let mut record_end = data.len();
283    let mut chunk_right = data.len();
284
285    while chunk_right > 0 {
286        let chunk_left = chunk_right.saturating_sub(CHUNK);
287        let chunk = &data[chunk_left..chunk_right];
288
289        positions.clear();
290        for p in memchr::memchr_iter(sep, chunk) {
291            positions.push(chunk_left + p);
292        }
293
294        for &pos in positions.iter().rev() {
295            if pos < record_end {
296                slices.push(IoSlice::new(&data[pos..record_end]));
297                if slices.len() >= BATCH {
298                    write_all_vectored(out, &slices)?;
299                    slices.clear();
300                }
301            }
302            record_end = pos;
303        }
304
305        chunk_right = chunk_left;
306    }
307
308    if record_end > 0 {
309        slices.push(IoSlice::new(&data[..record_end]));
310    }
311    if !slices.is_empty() {
312        write_all_vectored(out, &slices)?;
313    }
314
315    Ok(())
316}
317
318/// Reverse records using a multi-byte string separator.
319/// Uses SIMD-accelerated memmem + write_all output.
320///
321/// For single-byte separators, delegates to tac_bytes which uses memchr (faster).
322pub fn tac_string_separator(
323    data: &[u8],
324    separator: &[u8],
325    before: bool,
326    out: &mut impl Write,
327) -> io::Result<()> {
328    if data.is_empty() {
329        return Ok(());
330    }
331
332    if separator.len() == 1 {
333        return tac_bytes(data, separator[0], before, out);
334    }
335
336    let sep_len = separator.len();
337
338    if !before {
339        tac_string_after(data, separator, sep_len, out)
340    } else {
341        tac_string_before(data, separator, sep_len, out)
342    }
343}
344
345/// Multi-byte string separator, after mode. Uses writev batching.
346fn tac_string_after(
347    data: &[u8],
348    separator: &[u8],
349    sep_len: usize,
350    out: &mut impl Write,
351) -> io::Result<()> {
352    let positions = collect_positions_str(data, separator);
353
354    if positions.is_empty() {
355        return out.write_all(data);
356    }
357
358    const BATCH: usize = 1024;
359    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
360    let mut end = data.len();
361
362    for &pos in positions.iter().rev() {
363        let rec_start = pos + sep_len;
364        if rec_start < end {
365            slices.push(IoSlice::new(&data[rec_start..end]));
366            if slices.len() >= BATCH {
367                write_all_vectored(out, &slices)?;
368                slices.clear();
369            }
370        }
371        end = rec_start;
372    }
373    if end > 0 {
374        slices.push(IoSlice::new(&data[..end]));
375    }
376    if !slices.is_empty() {
377        write_all_vectored(out, &slices)?;
378    }
379    Ok(())
380}
381
382/// Multi-byte string separator, before mode. Uses writev batching.
383fn tac_string_before(
384    data: &[u8],
385    separator: &[u8],
386    _sep_len: usize,
387    out: &mut impl Write,
388) -> io::Result<()> {
389    let positions = collect_positions_str(data, separator);
390
391    if positions.is_empty() {
392        return out.write_all(data);
393    }
394
395    const BATCH: usize = 1024;
396    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(BATCH);
397    let mut end = data.len();
398
399    for &pos in positions.iter().rev() {
400        if pos < end {
401            slices.push(IoSlice::new(&data[pos..end]));
402            if slices.len() >= BATCH {
403                write_all_vectored(out, &slices)?;
404                slices.clear();
405            }
406        }
407        end = pos;
408    }
409    if end > 0 {
410        slices.push(IoSlice::new(&data[..end]));
411    }
412    if !slices.is_empty() {
413        write_all_vectored(out, &slices)?;
414    }
415    Ok(())
416}
417
418/// Find regex matches using backward scanning, matching GNU tac's re_search behavior.
419fn find_regex_matches_backward(data: &[u8], re: &regex::bytes::Regex) -> Vec<(usize, usize)> {
420    let mut matches = Vec::new();
421    let mut past_end = data.len();
422
423    while past_end > 0 {
424        let buf = &data[..past_end];
425        let mut found = false;
426
427        let mut pos = past_end;
428        while pos > 0 {
429            pos -= 1;
430            if let Some(m) = re.find_at(buf, pos) {
431                if m.start() == pos {
432                    matches.push((m.start(), m.end()));
433                    past_end = m.start();
434                    found = true;
435                    break;
436                }
437            }
438        }
439
440        if !found {
441            break;
442        }
443    }
444
445    matches.reverse();
446    matches
447}
448
449/// Reverse records using a regex separator.
450/// Uses write_vectored for regex path (typically few large records).
451pub fn tac_regex_separator(
452    data: &[u8],
453    pattern: &str,
454    before: bool,
455    out: &mut impl Write,
456) -> io::Result<()> {
457    if data.is_empty() {
458        return Ok(());
459    }
460
461    let re = match regex::bytes::Regex::new(pattern) {
462        Ok(r) => r,
463        Err(e) => {
464            return Err(io::Error::new(
465                io::ErrorKind::InvalidInput,
466                format!("invalid regex '{}': {}", pattern, e),
467            ));
468        }
469    };
470
471    let matches = find_regex_matches_backward(data, &re);
472
473    if matches.is_empty() {
474        out.write_all(data)?;
475        return Ok(());
476    }
477
478    // For regex separators, use write_vectored since there are typically
479    // few large records. Build all IoSlices at once and flush.
480    let mut slices: Vec<IoSlice<'_>> = Vec::with_capacity(matches.len() + 2);
481
482    if !before {
483        let last_end = matches.last().unwrap().1;
484
485        if last_end < data.len() {
486            slices.push(IoSlice::new(&data[last_end..]));
487        }
488
489        let mut i = matches.len();
490        while i > 0 {
491            i -= 1;
492            let rec_start = if i == 0 { 0 } else { matches[i - 1].1 };
493            slices.push(IoSlice::new(&data[rec_start..matches[i].1]));
494        }
495    } else {
496        let mut i = matches.len();
497        while i > 0 {
498            i -= 1;
499            let start = matches[i].0;
500            let end = if i + 1 < matches.len() {
501                matches[i + 1].0
502            } else {
503                data.len()
504            };
505            slices.push(IoSlice::new(&data[start..end]));
506        }
507
508        if matches[0].0 > 0 {
509            slices.push(IoSlice::new(&data[..matches[0].0]));
510        }
511    }
512
513    write_all_vectored(out, &slices)
514}
515
516/// Write all IoSlice entries, handling partial writes.
517/// Hot path: single write_vectored succeeds fully (common on Linux pipes/files).
518/// Cold path: partial write handled out-of-line to keep hot path tight.
519#[inline(always)]
520fn write_all_vectored(out: &mut impl Write, slices: &[IoSlice<'_>]) -> io::Result<()> {
521    let total: usize = slices.iter().map(|s| s.len()).sum();
522    let written = out.write_vectored(slices)?;
523    if written >= total {
524        return Ok(());
525    }
526    if written == 0 {
527        return Err(io::Error::new(io::ErrorKind::WriteZero, "write zero"));
528    }
529    flush_vectored_slow(out, slices, written)
530}
531
532/// Handle partial write (cold path, never inlined).
533#[cold]
534#[inline(never)]
535fn flush_vectored_slow(
536    out: &mut impl Write,
537    slices: &[IoSlice<'_>],
538    mut skip: usize,
539) -> io::Result<()> {
540    for slice in slices {
541        let len = slice.len();
542        if skip >= len {
543            skip -= len;
544            continue;
545        }
546        out.write_all(&slice[skip..])?;
547        skip = 0;
548    }
549    Ok(())
550}