Skip to main content

coreutils_rs/tac/
core.rs

1use std::io::{self, IoSlice, Write};
2
3/// Max IoSlice entries per write_vectored batch.
4/// Linux UIO_MAXIOV is 1024; we use that as our batch limit.
5const MAX_IOV: usize = 1024;
6
7/// Chunk size for the forward-scan-within-backward-chunks strategy.
8/// 4MB gives better SIMD throughput per memchr_iter call (fewer calls needed,
9/// each processing a larger contiguous region with full AVX2 pipeline).
10/// For a 10MB file with 50-byte lines, 4MB chunks = 3 calls vs 5 at 2MB.
11const CHUNK: usize = 4 * 1024 * 1024;
12
13/// Flush a batch of IoSlice entries using write_vectored.
14/// Falls back to individual write_all for each slice if write_vectored
15/// doesn't write everything (handles partial writes).
16#[inline]
17fn flush_iov(out: &mut impl Write, slices: &[IoSlice]) -> io::Result<()> {
18    if slices.is_empty() {
19        return Ok(());
20    }
21    // Try write_vectored first for the whole batch
22    let total: usize = slices.iter().map(|s| s.len()).sum();
23
24    // Fast path: single writev call often writes everything
25    let written = match out.write_vectored(slices) {
26        Ok(n) if n >= total => return Ok(()),
27        Ok(n) => n,
28        Err(e) => return Err(e),
29    };
30
31    // Slow path: partial write — fall back to write_all per remaining slice
32    let mut skip = written;
33    for slice in slices {
34        let slen = slice.len();
35        if skip >= slen {
36            skip -= slen;
37            continue;
38        }
39        if skip > 0 {
40            out.write_all(&slice[skip..])?;
41            skip = 0;
42        } else {
43            out.write_all(slice)?;
44        }
45    }
46    Ok(())
47}
48
49/// Reverse records separated by a single byte.
50/// Uses chunk-based forward SIMD scan processed in reverse order for
51/// maximum throughput — eliminates per-line memrchr call overhead.
52/// Output uses write_vectored (writev) for zero-copy from mmap'd data.
53///
54/// For data up to CONTIG_LIMIT, builds a contiguous reversed output buffer
55/// and writes it in a single write() call. This is faster for pipe output
56/// because it eliminates the overhead of many writev() syscalls (a 10MB file
57/// with 50-byte lines would need ~200 writev calls of 1024 entries each;
58/// a single 10MB write() is much faster).
59///
60/// For larger data, uses the writev approach to avoid doubling memory usage.
61pub fn tac_bytes(data: &[u8], separator: u8, before: bool, out: &mut impl Write) -> io::Result<()> {
62    if data.is_empty() {
63        return Ok(());
64    }
65    // For data up to 128MB, zero-copy writev: collect all positions in one forward
66    // SIMD pass, build IoSlice entries in reverse, flush with batched writev.
67    if data.len() <= CONTIG_LIMIT {
68        if !before {
69            tac_bytes_contig_after(data, separator, out)
70        } else {
71            tac_bytes_contig_before(data, separator, out)
72        }
73    } else if !before {
74        tac_bytes_backward_after(data, separator, out)
75    } else {
76        tac_bytes_backward_before(data, separator, out)
77    }
78}
79
80/// Threshold for zero-copy writev strategy.
81/// Up to 128MB, collect all separator positions in one pass, then writev
82/// with IoSlice entries pointing into the original buffer. Memory overhead
83/// is ~24 bytes per line (8 for position + 16 for IoSlice), much less than
84/// the old contiguous-copy approach that needed a full output buffer.
85/// For data > 128MB, falls back to chunked backward scan with batched writev.
86const CONTIG_LIMIT: usize = 128 * 1024 * 1024;
87
88/// Zero-copy after-separator mode for data <= CONTIG_LIMIT.
89/// Collects ALL separator positions in a single forward memchr pass, then builds
90/// IoSlice entries pointing to records in reverse order in the ORIGINAL input buffer.
91/// Writes all slices with batched writev — no output buffer allocation or memcpy.
92///
93/// For a 10MB file with 50-byte lines (~200K lines), this needs:
94///   - ~200K * 8 = 1.6MB for position indices
95///   - ~200K * 16 = 3.2MB for IoSlice entries
96/// vs the old approach which needed 10MB for the output buffer.
97fn tac_bytes_contig_after(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
98    // Single forward pass: find ALL separator positions using SIMD memchr.
99    // memchr_iter uses AVX2 on x86_64, processing 32 bytes per iteration.
100    let positions: Vec<usize> = memchr::memchr_iter(sep, data).collect();
101
102    // Build IoSlice entries in reverse record order, pointing directly into `data`.
103    // Each record is data[prev_sep+1 .. cur_sep+1] (includes the separator at cur_sep).
104    let num_records = positions.len() + 1; // +1 for the first record (before first sep)
105    let mut iov: Vec<IoSlice> = Vec::with_capacity(num_records);
106
107    // Records from last to first: each record is (sep_pos+1 .. next_sep_pos+1)
108    let mut end = data.len();
109    for &pos in positions.iter().rev() {
110        let rec_start = pos + 1;
111        if rec_start < end {
112            iov.push(IoSlice::new(&data[rec_start..end]));
113        }
114        end = rec_start;
115    }
116    // First record: data[0..first_sep+1] or data[0..end] if no separators
117    if end > 0 {
118        iov.push(IoSlice::new(&data[0..end]));
119    }
120
121    // Flush all IoSlice entries in batched writev calls
122    flush_iov(out, &iov)
123}
124
125/// Zero-copy before-separator mode for data <= CONTIG_LIMIT.
126/// Same strategy as tac_bytes_contig_after but records start AT the separator.
127fn tac_bytes_contig_before(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
128    let positions: Vec<usize> = memchr::memchr_iter(sep, data).collect();
129
130    let num_records = positions.len() + 1;
131    let mut iov: Vec<IoSlice> = Vec::with_capacity(num_records);
132
133    // Records from last to first: each record is [sep_pos .. next_sep_pos)
134    let mut end = data.len();
135    for &pos in positions.iter().rev() {
136        if pos < end {
137            iov.push(IoSlice::new(&data[pos..end]));
138        }
139        end = pos;
140    }
141    // First record: data[0..first_sep] or data[0..end] if no separators
142    if end > 0 {
143        iov.push(IoSlice::new(&data[0..end]));
144    }
145
146    flush_iov(out, &iov)
147}
148
149/// After-separator mode: chunk-based forward SIMD scan, emitted in reverse.
150///
151/// Instead of calling memrchr once per line (high per-call overhead for short
152/// lines), we process the file backward in large chunks. Within each chunk a
153/// single memchr_iter forward pass finds ALL separator positions with full SIMD
154/// pipeline utilisation, then we emit the records (slices) in reverse order.
155///
156/// This converts ~200K memrchr calls (for a 10MB / 50-byte-line file) into
157/// ~20 memchr_iter calls, each scanning a contiguous 512KB region.
158///
159/// Optimization: Instead of collecting positions into a Vec, we store them
160/// in a stack-allocated array to avoid heap allocation per chunk.
161fn tac_bytes_backward_after(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
162    let mut iov: Vec<IoSlice> = Vec::with_capacity(MAX_IOV);
163
164    // `global_end` tracks where the current (rightmost unseen) record ends.
165    let mut global_end = data.len();
166
167    // Stack-allocated positions buffer: avoid heap allocation per chunk.
168    // 512KB chunk / 1 byte per separator = 512K max positions.
169    // We use a heap-allocated buffer once but reuse across all chunks.
170    let mut positions_buf: Vec<usize> = Vec::with_capacity(CHUNK);
171
172    let mut chunk_start = data.len().saturating_sub(CHUNK);
173
174    loop {
175        let chunk_end = global_end.min(data.len());
176        if chunk_start >= chunk_end {
177            if chunk_start == 0 {
178                break;
179            }
180            chunk_start = chunk_start.saturating_sub(CHUNK);
181            continue;
182        }
183        let chunk = &data[chunk_start..chunk_end];
184
185        // Reuse positions buffer: clear and refill without reallocation.
186        positions_buf.clear();
187        positions_buf.extend(memchr::memchr_iter(sep, chunk).map(|p| p + chunk_start));
188
189        // Emit records in reverse (rightmost first).
190        for &pos in positions_buf.iter().rev() {
191            if pos + 1 < global_end {
192                iov.push(IoSlice::new(&data[pos + 1..global_end]));
193            }
194            global_end = pos + 1;
195
196            if iov.len() >= MAX_IOV {
197                flush_iov(out, &iov)?;
198                iov.clear();
199            }
200        }
201
202        if chunk_start == 0 {
203            break;
204        }
205        chunk_start = chunk_start.saturating_sub(CHUNK);
206    }
207
208    // First record
209    if global_end > 0 {
210        iov.push(IoSlice::new(&data[0..global_end]));
211    }
212    flush_iov(out, &iov)?;
213    Ok(())
214}
215
216/// Before-separator mode: chunk-based forward SIMD scan, emitted in reverse.
217///
218/// Same chunked strategy as after-mode, but each record STARTS with its
219/// separator byte instead of ending with it.
220fn tac_bytes_backward_before(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
221    let mut iov: Vec<IoSlice> = Vec::with_capacity(MAX_IOV);
222
223    let mut global_end = data.len();
224    let mut chunk_start = data.len().saturating_sub(CHUNK);
225    let mut positions_buf: Vec<usize> = Vec::with_capacity(CHUNK);
226
227    loop {
228        let chunk_end = global_end.min(data.len());
229        if chunk_start >= chunk_end {
230            if chunk_start == 0 {
231                break;
232            }
233            chunk_start = chunk_start.saturating_sub(CHUNK);
234            continue;
235        }
236        let chunk = &data[chunk_start..chunk_end];
237
238        positions_buf.clear();
239        positions_buf.extend(memchr::memchr_iter(sep, chunk).map(|p| p + chunk_start));
240
241        for &pos in positions_buf.iter().rev() {
242            iov.push(IoSlice::new(&data[pos..global_end]));
243            global_end = pos;
244
245            if iov.len() >= MAX_IOV {
246                flush_iov(out, &iov)?;
247                iov.clear();
248            }
249        }
250
251        if chunk_start == 0 {
252            break;
253        }
254        chunk_start = chunk_start.saturating_sub(CHUNK);
255    }
256
257    if global_end > 0 {
258        iov.push(IoSlice::new(&data[0..global_end]));
259    }
260    flush_iov(out, &iov)?;
261    Ok(())
262}
263
264/// Reverse records using a multi-byte string separator.
265/// Uses chunk-based forward SIMD-accelerated memmem + IoSlice zero-copy output.
266///
267/// For single-byte separators, delegates to tac_bytes which uses memchr (faster).
268pub fn tac_string_separator(
269    data: &[u8],
270    separator: &[u8],
271    before: bool,
272    out: &mut impl Write,
273) -> io::Result<()> {
274    if data.is_empty() {
275        return Ok(());
276    }
277
278    if separator.len() == 1 {
279        return tac_bytes(data, separator[0], before, out);
280    }
281
282    let sep_len = separator.len();
283
284    // For multi-byte separators we use the same chunk-based strategy but with
285    // memmem::find_iter instead of memchr_iter. We need FinderRev only for a
286    // quick "any separator exists?" check — the actual work uses forward Finder.
287    //
288    // We still need to handle the case where a separator straddles a chunk
289    // boundary. We do this by extending each chunk's left edge by (sep_len - 1)
290    // bytes and deduplicating matches that fall in the overlap zone.
291
292    if !before {
293        tac_string_after(data, separator, sep_len, out)
294    } else {
295        tac_string_before(data, separator, sep_len, out)
296    }
297}
298
299/// Multi-byte string separator, after mode (separator at end of record).
300fn tac_string_after(
301    data: &[u8],
302    separator: &[u8],
303    sep_len: usize,
304    out: &mut impl Write,
305) -> io::Result<()> {
306    let mut iov: Vec<IoSlice> = Vec::with_capacity(MAX_IOV);
307    let mut global_end = data.len();
308    let mut chunk_start = data.len().saturating_sub(CHUNK);
309    let mut positions_buf: Vec<usize> = Vec::with_capacity(CHUNK / 4);
310
311    loop {
312        let chunk_end = global_end.min(data.len());
313        let scan_start = chunk_start.saturating_sub(sep_len - 1);
314        if scan_start >= chunk_end {
315            if chunk_start == 0 {
316                break;
317            }
318            chunk_start = chunk_start.saturating_sub(CHUNK);
319            continue;
320        }
321        let scan_region = &data[scan_start..chunk_end];
322
323        positions_buf.clear();
324        positions_buf.extend(
325            memchr::memmem::find_iter(scan_region, separator)
326                .map(|p| p + scan_start)
327                .filter(|&p| p >= chunk_start || chunk_start == 0)
328                .filter(|&p| p + sep_len <= global_end),
329        );
330
331        for &pos in positions_buf.iter().rev() {
332            let rec_end_exclusive = pos + sep_len;
333            if rec_end_exclusive < global_end {
334                iov.push(IoSlice::new(&data[rec_end_exclusive..global_end]));
335            }
336            global_end = rec_end_exclusive;
337            if iov.len() >= MAX_IOV {
338                flush_iov(out, &iov)?;
339                iov.clear();
340            }
341        }
342
343        if chunk_start == 0 {
344            break;
345        }
346        chunk_start = chunk_start.saturating_sub(CHUNK);
347    }
348
349    if global_end > 0 {
350        iov.push(IoSlice::new(&data[0..global_end]));
351    }
352    flush_iov(out, &iov)?;
353    Ok(())
354}
355
356/// Multi-byte string separator, before mode (separator at start of record).
357fn tac_string_before(
358    data: &[u8],
359    separator: &[u8],
360    sep_len: usize,
361    out: &mut impl Write,
362) -> io::Result<()> {
363    let mut iov: Vec<IoSlice> = Vec::with_capacity(MAX_IOV);
364    let mut global_end = data.len();
365    let mut chunk_start = data.len().saturating_sub(CHUNK);
366    let mut positions_buf: Vec<usize> = Vec::with_capacity(CHUNK / 4);
367
368    loop {
369        let chunk_end = global_end.min(data.len());
370        let scan_start = chunk_start.saturating_sub(sep_len - 1);
371        if scan_start >= chunk_end {
372            if chunk_start == 0 {
373                break;
374            }
375            chunk_start = chunk_start.saturating_sub(CHUNK);
376            continue;
377        }
378        let scan_region = &data[scan_start..chunk_end];
379
380        positions_buf.clear();
381        positions_buf.extend(
382            memchr::memmem::find_iter(scan_region, separator)
383                .map(|p| p + scan_start)
384                .filter(|&p| p >= chunk_start || chunk_start == 0)
385                .filter(|&p| p < global_end),
386        );
387
388        for &pos in positions_buf.iter().rev() {
389            iov.push(IoSlice::new(&data[pos..global_end]));
390            global_end = pos;
391            if iov.len() >= MAX_IOV {
392                flush_iov(out, &iov)?;
393                iov.clear();
394            }
395        }
396
397        if chunk_start == 0 {
398            break;
399        }
400        chunk_start = chunk_start.saturating_sub(CHUNK);
401    }
402
403    if global_end > 0 {
404        iov.push(IoSlice::new(&data[0..global_end]));
405    }
406    flush_iov(out, &iov)?;
407    Ok(())
408}
409
410/// Find regex matches using backward scanning, matching GNU tac's re_search behavior.
411fn find_regex_matches_backward(data: &[u8], re: &regex::bytes::Regex) -> Vec<(usize, usize)> {
412    let mut matches = Vec::new();
413    let mut past_end = data.len();
414
415    while past_end > 0 {
416        let buf = &data[..past_end];
417        let mut found = false;
418
419        let mut pos = past_end;
420        while pos > 0 {
421            pos -= 1;
422            if let Some(m) = re.find_at(buf, pos) {
423                if m.start() == pos {
424                    matches.push((m.start(), m.end()));
425                    past_end = m.start();
426                    found = true;
427                    break;
428                }
429            }
430        }
431
432        if !found {
433            break;
434        }
435    }
436
437    matches.reverse();
438    matches
439}
440
441/// Reverse records using a regex separator.
442pub fn tac_regex_separator(
443    data: &[u8],
444    pattern: &str,
445    before: bool,
446    out: &mut impl Write,
447) -> io::Result<()> {
448    if data.is_empty() {
449        return Ok(());
450    }
451
452    let re = match regex::bytes::Regex::new(pattern) {
453        Ok(r) => r,
454        Err(e) => {
455            return Err(io::Error::new(
456                io::ErrorKind::InvalidInput,
457                format!("invalid regex '{}': {}", pattern, e),
458            ));
459        }
460    };
461
462    let matches = find_regex_matches_backward(data, &re);
463
464    if matches.is_empty() {
465        out.write_all(data)?;
466        return Ok(());
467    }
468
469    let mut iov: Vec<IoSlice> = Vec::with_capacity(matches.len().min(MAX_IOV) + 2);
470
471    if !before {
472        let last_end = matches.last().unwrap().1;
473
474        if last_end < data.len() {
475            iov.push(IoSlice::new(&data[last_end..]));
476        }
477
478        let mut i = matches.len();
479        while i > 0 {
480            i -= 1;
481            let rec_start = if i == 0 { 0 } else { matches[i - 1].1 };
482            iov.push(IoSlice::new(&data[rec_start..matches[i].1]));
483            if iov.len() >= MAX_IOV {
484                flush_iov(out, &iov)?;
485                iov.clear();
486            }
487        }
488    } else {
489        let mut i = matches.len();
490        while i > 0 {
491            i -= 1;
492            let start = matches[i].0;
493            let end = if i + 1 < matches.len() {
494                matches[i + 1].0
495            } else {
496                data.len()
497            };
498            iov.push(IoSlice::new(&data[start..end]));
499            if iov.len() >= MAX_IOV {
500                flush_iov(out, &iov)?;
501                iov.clear();
502            }
503        }
504
505        if matches[0].0 > 0 {
506            iov.push(IoSlice::new(&data[..matches[0].0]));
507        }
508    }
509
510    flush_iov(out, &iov)?;
511    Ok(())
512}