Skip to main content

coreutils_rs/paste/
core.rs

1use std::io::Write;
2
3/// Configuration for the paste command.
4pub struct PasteConfig {
5    /// Delimiter characters, cycled through columns.
6    pub delimiters: Vec<u8>,
7    /// Serial mode: paste one file at a time.
8    pub serial: bool,
9    /// Use NUL as line terminator instead of newline.
10    pub zero_terminated: bool,
11}
12
13impl Default for PasteConfig {
14    fn default() -> Self {
15        Self {
16            delimiters: vec![b'\t'],
17            serial: false,
18            zero_terminated: false,
19        }
20    }
21}
22
23/// Parse delimiter string with escape sequences.
24/// Supports: \n (newline), \t (tab), \\ (backslash), \0 (NUL), empty string (no delimiter).
25pub fn parse_delimiters(s: &str) -> Vec<u8> {
26    if s.is_empty() {
27        return Vec::new();
28    }
29    let bytes = s.as_bytes();
30    let mut result = Vec::with_capacity(bytes.len());
31    let mut i = 0;
32    while i < bytes.len() {
33        if bytes[i] == b'\\' && i + 1 < bytes.len() {
34            match bytes[i + 1] {
35                b'n' => {
36                    result.push(b'\n');
37                    i += 2;
38                }
39                b't' => {
40                    result.push(b'\t');
41                    i += 2;
42                }
43                b'\\' => {
44                    result.push(b'\\');
45                    i += 2;
46                }
47                b'0' => {
48                    result.push(0);
49                    i += 2;
50                }
51                _ => {
52                    // Unknown escape: treat backslash as literal
53                    result.push(b'\\');
54                    i += 1;
55                }
56            }
57        } else {
58            result.push(bytes[i]);
59            i += 1;
60        }
61    }
62    result
63}
64
65/// Output buffer size for streaming paste (1 MiB).
66const BUF_SIZE: usize = 1024 * 1024;
67
68/// Raw write to stdout fd 1. Returns any error encountered.
69#[cfg(unix)]
70fn raw_write_all(data: &[u8]) -> std::io::Result<()> {
71    let mut written = 0;
72    while written < data.len() {
73        let ret = unsafe {
74            libc::write(
75                1,
76                data[written..].as_ptr() as *const libc::c_void,
77                (data.len() - written) as _,
78            )
79        };
80        if ret > 0 {
81            written += ret as usize;
82        } else if ret == 0 {
83            return Err(std::io::Error::new(
84                std::io::ErrorKind::WriteZero,
85                "write returned 0",
86            ));
87        } else {
88            let err = std::io::Error::last_os_error();
89            if err.kind() == std::io::ErrorKind::Interrupted {
90                continue;
91            }
92            return Err(err);
93        }
94    }
95    Ok(())
96}
97
98#[cfg(not(unix))]
99fn raw_write_all(data: &[u8]) -> std::io::Result<()> {
100    let stdout = std::io::stdout();
101    let mut lock = stdout.lock();
102    lock.write_all(data)?;
103    lock.flush()
104}
105
106/// A streaming output writer that buffers into a fixed-size buffer
107/// and flushes via raw libc::write on Unix for minimal overhead.
108struct RawBufWriter {
109    buf: Vec<u8>,
110    error: Option<std::io::Error>,
111}
112
113impl RawBufWriter {
114    fn new() -> Self {
115        let mut buf = Vec::with_capacity(BUF_SIZE);
116        // Touch all pages upfront to avoid page faults during hot loop
117        unsafe {
118            std::ptr::write_bytes(buf.as_mut_ptr(), 0, BUF_SIZE);
119        }
120        Self { buf, error: None }
121    }
122
123    /// Append a single byte. Flushes if buffer is full.
124    #[inline(always)]
125    fn push(&mut self, b: u8) {
126        if self.buf.len() >= BUF_SIZE {
127            self.flush_buf();
128        }
129        self.buf.push(b);
130    }
131
132    /// Append a slice. Flushes as needed for large slices.
133    #[inline(always)]
134    fn extend(&mut self, data: &[u8]) {
135        let avail = BUF_SIZE - self.buf.len();
136        if data.len() <= avail {
137            self.buf.extend_from_slice(data);
138        } else {
139            self.extend_slow(data);
140        }
141    }
142
143    #[cold]
144    fn extend_slow(&mut self, data: &[u8]) {
145        if self.error.is_some() {
146            return;
147        }
148        let avail = BUF_SIZE - self.buf.len();
149        // Fill current buffer
150        self.buf.extend_from_slice(&data[..avail]);
151        self.flush_buf();
152        let mut remaining = &data[avail..];
153        // Write full BUF_SIZE chunks directly, bypassing the buffer
154        while remaining.len() >= BUF_SIZE {
155            if let Err(e) = raw_write_all(&remaining[..BUF_SIZE]) {
156                self.error = Some(e);
157                return;
158            }
159            remaining = &remaining[BUF_SIZE..];
160        }
161        // Buffer the tail
162        if !remaining.is_empty() {
163            self.buf.extend_from_slice(remaining);
164        }
165    }
166
167    /// Flush internal buffer.
168    fn flush_buf(&mut self) {
169        if !self.buf.is_empty() && self.error.is_none() {
170            if let Err(e) = raw_write_all(&self.buf) {
171                self.error = Some(e);
172            }
173            self.buf.clear();
174        }
175    }
176
177    /// Finish: flush remaining data and return any error.
178    fn finish(mut self) -> std::io::Result<()> {
179        self.flush_buf();
180        match self.error {
181            Some(e) => Err(e),
182            None => Ok(()),
183        }
184    }
185}
186
187/// Fast path for the common case: 2 files, single-byte delimiter (usually tab).
188/// Scans both files simultaneously with memchr, writing results into a 1MB buffer.
189/// Uses unsafe pointer writes to eliminate bounds checks in the hot loop.
190fn paste_two_files_fast(
191    data_a: &[u8],
192    data_b: &[u8],
193    delim: u8,
194    terminator: u8,
195) -> std::io::Result<()> {
196    let buf_cap = BUF_SIZE;
197    let mut buf: Vec<u8> = Vec::with_capacity(buf_cap);
198    // Pre-fault pages
199    unsafe {
200        std::ptr::write_bytes(buf.as_mut_ptr(), 0, buf_cap);
201    }
202
203    let base = buf.as_mut_ptr();
204    let mut pos: usize = 0;
205    let mut cur_a: usize = 0;
206    let mut cur_b: usize = 0;
207    let done_a = data_a.is_empty();
208    let done_b = data_b.is_empty();
209
210    if done_a && done_b {
211        return Ok(());
212    }
213
214    loop {
215        let a_exhausted = cur_a >= data_a.len();
216        let b_exhausted = cur_b >= data_b.len();
217        if a_exhausted && b_exhausted {
218            break;
219        }
220
221        // Find line end in file A
222        let (line_a_ptr, line_a_len, new_cur_a) = if !a_exhausted {
223            match memchr::memchr(terminator, &data_a[cur_a..]) {
224                Some(off) => (data_a.as_ptr().wrapping_add(cur_a), off, cur_a + off + 1),
225                None => (
226                    data_a.as_ptr().wrapping_add(cur_a),
227                    data_a.len() - cur_a,
228                    data_a.len(),
229                ),
230            }
231        } else {
232            (std::ptr::null(), 0, cur_a)
233        };
234
235        // Find line end in file B
236        let (line_b_ptr, line_b_len, new_cur_b) = if !b_exhausted {
237            match memchr::memchr(terminator, &data_b[cur_b..]) {
238                Some(off) => (data_b.as_ptr().wrapping_add(cur_b), off, cur_b + off + 1),
239                None => (
240                    data_b.as_ptr().wrapping_add(cur_b),
241                    data_b.len() - cur_b,
242                    data_b.len(),
243                ),
244            }
245        } else {
246            (std::ptr::null(), 0, cur_b)
247        };
248
249        cur_a = new_cur_a;
250        cur_b = new_cur_b;
251
252        // Total bytes for this output line: line_a + delim + line_b + terminator
253        let out_len = line_a_len + 1 + line_b_len + 1;
254
255        // Flush if needed
256        if pos + out_len > buf_cap {
257            unsafe { buf.set_len(pos) };
258            raw_write_all(&buf)?;
259            buf.clear();
260            pos = 0;
261        }
262
263        // Write directly with unsafe pointer copies
264        unsafe {
265            if line_a_len > 0 {
266                std::ptr::copy_nonoverlapping(line_a_ptr, base.add(pos), line_a_len);
267                pos += line_a_len;
268            }
269            *base.add(pos) = delim;
270            pos += 1;
271            if line_b_len > 0 {
272                std::ptr::copy_nonoverlapping(line_b_ptr, base.add(pos), line_b_len);
273                pos += line_b_len;
274            }
275            *base.add(pos) = terminator;
276            pos += 1;
277        }
278    }
279
280    // Final flush
281    if pos > 0 {
282        unsafe { buf.set_len(pos) };
283        raw_write_all(&buf)?;
284    }
285
286    Ok(())
287}
288
289/// Streaming paste for the parallel (normal) mode.
290/// Uses memchr per-line scanning and a 1MB output buffer with raw fd writes.
291/// For the common 2-file case, dispatches to an optimized fast path.
292pub fn paste_parallel_stream(file_data: &[&[u8]], config: &PasteConfig) -> std::io::Result<()> {
293    let terminator = if config.zero_terminated { 0u8 } else { b'\n' };
294    let delims = &config.delimiters;
295    let has_delims = !delims.is_empty();
296    let nfiles = file_data.len();
297
298    if nfiles == 0 || file_data.iter().all(|d| d.is_empty()) {
299        return Ok(());
300    }
301
302    // Fast path: 2 files with single-byte delimiter (the common case: `paste file1 file2`)
303    if nfiles == 2 && delims.len() == 1 {
304        return paste_two_files_fast(file_data[0], file_data[1], delims[0], terminator);
305    }
306
307    let mut writer = RawBufWriter::new();
308
309    // Cursors: current position in each file.
310    // Initialize to usize::MAX for empty files (already exhausted).
311    let mut cursors: Vec<usize> = file_data
312        .iter()
313        .map(|d| if d.is_empty() { usize::MAX } else { 0 })
314        .collect();
315    let mut active_count = file_data.iter().filter(|d| !d.is_empty()).count();
316
317    loop {
318        if active_count == 0 {
319            break;
320        }
321
322        let mut newly_exhausted = 0;
323
324        for file_idx in 0..nfiles {
325            if file_idx > 0 && has_delims {
326                writer.push(delims[(file_idx - 1) % delims.len()]);
327            }
328
329            let cursor = cursors[file_idx];
330            if cursor == usize::MAX {
331                continue;
332            }
333
334            let data = file_data[file_idx];
335
336            match memchr::memchr(terminator, &data[cursor..]) {
337                Some(offset) => {
338                    writer.extend(&data[cursor..cursor + offset]);
339                    let new_cursor = cursor + offset + 1;
340                    if new_cursor >= data.len() {
341                        cursors[file_idx] = usize::MAX;
342                        newly_exhausted += 1;
343                    } else {
344                        cursors[file_idx] = new_cursor;
345                    }
346                }
347                None => {
348                    writer.extend(&data[cursor..]);
349                    cursors[file_idx] = usize::MAX;
350                    newly_exhausted += 1;
351                }
352            }
353        }
354
355        writer.push(terminator);
356        active_count -= newly_exhausted;
357    }
358
359    writer.finish()
360}
361
362/// Streaming paste for serial mode.
363pub fn paste_serial_stream(file_data: &[&[u8]], config: &PasteConfig) -> std::io::Result<()> {
364    let terminator = if config.zero_terminated { 0u8 } else { b'\n' };
365    let delims = &config.delimiters;
366    let has_delims = !delims.is_empty();
367
368    let mut writer = RawBufWriter::new();
369
370    for data in file_data {
371        if data.is_empty() {
372            writer.push(terminator);
373            continue;
374        }
375
376        let mut cursor = 0;
377        let mut first = true;
378        let mut delim_idx = 0;
379
380        while cursor < data.len() {
381            if !first && has_delims {
382                writer.push(delims[delim_idx % delims.len()]);
383                delim_idx += 1;
384            }
385            first = false;
386
387            match memchr::memchr(terminator, &data[cursor..]) {
388                Some(offset) => {
389                    writer.extend(&data[cursor..cursor + offset]);
390                    cursor += offset + 1;
391                }
392                None => {
393                    writer.extend(&data[cursor..]);
394                    cursor = data.len();
395                }
396            }
397        }
398
399        writer.push(terminator);
400    }
401
402    writer.finish()
403}
404
405/// Streaming paste entry point. Writes directly to stdout using raw fd writes.
406pub fn paste_stream(file_data: &[&[u8]], config: &PasteConfig) -> std::io::Result<()> {
407    if config.serial {
408        paste_serial_stream(file_data, config)
409    } else {
410        paste_parallel_stream(file_data, config)
411    }
412}
413
414/// Pre-split a file into line offset pairs using a single SIMD memchr_iter pass.
415/// Returns a Vec of (start, end) byte offsets — one per line.
416#[inline]
417fn presplit_lines(data: &[u8], terminator: u8) -> Vec<(u32, u32)> {
418    if data.is_empty() {
419        return Vec::new();
420    }
421    assert!(
422        data.len() <= u32::MAX as usize,
423        "presplit_lines: data exceeds 4 GiB"
424    );
425    // Heuristic: assume average line length ~40 bytes to avoid a count pre-scan.
426    let estimated_lines = data.len() / 40 + 1;
427    let mut offsets = Vec::with_capacity(estimated_lines);
428    let mut start = 0u32;
429    for pos in memchr::memchr_iter(terminator, data) {
430        offsets.push((start, pos as u32));
431        start = pos as u32 + 1;
432    }
433    if data.last() != Some(&terminator) && (start as usize) < data.len() {
434        offsets.push((start, data.len() as u32));
435    }
436    offsets
437}
438
439/// Paste files in normal (parallel) mode and return the output buffer.
440/// Pre-splits files into line offsets (one SIMD pass each), then the main
441/// loop uses O(1) array indexing instead of per-line memchr calls.
442/// Uses unsafe raw pointer writes to eliminate bounds-check overhead.
443pub fn paste_parallel_to_vec(file_data: &[&[u8]], config: &PasteConfig) -> Vec<u8> {
444    let terminator = if config.zero_terminated { 0u8 } else { b'\n' };
445    let delims = &config.delimiters;
446
447    if file_data.is_empty() || file_data.iter().all(|d| d.is_empty()) {
448        return Vec::new();
449    }
450
451    // Pre-split each file into line offsets — single SIMD pass per file.
452    let file_lines: Vec<Vec<(u32, u32)>> = file_data
453        .iter()
454        .map(|data| presplit_lines(data, terminator))
455        .collect();
456
457    let max_lines = file_lines.iter().map(|l| l.len()).max().unwrap_or(0);
458    if max_lines == 0 {
459        return Vec::new();
460    }
461
462    // Compute exact output size to avoid reallocation.
463    let nfiles = file_data.len();
464    let has_delims = !delims.is_empty();
465    let delims_per_line = if has_delims && nfiles > 1 {
466        nfiles - 1
467    } else {
468        0
469    };
470
471    let mut exact_size = max_lines * (delims_per_line + 1); // delimiters + terminators
472    for fl in &file_lines {
473        for &(s, e) in fl.iter() {
474            exact_size += (e - s) as usize;
475        }
476    }
477    // Empty-file lines contribute nothing but delimiter slots are already counted
478
479    let mut output = Vec::with_capacity(exact_size);
480
481    // SAFETY: We computed exact_size above. All writes go through raw pointers
482    // with total bytes written == exact_size. We set_len at the end.
483    unsafe {
484        let base: *mut u8 = output.as_mut_ptr();
485        let mut pos = 0usize;
486
487        for line_idx in 0..max_lines {
488            for file_idx in 0..nfiles {
489                if file_idx > 0 && has_delims {
490                    *base.add(pos) = delims[(file_idx - 1) % delims.len()];
491                    pos += 1;
492                }
493                let lines = &file_lines[file_idx];
494                if line_idx < lines.len() {
495                    let (s, e) = *lines.get_unchecked(line_idx);
496                    let len = (e - s) as usize;
497                    if len > 0 {
498                        std::ptr::copy_nonoverlapping(
499                            file_data.get_unchecked(file_idx).as_ptr().add(s as usize),
500                            base.add(pos),
501                            len,
502                        );
503                        pos += len;
504                    }
505                }
506            }
507            *base.add(pos) = terminator;
508            pos += 1;
509        }
510
511        assert_eq!(pos, exact_size, "exact_size miscalculated");
512        output.set_len(pos);
513    }
514
515    output
516}
517
518/// Paste files in serial mode and return the output buffer.
519/// For each file, join all lines with the delimiter list (cycling).
520/// Pre-splits lines using SIMD memchr, then iterates offset pairs.
521pub fn paste_serial_to_vec(file_data: &[&[u8]], config: &PasteConfig) -> Vec<u8> {
522    let terminator = if config.zero_terminated { 0u8 } else { b'\n' };
523    let delims = &config.delimiters;
524    let has_delims = !delims.is_empty();
525
526    // Estimate output size
527    let total_input: usize = file_data.iter().map(|d| d.len()).sum();
528    let mut output = Vec::with_capacity(total_input + file_data.len());
529
530    for data in file_data {
531        if data.is_empty() {
532            output.push(terminator);
533            continue;
534        }
535        let lines = presplit_lines(data, terminator);
536        if lines.is_empty() {
537            output.push(terminator);
538            continue;
539        }
540        // First line: no leading delimiter
541        let (s, e) = lines[0];
542        output.extend_from_slice(&data[s as usize..e as usize]);
543        // Subsequent lines: prepend cycling delimiter
544        for (i, &(s, e)) in lines[1..].iter().enumerate() {
545            if has_delims {
546                output.push(delims[i % delims.len()]);
547            }
548            output.extend_from_slice(&data[s as usize..e as usize]);
549        }
550        output.push(terminator);
551    }
552
553    output
554}
555
556/// Main paste entry point. Writes directly to the provided writer.
557pub fn paste(
558    file_data: &[&[u8]],
559    config: &PasteConfig,
560    out: &mut impl Write,
561) -> std::io::Result<()> {
562    let output = if config.serial {
563        paste_serial_to_vec(file_data, config)
564    } else {
565        paste_parallel_to_vec(file_data, config)
566    };
567    out.write_all(&output)
568}
569
570/// Build the paste output as a Vec, then return it for the caller to write.
571/// This allows the binary to use raw write() for maximum throughput.
572pub fn paste_to_vec(file_data: &[&[u8]], config: &PasteConfig) -> Vec<u8> {
573    if config.serial {
574        paste_serial_to_vec(file_data, config)
575    } else {
576        paste_parallel_to_vec(file_data, config)
577    }
578}