Skip to main content

coreutils_rs/paste/
core.rs

1use std::io::Write;
2
3/// Configuration for the paste command.
4pub struct PasteConfig {
5    /// Delimiter characters, cycled through columns.
6    pub delimiters: Vec<u8>,
7    /// Serial mode: paste one file at a time.
8    pub serial: bool,
9    /// Use NUL as line terminator instead of newline.
10    pub zero_terminated: bool,
11}
12
13impl Default for PasteConfig {
14    fn default() -> Self {
15        Self {
16            delimiters: vec![b'\t'],
17            serial: false,
18            zero_terminated: false,
19        }
20    }
21}
22
23/// Parse delimiter string with escape sequences.
24/// Supports: \n (newline), \t (tab), \\ (backslash), \0 (NUL), empty string (no delimiter).
25pub fn parse_delimiters(s: &str) -> Vec<u8> {
26    if s.is_empty() {
27        return Vec::new();
28    }
29    let bytes = s.as_bytes();
30    let mut result = Vec::with_capacity(bytes.len());
31    let mut i = 0;
32    while i < bytes.len() {
33        if bytes[i] == b'\\' && i + 1 < bytes.len() {
34            match bytes[i + 1] {
35                b'n' => {
36                    result.push(b'\n');
37                    i += 2;
38                }
39                b't' => {
40                    result.push(b'\t');
41                    i += 2;
42                }
43                b'\\' => {
44                    result.push(b'\\');
45                    i += 2;
46                }
47                b'0' => {
48                    result.push(0);
49                    i += 2;
50                }
51                _ => {
52                    // Unknown escape: treat backslash as literal
53                    result.push(b'\\');
54                    i += 1;
55                }
56            }
57        } else {
58            result.push(bytes[i]);
59            i += 1;
60        }
61    }
62    result
63}
64
65/// Output buffer size for streaming paste (1 MiB).
66const BUF_SIZE: usize = 1024 * 1024;
67
68/// Raw write to stdout fd 1. Returns any error encountered.
69#[cfg(unix)]
70pub fn raw_write_all(data: &[u8]) -> std::io::Result<()> {
71    let mut written = 0;
72    while written < data.len() {
73        let ret = unsafe {
74            libc::write(
75                1,
76                data[written..].as_ptr() as *const libc::c_void,
77                (data.len() - written) as _,
78            )
79        };
80        if ret > 0 {
81            written += ret as usize;
82        } else if ret == 0 {
83            return Err(std::io::Error::new(
84                std::io::ErrorKind::WriteZero,
85                "write returned 0",
86            ));
87        } else {
88            let err = std::io::Error::last_os_error();
89            if err.kind() == std::io::ErrorKind::Interrupted {
90                continue;
91            }
92            return Err(err);
93        }
94    }
95    Ok(())
96}
97
98#[cfg(not(unix))]
99pub fn raw_write_all(data: &[u8]) -> std::io::Result<()> {
100    let stdout = std::io::stdout();
101    let mut lock = stdout.lock();
102    lock.write_all(data)?;
103    lock.flush()
104}
105
106/// Streaming paste for the parallel (normal) mode.
107/// Scans each file line-by-line with memchr on-the-fly — no pre-split offset arrays.
108/// Uses a single 1MB output buffer with raw fd writes.
109pub fn paste_parallel_stream(file_data: &[&[u8]], config: &PasteConfig) -> std::io::Result<()> {
110    let terminator = if config.zero_terminated { 0u8 } else { b'\n' };
111    let delims = &config.delimiters;
112    let has_delims = !delims.is_empty();
113    let nfiles = file_data.len();
114
115    if nfiles == 0 || file_data.iter().all(|d| d.is_empty()) {
116        return Ok(());
117    }
118
119    // Fast path: single file is a passthrough (output == input)
120    if nfiles == 1 {
121        let data = file_data[0];
122        if data.is_empty() {
123            return Ok(());
124        }
125        if *data.last().unwrap() == terminator {
126            return raw_write_all(data);
127        }
128        raw_write_all(data)?;
129        return raw_write_all(&[terminator]);
130    }
131
132    // Fast path: 2 files with single-byte delimiter (the most common case)
133    if nfiles == 2 && delims.len() == 1 {
134        return paste_two_files_streaming(file_data[0], file_data[1], delims[0], terminator);
135    }
136
137    // General N-file streaming paste
138    paste_n_files_streaming(file_data, delims, has_delims, nfiles, terminator)
139}
140
141/// Fast path for 2-file paste: uses unsafe pointer writes and direct memchr scanning.
142/// Avoids pre-split offset arrays entirely — scans both files in lockstep.
143fn paste_two_files_streaming(
144    data_a: &[u8],
145    data_b: &[u8],
146    delim: u8,
147    terminator: u8,
148) -> std::io::Result<()> {
149    if data_a.is_empty() && data_b.is_empty() {
150        return Ok(());
151    }
152
153    let buf_cap = BUF_SIZE;
154    let mut buf: Vec<u8> = Vec::with_capacity(buf_cap + 65536);
155    let mut pos: usize = 0;
156
157    let mut cur_a: usize = 0;
158    let mut cur_b: usize = 0;
159
160    let ptr_a = data_a.as_ptr();
161    let ptr_b = data_b.as_ptr();
162    let len_a = data_a.len();
163    let len_b = data_b.len();
164
165    while cur_a < len_a || cur_b < len_b {
166        // Find line boundaries in both files
167        let (a_start, a_len, next_a) = if cur_a < len_a {
168            if let Some(nl) = memchr::memchr(terminator, &data_a[cur_a..]) {
169                (cur_a, nl, cur_a + nl + 1)
170            } else {
171                (cur_a, len_a - cur_a, len_a)
172            }
173        } else {
174            (0, 0, cur_a)
175        };
176
177        let (b_start, b_len, next_b) = if cur_b < len_b {
178            if let Some(nl) = memchr::memchr(terminator, &data_b[cur_b..]) {
179                (cur_b, nl, cur_b + nl + 1)
180            } else {
181                (cur_b, len_b - cur_b, len_b)
182            }
183        } else {
184            (0, 0, cur_b)
185        };
186
187        let out_len = a_len + 1 + b_len + 1;
188
189        // Ensure buffer has space
190        if pos + out_len > buf.capacity() {
191            unsafe { buf.set_len(pos) };
192            raw_write_all(&buf)?;
193            buf.clear();
194            pos = 0;
195            if out_len > buf.capacity() {
196                buf.reserve(out_len);
197            }
198        }
199
200        // Write line with unsafe pointer copies
201        unsafe {
202            let base = buf.as_mut_ptr();
203            if a_len > 0 {
204                std::ptr::copy_nonoverlapping(ptr_a.add(a_start), base.add(pos), a_len);
205                pos += a_len;
206            }
207            *base.add(pos) = delim;
208            pos += 1;
209            if b_len > 0 {
210                std::ptr::copy_nonoverlapping(ptr_b.add(b_start), base.add(pos), b_len);
211                pos += b_len;
212            }
213            *base.add(pos) = terminator;
214            pos += 1;
215        }
216
217        cur_a = next_a;
218        cur_b = next_b;
219
220        // Flush when buffer is full
221        if pos >= buf_cap {
222            unsafe { buf.set_len(pos) };
223            raw_write_all(&buf)?;
224            buf.clear();
225            pos = 0;
226        }
227    }
228
229    // Final flush
230    if pos > 0 {
231        unsafe { buf.set_len(pos) };
232        raw_write_all(&buf)?;
233    }
234
235    Ok(())
236}
237
238/// General N-file streaming paste with memchr cursors and unsafe pointer writes.
239fn paste_n_files_streaming(
240    file_data: &[&[u8]],
241    delims: &[u8],
242    has_delims: bool,
243    nfiles: usize,
244    terminator: u8,
245) -> std::io::Result<()> {
246    let mut cursors: Vec<usize> = vec![0; nfiles];
247    let buf_cap = BUF_SIZE;
248    let mut buf: Vec<u8> = Vec::with_capacity(buf_cap + 65536);
249    let mut pos: usize = 0;
250
251    loop {
252        // Check if any file still has data before writing anything
253        let mut any_data = false;
254        for i in 0..nfiles {
255            if cursors[i] < file_data[i].len() {
256                any_data = true;
257                break;
258            }
259        }
260        if !any_data {
261            break;
262        }
263
264        for file_idx in 0..nfiles {
265            let data = file_data[file_idx];
266            let cursor = cursors[file_idx];
267
268            // Delimiter before columns 1..N
269            if file_idx > 0 && has_delims {
270                let d = unsafe { *delims.get_unchecked((file_idx - 1) % delims.len()) };
271                if pos >= buf.capacity() {
272                    unsafe { buf.set_len(pos) };
273                    raw_write_all(&buf)?;
274                    buf.clear();
275                    pos = 0;
276                }
277                unsafe { *buf.as_mut_ptr().add(pos) = d };
278                pos += 1;
279            }
280
281            if cursor < data.len() {
282                let remaining = &data[cursor..];
283                let (line_len, next) = if let Some(nl_pos) = memchr::memchr(terminator, remaining) {
284                    (nl_pos, cursor + nl_pos + 1)
285                } else {
286                    (remaining.len(), data.len())
287                };
288
289                if line_len > 0 {
290                    if pos + line_len > buf.capacity() {
291                        unsafe { buf.set_len(pos) };
292                        raw_write_all(&buf)?;
293                        buf.clear();
294                        pos = 0;
295                        if line_len > buf.capacity() {
296                            buf.reserve(line_len + 4096);
297                        }
298                    }
299                    unsafe {
300                        std::ptr::copy_nonoverlapping(
301                            data.as_ptr().add(cursor),
302                            buf.as_mut_ptr().add(pos),
303                            line_len,
304                        );
305                    }
306                    pos += line_len;
307                }
308                cursors[file_idx] = next;
309            }
310        }
311
312        // Terminator
313        if pos >= buf.capacity() {
314            unsafe { buf.set_len(pos) };
315            raw_write_all(&buf)?;
316            buf.clear();
317            pos = 0;
318        }
319        unsafe { *buf.as_mut_ptr().add(pos) = terminator };
320        pos += 1;
321
322        // Flush when buffer is full
323        if pos >= buf_cap {
324            unsafe { buf.set_len(pos) };
325            raw_write_all(&buf)?;
326            buf.clear();
327            pos = 0;
328        }
329    }
330
331    // Final flush
332    if pos > 0 {
333        unsafe { buf.set_len(pos) };
334        raw_write_all(&buf)?;
335    }
336
337    Ok(())
338}
339
340/// Streaming paste for serial mode.
341/// For each file, join all lines with the delimiter list (cycling).
342pub fn paste_serial_stream(file_data: &[&[u8]], config: &PasteConfig) -> std::io::Result<()> {
343    let terminator = if config.zero_terminated { 0u8 } else { b'\n' };
344    let delims = &config.delimiters;
345    let has_delims = !delims.is_empty();
346
347    let mut buf: Vec<u8> = Vec::with_capacity(BUF_SIZE + 4096);
348
349    for data in file_data {
350        if data.is_empty() {
351            buf.push(terminator);
352            if buf.len() >= BUF_SIZE {
353                raw_write_all(&buf)?;
354                buf.clear();
355            }
356            continue;
357        }
358
359        let mut cursor = 0usize;
360        let mut line_idx = 0usize;
361
362        while cursor < data.len() {
363            // Delimiter before lines 1..N
364            if line_idx > 0 && has_delims {
365                buf.push(delims[(line_idx - 1) % delims.len()]);
366            }
367
368            let remaining = &data[cursor..];
369            if let Some(nl_pos) = memchr::memchr(terminator, remaining) {
370                buf.extend_from_slice(&remaining[..nl_pos]);
371                cursor += nl_pos + 1;
372            } else {
373                buf.extend_from_slice(remaining);
374                cursor = data.len();
375            }
376
377            line_idx += 1;
378
379            if buf.len() >= BUF_SIZE {
380                raw_write_all(&buf)?;
381                buf.clear();
382            }
383        }
384
385        buf.push(terminator);
386        if buf.len() >= BUF_SIZE {
387            raw_write_all(&buf)?;
388            buf.clear();
389        }
390    }
391
392    // Final flush
393    if !buf.is_empty() {
394        raw_write_all(&buf)?;
395    }
396
397    Ok(())
398}
399
400/// Streaming paste entry point. Writes directly to stdout using raw fd writes.
401pub fn paste_stream(file_data: &[&[u8]], config: &PasteConfig) -> std::io::Result<()> {
402    if config.serial {
403        paste_serial_stream(file_data, config)
404    } else {
405        paste_parallel_stream(file_data, config)
406    }
407}
408
409/// Pre-split a file into line offset pairs using a single SIMD memchr_iter pass.
410/// Returns a Vec of (start, end) byte offsets — one per line.
411#[inline]
412fn presplit_lines(data: &[u8], terminator: u8) -> Vec<(u32, u32)> {
413    if data.is_empty() {
414        return Vec::new();
415    }
416    assert!(
417        data.len() <= u32::MAX as usize,
418        "presplit_lines: data exceeds 4 GiB"
419    );
420    // Heuristic: assume average line length ~40 bytes to avoid a count pre-scan.
421    let estimated_lines = data.len() / 40 + 1;
422    let mut offsets = Vec::with_capacity(estimated_lines);
423    let mut start = 0u32;
424    for pos in memchr::memchr_iter(terminator, data) {
425        offsets.push((start, pos as u32));
426        start = pos as u32 + 1;
427    }
428    if data.last() != Some(&terminator) && (start as usize) < data.len() {
429        offsets.push((start, data.len() as u32));
430    }
431    offsets
432}
433
434/// Paste files in normal (parallel) mode and return the output buffer.
435/// Pre-splits files into line offsets (one SIMD pass each), then the main
436/// loop uses O(1) array indexing instead of per-line memchr calls.
437/// Uses unsafe raw pointer writes to eliminate bounds-check overhead.
438pub fn paste_parallel_to_vec(file_data: &[&[u8]], config: &PasteConfig) -> Vec<u8> {
439    let terminator = if config.zero_terminated { 0u8 } else { b'\n' };
440    let delims = &config.delimiters;
441
442    if file_data.is_empty() || file_data.iter().all(|d| d.is_empty()) {
443        return Vec::new();
444    }
445
446    // Pre-split each file into line offsets — single SIMD pass per file.
447    let file_lines: Vec<Vec<(u32, u32)>> = file_data
448        .iter()
449        .map(|data| presplit_lines(data, terminator))
450        .collect();
451
452    let max_lines = file_lines.iter().map(|l| l.len()).max().unwrap_or(0);
453    if max_lines == 0 {
454        return Vec::new();
455    }
456
457    // Compute exact output size to avoid reallocation.
458    let nfiles = file_data.len();
459    let has_delims = !delims.is_empty();
460    let delims_per_line = if has_delims && nfiles > 1 {
461        nfiles - 1
462    } else {
463        0
464    };
465
466    let mut exact_size = max_lines * (delims_per_line + 1); // delimiters + terminators
467    for fl in &file_lines {
468        for &(s, e) in fl.iter() {
469            exact_size += (e - s) as usize;
470        }
471    }
472    // Empty-file lines contribute nothing but delimiter slots are already counted
473
474    let mut output = Vec::with_capacity(exact_size);
475
476    // SAFETY: We computed exact_size above. All writes go through raw pointers
477    // with total bytes written == exact_size. We set_len at the end.
478    unsafe {
479        let base: *mut u8 = output.as_mut_ptr();
480        let mut pos = 0usize;
481
482        for line_idx in 0..max_lines {
483            for file_idx in 0..nfiles {
484                if file_idx > 0 && has_delims {
485                    *base.add(pos) = delims[(file_idx - 1) % delims.len()];
486                    pos += 1;
487                }
488                let lines = &file_lines[file_idx];
489                if line_idx < lines.len() {
490                    let (s, e) = *lines.get_unchecked(line_idx);
491                    let len = (e - s) as usize;
492                    if len > 0 {
493                        std::ptr::copy_nonoverlapping(
494                            file_data.get_unchecked(file_idx).as_ptr().add(s as usize),
495                            base.add(pos),
496                            len,
497                        );
498                        pos += len;
499                    }
500                }
501            }
502            *base.add(pos) = terminator;
503            pos += 1;
504        }
505
506        assert_eq!(pos, exact_size, "exact_size miscalculated");
507        output.set_len(pos);
508    }
509
510    output
511}
512
513/// Paste files in serial mode and return the output buffer.
514/// For each file, join all lines with the delimiter list (cycling).
515/// Pre-splits lines using SIMD memchr, then iterates offset pairs.
516pub fn paste_serial_to_vec(file_data: &[&[u8]], config: &PasteConfig) -> Vec<u8> {
517    let terminator = if config.zero_terminated { 0u8 } else { b'\n' };
518    let delims = &config.delimiters;
519    let has_delims = !delims.is_empty();
520
521    // Estimate output size
522    let total_input: usize = file_data.iter().map(|d| d.len()).sum();
523    let mut output = Vec::with_capacity(total_input + file_data.len());
524
525    for data in file_data {
526        if data.is_empty() {
527            output.push(terminator);
528            continue;
529        }
530        let lines = presplit_lines(data, terminator);
531        if lines.is_empty() {
532            output.push(terminator);
533            continue;
534        }
535        // First line: no leading delimiter
536        let (s, e) = lines[0];
537        output.extend_from_slice(&data[s as usize..e as usize]);
538        // Subsequent lines: prepend cycling delimiter
539        for (i, &(s, e)) in lines[1..].iter().enumerate() {
540            if has_delims {
541                output.push(delims[i % delims.len()]);
542            }
543            output.extend_from_slice(&data[s as usize..e as usize]);
544        }
545        output.push(terminator);
546    }
547
548    output
549}
550
551/// Main paste entry point. Writes directly to the provided writer.
552pub fn paste(
553    file_data: &[&[u8]],
554    config: &PasteConfig,
555    out: &mut impl Write,
556) -> std::io::Result<()> {
557    let output = if config.serial {
558        paste_serial_to_vec(file_data, config)
559    } else {
560        paste_parallel_to_vec(file_data, config)
561    };
562    out.write_all(&output)
563}
564
565/// Build the paste output as a Vec, then return it for the caller to write.
566/// This allows the binary to use raw write() for maximum throughput.
567pub fn paste_to_vec(file_data: &[&[u8]], config: &PasteConfig) -> Vec<u8> {
568    if config.serial {
569        paste_serial_to_vec(file_data, config)
570    } else {
571        paste_parallel_to_vec(file_data, config)
572    }
573}