Skip to main content

coreutils_rs/tac/
core.rs

1use std::io::{self, IoSlice, Write};
2
3/// Max IoSlice entries per write_vectored batch.
4/// Linux UIO_MAXIOV is 1024; we use that as our batch limit.
5const MAX_IOV: usize = 1024;
6
7/// Chunk size for the forward-scan-within-backward-chunks strategy.
8/// 4MB gives better SIMD throughput per memchr_iter call (fewer calls needed,
9/// each processing a larger contiguous region with full AVX2 pipeline).
10/// For a 10MB file with 50-byte lines, 4MB chunks = 3 calls vs 5 at 2MB.
11const CHUNK: usize = 4 * 1024 * 1024;
12
13/// Flush a batch of IoSlice entries using write_vectored.
14/// Batches in chunks of MAX_IOV to respect Linux UIO_MAXIOV.
15#[inline]
16fn flush_iov(out: &mut impl Write, slices: &[IoSlice]) -> io::Result<()> {
17    for batch in slices.chunks(MAX_IOV) {
18        let total: usize = batch.iter().map(|s| s.len()).sum();
19        let written = match out.write_vectored(batch) {
20            Ok(n) if n >= total => continue,
21            Ok(n) => n,
22            Err(e) => return Err(e),
23        };
24        // Slow path: partial write — fall back to write_all per remaining slice
25        let mut skip = written;
26        for slice in batch {
27            let slen = slice.len();
28            if skip >= slen {
29                skip -= slen;
30                continue;
31            }
32            if skip > 0 {
33                out.write_all(&slice[skip..])?;
34                skip = 0;
35            } else {
36                out.write_all(slice)?;
37            }
38        }
39    }
40    Ok(())
41}
42
43/// Reverse records separated by a single byte.
44/// Uses single forward SIMD pass to find separators, then builds reversed output.
45///
46/// For data up to CONTIG_LIMIT, builds a contiguous reversed output buffer
47/// and writes it in a single write_all(). This is much faster for pipe output
48/// than writev with many small slices (a 10MB file with 50-byte lines would
49/// need ~200 writev calls of 1024 entries each; a single 10MB write is faster).
50///
51/// For larger data, uses batched writev to avoid doubling memory usage.
52pub fn tac_bytes(data: &[u8], separator: u8, before: bool, out: &mut impl Write) -> io::Result<()> {
53    if data.is_empty() {
54        return Ok(());
55    }
56    if data.len() <= CONTIG_LIMIT {
57        if !before {
58            tac_bytes_contig_after(data, separator, out)
59        } else {
60            tac_bytes_contig_before(data, separator, out)
61        }
62    } else if !before {
63        tac_bytes_backward_after(data, separator, out)
64    } else {
65        tac_bytes_backward_before(data, separator, out)
66    }
67}
68
69/// Threshold for contiguous-buffer strategy.
70/// Up to 128MB, build a single reversed output buffer and write_all in one call.
71/// For data > 128MB, falls back to chunked backward scan with batched writev.
72const CONTIG_LIMIT: usize = 128 * 1024 * 1024;
73
74/// Contiguous-buffer after-separator mode for data <= CONTIG_LIMIT.
75/// Collects ALL separator positions in a single forward SIMD memchr pass,
76/// then copies records in reverse order into a contiguous output buffer.
77/// One write_all() of the full buffer is vastly faster than writev with
78/// thousands of small IoSlice entries (eliminates per-syscall overhead).
79fn tac_bytes_contig_after(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
80    let positions: Vec<usize> = memchr::memchr_iter(sep, data).collect();
81
82    let mut buf: Vec<u8> = Vec::with_capacity(data.len());
83
84    let mut end = data.len();
85    for &pos in positions.iter().rev() {
86        let rec_start = pos + 1;
87        if rec_start < end {
88            buf.extend_from_slice(&data[rec_start..end]);
89        }
90        end = rec_start;
91    }
92    if end > 0 {
93        buf.extend_from_slice(&data[0..end]);
94    }
95
96    out.write_all(&buf)
97}
98
99/// Contiguous-buffer before-separator mode for data <= CONTIG_LIMIT.
100fn tac_bytes_contig_before(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
101    let positions: Vec<usize> = memchr::memchr_iter(sep, data).collect();
102
103    let mut buf: Vec<u8> = Vec::with_capacity(data.len());
104
105    let mut end = data.len();
106    for &pos in positions.iter().rev() {
107        if pos < end {
108            buf.extend_from_slice(&data[pos..end]);
109        }
110        end = pos;
111    }
112    if end > 0 {
113        buf.extend_from_slice(&data[0..end]);
114    }
115
116    out.write_all(&buf)
117}
118
119/// After-separator mode: chunk-based forward SIMD scan, emitted in reverse.
120///
121/// Instead of calling memrchr once per line (high per-call overhead for short
122/// lines), we process the file backward in large chunks. Within each chunk a
123/// single memchr_iter forward pass finds ALL separator positions with full SIMD
124/// pipeline utilisation, then we emit the records (slices) in reverse order.
125///
126/// This converts ~200K memrchr calls (for a 10MB / 50-byte-line file) into
127/// ~20 memchr_iter calls, each scanning a contiguous 512KB region.
128///
129/// Optimization: Instead of collecting positions into a Vec, we store them
130/// in a stack-allocated array to avoid heap allocation per chunk.
131fn tac_bytes_backward_after(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
132    let mut iov: Vec<IoSlice> = Vec::with_capacity(MAX_IOV);
133
134    // `global_end` tracks where the current (rightmost unseen) record ends.
135    let mut global_end = data.len();
136
137    // Stack-allocated positions buffer: avoid heap allocation per chunk.
138    // 512KB chunk / 1 byte per separator = 512K max positions.
139    // We use a heap-allocated buffer once but reuse across all chunks.
140    let mut positions_buf: Vec<usize> = Vec::with_capacity(CHUNK);
141
142    let mut chunk_start = data.len().saturating_sub(CHUNK);
143
144    loop {
145        let chunk_end = global_end.min(data.len());
146        if chunk_start >= chunk_end {
147            if chunk_start == 0 {
148                break;
149            }
150            chunk_start = chunk_start.saturating_sub(CHUNK);
151            continue;
152        }
153        let chunk = &data[chunk_start..chunk_end];
154
155        // Reuse positions buffer: clear and refill without reallocation.
156        positions_buf.clear();
157        positions_buf.extend(memchr::memchr_iter(sep, chunk).map(|p| p + chunk_start));
158
159        // Emit records in reverse (rightmost first).
160        for &pos in positions_buf.iter().rev() {
161            if pos + 1 < global_end {
162                iov.push(IoSlice::new(&data[pos + 1..global_end]));
163            }
164            global_end = pos + 1;
165
166            if iov.len() >= MAX_IOV {
167                flush_iov(out, &iov)?;
168                iov.clear();
169            }
170        }
171
172        if chunk_start == 0 {
173            break;
174        }
175        chunk_start = chunk_start.saturating_sub(CHUNK);
176    }
177
178    // First record
179    if global_end > 0 {
180        iov.push(IoSlice::new(&data[0..global_end]));
181    }
182    flush_iov(out, &iov)?;
183    Ok(())
184}
185
186/// Before-separator mode: chunk-based forward SIMD scan, emitted in reverse.
187///
188/// Same chunked strategy as after-mode, but each record STARTS with its
189/// separator byte instead of ending with it.
190fn tac_bytes_backward_before(data: &[u8], sep: u8, out: &mut impl Write) -> io::Result<()> {
191    let mut iov: Vec<IoSlice> = Vec::with_capacity(MAX_IOV);
192
193    let mut global_end = data.len();
194    let mut chunk_start = data.len().saturating_sub(CHUNK);
195    let mut positions_buf: Vec<usize> = Vec::with_capacity(CHUNK);
196
197    loop {
198        let chunk_end = global_end.min(data.len());
199        if chunk_start >= chunk_end {
200            if chunk_start == 0 {
201                break;
202            }
203            chunk_start = chunk_start.saturating_sub(CHUNK);
204            continue;
205        }
206        let chunk = &data[chunk_start..chunk_end];
207
208        positions_buf.clear();
209        positions_buf.extend(memchr::memchr_iter(sep, chunk).map(|p| p + chunk_start));
210
211        for &pos in positions_buf.iter().rev() {
212            iov.push(IoSlice::new(&data[pos..global_end]));
213            global_end = pos;
214
215            if iov.len() >= MAX_IOV {
216                flush_iov(out, &iov)?;
217                iov.clear();
218            }
219        }
220
221        if chunk_start == 0 {
222            break;
223        }
224        chunk_start = chunk_start.saturating_sub(CHUNK);
225    }
226
227    if global_end > 0 {
228        iov.push(IoSlice::new(&data[0..global_end]));
229    }
230    flush_iov(out, &iov)?;
231    Ok(())
232}
233
234/// Reverse records using a multi-byte string separator.
235/// Uses chunk-based forward SIMD-accelerated memmem + IoSlice zero-copy output.
236///
237/// For single-byte separators, delegates to tac_bytes which uses memchr (faster).
238pub fn tac_string_separator(
239    data: &[u8],
240    separator: &[u8],
241    before: bool,
242    out: &mut impl Write,
243) -> io::Result<()> {
244    if data.is_empty() {
245        return Ok(());
246    }
247
248    if separator.len() == 1 {
249        return tac_bytes(data, separator[0], before, out);
250    }
251
252    let sep_len = separator.len();
253
254    // For multi-byte separators we use the same chunk-based strategy but with
255    // memmem::find_iter instead of memchr_iter. We need FinderRev only for a
256    // quick "any separator exists?" check — the actual work uses forward Finder.
257    //
258    // We still need to handle the case where a separator straddles a chunk
259    // boundary. We do this by extending each chunk's left edge by (sep_len - 1)
260    // bytes and deduplicating matches that fall in the overlap zone.
261
262    if !before {
263        tac_string_after(data, separator, sep_len, out)
264    } else {
265        tac_string_before(data, separator, sep_len, out)
266    }
267}
268
269/// Multi-byte string separator, after mode (separator at end of record).
270fn tac_string_after(
271    data: &[u8],
272    separator: &[u8],
273    sep_len: usize,
274    out: &mut impl Write,
275) -> io::Result<()> {
276    let mut iov: Vec<IoSlice> = Vec::with_capacity(MAX_IOV);
277    let mut global_end = data.len();
278    let mut chunk_start = data.len().saturating_sub(CHUNK);
279    let mut positions_buf: Vec<usize> = Vec::with_capacity(CHUNK / 4);
280
281    loop {
282        let chunk_end = global_end.min(data.len());
283        let scan_start = chunk_start.saturating_sub(sep_len - 1);
284        if scan_start >= chunk_end {
285            if chunk_start == 0 {
286                break;
287            }
288            chunk_start = chunk_start.saturating_sub(CHUNK);
289            continue;
290        }
291        let scan_region = &data[scan_start..chunk_end];
292
293        positions_buf.clear();
294        positions_buf.extend(
295            memchr::memmem::find_iter(scan_region, separator)
296                .map(|p| p + scan_start)
297                .filter(|&p| p >= chunk_start || chunk_start == 0)
298                .filter(|&p| p + sep_len <= global_end),
299        );
300
301        for &pos in positions_buf.iter().rev() {
302            let rec_end_exclusive = pos + sep_len;
303            if rec_end_exclusive < global_end {
304                iov.push(IoSlice::new(&data[rec_end_exclusive..global_end]));
305            }
306            global_end = rec_end_exclusive;
307            if iov.len() >= MAX_IOV {
308                flush_iov(out, &iov)?;
309                iov.clear();
310            }
311        }
312
313        if chunk_start == 0 {
314            break;
315        }
316        chunk_start = chunk_start.saturating_sub(CHUNK);
317    }
318
319    if global_end > 0 {
320        iov.push(IoSlice::new(&data[0..global_end]));
321    }
322    flush_iov(out, &iov)?;
323    Ok(())
324}
325
326/// Multi-byte string separator, before mode (separator at start of record).
327fn tac_string_before(
328    data: &[u8],
329    separator: &[u8],
330    sep_len: usize,
331    out: &mut impl Write,
332) -> io::Result<()> {
333    let mut iov: Vec<IoSlice> = Vec::with_capacity(MAX_IOV);
334    let mut global_end = data.len();
335    let mut chunk_start = data.len().saturating_sub(CHUNK);
336    let mut positions_buf: Vec<usize> = Vec::with_capacity(CHUNK / 4);
337
338    loop {
339        let chunk_end = global_end.min(data.len());
340        let scan_start = chunk_start.saturating_sub(sep_len - 1);
341        if scan_start >= chunk_end {
342            if chunk_start == 0 {
343                break;
344            }
345            chunk_start = chunk_start.saturating_sub(CHUNK);
346            continue;
347        }
348        let scan_region = &data[scan_start..chunk_end];
349
350        positions_buf.clear();
351        positions_buf.extend(
352            memchr::memmem::find_iter(scan_region, separator)
353                .map(|p| p + scan_start)
354                .filter(|&p| p >= chunk_start || chunk_start == 0)
355                .filter(|&p| p < global_end),
356        );
357
358        for &pos in positions_buf.iter().rev() {
359            iov.push(IoSlice::new(&data[pos..global_end]));
360            global_end = pos;
361            if iov.len() >= MAX_IOV {
362                flush_iov(out, &iov)?;
363                iov.clear();
364            }
365        }
366
367        if chunk_start == 0 {
368            break;
369        }
370        chunk_start = chunk_start.saturating_sub(CHUNK);
371    }
372
373    if global_end > 0 {
374        iov.push(IoSlice::new(&data[0..global_end]));
375    }
376    flush_iov(out, &iov)?;
377    Ok(())
378}
379
380/// Find regex matches using backward scanning, matching GNU tac's re_search behavior.
381fn find_regex_matches_backward(data: &[u8], re: &regex::bytes::Regex) -> Vec<(usize, usize)> {
382    let mut matches = Vec::new();
383    let mut past_end = data.len();
384
385    while past_end > 0 {
386        let buf = &data[..past_end];
387        let mut found = false;
388
389        let mut pos = past_end;
390        while pos > 0 {
391            pos -= 1;
392            if let Some(m) = re.find_at(buf, pos) {
393                if m.start() == pos {
394                    matches.push((m.start(), m.end()));
395                    past_end = m.start();
396                    found = true;
397                    break;
398                }
399            }
400        }
401
402        if !found {
403            break;
404        }
405    }
406
407    matches.reverse();
408    matches
409}
410
411/// Reverse records using a regex separator.
412pub fn tac_regex_separator(
413    data: &[u8],
414    pattern: &str,
415    before: bool,
416    out: &mut impl Write,
417) -> io::Result<()> {
418    if data.is_empty() {
419        return Ok(());
420    }
421
422    let re = match regex::bytes::Regex::new(pattern) {
423        Ok(r) => r,
424        Err(e) => {
425            return Err(io::Error::new(
426                io::ErrorKind::InvalidInput,
427                format!("invalid regex '{}': {}", pattern, e),
428            ));
429        }
430    };
431
432    let matches = find_regex_matches_backward(data, &re);
433
434    if matches.is_empty() {
435        out.write_all(data)?;
436        return Ok(());
437    }
438
439    let mut iov: Vec<IoSlice> = Vec::with_capacity(matches.len().min(MAX_IOV) + 2);
440
441    if !before {
442        let last_end = matches.last().unwrap().1;
443
444        if last_end < data.len() {
445            iov.push(IoSlice::new(&data[last_end..]));
446        }
447
448        let mut i = matches.len();
449        while i > 0 {
450            i -= 1;
451            let rec_start = if i == 0 { 0 } else { matches[i - 1].1 };
452            iov.push(IoSlice::new(&data[rec_start..matches[i].1]));
453            if iov.len() >= MAX_IOV {
454                flush_iov(out, &iov)?;
455                iov.clear();
456            }
457        }
458    } else {
459        let mut i = matches.len();
460        while i > 0 {
461            i -= 1;
462            let start = matches[i].0;
463            let end = if i + 1 < matches.len() {
464                matches[i + 1].0
465            } else {
466                data.len()
467            };
468            iov.push(IoSlice::new(&data[start..end]));
469            if iov.len() >= MAX_IOV {
470                flush_iov(out, &iov)?;
471                iov.clear();
472            }
473        }
474
475        if matches[0].0 > 0 {
476            iov.push(IoSlice::new(&data[..matches[0].0]));
477        }
478    }
479
480    flush_iov(out, &iov)?;
481    Ok(())
482}