Skip to main content

coreutils_rs/base64/
core.rs

1use std::io::{self, Read, Write};
2
3use base64_simd::AsOut;
4
5const BASE64_ENGINE: &base64_simd::Base64 = &base64_simd::STANDARD;
6
7/// Number of available CPUs for parallel chunk splitting.
8/// Uses std::thread::available_parallelism() to avoid triggering premature
9/// rayon pool initialization (~300-500µs). Rayon pool inits on first scope() call.
10#[inline]
11fn num_cpus() -> usize {
12    std::thread::available_parallelism()
13        .map(|n| n.get())
14        .unwrap_or(1)
15}
16
17/// Chunk size for sequential no-wrap encoding: 8MB aligned to 3 bytes.
18/// Larger chunks reduce function call overhead per iteration while still
19/// keeping peak buffer allocation reasonable (~10.7MB for the output).
20const NOWRAP_CHUNK: usize = 8 * 1024 * 1024 - (8 * 1024 * 1024 % 3);
21
22/// Minimum data size for parallel no-wrap encoding (16MB).
23/// For single-file CLI usage (typical benchmark), the Rayon pool is cold
24/// on first use (~200-500µs init). At 10MB, sequential encoding is faster
25/// because pool init + dispatch overhead exceeds the parallel benefit.
26/// Note: multi-file callers pay pool init only once; subsequent files would
27/// benefit from a lower threshold (~2MB). Optimized for single-file CLI.
28const PARALLEL_NOWRAP_THRESHOLD: usize = 16 * 1024 * 1024;
29
30/// Minimum data size for parallel wrapped encoding (12MB).
31/// Same cold-pool reasoning as PARALLEL_NOWRAP_THRESHOLD above.
32/// The sequential encode_wrapped_expand path with backward expansion
33/// eliminates per-group overhead from L1-scatter chunking.
34const PARALLEL_WRAPPED_THRESHOLD: usize = 12 * 1024 * 1024;
35
36/// Minimum data size for parallel decoding (1MB of base64 data).
37/// Lower threshold than encode because decode is more compute-intensive
38/// and benefits from parallelism at smaller sizes. After first use, the
39/// Rayon pool is warm (~10µs dispatch), making 1MB a good crossover point.
40const PARALLEL_DECODE_THRESHOLD: usize = 1024 * 1024;
41
42/// Hint HUGEPAGE for large output buffers on Linux.
43/// MADV_HUGEPAGE tells kernel to use 2MB pages, reducing TLB misses
44/// and minor fault count for large allocations (~25,600 → ~50 for 100MB).
45#[cfg(target_os = "linux")]
46fn hint_hugepage(buf: &mut Vec<u8>) {
47    if buf.capacity() >= 2 * 1024 * 1024 {
48        unsafe {
49            libc::madvise(
50                buf.as_mut_ptr() as *mut libc::c_void,
51                buf.capacity(),
52                libc::MADV_HUGEPAGE,
53            );
54        }
55    }
56}
57
58/// Encode data and write to output with line wrapping.
59/// Uses SIMD encoding with fused encode+wrap for maximum throughput.
60pub fn encode_to_writer(data: &[u8], wrap_col: usize, out: &mut impl Write) -> io::Result<()> {
61    if data.is_empty() {
62        return Ok(());
63    }
64
65    if wrap_col == 0 {
66        return encode_no_wrap(data, out);
67    }
68
69    encode_wrapped(data, wrap_col, out)
70}
71
72/// Encode without wrapping — parallel SIMD encoding for large data, sequential for small.
73fn encode_no_wrap(data: &[u8], out: &mut impl Write) -> io::Result<()> {
74    if data.len() >= PARALLEL_NOWRAP_THRESHOLD && num_cpus() > 1 {
75        return encode_no_wrap_parallel(data, out);
76    }
77
78    // Single-buffer encode: for data that fits in one chunk, encode directly
79    // and write once. For larger data, reuse the buffer across chunks.
80    let enc_len = BASE64_ENGINE.encoded_length(data.len().min(NOWRAP_CHUNK));
81    let mut buf: Vec<u8> = Vec::with_capacity(enc_len);
82    #[allow(clippy::uninit_vec)]
83    unsafe {
84        buf.set_len(enc_len);
85    }
86
87    for chunk in data.chunks(NOWRAP_CHUNK) {
88        let clen = BASE64_ENGINE.encoded_length(chunk.len());
89        let encoded = BASE64_ENGINE.encode(chunk, buf[..clen].as_out());
90        out.write_all(encoded)?;
91    }
92    Ok(())
93}
94
95/// Parallel no-wrap encoding into a single shared output buffer.
96/// Split at 3-byte boundaries, pre-calculate output offsets, encode in parallel.
97/// Each chunk except possibly the last is 3-byte aligned, so no padding in intermediate chunks.
98/// Single allocation + single write_all instead of N allocations + writev.
99fn encode_no_wrap_parallel(data: &[u8], out: &mut impl Write) -> io::Result<()> {
100    let num_threads = num_cpus().max(1);
101    let raw_chunk = data.len() / num_threads;
102    // Align to 3 bytes so each chunk encodes without padding (except the last)
103    let chunk_size = ((raw_chunk + 2) / 3) * 3;
104
105    // Split input into 3-byte-aligned chunks
106    let chunks: Vec<&[u8]> = data.chunks(chunk_size.max(3)).collect();
107
108    // Pre-calculate output offsets
109    let mut offsets: Vec<usize> = Vec::with_capacity(chunks.len() + 1);
110    let mut total_out = 0usize;
111    for chunk in &chunks {
112        offsets.push(total_out);
113        total_out += BASE64_ENGINE.encoded_length(chunk.len());
114    }
115
116    // Single allocation for all threads
117    let mut output: Vec<u8> = Vec::with_capacity(total_out);
118    #[allow(clippy::uninit_vec)]
119    unsafe {
120        output.set_len(total_out);
121    }
122    #[cfg(target_os = "linux")]
123    hint_hugepage(&mut output);
124
125    // Parallel encode: each thread writes into its pre-assigned region
126    let output_base = output.as_mut_ptr() as usize;
127    rayon::scope(|s| {
128        for (i, chunk) in chunks.iter().enumerate() {
129            let out_off = offsets[i];
130            let enc_len = BASE64_ENGINE.encoded_length(chunk.len());
131            let base = output_base;
132            s.spawn(move |_| {
133                let dest =
134                    unsafe { std::slice::from_raw_parts_mut((base + out_off) as *mut u8, enc_len) };
135                let _ = BASE64_ENGINE.encode(chunk, dest.as_out());
136            });
137        }
138    });
139
140    out.write_all(&output[..total_out])
141}
142
143/// Encode with line wrapping using forward scatter from L1-cached temp buffer.
144/// Encodes groups of lines into a small temp buffer (fits in L1 cache), then
145/// scatter-copies wrap_col-byte chunks from temp to output with newlines.
146///
147/// This is faster than bulk encode + backward expansion because:
148/// - Temp buffer reads hit L1 cache (essentially free bandwidth)
149/// - Output buffer is written once (no double-write from backward memmove)
150/// - Forward access pattern is prefetcher-friendly
151fn encode_wrapped(data: &[u8], wrap_col: usize, out: &mut impl Write) -> io::Result<()> {
152    let bytes_per_line = wrap_col * 3 / 4;
153    if bytes_per_line == 0 {
154        return encode_wrapped_small(data, wrap_col, out);
155    }
156
157    if data.len() >= PARALLEL_WRAPPED_THRESHOLD && bytes_per_line.is_multiple_of(3) {
158        return encode_wrapped_parallel(data, wrap_col, bytes_per_line, out);
159    }
160
161    if bytes_per_line.is_multiple_of(3) {
162        return encode_wrapped_expand(data, wrap_col, bytes_per_line, out);
163    }
164
165    // Fallback for non-3-aligned bytes_per_line: use fuse_wrap approach
166    let enc_max = BASE64_ENGINE.encoded_length(data.len());
167    let num_full = enc_max / wrap_col;
168    let rem = enc_max % wrap_col;
169    let out_len = num_full * (wrap_col + 1) + if rem > 0 { rem + 1 } else { 0 };
170
171    // Encode full data, then fuse with newlines
172    let mut enc_buf: Vec<u8> = Vec::with_capacity(enc_max);
173    #[allow(clippy::uninit_vec)]
174    unsafe {
175        enc_buf.set_len(enc_max);
176    }
177    let _ = BASE64_ENGINE.encode(data, enc_buf[..enc_max].as_out());
178
179    let mut out_buf: Vec<u8> = Vec::with_capacity(out_len);
180    #[allow(clippy::uninit_vec)]
181    unsafe {
182        out_buf.set_len(out_len);
183    }
184    let n = fuse_wrap(&enc_buf, wrap_col, &mut out_buf);
185    out.write_all(&out_buf[..n])
186}
187
188/// Encode with backward expansion: single contiguous SIMD encode, then expand
189/// in-place to insert newlines. The encode is done in one call (no chunking),
190/// which eliminates per-group function call overhead from L1-scatter.
191/// The backward expansion only shifts data by ~1.3% (1 byte per 76 for wrap_col=76),
192/// and for most lines the shift exceeds wrap_col so memmove uses the fast memcpy path.
193fn encode_wrapped_expand(
194    data: &[u8],
195    wrap_col: usize,
196    bytes_per_line: usize,
197    out: &mut impl Write,
198) -> io::Result<()> {
199    debug_assert!(bytes_per_line.is_multiple_of(3));
200    let enc_len = BASE64_ENGINE.encoded_length(data.len());
201    if enc_len == 0 {
202        return Ok(());
203    }
204
205    let num_full = enc_len / wrap_col;
206    let rem = enc_len % wrap_col;
207    let out_len = num_full * (wrap_col + 1) + if rem > 0 { rem + 1 } else { 0 };
208
209    // Single allocation: encode into first enc_len bytes, expand backward to out_len.
210    // SAFETY: buf[..enc_len] is initialized by BASE64_ENGINE.encode below.
211    // buf[enc_len..out_len] is written by expand_backward before write_all reads it.
212    let mut buf: Vec<u8> = Vec::with_capacity(out_len);
213    #[allow(clippy::uninit_vec)]
214    unsafe {
215        buf.set_len(out_len);
216    }
217    #[cfg(target_os = "linux")]
218    hint_hugepage(&mut buf);
219
220    // One SIMD encode call for the entire input (no chunking overhead)
221    let encoded = BASE64_ENGINE.encode(data, buf[..enc_len].as_out());
222    debug_assert_eq!(encoded.len(), enc_len, "encode wrote unexpected length");
223
224    // Expand backward to insert newlines — shifts only ~1.3% of data
225    expand_backward(buf.as_mut_ptr(), enc_len, out_len, wrap_col);
226
227    out.write_all(&buf[..out_len])
228}
229
230/// L1-scatter encode: encode groups of lines into a small L1-cached temp buffer,
231/// then scatter-copy each line to its final position in the output buffer with
232/// newline insertion. Each output byte is written exactly once — no read-back
233/// from main memory, halving memory traffic vs backward expansion.
234///
235/// Temp buffer (~20KB for 256 lines × 76 chars) stays hot in L1 cache, so
236/// reads during scatter are essentially free. Output buffer is streamed out
237/// with sequential writes that the prefetcher can handle efficiently.
238///
239/// Uses a full output buffer for vmsplice safety: vmsplice maps user pages
240/// into the pipe buffer, so the buffer must stay valid until the reader consumes.
241#[allow(dead_code)]
242fn encode_wrapped_scatter(
243    data: &[u8],
244    wrap_col: usize,
245    bytes_per_line: usize,
246    out: &mut impl Write,
247) -> io::Result<()> {
248    let enc_len = BASE64_ENGINE.encoded_length(data.len());
249    if enc_len == 0 {
250        return Ok(());
251    }
252
253    let num_full = enc_len / wrap_col;
254    let rem = enc_len % wrap_col;
255    let out_len = num_full * (wrap_col + 1) + if rem > 0 { rem + 1 } else { 0 };
256
257    // Output buffer — written once via scatter, then write_all to output
258    let mut buf: Vec<u8> = Vec::with_capacity(out_len);
259    #[allow(clippy::uninit_vec)]
260    unsafe {
261        buf.set_len(out_len);
262    }
263    #[cfg(target_os = "linux")]
264    hint_hugepage(&mut buf);
265
266    // L1-cached temp buffer for encoding groups of lines.
267    // 256 lines × 76 chars = 19,456 bytes — fits comfortably in L1 (32-64KB).
268    const GROUP_LINES: usize = 256;
269    let group_input = GROUP_LINES * bytes_per_line;
270    let temp_size = GROUP_LINES * wrap_col;
271    let mut temp: Vec<u8> = Vec::with_capacity(temp_size);
272    #[allow(clippy::uninit_vec)]
273    unsafe {
274        temp.set_len(temp_size);
275    }
276
277    let line_out = wrap_col + 1;
278    let mut wp = 0usize; // write position in output buffer
279
280    for chunk in data.chunks(group_input) {
281        let clen = BASE64_ENGINE.encoded_length(chunk.len());
282        let _ = BASE64_ENGINE.encode(chunk, temp[..clen].as_out());
283
284        // Scatter-copy full lines from temp to output with newlines
285        let lines = clen / wrap_col;
286        let chunk_rem = clen % wrap_col;
287
288        // 8-line unrolled scatter for ILP
289        let mut i = 0;
290        while i + 8 <= lines {
291            unsafe {
292                let src = temp.as_ptr().add(i * wrap_col);
293                let dst = buf.as_mut_ptr().add(wp);
294                std::ptr::copy_nonoverlapping(src, dst, wrap_col);
295                *dst.add(wrap_col) = b'\n';
296                std::ptr::copy_nonoverlapping(src.add(wrap_col), dst.add(line_out), wrap_col);
297                *dst.add(line_out + wrap_col) = b'\n';
298                std::ptr::copy_nonoverlapping(
299                    src.add(2 * wrap_col),
300                    dst.add(2 * line_out),
301                    wrap_col,
302                );
303                *dst.add(2 * line_out + wrap_col) = b'\n';
304                std::ptr::copy_nonoverlapping(
305                    src.add(3 * wrap_col),
306                    dst.add(3 * line_out),
307                    wrap_col,
308                );
309                *dst.add(3 * line_out + wrap_col) = b'\n';
310                std::ptr::copy_nonoverlapping(
311                    src.add(4 * wrap_col),
312                    dst.add(4 * line_out),
313                    wrap_col,
314                );
315                *dst.add(4 * line_out + wrap_col) = b'\n';
316                std::ptr::copy_nonoverlapping(
317                    src.add(5 * wrap_col),
318                    dst.add(5 * line_out),
319                    wrap_col,
320                );
321                *dst.add(5 * line_out + wrap_col) = b'\n';
322                std::ptr::copy_nonoverlapping(
323                    src.add(6 * wrap_col),
324                    dst.add(6 * line_out),
325                    wrap_col,
326                );
327                *dst.add(6 * line_out + wrap_col) = b'\n';
328                std::ptr::copy_nonoverlapping(
329                    src.add(7 * wrap_col),
330                    dst.add(7 * line_out),
331                    wrap_col,
332                );
333                *dst.add(7 * line_out + wrap_col) = b'\n';
334            }
335            wp += 8 * line_out;
336            i += 8;
337        }
338        // Remaining full lines
339        while i < lines {
340            unsafe {
341                std::ptr::copy_nonoverlapping(
342                    temp.as_ptr().add(i * wrap_col),
343                    buf.as_mut_ptr().add(wp),
344                    wrap_col,
345                );
346                *buf.as_mut_ptr().add(wp + wrap_col) = b'\n';
347            }
348            wp += line_out;
349            i += 1;
350        }
351        // Partial last line (only on final chunk)
352        if chunk_rem > 0 {
353            unsafe {
354                std::ptr::copy_nonoverlapping(
355                    temp.as_ptr().add(lines * wrap_col),
356                    buf.as_mut_ptr().add(wp),
357                    chunk_rem,
358                );
359                *buf.as_mut_ptr().add(wp + chunk_rem) = b'\n';
360            }
361            wp += chunk_rem + 1;
362        }
363    }
364
365    out.write_all(&buf[..wp])
366}
367
368/// Scatter-copy encoded lines from temp buffer to output buffer with newlines.
369/// Uses copy_nonoverlapping since temp and output never overlap.
370#[inline]
371#[allow(dead_code)]
372fn scatter_lines(
373    temp: &[u8],
374    buf: &mut [u8],
375    line_start: usize,
376    count: usize,
377    wrap_col: usize,
378    line_out: usize,
379) {
380    unsafe {
381        let src = temp.as_ptr();
382        let dst = buf.as_mut_ptr();
383        for i in 0..count {
384            let s_off = i * wrap_col;
385            let d_off = (line_start + i) * line_out;
386            std::ptr::copy_nonoverlapping(src.add(s_off), dst.add(d_off), wrap_col);
387            *dst.add(d_off + wrap_col) = b'\n';
388        }
389    }
390}
391
392/// Expand encoded data in-place by inserting newlines at wrap_col boundaries.
393/// buf[0..enc_len] contains contiguous encoded data; buf has capacity for out_len.
394/// After expansion, buf[0..out_len] contains wrapped output with newlines.
395///
396/// Processes backward so shifted data never overwrites unread source data.
397/// For wrap_col=76: shift is ~1.3% (1 byte per 76), so most copies are
398/// non-overlapping and the memmove fast-path (memcpy) is used.
399#[inline]
400fn expand_backward(ptr: *mut u8, enc_len: usize, out_len: usize, wrap_col: usize) {
401    let num_full = enc_len / wrap_col;
402    let rem = enc_len % wrap_col;
403
404    unsafe {
405        let mut rp = enc_len;
406        let mut wp = out_len;
407
408        // Handle partial last line (remainder)
409        if rem > 0 {
410            wp -= 1;
411            *ptr.add(wp) = b'\n';
412            wp -= rem;
413            rp -= rem;
414            if rp != wp {
415                std::ptr::copy(ptr.add(rp), ptr.add(wp), rem);
416            }
417        }
418
419        // Process full lines backward
420        let mut lines_left = num_full;
421        while lines_left >= 8 {
422            // Unrolled: 8 lines per iteration
423            wp -= 1;
424            *ptr.add(wp) = b'\n';
425            rp -= wrap_col;
426            wp -= wrap_col;
427            std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
428
429            wp -= 1;
430            *ptr.add(wp) = b'\n';
431            rp -= wrap_col;
432            wp -= wrap_col;
433            std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
434
435            wp -= 1;
436            *ptr.add(wp) = b'\n';
437            rp -= wrap_col;
438            wp -= wrap_col;
439            std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
440
441            wp -= 1;
442            *ptr.add(wp) = b'\n';
443            rp -= wrap_col;
444            wp -= wrap_col;
445            std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
446
447            wp -= 1;
448            *ptr.add(wp) = b'\n';
449            rp -= wrap_col;
450            wp -= wrap_col;
451            std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
452
453            wp -= 1;
454            *ptr.add(wp) = b'\n';
455            rp -= wrap_col;
456            wp -= wrap_col;
457            std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
458
459            wp -= 1;
460            *ptr.add(wp) = b'\n';
461            rp -= wrap_col;
462            wp -= wrap_col;
463            std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
464
465            wp -= 1;
466            *ptr.add(wp) = b'\n';
467            rp -= wrap_col;
468            wp -= wrap_col;
469            std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
470
471            lines_left -= 8;
472        }
473
474        // Remaining lines (0-7)
475        while lines_left > 0 {
476            wp -= 1;
477            *ptr.add(wp) = b'\n';
478            rp -= wrap_col;
479            wp -= wrap_col;
480            if rp != wp {
481                std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
482            }
483            lines_left -= 1;
484        }
485    }
486}
487
488/// Static newline byte for IoSlice references in writev calls.
489static NEWLINE: [u8; 1] = [b'\n'];
490
491/// Write encoded base64 data with line wrapping using write_vectored (writev).
492/// Builds IoSlice entries pointing at wrap_col-sized segments of the encoded buffer,
493/// interleaved with newline IoSlices, then writes in batches of MAX_WRITEV_IOV.
494/// This is zero-copy: no fused output buffer needed.
495#[inline]
496#[allow(dead_code)]
497fn write_wrapped_iov(encoded: &[u8], wrap_col: usize, out: &mut impl Write) -> io::Result<()> {
498    // Max IoSlice entries per writev batch. Linux UIO_MAXIOV is 1024.
499    // Each line needs 2 entries (data + newline), so 512 lines per batch.
500    const MAX_IOV: usize = 1024;
501
502    let num_full_lines = encoded.len() / wrap_col;
503    let remainder = encoded.len() % wrap_col;
504    let total_iov = num_full_lines * 2 + if remainder > 0 { 2 } else { 0 };
505
506    // Small output: build all IoSlices and write in one call
507    if total_iov <= MAX_IOV {
508        let mut iov: Vec<io::IoSlice> = Vec::with_capacity(total_iov);
509        let mut pos = 0;
510        for _ in 0..num_full_lines {
511            iov.push(io::IoSlice::new(&encoded[pos..pos + wrap_col]));
512            iov.push(io::IoSlice::new(&NEWLINE));
513            pos += wrap_col;
514        }
515        if remainder > 0 {
516            iov.push(io::IoSlice::new(&encoded[pos..pos + remainder]));
517            iov.push(io::IoSlice::new(&NEWLINE));
518        }
519        return write_all_vectored(out, &iov);
520    }
521
522    // Large output: fuse batches of lines into a reusable L1-cached buffer.
523    // Each batch copies ~39KB (512 lines × 77 bytes) from the encoded buffer
524    // with newlines inserted, then writes as a single contiguous write(2).
525    // This is faster than writev with 1024 IoSlice entries because:
526    // - One kernel memcpy per batch vs 1024 separate copies
527    // - Fused buffer (39KB) stays hot in L1 cache across batches
528    let line_out = wrap_col + 1;
529    const BATCH_LINES: usize = 512;
530    let batch_fused_size = BATCH_LINES * line_out;
531    let mut fused: Vec<u8> = Vec::with_capacity(batch_fused_size);
532    #[allow(clippy::uninit_vec)]
533    unsafe {
534        fused.set_len(batch_fused_size);
535    }
536
537    let mut rp = 0;
538    let mut lines_done = 0;
539
540    // Process full batches using 8-line unrolled fuse_wrap
541    while lines_done + BATCH_LINES <= num_full_lines {
542        let n = fuse_wrap(
543            &encoded[rp..rp + BATCH_LINES * wrap_col],
544            wrap_col,
545            &mut fused,
546        );
547        out.write_all(&fused[..n])?;
548        rp += BATCH_LINES * wrap_col;
549        lines_done += BATCH_LINES;
550    }
551
552    // Remaining full lines (partial batch)
553    let remaining_lines = num_full_lines - lines_done;
554    if remaining_lines > 0 {
555        let n = fuse_wrap(
556            &encoded[rp..rp + remaining_lines * wrap_col],
557            wrap_col,
558            &mut fused,
559        );
560        out.write_all(&fused[..n])?;
561        rp += remaining_lines * wrap_col;
562    }
563
564    // Partial last line
565    if remainder > 0 {
566        out.write_all(&encoded[rp..rp + remainder])?;
567        out.write_all(b"\n")?;
568    }
569    Ok(())
570}
571
572/// Write encoded base64 data with line wrapping using writev, tracking column state
573/// across calls. Used by encode_stream for piped input where chunks don't align
574/// to line boundaries.
575#[inline]
576fn write_wrapped_iov_streaming(
577    encoded: &[u8],
578    wrap_col: usize,
579    col: &mut usize,
580    out: &mut impl Write,
581) -> io::Result<()> {
582    const MAX_IOV: usize = 1024;
583    let mut iov: Vec<io::IoSlice> = Vec::with_capacity(MAX_IOV);
584    let mut rp = 0;
585
586    while rp < encoded.len() {
587        let space = wrap_col - *col;
588        let avail = encoded.len() - rp;
589
590        if avail <= space {
591            // Remaining data fits in current line
592            iov.push(io::IoSlice::new(&encoded[rp..rp + avail]));
593            *col += avail;
594            if *col == wrap_col {
595                iov.push(io::IoSlice::new(&NEWLINE));
596                *col = 0;
597            }
598            break;
599        } else {
600            // Fill current line and add newline
601            iov.push(io::IoSlice::new(&encoded[rp..rp + space]));
602            iov.push(io::IoSlice::new(&NEWLINE));
603            rp += space;
604            *col = 0;
605        }
606
607        if iov.len() >= MAX_IOV - 1 {
608            write_all_vectored(out, &iov)?;
609            iov.clear();
610        }
611    }
612
613    if !iov.is_empty() {
614        write_all_vectored(out, &iov)?;
615    }
616    Ok(())
617}
618
619/// Parallel wrapped encoding with L1-scatter into a single shared output buffer.
620/// Pre-calculates each thread's output offset, allocates one buffer for all threads,
621/// and has each thread encode directly into its pre-assigned non-overlapping region.
622/// This saves N-1 buffer allocations and corresponding page faults vs per-thread Vecs,
623/// and uses a single write_all instead of writev.
624fn encode_wrapped_parallel(
625    data: &[u8],
626    wrap_col: usize,
627    bytes_per_line: usize,
628    out: &mut impl Write,
629) -> io::Result<()> {
630    let num_threads = num_cpus().max(1);
631    let lines_per_chunk = ((data.len() / bytes_per_line) / num_threads).max(1);
632    let chunk_input = lines_per_chunk * bytes_per_line;
633
634    // Split input at bytes_per_line boundaries (last chunk may have remainder)
635    let chunks: Vec<&[u8]> = data.chunks(chunk_input.max(bytes_per_line)).collect();
636
637    // Pre-calculate output offsets for each chunk
638    let mut offsets: Vec<usize> = Vec::with_capacity(chunks.len() + 1);
639    let mut total_out = 0usize;
640    for chunk in &chunks {
641        offsets.push(total_out);
642        let enc_len = BASE64_ENGINE.encoded_length(chunk.len());
643        let full_lines = enc_len / wrap_col;
644        let remainder = enc_len % wrap_col;
645        total_out += full_lines * (wrap_col + 1) + if remainder > 0 { remainder + 1 } else { 0 };
646    }
647
648    // Single allocation for all threads
649    let mut output: Vec<u8> = Vec::with_capacity(total_out);
650    #[allow(clippy::uninit_vec)]
651    unsafe {
652        output.set_len(total_out);
653    }
654    #[cfg(target_os = "linux")]
655    hint_hugepage(&mut output);
656
657    // Parallel encode: each thread writes into its pre-assigned region
658    let output_base = output.as_mut_ptr() as usize;
659    rayon::scope(|s| {
660        for (i, chunk) in chunks.iter().enumerate() {
661            let out_off = offsets[i];
662            let out_end = if i + 1 < offsets.len() {
663                offsets[i + 1]
664            } else {
665                total_out
666            };
667            let out_size = out_end - out_off;
668            let base = output_base;
669            s.spawn(move |_| {
670                let out_slice = unsafe {
671                    std::slice::from_raw_parts_mut((base + out_off) as *mut u8, out_size)
672                };
673                encode_chunk_l1_scatter_into(chunk, out_slice, wrap_col, bytes_per_line);
674            });
675        }
676    });
677
678    out.write_all(&output[..total_out])
679}
680
681/// Encode a chunk using L1-scatter, writing into a pre-allocated output slice.
682/// Encodes groups of 256 lines into L1-cached temp buffer, scatter-copy to output with newlines.
683/// The output slice must be large enough to hold the encoded+wrapped output.
684fn encode_chunk_l1_scatter_into(
685    data: &[u8],
686    output: &mut [u8],
687    wrap_col: usize,
688    bytes_per_line: usize,
689) {
690    const GROUP_LINES: usize = 256;
691    let group_input = GROUP_LINES * bytes_per_line;
692    let temp_size = GROUP_LINES * wrap_col;
693    let mut temp: Vec<u8> = Vec::with_capacity(temp_size);
694    #[allow(clippy::uninit_vec)]
695    unsafe {
696        temp.set_len(temp_size);
697    }
698
699    let line_out = wrap_col + 1;
700    let mut wp = 0usize;
701
702    for chunk in data.chunks(group_input) {
703        let clen = BASE64_ENGINE.encoded_length(chunk.len());
704        let _ = BASE64_ENGINE.encode(chunk, temp[..clen].as_out());
705
706        let lines = clen / wrap_col;
707        let chunk_rem = clen % wrap_col;
708
709        // 8-line unrolled scatter
710        let mut i = 0;
711        while i + 8 <= lines {
712            unsafe {
713                let src = temp.as_ptr().add(i * wrap_col);
714                let dst = output.as_mut_ptr().add(wp);
715                std::ptr::copy_nonoverlapping(src, dst, wrap_col);
716                *dst.add(wrap_col) = b'\n';
717                std::ptr::copy_nonoverlapping(src.add(wrap_col), dst.add(line_out), wrap_col);
718                *dst.add(line_out + wrap_col) = b'\n';
719                std::ptr::copy_nonoverlapping(
720                    src.add(2 * wrap_col),
721                    dst.add(2 * line_out),
722                    wrap_col,
723                );
724                *dst.add(2 * line_out + wrap_col) = b'\n';
725                std::ptr::copy_nonoverlapping(
726                    src.add(3 * wrap_col),
727                    dst.add(3 * line_out),
728                    wrap_col,
729                );
730                *dst.add(3 * line_out + wrap_col) = b'\n';
731                std::ptr::copy_nonoverlapping(
732                    src.add(4 * wrap_col),
733                    dst.add(4 * line_out),
734                    wrap_col,
735                );
736                *dst.add(4 * line_out + wrap_col) = b'\n';
737                std::ptr::copy_nonoverlapping(
738                    src.add(5 * wrap_col),
739                    dst.add(5 * line_out),
740                    wrap_col,
741                );
742                *dst.add(5 * line_out + wrap_col) = b'\n';
743                std::ptr::copy_nonoverlapping(
744                    src.add(6 * wrap_col),
745                    dst.add(6 * line_out),
746                    wrap_col,
747                );
748                *dst.add(6 * line_out + wrap_col) = b'\n';
749                std::ptr::copy_nonoverlapping(
750                    src.add(7 * wrap_col),
751                    dst.add(7 * line_out),
752                    wrap_col,
753                );
754                *dst.add(7 * line_out + wrap_col) = b'\n';
755            }
756            wp += 8 * line_out;
757            i += 8;
758        }
759        while i < lines {
760            unsafe {
761                std::ptr::copy_nonoverlapping(
762                    temp.as_ptr().add(i * wrap_col),
763                    output.as_mut_ptr().add(wp),
764                    wrap_col,
765                );
766                *output.as_mut_ptr().add(wp + wrap_col) = b'\n';
767            }
768            wp += line_out;
769            i += 1;
770        }
771        if chunk_rem > 0 {
772            unsafe {
773                std::ptr::copy_nonoverlapping(
774                    temp.as_ptr().add(lines * wrap_col),
775                    output.as_mut_ptr().add(wp),
776                    chunk_rem,
777                );
778                *output.as_mut_ptr().add(wp + chunk_rem) = b'\n';
779            }
780            wp += chunk_rem + 1;
781        }
782    }
783}
784
785/// Fuse encoded base64 data with newlines in a single pass.
786/// Uses ptr::copy_nonoverlapping with 8-line unrolling for max throughput.
787/// Returns number of bytes written.
788#[inline]
789fn fuse_wrap(encoded: &[u8], wrap_col: usize, out_buf: &mut [u8]) -> usize {
790    let line_out = wrap_col + 1; // wrap_col data bytes + 1 newline
791    let mut rp = 0;
792    let mut wp = 0;
793
794    // Unrolled: process 8 lines per iteration for better ILP
795    while rp + 8 * wrap_col <= encoded.len() {
796        unsafe {
797            let src = encoded.as_ptr().add(rp);
798            let dst = out_buf.as_mut_ptr().add(wp);
799
800            std::ptr::copy_nonoverlapping(src, dst, wrap_col);
801            *dst.add(wrap_col) = b'\n';
802
803            std::ptr::copy_nonoverlapping(src.add(wrap_col), dst.add(line_out), wrap_col);
804            *dst.add(line_out + wrap_col) = b'\n';
805
806            std::ptr::copy_nonoverlapping(src.add(2 * wrap_col), dst.add(2 * line_out), wrap_col);
807            *dst.add(2 * line_out + wrap_col) = b'\n';
808
809            std::ptr::copy_nonoverlapping(src.add(3 * wrap_col), dst.add(3 * line_out), wrap_col);
810            *dst.add(3 * line_out + wrap_col) = b'\n';
811
812            std::ptr::copy_nonoverlapping(src.add(4 * wrap_col), dst.add(4 * line_out), wrap_col);
813            *dst.add(4 * line_out + wrap_col) = b'\n';
814
815            std::ptr::copy_nonoverlapping(src.add(5 * wrap_col), dst.add(5 * line_out), wrap_col);
816            *dst.add(5 * line_out + wrap_col) = b'\n';
817
818            std::ptr::copy_nonoverlapping(src.add(6 * wrap_col), dst.add(6 * line_out), wrap_col);
819            *dst.add(6 * line_out + wrap_col) = b'\n';
820
821            std::ptr::copy_nonoverlapping(src.add(7 * wrap_col), dst.add(7 * line_out), wrap_col);
822            *dst.add(7 * line_out + wrap_col) = b'\n';
823        }
824        rp += 8 * wrap_col;
825        wp += 8 * line_out;
826    }
827
828    // Handle remaining 4 lines at a time
829    while rp + 4 * wrap_col <= encoded.len() {
830        unsafe {
831            let src = encoded.as_ptr().add(rp);
832            let dst = out_buf.as_mut_ptr().add(wp);
833
834            std::ptr::copy_nonoverlapping(src, dst, wrap_col);
835            *dst.add(wrap_col) = b'\n';
836
837            std::ptr::copy_nonoverlapping(src.add(wrap_col), dst.add(line_out), wrap_col);
838            *dst.add(line_out + wrap_col) = b'\n';
839
840            std::ptr::copy_nonoverlapping(src.add(2 * wrap_col), dst.add(2 * line_out), wrap_col);
841            *dst.add(2 * line_out + wrap_col) = b'\n';
842
843            std::ptr::copy_nonoverlapping(src.add(3 * wrap_col), dst.add(3 * line_out), wrap_col);
844            *dst.add(3 * line_out + wrap_col) = b'\n';
845        }
846        rp += 4 * wrap_col;
847        wp += 4 * line_out;
848    }
849
850    // Remaining full lines
851    while rp + wrap_col <= encoded.len() {
852        unsafe {
853            std::ptr::copy_nonoverlapping(
854                encoded.as_ptr().add(rp),
855                out_buf.as_mut_ptr().add(wp),
856                wrap_col,
857            );
858            *out_buf.as_mut_ptr().add(wp + wrap_col) = b'\n';
859        }
860        rp += wrap_col;
861        wp += line_out;
862    }
863
864    // Partial last line
865    if rp < encoded.len() {
866        let remaining = encoded.len() - rp;
867        unsafe {
868            std::ptr::copy_nonoverlapping(
869                encoded.as_ptr().add(rp),
870                out_buf.as_mut_ptr().add(wp),
871                remaining,
872            );
873        }
874        wp += remaining;
875        out_buf[wp] = b'\n';
876        wp += 1;
877    }
878
879    wp
880}
881
882/// Fallback for very small wrap columns (< 4 chars).
883fn encode_wrapped_small(data: &[u8], wrap_col: usize, out: &mut impl Write) -> io::Result<()> {
884    let enc_max = BASE64_ENGINE.encoded_length(data.len());
885    let mut buf: Vec<u8> = Vec::with_capacity(enc_max);
886    #[allow(clippy::uninit_vec)]
887    unsafe {
888        buf.set_len(enc_max);
889    }
890    let encoded = BASE64_ENGINE.encode(data, buf[..enc_max].as_out());
891
892    let wc = wrap_col.max(1);
893    for line in encoded.chunks(wc) {
894        out.write_all(line)?;
895        out.write_all(b"\n")?;
896    }
897    Ok(())
898}
899
900/// Decode base64 data and write to output (borrows data, allocates clean buffer).
901/// When `ignore_garbage` is true, strip all non-base64 characters.
902/// When false, only strip whitespace (standard behavior).
903pub fn decode_to_writer(data: &[u8], ignore_garbage: bool, out: &mut impl Write) -> io::Result<()> {
904    if data.is_empty() {
905        return Ok(());
906    }
907
908    if ignore_garbage {
909        let mut cleaned = strip_non_base64(data);
910        return decode_clean_slice(&mut cleaned, out);
911    }
912
913    // For large data (>= 512KB): use bulk strip + single-shot decode.
914    // try_line_decode decodes per-line (~25ns overhead per 76-byte line call),
915    // while strip+decode uses SIMD gap-copy + single-shot SIMD decode at ~6.5 GB/s.
916    // For 10MB decode benchmark: ~2ms (bulk) vs ~4ms (per-line) = 2x faster.
917    // For small data (< 512KB): per-line decode avoids allocation overhead.
918    if data.len() < 512 * 1024 && data.len() >= 77 {
919        if let Some(result) = try_line_decode(data, out) {
920            return result;
921        }
922    }
923
924    // Fast path: single-pass SIMD strip + decode
925    decode_stripping_whitespace(data, out)
926}
927
928/// Decode base64 from a mutable buffer (MAP_PRIVATE mmap or owned Vec).
929/// Strips whitespace in-place using SIMD memchr2 gap-copy, then decodes
930/// in-place with base64_simd::decode_inplace. Zero additional allocations.
931///
932/// For MAP_PRIVATE mmap: the kernel uses COW semantics, so only pages
933/// containing whitespace (newlines) get physically copied (~1.3% for
934/// 76-char line base64). The decode writes to the same buffer, but decoded
935/// data is always shorter than encoded (3/4 ratio), so it fits in-place.
936pub fn decode_mmap_inplace(
937    data: &mut [u8],
938    ignore_garbage: bool,
939    out: &mut impl Write,
940) -> io::Result<()> {
941    if data.is_empty() {
942        return Ok(());
943    }
944
945    // For small data: try line-by-line decode (avoids COW page faults).
946    // For large data (>= 512KB): bulk strip+decode is faster than per-line decode.
947    if !ignore_garbage && data.len() >= 77 && data.len() < 512 * 1024 {
948        if let Some(result) = try_line_decode(data, out) {
949            return result;
950        }
951    }
952
953    if ignore_garbage {
954        // Strip non-base64 chars in-place
955        let ptr = data.as_mut_ptr();
956        let len = data.len();
957        let mut wp = 0;
958        for rp in 0..len {
959            let b = unsafe { *ptr.add(rp) };
960            if is_base64_char(b) {
961                unsafe { *ptr.add(wp) = b };
962                wp += 1;
963            }
964        }
965        match BASE64_ENGINE.decode_inplace(&mut data[..wp]) {
966            Ok(decoded) => return out.write_all(decoded),
967            Err(_) => return decode_error(),
968        }
969    }
970
971    // Fast path: uniform-line fused strip+decode (no intermediate buffer).
972    if data.len() >= 77 {
973        if let Some(result) = try_decode_uniform_lines(data, out) {
974            return result;
975        }
976    }
977
978    // Fallback: strip whitespace in-place using SIMD memchr2 gap-copy.
979
980    // Quick check: no newlines at all — maybe already clean
981    if memchr::memchr2(b'\n', b'\r', data).is_none() {
982        // Check for rare whitespace
983        if !data
984            .iter()
985            .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c)
986        {
987            // Perfectly clean — decode in-place directly
988            match BASE64_ENGINE.decode_inplace(data) {
989                Ok(decoded) => return out.write_all(decoded),
990                Err(_) => return decode_error(),
991            }
992        }
993        // Rare whitespace only — strip in-place
994        let ptr = data.as_mut_ptr();
995        let len = data.len();
996        let mut wp = 0;
997        for rp in 0..len {
998            let b = unsafe { *ptr.add(rp) };
999            if NOT_WHITESPACE[b as usize] {
1000                unsafe { *ptr.add(wp) = b };
1001                wp += 1;
1002            }
1003        }
1004        match BASE64_ENGINE.decode_inplace(&mut data[..wp]) {
1005            Ok(decoded) => return out.write_all(decoded),
1006            Err(_) => return decode_error(),
1007        }
1008    }
1009
1010    // SIMD gap-copy: strip \n and \r in-place using memchr2
1011    let ptr = data.as_mut_ptr();
1012    let len = data.len();
1013    let mut wp = 0usize;
1014    let mut gap_start = 0usize;
1015    let mut has_rare_ws = false;
1016
1017    // SAFETY: memchr2_iter reads from the original data. We write to positions
1018    // [0..wp] which are always <= gap_start, so we never overwrite unread data.
1019    for pos in memchr::memchr2_iter(b'\n', b'\r', data) {
1020        let gap_len = pos - gap_start;
1021        if gap_len > 0 {
1022            if !has_rare_ws {
1023                // Check for rare whitespace during the gap-copy
1024                has_rare_ws = unsafe {
1025                    std::slice::from_raw_parts(ptr.add(gap_start), gap_len)
1026                        .iter()
1027                        .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c)
1028                };
1029            }
1030            if wp != gap_start {
1031                unsafe { std::ptr::copy(ptr.add(gap_start), ptr.add(wp), gap_len) };
1032            }
1033            wp += gap_len;
1034        }
1035        gap_start = pos + 1;
1036    }
1037    // Final gap
1038    let tail_len = len - gap_start;
1039    if tail_len > 0 {
1040        if !has_rare_ws {
1041            has_rare_ws = unsafe {
1042                std::slice::from_raw_parts(ptr.add(gap_start), tail_len)
1043                    .iter()
1044                    .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c)
1045            };
1046        }
1047        if wp != gap_start {
1048            unsafe { std::ptr::copy(ptr.add(gap_start), ptr.add(wp), tail_len) };
1049        }
1050        wp += tail_len;
1051    }
1052
1053    // Second pass for rare whitespace if needed
1054    if has_rare_ws {
1055        let mut rp = 0;
1056        let mut cwp = 0;
1057        while rp < wp {
1058            let b = unsafe { *ptr.add(rp) };
1059            if NOT_WHITESPACE[b as usize] {
1060                unsafe { *ptr.add(cwp) = b };
1061                cwp += 1;
1062            }
1063            rp += 1;
1064        }
1065        wp = cwp;
1066    }
1067
1068    // Decode in-place: decoded data is always shorter than encoded (3/4 ratio)
1069    if wp >= PARALLEL_DECODE_THRESHOLD {
1070        // For large data, use parallel decode from the cleaned slice
1071        return decode_borrowed_clean_parallel(out, &data[..wp]);
1072    }
1073    match BASE64_ENGINE.decode_inplace(&mut data[..wp]) {
1074        Ok(decoded) => out.write_all(decoded),
1075        Err(_) => decode_error(),
1076    }
1077}
1078
1079/// Decode base64 from an owned Vec (in-place whitespace strip + decode).
1080pub fn decode_owned(
1081    data: &mut Vec<u8>,
1082    ignore_garbage: bool,
1083    out: &mut impl Write,
1084) -> io::Result<()> {
1085    if data.is_empty() {
1086        return Ok(());
1087    }
1088
1089    if ignore_garbage {
1090        data.retain(|&b| is_base64_char(b));
1091    } else {
1092        strip_whitespace_inplace(data);
1093    }
1094
1095    decode_clean_slice(data, out)
1096}
1097
1098/// Strip all whitespace from a Vec in-place using SIMD memchr2 gap-copy.
1099/// For typical base64 (76-char lines with \n), newlines are ~1/77 of the data,
1100/// so SIMD memchr2 skips ~76 bytes per hit instead of checking every byte.
1101/// Falls back to scalar compaction only for rare whitespace (tab, space, VT, FF).
1102fn strip_whitespace_inplace(data: &mut Vec<u8>) {
1103    // Quick check: skip stripping if no \n or \r in the data.
1104    // Uses SIMD memchr2 for fast scanning (~10 GB/s) instead of per-byte check.
1105    // For typical base64 (76-char lines), we'll find \n immediately and skip this.
1106    if memchr::memchr2(b'\n', b'\r', data).is_none() {
1107        // No newlines/CR — check for rare whitespace only
1108        if data
1109            .iter()
1110            .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c)
1111        {
1112            data.retain(|&b| NOT_WHITESPACE[b as usize]);
1113        }
1114        return;
1115    }
1116
1117    // SIMD gap-copy: find \n and \r positions with memchr2, then memmove the
1118    // gaps between them to compact the data in-place. For typical base64 streams,
1119    // newlines are the only whitespace, so this handles >99% of cases.
1120    let ptr = data.as_mut_ptr();
1121    let len = data.len();
1122    let mut wp = 0usize;
1123    let mut gap_start = 0usize;
1124    let mut has_rare_ws = false;
1125
1126    for pos in memchr::memchr2_iter(b'\n', b'\r', data.as_slice()) {
1127        let gap_len = pos - gap_start;
1128        if gap_len > 0 {
1129            if !has_rare_ws {
1130                // Check for rare whitespace during copy (amortized ~1 branch per 77 bytes)
1131                has_rare_ws = data[gap_start..pos]
1132                    .iter()
1133                    .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
1134            }
1135            if wp != gap_start {
1136                unsafe {
1137                    std::ptr::copy(ptr.add(gap_start), ptr.add(wp), gap_len);
1138                }
1139            }
1140            wp += gap_len;
1141        }
1142        gap_start = pos + 1;
1143    }
1144    // Copy the final gap
1145    let tail_len = len - gap_start;
1146    if tail_len > 0 {
1147        if !has_rare_ws {
1148            has_rare_ws = data[gap_start..]
1149                .iter()
1150                .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
1151        }
1152        if wp != gap_start {
1153            unsafe {
1154                std::ptr::copy(ptr.add(gap_start), ptr.add(wp), tail_len);
1155            }
1156        }
1157        wp += tail_len;
1158    }
1159
1160    data.truncate(wp);
1161
1162    // Second pass for rare whitespace (tab, space, VT, FF) — only if detected.
1163    // In typical base64 streams (76-char lines with \n), this is skipped entirely.
1164    if has_rare_ws {
1165        let ptr = data.as_mut_ptr();
1166        let len = data.len();
1167        let mut rp = 0;
1168        let mut cwp = 0;
1169        while rp < len {
1170            let b = unsafe { *ptr.add(rp) };
1171            if NOT_WHITESPACE[b as usize] {
1172                unsafe { *ptr.add(cwp) = b };
1173                cwp += 1;
1174            }
1175            rp += 1;
1176        }
1177        data.truncate(cwp);
1178    }
1179}
1180
1181/// 256-byte lookup table: true for non-whitespace bytes.
1182/// Used for single-pass whitespace stripping in decode.
1183static NOT_WHITESPACE: [bool; 256] = {
1184    let mut table = [true; 256];
1185    table[b' ' as usize] = false;
1186    table[b'\t' as usize] = false;
1187    table[b'\n' as usize] = false;
1188    table[b'\r' as usize] = false;
1189    table[0x0b] = false; // vertical tab
1190    table[0x0c] = false; // form feed
1191    table
1192};
1193
1194/// Fused strip+decode for uniform-line base64 data.
1195/// Detects consistent line length, then processes in sub-chunks: each sub-chunk
1196/// copies lines to a small local buffer (L2-hot) and decodes immediately.
1197/// Eliminates the large intermediate clean buffer (~12MB for 10MB decode).
1198/// Returns None if the data doesn't have uniform line structure.
1199fn try_decode_uniform_lines(data: &[u8], out: &mut impl Write) -> Option<io::Result<()>> {
1200    let first_nl = memchr::memchr(b'\n', data)?;
1201    let line_len = first_nl;
1202    if line_len == 0 || line_len % 4 != 0 {
1203        return None;
1204    }
1205
1206    let stride = line_len + 1;
1207
1208    // Verify the data has consistent line structure (first + last lines)
1209    let check_lines = 4.min(data.len() / stride);
1210    for i in 1..check_lines {
1211        let expected_nl = i * stride - 1;
1212        if expected_nl >= data.len() || data[expected_nl] != b'\n' {
1213            return None;
1214        }
1215    }
1216
1217    let full_lines = if data.len() >= stride {
1218        let candidate = data.len() / stride;
1219        if candidate > 0 && data[candidate * stride - 1] != b'\n' {
1220            return None;
1221        }
1222        candidate
1223    } else {
1224        0
1225    };
1226
1227    let remainder_start = full_lines * stride;
1228    let remainder = &data[remainder_start..];
1229    let rem_clean = if remainder.last() == Some(&b'\n') {
1230        &remainder[..remainder.len() - 1]
1231    } else {
1232        remainder
1233    };
1234
1235    // Compute exact decoded sizes
1236    let decoded_per_line = line_len * 3 / 4;
1237    let rem_decoded_size = if rem_clean.is_empty() {
1238        0
1239    } else {
1240        let pad = rem_clean
1241            .iter()
1242            .rev()
1243            .take(2)
1244            .filter(|&&b| b == b'=')
1245            .count();
1246        rem_clean.len() * 3 / 4 - pad
1247    };
1248    let total_decoded = full_lines * decoded_per_line + rem_decoded_size;
1249    let clean_len = full_lines * line_len;
1250
1251    // Parallel path: fused strip+decode with 128KB sub-chunks per thread.
1252    // Each thread copies lines to a thread-local buffer (L2-hot) and decodes immediately,
1253    // eliminating the 12MB+ intermediate clean buffer entirely.
1254    if clean_len >= PARALLEL_DECODE_THRESHOLD && num_cpus() > 1 {
1255        let mut output: Vec<u8> = Vec::with_capacity(total_decoded);
1256        #[allow(clippy::uninit_vec)]
1257        unsafe {
1258            output.set_len(total_decoded);
1259        }
1260        #[cfg(target_os = "linux")]
1261        hint_hugepage(&mut output);
1262
1263        let out_ptr = output.as_mut_ptr() as usize;
1264        let src_ptr = data.as_ptr() as usize;
1265        let num_threads = num_cpus().max(1);
1266        let lines_per_thread = (full_lines + num_threads - 1) / num_threads;
1267        // 512KB sub-chunks: larger chunks give SIMD decode more contiguous data,
1268        // reducing per-call overhead. 512KB fits in L2 cache (256KB-1MB typical).
1269        let lines_per_sub = (512 * 1024 / line_len).max(1);
1270
1271        let err_flag = std::sync::atomic::AtomicBool::new(false);
1272        rayon::scope(|s| {
1273            for t in 0..num_threads {
1274                let err_flag = &err_flag;
1275                s.spawn(move |_| {
1276                    let start_line = t * lines_per_thread;
1277                    if start_line >= full_lines {
1278                        return;
1279                    }
1280                    let end_line = (start_line + lines_per_thread).min(full_lines);
1281                    let chunk_lines = end_line - start_line;
1282
1283                    let sub_buf_size = lines_per_sub.min(chunk_lines) * line_len;
1284                    let mut local_buf: Vec<u8> = Vec::with_capacity(sub_buf_size);
1285                    #[allow(clippy::uninit_vec)]
1286                    unsafe {
1287                        local_buf.set_len(sub_buf_size);
1288                    }
1289
1290                    let src = src_ptr as *const u8;
1291                    let out_base = out_ptr as *mut u8;
1292                    let local_dst = local_buf.as_mut_ptr();
1293
1294                    let mut sub_start = 0usize;
1295                    while sub_start < chunk_lines {
1296                        if err_flag.load(std::sync::atomic::Ordering::Relaxed) {
1297                            return;
1298                        }
1299                        let sub_count = (chunk_lines - sub_start).min(lines_per_sub);
1300                        let sub_clean = sub_count * line_len;
1301
1302                        for i in 0..sub_count {
1303                            unsafe {
1304                                std::ptr::copy_nonoverlapping(
1305                                    src.add((start_line + sub_start + i) * stride),
1306                                    local_dst.add(i * line_len),
1307                                    line_len,
1308                                );
1309                            }
1310                        }
1311
1312                        let out_offset = (start_line + sub_start) * decoded_per_line;
1313                        let out_size = sub_count * decoded_per_line;
1314                        let out_slice = unsafe {
1315                            std::slice::from_raw_parts_mut(out_base.add(out_offset), out_size)
1316                        };
1317                        if BASE64_ENGINE
1318                            .decode(&local_buf[..sub_clean], out_slice.as_out())
1319                            .is_err()
1320                        {
1321                            err_flag.store(true, std::sync::atomic::Ordering::Relaxed);
1322                            return;
1323                        }
1324
1325                        sub_start += sub_count;
1326                    }
1327                });
1328            }
1329        });
1330        let result: Result<(), io::Error> = if err_flag.load(std::sync::atomic::Ordering::Relaxed) {
1331            Err(io::Error::new(io::ErrorKind::InvalidData, "invalid input"))
1332        } else {
1333            Ok(())
1334        };
1335
1336        if let Err(e) = result {
1337            return Some(Err(e));
1338        }
1339
1340        if !rem_clean.is_empty() {
1341            let rem_out = &mut output[full_lines * decoded_per_line..total_decoded];
1342            match BASE64_ENGINE.decode(rem_clean, rem_out.as_out()) {
1343                Ok(_) => {}
1344                Err(_) => return Some(decode_error()),
1345            }
1346        }
1347
1348        return Some(out.write_all(&output[..total_decoded]));
1349    }
1350
1351    // Sequential path: fused strip+decode in 256KB sub-chunks.
1352    // Larger sub-chunks give SIMD decode more data per call, improving throughput.
1353    // Uses decode_inplace on a small reusable buffer — no large allocations at all.
1354    let lines_per_sub = (256 * 1024 / line_len).max(1);
1355    let sub_buf_size = lines_per_sub * line_len;
1356    let mut local_buf: Vec<u8> = Vec::with_capacity(sub_buf_size);
1357    #[allow(clippy::uninit_vec)]
1358    unsafe {
1359        local_buf.set_len(sub_buf_size);
1360    }
1361
1362    let src = data.as_ptr();
1363    let local_dst = local_buf.as_mut_ptr();
1364
1365    let mut line_idx = 0usize;
1366    while line_idx < full_lines {
1367        let sub_count = (full_lines - line_idx).min(lines_per_sub);
1368        let sub_clean = sub_count * line_len;
1369
1370        for i in 0..sub_count {
1371            unsafe {
1372                std::ptr::copy_nonoverlapping(
1373                    src.add((line_idx + i) * stride),
1374                    local_dst.add(i * line_len),
1375                    line_len,
1376                );
1377            }
1378        }
1379
1380        match BASE64_ENGINE.decode_inplace(&mut local_buf[..sub_clean]) {
1381            Ok(decoded) => {
1382                if let Err(e) = out.write_all(decoded) {
1383                    return Some(Err(e));
1384                }
1385            }
1386            Err(_) => return Some(decode_error()),
1387        }
1388
1389        line_idx += sub_count;
1390    }
1391
1392    if !rem_clean.is_empty() {
1393        let mut rem_buf = rem_clean.to_vec();
1394        match BASE64_ENGINE.decode_inplace(&mut rem_buf) {
1395            Ok(decoded) => {
1396                if let Err(e) = out.write_all(decoded) {
1397                    return Some(Err(e));
1398                }
1399            }
1400            Err(_) => return Some(decode_error()),
1401        }
1402    }
1403
1404    Some(Ok(()))
1405}
1406
1407/// Decode by stripping whitespace and decoding in a single fused pass.
1408/// For data with no whitespace, decodes directly without any copy.
1409/// Detects uniform line structure for fast structured-copy (no search needed),
1410/// falls back to SIMD memchr2 gap-copy for irregular data.
1411fn decode_stripping_whitespace(data: &[u8], out: &mut impl Write) -> io::Result<()> {
1412    // Fast path for uniform-line base64 (e.g., standard 76-char lines + newline).
1413    // Copies at known offsets, avoiding the memchr2 search entirely.
1414    // For 13MB base64: saves ~1ms vs memchr2 gap-copy (just structured memcpy).
1415    if data.len() >= 77 {
1416        if let Some(result) = try_decode_uniform_lines(data, out) {
1417            return result;
1418        }
1419    }
1420
1421    // Quick check: skip stripping if no \n or \r in the data.
1422    // Uses SIMD memchr2 for fast scanning (~10 GB/s) instead of per-byte check.
1423    if memchr::memchr2(b'\n', b'\r', data).is_none() {
1424        // No newlines/CR — check for rare whitespace only
1425        if !data
1426            .iter()
1427            .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c)
1428        {
1429            return decode_borrowed_clean(out, data);
1430        }
1431        // Has rare whitespace only — strip and decode
1432        let mut cleaned: Vec<u8> = Vec::with_capacity(data.len());
1433        for &b in data {
1434            if NOT_WHITESPACE[b as usize] {
1435                cleaned.push(b);
1436            }
1437        }
1438        return decode_clean_slice(&mut cleaned, out);
1439    }
1440
1441    // SIMD gap-copy: use memchr2 to find \n and \r positions, then copy the
1442    // gaps between them. For typical base64 (76-char lines), newlines are ~1/77
1443    // of the data, so we process ~76 bytes per memchr hit instead of 1 per scalar.
1444    let mut clean: Vec<u8> = Vec::with_capacity(data.len());
1445    let dst = clean.as_mut_ptr();
1446    let mut wp = 0usize;
1447    let mut gap_start = 0usize;
1448    // Track whether any rare whitespace (tab, space, VT, FF) exists in gap regions.
1449    // This avoids the second full-scan pass when only \n/\r are present.
1450    let mut has_rare_ws = false;
1451
1452    for pos in memchr::memchr2_iter(b'\n', b'\r', data) {
1453        let gap_len = pos - gap_start;
1454        if gap_len > 0 {
1455            // Check gap region for rare whitespace during copy.
1456            // This adds ~1 branch per gap but eliminates the second full scan.
1457            if !has_rare_ws {
1458                has_rare_ws = data[gap_start..pos]
1459                    .iter()
1460                    .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
1461            }
1462            unsafe {
1463                std::ptr::copy_nonoverlapping(data.as_ptr().add(gap_start), dst.add(wp), gap_len);
1464            }
1465            wp += gap_len;
1466        }
1467        gap_start = pos + 1;
1468    }
1469    // Copy the final gap after the last \n/\r
1470    let tail_len = data.len() - gap_start;
1471    if tail_len > 0 {
1472        if !has_rare_ws {
1473            has_rare_ws = data[gap_start..]
1474                .iter()
1475                .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
1476        }
1477        unsafe {
1478            std::ptr::copy_nonoverlapping(data.as_ptr().add(gap_start), dst.add(wp), tail_len);
1479        }
1480        wp += tail_len;
1481    }
1482    unsafe {
1483        clean.set_len(wp);
1484    }
1485
1486    // Second pass for rare whitespace (tab, space, VT, FF) — only runs when needed.
1487    // In typical base64 streams (76-char lines with \n), this is skipped entirely.
1488    if has_rare_ws {
1489        let ptr = clean.as_mut_ptr();
1490        let len = clean.len();
1491        let mut rp = 0;
1492        let mut cwp = 0;
1493        while rp < len {
1494            let b = unsafe { *ptr.add(rp) };
1495            if NOT_WHITESPACE[b as usize] {
1496                unsafe { *ptr.add(cwp) = b };
1497                cwp += 1;
1498            }
1499            rp += 1;
1500        }
1501        clean.truncate(cwp);
1502    }
1503
1504    // For large data (>= threshold), use parallel decode for multi-core speedup.
1505    // For small data, use in-place decode to avoid extra allocation.
1506    if clean.len() >= PARALLEL_DECODE_THRESHOLD {
1507        decode_borrowed_clean_parallel(out, &clean)
1508    } else {
1509        decode_clean_slice(&mut clean, out)
1510    }
1511}
1512
1513/// Try to decode base64 data line-by-line, avoiding whitespace stripping.
1514/// Returns Some(result) if the data has uniform line lengths suitable for
1515/// per-line decode, or None if the data doesn't fit this pattern.
1516///
1517/// For standard 76-char-line base64 (wrap=76): each line is 76 encoded chars
1518/// + newline = 77 bytes. 76 chars = 19 groups of 4 = 57 decoded bytes per line.
1519/// We decode each line directly into its position in the output buffer.
1520fn try_line_decode(data: &[u8], out: &mut impl Write) -> Option<io::Result<()>> {
1521    // Find the first newline to determine line length
1522    let first_nl = memchr::memchr(b'\n', data)?;
1523    let line_len = first_nl; // encoded chars per line (without newline)
1524
1525    // Line length must be a multiple of 4 (complete base64 groups, no padding mid-stream)
1526    if line_len == 0 || line_len % 4 != 0 {
1527        return None;
1528    }
1529
1530    let line_stride = line_len + 1; // line_len chars + 1 newline byte
1531    let decoded_per_line = line_len * 3 / 4;
1532
1533    // Verify the data has a consistent line structure by checking the next few lines
1534    let check_lines = 4.min(data.len() / line_stride);
1535    for i in 1..check_lines {
1536        let expected_nl = i * line_stride - 1;
1537        if expected_nl >= data.len() {
1538            break;
1539        }
1540        if data[expected_nl] != b'\n' {
1541            return None; // Inconsistent line length
1542        }
1543    }
1544
1545    // Calculate full lines and remainder
1546    let full_lines = if data.len() >= line_stride {
1547        // Check how many complete lines fit
1548        let candidate = data.len() / line_stride;
1549        // Verify the last full line's newline
1550        if candidate > 0 && data[candidate * line_stride - 1] != b'\n' {
1551            return None; // Not a clean line-structured file
1552        }
1553        candidate
1554    } else {
1555        0
1556    };
1557
1558    let remainder_start = full_lines * line_stride;
1559    let remainder = &data[remainder_start..];
1560
1561    // Calculate exact output size
1562    let remainder_clean_len = if remainder.is_empty() {
1563        0
1564    } else {
1565        // Remainder might end with newline, strip it
1566        let rem = if remainder.last() == Some(&b'\n') {
1567            &remainder[..remainder.len() - 1]
1568        } else {
1569            remainder
1570        };
1571        if rem.is_empty() {
1572            0
1573        } else {
1574            // Check for padding
1575            let pad = rem.iter().rev().take(2).filter(|&&b| b == b'=').count();
1576            if rem.len() % 4 != 0 {
1577                return None; // Invalid remainder
1578            }
1579            rem.len() * 3 / 4 - pad
1580        }
1581    };
1582
1583    // Single-allocation decode: allocate full decoded output, decode all lines
1584    // directly into it, then write_all in one syscall. For 10MB base64 (7.5MB decoded),
1585    // this does 1 write() instead of ~30 chunked writes. The 7.5MB allocation is trivial
1586    // compared to the mmap'd input. SIMD decode at ~8 GB/s finishes in <1ms.
1587    let total_decoded = full_lines * decoded_per_line + remainder_clean_len;
1588    let mut out_buf: Vec<u8> = Vec::with_capacity(total_decoded);
1589    #[allow(clippy::uninit_vec)]
1590    unsafe {
1591        out_buf.set_len(total_decoded);
1592    }
1593
1594    let dst = out_buf.as_mut_ptr();
1595
1596    // Parallel line decode for large inputs (>= 4MB): split lines across threads.
1597    // Each thread decodes a contiguous block of lines directly to its final position
1598    // in the shared output buffer. SAFETY: non-overlapping output regions per thread.
1599    if data.len() >= PARALLEL_DECODE_THRESHOLD && full_lines >= 64 {
1600        let out_addr = dst as usize;
1601        let num_threads = num_cpus().max(1);
1602        let lines_per_chunk = (full_lines / num_threads).max(1);
1603
1604        // Build per-thread task ranges: (start_line, end_line)
1605        let mut tasks: Vec<(usize, usize)> = Vec::new();
1606        let mut line_off = 0;
1607        while line_off < full_lines {
1608            let end = (line_off + lines_per_chunk).min(full_lines);
1609            tasks.push((line_off, end));
1610            line_off = end;
1611        }
1612
1613        let decode_err = std::sync::atomic::AtomicBool::new(false);
1614        rayon::scope(|s| {
1615            for &(start_line, end_line) in &tasks {
1616                let decode_err = &decode_err;
1617                s.spawn(move |_| {
1618                    let out_ptr = out_addr as *mut u8;
1619                    let mut i = start_line;
1620
1621                    while i + 4 <= end_line {
1622                        if decode_err.load(std::sync::atomic::Ordering::Relaxed) {
1623                            return;
1624                        }
1625                        let in_base = i * line_stride;
1626                        let ob = i * decoded_per_line;
1627                        unsafe {
1628                            let s0 =
1629                                std::slice::from_raw_parts_mut(out_ptr.add(ob), decoded_per_line);
1630                            if BASE64_ENGINE
1631                                .decode(&data[in_base..in_base + line_len], s0.as_out())
1632                                .is_err()
1633                            {
1634                                decode_err.store(true, std::sync::atomic::Ordering::Relaxed);
1635                                return;
1636                            }
1637                            let s1 = std::slice::from_raw_parts_mut(
1638                                out_ptr.add(ob + decoded_per_line),
1639                                decoded_per_line,
1640                            );
1641                            if BASE64_ENGINE
1642                                .decode(
1643                                    &data[in_base + line_stride..in_base + line_stride + line_len],
1644                                    s1.as_out(),
1645                                )
1646                                .is_err()
1647                            {
1648                                decode_err.store(true, std::sync::atomic::Ordering::Relaxed);
1649                                return;
1650                            }
1651                            let s2 = std::slice::from_raw_parts_mut(
1652                                out_ptr.add(ob + 2 * decoded_per_line),
1653                                decoded_per_line,
1654                            );
1655                            if BASE64_ENGINE
1656                                .decode(
1657                                    &data[in_base + 2 * line_stride
1658                                        ..in_base + 2 * line_stride + line_len],
1659                                    s2.as_out(),
1660                                )
1661                                .is_err()
1662                            {
1663                                decode_err.store(true, std::sync::atomic::Ordering::Relaxed);
1664                                return;
1665                            }
1666                            let s3 = std::slice::from_raw_parts_mut(
1667                                out_ptr.add(ob + 3 * decoded_per_line),
1668                                decoded_per_line,
1669                            );
1670                            if BASE64_ENGINE
1671                                .decode(
1672                                    &data[in_base + 3 * line_stride
1673                                        ..in_base + 3 * line_stride + line_len],
1674                                    s3.as_out(),
1675                                )
1676                                .is_err()
1677                            {
1678                                decode_err.store(true, std::sync::atomic::Ordering::Relaxed);
1679                                return;
1680                            }
1681                        }
1682                        i += 4;
1683                    }
1684
1685                    while i < end_line {
1686                        if decode_err.load(std::sync::atomic::Ordering::Relaxed) {
1687                            return;
1688                        }
1689                        let in_start = i * line_stride;
1690                        let out_off = i * decoded_per_line;
1691                        let out_slice = unsafe {
1692                            std::slice::from_raw_parts_mut(out_ptr.add(out_off), decoded_per_line)
1693                        };
1694                        if BASE64_ENGINE
1695                            .decode(&data[in_start..in_start + line_len], out_slice.as_out())
1696                            .is_err()
1697                        {
1698                            decode_err.store(true, std::sync::atomic::Ordering::Relaxed);
1699                            return;
1700                        }
1701                        i += 1;
1702                    }
1703                });
1704            }
1705        });
1706
1707        if decode_err.load(std::sync::atomic::Ordering::Relaxed) {
1708            return Some(decode_error());
1709        }
1710    } else {
1711        // Sequential decode with 4x unrolling for smaller inputs
1712        let mut i = 0;
1713
1714        while i + 4 <= full_lines {
1715            let in_base = i * line_stride;
1716            let out_base = i * decoded_per_line;
1717            unsafe {
1718                let s0 = std::slice::from_raw_parts_mut(dst.add(out_base), decoded_per_line);
1719                if BASE64_ENGINE
1720                    .decode(&data[in_base..in_base + line_len], s0.as_out())
1721                    .is_err()
1722                {
1723                    return Some(decode_error());
1724                }
1725
1726                let s1 = std::slice::from_raw_parts_mut(
1727                    dst.add(out_base + decoded_per_line),
1728                    decoded_per_line,
1729                );
1730                if BASE64_ENGINE
1731                    .decode(
1732                        &data[in_base + line_stride..in_base + line_stride + line_len],
1733                        s1.as_out(),
1734                    )
1735                    .is_err()
1736                {
1737                    return Some(decode_error());
1738                }
1739
1740                let s2 = std::slice::from_raw_parts_mut(
1741                    dst.add(out_base + 2 * decoded_per_line),
1742                    decoded_per_line,
1743                );
1744                if BASE64_ENGINE
1745                    .decode(
1746                        &data[in_base + 2 * line_stride..in_base + 2 * line_stride + line_len],
1747                        s2.as_out(),
1748                    )
1749                    .is_err()
1750                {
1751                    return Some(decode_error());
1752                }
1753
1754                let s3 = std::slice::from_raw_parts_mut(
1755                    dst.add(out_base + 3 * decoded_per_line),
1756                    decoded_per_line,
1757                );
1758                if BASE64_ENGINE
1759                    .decode(
1760                        &data[in_base + 3 * line_stride..in_base + 3 * line_stride + line_len],
1761                        s3.as_out(),
1762                    )
1763                    .is_err()
1764                {
1765                    return Some(decode_error());
1766                }
1767            }
1768            i += 4;
1769        }
1770
1771        while i < full_lines {
1772            let in_start = i * line_stride;
1773            let in_end = in_start + line_len;
1774            let out_off = i * decoded_per_line;
1775            let out_slice =
1776                unsafe { std::slice::from_raw_parts_mut(dst.add(out_off), decoded_per_line) };
1777            match BASE64_ENGINE.decode(&data[in_start..in_end], out_slice.as_out()) {
1778                Ok(_) => {}
1779                Err(_) => return Some(decode_error()),
1780            }
1781            i += 1;
1782        }
1783    }
1784
1785    // Decode remainder
1786    if remainder_clean_len > 0 {
1787        let rem = if remainder.last() == Some(&b'\n') {
1788            &remainder[..remainder.len() - 1]
1789        } else {
1790            remainder
1791        };
1792        let out_off = full_lines * decoded_per_line;
1793        let out_slice =
1794            unsafe { std::slice::from_raw_parts_mut(dst.add(out_off), remainder_clean_len) };
1795        match BASE64_ENGINE.decode(rem, out_slice.as_out()) {
1796            Ok(_) => {}
1797            Err(_) => return Some(decode_error()),
1798        }
1799    }
1800
1801    // Single write_all for the entire decoded output
1802    Some(out.write_all(&out_buf[..total_decoded]))
1803}
1804
1805/// Decode a clean (no whitespace) buffer in-place with SIMD.
1806fn decode_clean_slice(data: &mut [u8], out: &mut impl Write) -> io::Result<()> {
1807    if data.is_empty() {
1808        return Ok(());
1809    }
1810    match BASE64_ENGINE.decode_inplace(data) {
1811        Ok(decoded) => out.write_all(decoded),
1812        Err(_) => decode_error(),
1813    }
1814}
1815
1816/// Cold error path — keeps hot decode path tight by moving error construction out of line.
1817#[cold]
1818#[inline(never)]
1819fn decode_error() -> io::Result<()> {
1820    Err(io::Error::new(io::ErrorKind::InvalidData, "invalid input"))
1821}
1822
1823/// Decode clean base64 data (no whitespace) from a borrowed slice.
1824fn decode_borrowed_clean(out: &mut impl Write, data: &[u8]) -> io::Result<()> {
1825    if data.is_empty() {
1826        return Ok(());
1827    }
1828    // Parallel decode for large data: split at 4-byte boundaries,
1829    // decode each chunk independently (base64 is context-free per 4-char group).
1830    if data.len() >= PARALLEL_DECODE_THRESHOLD {
1831        return decode_borrowed_clean_parallel(out, data);
1832    }
1833    // Pre-allocate exact output size to avoid decode_to_vec's reallocation.
1834    // Decoded size = data.len() * 3 / 4 minus padding.
1835    let pad = data.iter().rev().take(2).filter(|&&b| b == b'=').count();
1836    let decoded_size = data.len() * 3 / 4 - pad;
1837    let mut buf: Vec<u8> = Vec::with_capacity(decoded_size);
1838    #[allow(clippy::uninit_vec)]
1839    unsafe {
1840        buf.set_len(decoded_size);
1841    }
1842    match BASE64_ENGINE.decode(data, buf[..decoded_size].as_out()) {
1843        Ok(decoded) => {
1844            out.write_all(decoded)?;
1845            Ok(())
1846        }
1847        Err(_) => decode_error(),
1848    }
1849}
1850
1851/// Parallel decode: split at 4-byte boundaries, decode chunks in parallel.
1852/// Pre-allocates a single contiguous output buffer with exact decoded offsets computed
1853/// upfront, so each thread decodes directly to its final position. No compaction needed.
1854fn decode_borrowed_clean_parallel(out: &mut impl Write, data: &[u8]) -> io::Result<()> {
1855    let num_threads = num_cpus().max(1);
1856    let raw_chunk = data.len() / num_threads;
1857    // Align to 4 bytes (each 4 base64 chars = 3 decoded bytes, context-free)
1858    let chunk_size = ((raw_chunk + 3) / 4) * 4;
1859
1860    let chunks: Vec<&[u8]> = data.chunks(chunk_size.max(4)).collect();
1861
1862    // Compute exact decoded sizes per chunk upfront to eliminate the compaction pass.
1863    let mut offsets: Vec<usize> = Vec::with_capacity(chunks.len() + 1);
1864    offsets.push(0);
1865    let mut total_decoded = 0usize;
1866    for (i, chunk) in chunks.iter().enumerate() {
1867        let decoded_size = if i == chunks.len() - 1 {
1868            let pad = chunk.iter().rev().take(2).filter(|&&b| b == b'=').count();
1869            chunk.len() * 3 / 4 - pad
1870        } else {
1871            chunk.len() * 3 / 4
1872        };
1873        total_decoded += decoded_size;
1874        offsets.push(total_decoded);
1875    }
1876
1877    let mut output_buf: Vec<u8> = Vec::with_capacity(total_decoded);
1878    #[allow(clippy::uninit_vec)]
1879    unsafe {
1880        output_buf.set_len(total_decoded);
1881    }
1882    #[cfg(target_os = "linux")]
1883    hint_hugepage(&mut output_buf);
1884
1885    // Parallel decode: each thread decodes directly into its exact final position.
1886    // SAFETY: each thread writes to a non-overlapping region of the output buffer.
1887    let out_addr = output_buf.as_mut_ptr() as usize;
1888    let err_flag = std::sync::atomic::AtomicBool::new(false);
1889    rayon::scope(|s| {
1890        for (i, chunk) in chunks.iter().enumerate() {
1891            let offset = offsets[i];
1892            let expected_size = offsets[i + 1] - offset;
1893            let err_flag = &err_flag;
1894            s.spawn(move |_| {
1895                if err_flag.load(std::sync::atomic::Ordering::Relaxed) {
1896                    return;
1897                }
1898                // SAFETY: each thread writes to non-overlapping region
1899                let out_slice = unsafe {
1900                    std::slice::from_raw_parts_mut((out_addr as *mut u8).add(offset), expected_size)
1901                };
1902                if BASE64_ENGINE.decode(chunk, out_slice.as_out()).is_err() {
1903                    err_flag.store(true, std::sync::atomic::Ordering::Relaxed);
1904                }
1905            });
1906        }
1907    });
1908
1909    if err_flag.load(std::sync::atomic::Ordering::Relaxed) {
1910        return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid input"));
1911    }
1912
1913    out.write_all(&output_buf[..total_decoded])
1914}
1915
1916/// Strip non-base64 characters (for -i / --ignore-garbage).
1917fn strip_non_base64(data: &[u8]) -> Vec<u8> {
1918    data.iter()
1919        .copied()
1920        .filter(|&b| is_base64_char(b))
1921        .collect()
1922}
1923
1924/// Check if a byte is a valid base64 alphabet character or padding.
1925#[inline]
1926fn is_base64_char(b: u8) -> bool {
1927    b.is_ascii_alphanumeric() || b == b'+' || b == b'/' || b == b'='
1928}
1929
1930/// Stream-encode from a reader to a writer. Used for stdin processing.
1931/// Dispatches to specialized paths for wrap_col=0 (no wrap) and wrap_col>0 (wrapping).
1932pub fn encode_stream(
1933    reader: &mut impl Read,
1934    wrap_col: usize,
1935    writer: &mut impl Write,
1936) -> io::Result<()> {
1937    if wrap_col == 0 {
1938        return encode_stream_nowrap(reader, writer);
1939    }
1940    encode_stream_wrapped(reader, wrap_col, writer)
1941}
1942
1943/// Streaming encode with NO line wrapping — optimized fast path.
1944/// Read size is 24MB (divisible by 3): encoded output = 24MB * 4/3 = 32MB.
1945/// 24MB reads mean 10-18MB input is consumed in a single read() call,
1946/// and the encoded output writes in 1-2 write() calls.
1947fn encode_stream_nowrap(reader: &mut impl Read, writer: &mut impl Write) -> io::Result<()> {
1948    // 24MB aligned to 3 bytes: 24MB reads handle up to 24MB input in one pass.
1949    const NOWRAP_READ: usize = 24 * 1024 * 1024; // exactly divisible by 3
1950
1951    // SAFETY: buf bytes are written by read_full before being processed.
1952    // encode_buf bytes are written by encode before being read.
1953    let mut buf: Vec<u8> = Vec::with_capacity(NOWRAP_READ);
1954    #[allow(clippy::uninit_vec)]
1955    unsafe {
1956        buf.set_len(NOWRAP_READ);
1957    }
1958    let encode_buf_size = BASE64_ENGINE.encoded_length(NOWRAP_READ);
1959    let mut encode_buf: Vec<u8> = Vec::with_capacity(encode_buf_size);
1960    #[allow(clippy::uninit_vec)]
1961    unsafe {
1962        encode_buf.set_len(encode_buf_size);
1963    }
1964
1965    loop {
1966        let n = read_full(reader, &mut buf)?;
1967        if n == 0 {
1968            break;
1969        }
1970        let enc_len = BASE64_ENGINE.encoded_length(n);
1971        let encoded = BASE64_ENGINE.encode(&buf[..n], encode_buf[..enc_len].as_out());
1972        writer.write_all(encoded)?;
1973    }
1974    Ok(())
1975}
1976
1977/// Streaming encode WITH line wrapping.
1978/// For the common case (wrap_col divides evenly into 3-byte input groups),
1979/// uses fuse_wrap to build a contiguous output buffer with newlines interleaved,
1980/// then writes it in a single write() call. This eliminates the overhead of
1981/// many writev() syscalls (one per ~512 lines via IoSlice).
1982///
1983/// For non-aligned wrap columns, falls back to the IoSlice/writev approach.
1984fn encode_stream_wrapped(
1985    reader: &mut impl Read,
1986    wrap_col: usize,
1987    writer: &mut impl Write,
1988) -> io::Result<()> {
1989    let bytes_per_line = wrap_col * 3 / 4;
1990    // For the common case (76-col wrapping, bytes_per_line=57 which is divisible by 3),
1991    // align the read buffer to bytes_per_line boundaries so each chunk produces
1992    // complete lines with no column carry-over between chunks.
1993    if bytes_per_line > 0 && bytes_per_line.is_multiple_of(3) {
1994        return encode_stream_wrapped_fused(reader, wrap_col, bytes_per_line, writer);
1995    }
1996
1997    // Fallback: non-aligned wrap columns use IoSlice/writev with column tracking
1998    const STREAM_READ: usize = 12 * 1024 * 1024;
1999    let mut buf: Vec<u8> = Vec::with_capacity(STREAM_READ);
2000    #[allow(clippy::uninit_vec)]
2001    unsafe {
2002        buf.set_len(STREAM_READ);
2003    }
2004    let encode_buf_size = BASE64_ENGINE.encoded_length(STREAM_READ);
2005    let mut encode_buf: Vec<u8> = Vec::with_capacity(encode_buf_size);
2006    #[allow(clippy::uninit_vec)]
2007    unsafe {
2008        encode_buf.set_len(encode_buf_size);
2009    }
2010
2011    let mut col = 0usize;
2012
2013    loop {
2014        let n = read_full(reader, &mut buf)?;
2015        if n == 0 {
2016            break;
2017        }
2018        let enc_len = BASE64_ENGINE.encoded_length(n);
2019        let encoded = BASE64_ENGINE.encode(&buf[..n], encode_buf[..enc_len].as_out());
2020
2021        write_wrapped_iov_streaming(encoded, wrap_col, &mut col, writer)?;
2022    }
2023
2024    if col > 0 {
2025        writer.write_all(b"\n")?;
2026    }
2027
2028    Ok(())
2029}
2030
2031/// Direct-to-position encode+wrap streaming: align reads to bytes_per_line boundaries,
2032/// encode each line directly into its final position with newline appended.
2033/// Eliminates the two-pass encode-then-fuse_wrap approach.
2034/// For 76-col wrapping (bytes_per_line=57): 12MB / 57 = ~210K complete lines per chunk.
2035/// Output = 210K * 77 bytes = ~16MB, one write() syscall per chunk.
2036fn encode_stream_wrapped_fused(
2037    reader: &mut impl Read,
2038    wrap_col: usize,
2039    bytes_per_line: usize,
2040    writer: &mut impl Write,
2041) -> io::Result<()> {
2042    // Align read size to bytes_per_line for complete output lines per chunk.
2043    // ~420K lines * 57 bytes = ~24MB input, ~32MB output.
2044    let lines_per_chunk = (24 * 1024 * 1024) / bytes_per_line;
2045    let read_size = lines_per_chunk * bytes_per_line;
2046    let line_out = wrap_col + 1; // wrap_col encoded bytes + 1 newline
2047
2048    // SAFETY: buf bytes are written by read_full before being processed.
2049    // out_buf bytes are written by encode before being read.
2050    let mut buf: Vec<u8> = Vec::with_capacity(read_size);
2051    #[allow(clippy::uninit_vec)]
2052    unsafe {
2053        buf.set_len(read_size);
2054    }
2055    // Output buffer: enough for all lines + remainder
2056    let max_output = lines_per_chunk * line_out + BASE64_ENGINE.encoded_length(bytes_per_line) + 2;
2057    let mut out_buf: Vec<u8> = Vec::with_capacity(max_output);
2058    #[allow(clippy::uninit_vec)]
2059    unsafe {
2060        out_buf.set_len(max_output);
2061    }
2062
2063    loop {
2064        let n = read_full(reader, &mut buf)?;
2065        if n == 0 {
2066            break;
2067        }
2068
2069        let full_lines = n / bytes_per_line;
2070        let remainder = n % bytes_per_line;
2071
2072        // Encode each input line directly into its final output position.
2073        // Each 57-byte input line -> 76 encoded bytes + '\n' = 77 bytes at offset line_idx * 77.
2074        // This eliminates the separate encode + fuse_wrap copy entirely.
2075        let dst = out_buf.as_mut_ptr();
2076        let mut line_idx = 0;
2077
2078        // 4-line unrolled loop for better ILP
2079        while line_idx + 4 <= full_lines {
2080            let in_base = line_idx * bytes_per_line;
2081            let out_base = line_idx * line_out;
2082            unsafe {
2083                let s0 = std::slice::from_raw_parts_mut(dst.add(out_base), wrap_col);
2084                let _ = BASE64_ENGINE.encode(&buf[in_base..in_base + bytes_per_line], s0.as_out());
2085                *dst.add(out_base + wrap_col) = b'\n';
2086
2087                let s1 = std::slice::from_raw_parts_mut(dst.add(out_base + line_out), wrap_col);
2088                let _ = BASE64_ENGINE.encode(
2089                    &buf[in_base + bytes_per_line..in_base + 2 * bytes_per_line],
2090                    s1.as_out(),
2091                );
2092                *dst.add(out_base + line_out + wrap_col) = b'\n';
2093
2094                let s2 = std::slice::from_raw_parts_mut(dst.add(out_base + 2 * line_out), wrap_col);
2095                let _ = BASE64_ENGINE.encode(
2096                    &buf[in_base + 2 * bytes_per_line..in_base + 3 * bytes_per_line],
2097                    s2.as_out(),
2098                );
2099                *dst.add(out_base + 2 * line_out + wrap_col) = b'\n';
2100
2101                let s3 = std::slice::from_raw_parts_mut(dst.add(out_base + 3 * line_out), wrap_col);
2102                let _ = BASE64_ENGINE.encode(
2103                    &buf[in_base + 3 * bytes_per_line..in_base + 4 * bytes_per_line],
2104                    s3.as_out(),
2105                );
2106                *dst.add(out_base + 3 * line_out + wrap_col) = b'\n';
2107            }
2108            line_idx += 4;
2109        }
2110
2111        // Remaining full lines
2112        while line_idx < full_lines {
2113            let in_base = line_idx * bytes_per_line;
2114            let out_base = line_idx * line_out;
2115            unsafe {
2116                let s = std::slice::from_raw_parts_mut(dst.add(out_base), wrap_col);
2117                let _ = BASE64_ENGINE.encode(&buf[in_base..in_base + bytes_per_line], s.as_out());
2118                *dst.add(out_base + wrap_col) = b'\n';
2119            }
2120            line_idx += 1;
2121        }
2122
2123        let mut wp = full_lines * line_out;
2124
2125        // Handle remainder (partial last line of this chunk)
2126        if remainder > 0 {
2127            let enc_len = BASE64_ENGINE.encoded_length(remainder);
2128            let line_input = &buf[full_lines * bytes_per_line..n];
2129            unsafe {
2130                let s = std::slice::from_raw_parts_mut(dst.add(wp), enc_len);
2131                let _ = BASE64_ENGINE.encode(line_input, s.as_out());
2132                *dst.add(wp + enc_len) = b'\n';
2133            }
2134            wp += enc_len + 1;
2135        }
2136
2137        writer.write_all(&out_buf[..wp])?;
2138    }
2139
2140    Ok(())
2141}
2142
2143/// Stream-decode from a reader to a writer. Used for stdin processing.
2144/// In-place strip + decode: read chunk -> strip whitespace in-place in read buffer
2145/// -> decode in-place -> write. Eliminates separate clean buffer allocation (saves 32MB).
2146/// Uses 32MB read buffer for maximum pipe throughput — read_full retries to
2147/// fill the entire buffer from the pipe, and 32MB means even large inputs
2148/// (up to ~24MB after base64 encoding of 18MB raw) are read in a single syscall batch.
2149pub fn decode_stream(
2150    reader: &mut impl Read,
2151    ignore_garbage: bool,
2152    writer: &mut impl Write,
2153) -> io::Result<()> {
2154    const READ_CHUNK: usize = 32 * 1024 * 1024;
2155    // SAFETY: buf bytes are written by read_full before being processed.
2156    // The extra 4 bytes accommodate carry-over from previous chunk.
2157    let mut buf: Vec<u8> = Vec::with_capacity(READ_CHUNK + 4);
2158    #[allow(clippy::uninit_vec)]
2159    unsafe {
2160        buf.set_len(READ_CHUNK + 4);
2161    }
2162    let mut carry = [0u8; 4];
2163    let mut carry_len = 0usize;
2164
2165    loop {
2166        // Copy carry bytes to start of buffer, read new data after them
2167        if carry_len > 0 {
2168            unsafe {
2169                std::ptr::copy_nonoverlapping(carry.as_ptr(), buf.as_mut_ptr(), carry_len);
2170            }
2171        }
2172        let n = read_full(reader, &mut buf[carry_len..carry_len + READ_CHUNK])?;
2173        if n == 0 {
2174            break;
2175        }
2176        let total_raw = carry_len + n;
2177
2178        // Strip whitespace in-place in the buffer itself.
2179        // This eliminates the separate clean buffer allocation (saves 16MB).
2180        let clean_len = if ignore_garbage {
2181            // Scalar filter for ignore_garbage mode (rare path)
2182            let ptr = buf.as_mut_ptr();
2183            let mut wp = 0usize;
2184            for i in 0..total_raw {
2185                let b = unsafe { *ptr.add(i) };
2186                if is_base64_char(b) {
2187                    unsafe { *ptr.add(wp) = b };
2188                    wp += 1;
2189                }
2190            }
2191            wp
2192        } else {
2193            // In-place SIMD gap-copy using memchr2 to find \n and \r positions.
2194            // For typical base64 (76-char lines), newlines are ~1/77 of the data,
2195            // so we process ~76 bytes per memchr hit.
2196            let ptr = buf.as_mut_ptr();
2197            let data = &buf[..total_raw];
2198            let mut wp = 0usize;
2199            let mut gap_start = 0usize;
2200            let mut has_rare_ws = false;
2201
2202            for pos in memchr::memchr2_iter(b'\n', b'\r', data) {
2203                let gap_len = pos - gap_start;
2204                if gap_len > 0 {
2205                    if !has_rare_ws {
2206                        has_rare_ws = data[gap_start..pos]
2207                            .iter()
2208                            .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
2209                    }
2210                    if wp != gap_start {
2211                        unsafe {
2212                            std::ptr::copy(ptr.add(gap_start), ptr.add(wp), gap_len);
2213                        }
2214                    }
2215                    wp += gap_len;
2216                }
2217                gap_start = pos + 1;
2218            }
2219            let tail_len = total_raw - gap_start;
2220            if tail_len > 0 {
2221                if !has_rare_ws {
2222                    has_rare_ws = data[gap_start..total_raw]
2223                        .iter()
2224                        .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
2225                }
2226                if wp != gap_start {
2227                    unsafe {
2228                        std::ptr::copy(ptr.add(gap_start), ptr.add(wp), tail_len);
2229                    }
2230                }
2231                wp += tail_len;
2232            }
2233
2234            // Second pass for rare whitespace (tab, space, VT, FF) — only when detected.
2235            if has_rare_ws {
2236                let mut rp = 0;
2237                let mut cwp = 0;
2238                while rp < wp {
2239                    let b = unsafe { *ptr.add(rp) };
2240                    if NOT_WHITESPACE[b as usize] {
2241                        unsafe { *ptr.add(cwp) = b };
2242                        cwp += 1;
2243                    }
2244                    rp += 1;
2245                }
2246                cwp
2247            } else {
2248                wp
2249            }
2250        };
2251
2252        carry_len = 0;
2253        let is_last = n < READ_CHUNK;
2254
2255        if is_last {
2256            // Last chunk: decode everything (including padding)
2257            decode_clean_slice(&mut buf[..clean_len], writer)?;
2258        } else {
2259            // Save incomplete base64 quadruplet for next iteration
2260            let decode_len = (clean_len / 4) * 4;
2261            let leftover = clean_len - decode_len;
2262            if leftover > 0 {
2263                unsafe {
2264                    std::ptr::copy_nonoverlapping(
2265                        buf.as_ptr().add(decode_len),
2266                        carry.as_mut_ptr(),
2267                        leftover,
2268                    );
2269                }
2270                carry_len = leftover;
2271            }
2272            if decode_len > 0 {
2273                decode_clean_slice(&mut buf[..decode_len], writer)?;
2274            }
2275        }
2276    }
2277
2278    // Handle any remaining carry-over bytes
2279    if carry_len > 0 {
2280        let mut carry_buf = carry[..carry_len].to_vec();
2281        decode_clean_slice(&mut carry_buf, writer)?;
2282    }
2283
2284    Ok(())
2285}
2286
2287/// Write all IoSlice entries using write_vectored (writev syscall).
2288/// Hot path: single write_vectored succeeds fully (common on Linux pipes/files).
2289/// Cold path: partial write handled out-of-line to keep hot path tight.
2290#[inline(always)]
2291fn write_all_vectored(out: &mut impl Write, slices: &[io::IoSlice]) -> io::Result<()> {
2292    if slices.is_empty() {
2293        return Ok(());
2294    }
2295    let total: usize = slices.iter().map(|s| s.len()).sum();
2296    let written = out.write_vectored(slices)?;
2297    if written >= total {
2298        return Ok(());
2299    }
2300    if written == 0 {
2301        return Err(io::Error::new(io::ErrorKind::WriteZero, "write zero"));
2302    }
2303    write_all_vectored_slow(out, slices, written)
2304}
2305
2306/// Handle partial write (cold path, never inlined).
2307#[cold]
2308#[inline(never)]
2309fn write_all_vectored_slow(
2310    out: &mut impl Write,
2311    slices: &[io::IoSlice],
2312    mut skip: usize,
2313) -> io::Result<()> {
2314    for slice in slices {
2315        let len = slice.len();
2316        if skip >= len {
2317            skip -= len;
2318            continue;
2319        }
2320        out.write_all(&slice[skip..])?;
2321        skip = 0;
2322    }
2323    Ok(())
2324}
2325
2326/// Read as many bytes as possible into buf, retrying on partial reads.
2327/// Fast path: regular file reads usually return the full buffer on the first call,
2328/// avoiding the loop overhead entirely.
2329#[inline]
2330fn read_full(reader: &mut impl Read, buf: &mut [u8]) -> io::Result<usize> {
2331    // Fast path: first read() usually fills the entire buffer for regular files
2332    let n = reader.read(buf)?;
2333    if n == buf.len() || n == 0 {
2334        return Ok(n);
2335    }
2336    // Slow path: partial read — retry to fill buffer (pipes, slow devices)
2337    let mut total = n;
2338    while total < buf.len() {
2339        match reader.read(&mut buf[total..]) {
2340            Ok(0) => break,
2341            Ok(n) => total += n,
2342            Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
2343            Err(e) => return Err(e),
2344        }
2345    }
2346    Ok(total)
2347}