1use std::io::{self, Read, Write};
2
3use base64_simd::AsOut;
4
5const BASE64_ENGINE: &base64_simd::Base64 = &base64_simd::STANDARD;
6
7#[inline]
11fn num_cpus() -> usize {
12 std::thread::available_parallelism()
13 .map(|n| n.get())
14 .unwrap_or(1)
15}
16
17const NOWRAP_CHUNK: usize = 8 * 1024 * 1024 - (8 * 1024 * 1024 % 3);
21
22const PARALLEL_NOWRAP_THRESHOLD: usize = 16 * 1024 * 1024;
29
30const PARALLEL_WRAPPED_THRESHOLD: usize = 12 * 1024 * 1024;
35
36const PARALLEL_DECODE_THRESHOLD: usize = 1024 * 1024;
41
42#[cfg(target_os = "linux")]
46fn hint_hugepage(buf: &mut Vec<u8>) {
47 if buf.capacity() >= 2 * 1024 * 1024 {
48 unsafe {
49 libc::madvise(
50 buf.as_mut_ptr() as *mut libc::c_void,
51 buf.capacity(),
52 libc::MADV_HUGEPAGE,
53 );
54 }
55 }
56}
57
58pub fn encode_to_writer(data: &[u8], wrap_col: usize, out: &mut impl Write) -> io::Result<()> {
61 if data.is_empty() {
62 return Ok(());
63 }
64
65 if wrap_col == 0 {
66 return encode_no_wrap(data, out);
67 }
68
69 encode_wrapped(data, wrap_col, out)
70}
71
72fn encode_no_wrap(data: &[u8], out: &mut impl Write) -> io::Result<()> {
74 if data.len() >= PARALLEL_NOWRAP_THRESHOLD && num_cpus() > 1 {
75 return encode_no_wrap_parallel(data, out);
76 }
77
78 let enc_len = BASE64_ENGINE.encoded_length(data.len().min(NOWRAP_CHUNK));
81 let mut buf: Vec<u8> = Vec::with_capacity(enc_len);
82 #[allow(clippy::uninit_vec)]
83 unsafe {
84 buf.set_len(enc_len);
85 }
86
87 for chunk in data.chunks(NOWRAP_CHUNK) {
88 let clen = BASE64_ENGINE.encoded_length(chunk.len());
89 let encoded = BASE64_ENGINE.encode(chunk, buf[..clen].as_out());
90 out.write_all(encoded)?;
91 }
92 Ok(())
93}
94
95fn encode_no_wrap_parallel(data: &[u8], out: &mut impl Write) -> io::Result<()> {
100 let num_threads = num_cpus().max(1);
101 let raw_chunk = data.len() / num_threads;
102 let chunk_size = ((raw_chunk + 2) / 3) * 3;
104
105 let chunks: Vec<&[u8]> = data.chunks(chunk_size.max(3)).collect();
107
108 let mut offsets: Vec<usize> = Vec::with_capacity(chunks.len() + 1);
110 let mut total_out = 0usize;
111 for chunk in &chunks {
112 offsets.push(total_out);
113 total_out += BASE64_ENGINE.encoded_length(chunk.len());
114 }
115
116 let mut output: Vec<u8> = Vec::with_capacity(total_out);
118 #[allow(clippy::uninit_vec)]
119 unsafe {
120 output.set_len(total_out);
121 }
122 #[cfg(target_os = "linux")]
123 hint_hugepage(&mut output);
124
125 let output_base = output.as_mut_ptr() as usize;
127 rayon::scope(|s| {
128 for (i, chunk) in chunks.iter().enumerate() {
129 let out_off = offsets[i];
130 let enc_len = BASE64_ENGINE.encoded_length(chunk.len());
131 let base = output_base;
132 s.spawn(move |_| {
133 let dest =
134 unsafe { std::slice::from_raw_parts_mut((base + out_off) as *mut u8, enc_len) };
135 let _ = BASE64_ENGINE.encode(chunk, dest.as_out());
136 });
137 }
138 });
139
140 out.write_all(&output[..total_out])
141}
142
143fn encode_wrapped(data: &[u8], wrap_col: usize, out: &mut impl Write) -> io::Result<()> {
152 let bytes_per_line = wrap_col * 3 / 4;
153 if bytes_per_line == 0 {
154 return encode_wrapped_small(data, wrap_col, out);
155 }
156
157 if data.len() >= PARALLEL_WRAPPED_THRESHOLD && bytes_per_line.is_multiple_of(3) {
158 return encode_wrapped_parallel(data, wrap_col, bytes_per_line, out);
159 }
160
161 if bytes_per_line.is_multiple_of(3) {
162 return encode_wrapped_expand(data, wrap_col, bytes_per_line, out);
163 }
164
165 let enc_max = BASE64_ENGINE.encoded_length(data.len());
167 let num_full = enc_max / wrap_col;
168 let rem = enc_max % wrap_col;
169 let out_len = num_full * (wrap_col + 1) + if rem > 0 { rem + 1 } else { 0 };
170
171 let mut enc_buf: Vec<u8> = Vec::with_capacity(enc_max);
173 #[allow(clippy::uninit_vec)]
174 unsafe {
175 enc_buf.set_len(enc_max);
176 }
177 let _ = BASE64_ENGINE.encode(data, enc_buf[..enc_max].as_out());
178
179 let mut out_buf: Vec<u8> = Vec::with_capacity(out_len);
180 #[allow(clippy::uninit_vec)]
181 unsafe {
182 out_buf.set_len(out_len);
183 }
184 let n = fuse_wrap(&enc_buf, wrap_col, &mut out_buf);
185 out.write_all(&out_buf[..n])
186}
187
188fn encode_wrapped_expand(
194 data: &[u8],
195 wrap_col: usize,
196 bytes_per_line: usize,
197 out: &mut impl Write,
198) -> io::Result<()> {
199 debug_assert!(bytes_per_line.is_multiple_of(3));
200 let enc_len = BASE64_ENGINE.encoded_length(data.len());
201 if enc_len == 0 {
202 return Ok(());
203 }
204
205 let num_full = enc_len / wrap_col;
206 let rem = enc_len % wrap_col;
207 let out_len = num_full * (wrap_col + 1) + if rem > 0 { rem + 1 } else { 0 };
208
209 let mut buf: Vec<u8> = Vec::with_capacity(out_len);
213 #[allow(clippy::uninit_vec)]
214 unsafe {
215 buf.set_len(out_len);
216 }
217 #[cfg(target_os = "linux")]
218 hint_hugepage(&mut buf);
219
220 let encoded = BASE64_ENGINE.encode(data, buf[..enc_len].as_out());
222 debug_assert_eq!(encoded.len(), enc_len, "encode wrote unexpected length");
223
224 expand_backward(buf.as_mut_ptr(), enc_len, out_len, wrap_col);
226
227 out.write_all(&buf[..out_len])
228}
229
230#[allow(dead_code)]
242fn encode_wrapped_scatter(
243 data: &[u8],
244 wrap_col: usize,
245 bytes_per_line: usize,
246 out: &mut impl Write,
247) -> io::Result<()> {
248 let enc_len = BASE64_ENGINE.encoded_length(data.len());
249 if enc_len == 0 {
250 return Ok(());
251 }
252
253 let num_full = enc_len / wrap_col;
254 let rem = enc_len % wrap_col;
255 let out_len = num_full * (wrap_col + 1) + if rem > 0 { rem + 1 } else { 0 };
256
257 let mut buf: Vec<u8> = Vec::with_capacity(out_len);
259 #[allow(clippy::uninit_vec)]
260 unsafe {
261 buf.set_len(out_len);
262 }
263 #[cfg(target_os = "linux")]
264 hint_hugepage(&mut buf);
265
266 const GROUP_LINES: usize = 256;
269 let group_input = GROUP_LINES * bytes_per_line;
270 let temp_size = GROUP_LINES * wrap_col;
271 let mut temp: Vec<u8> = Vec::with_capacity(temp_size);
272 #[allow(clippy::uninit_vec)]
273 unsafe {
274 temp.set_len(temp_size);
275 }
276
277 let line_out = wrap_col + 1;
278 let mut wp = 0usize; for chunk in data.chunks(group_input) {
281 let clen = BASE64_ENGINE.encoded_length(chunk.len());
282 let _ = BASE64_ENGINE.encode(chunk, temp[..clen].as_out());
283
284 let lines = clen / wrap_col;
286 let chunk_rem = clen % wrap_col;
287
288 let mut i = 0;
290 while i + 8 <= lines {
291 unsafe {
292 let src = temp.as_ptr().add(i * wrap_col);
293 let dst = buf.as_mut_ptr().add(wp);
294 std::ptr::copy_nonoverlapping(src, dst, wrap_col);
295 *dst.add(wrap_col) = b'\n';
296 std::ptr::copy_nonoverlapping(src.add(wrap_col), dst.add(line_out), wrap_col);
297 *dst.add(line_out + wrap_col) = b'\n';
298 std::ptr::copy_nonoverlapping(
299 src.add(2 * wrap_col),
300 dst.add(2 * line_out),
301 wrap_col,
302 );
303 *dst.add(2 * line_out + wrap_col) = b'\n';
304 std::ptr::copy_nonoverlapping(
305 src.add(3 * wrap_col),
306 dst.add(3 * line_out),
307 wrap_col,
308 );
309 *dst.add(3 * line_out + wrap_col) = b'\n';
310 std::ptr::copy_nonoverlapping(
311 src.add(4 * wrap_col),
312 dst.add(4 * line_out),
313 wrap_col,
314 );
315 *dst.add(4 * line_out + wrap_col) = b'\n';
316 std::ptr::copy_nonoverlapping(
317 src.add(5 * wrap_col),
318 dst.add(5 * line_out),
319 wrap_col,
320 );
321 *dst.add(5 * line_out + wrap_col) = b'\n';
322 std::ptr::copy_nonoverlapping(
323 src.add(6 * wrap_col),
324 dst.add(6 * line_out),
325 wrap_col,
326 );
327 *dst.add(6 * line_out + wrap_col) = b'\n';
328 std::ptr::copy_nonoverlapping(
329 src.add(7 * wrap_col),
330 dst.add(7 * line_out),
331 wrap_col,
332 );
333 *dst.add(7 * line_out + wrap_col) = b'\n';
334 }
335 wp += 8 * line_out;
336 i += 8;
337 }
338 while i < lines {
340 unsafe {
341 std::ptr::copy_nonoverlapping(
342 temp.as_ptr().add(i * wrap_col),
343 buf.as_mut_ptr().add(wp),
344 wrap_col,
345 );
346 *buf.as_mut_ptr().add(wp + wrap_col) = b'\n';
347 }
348 wp += line_out;
349 i += 1;
350 }
351 if chunk_rem > 0 {
353 unsafe {
354 std::ptr::copy_nonoverlapping(
355 temp.as_ptr().add(lines * wrap_col),
356 buf.as_mut_ptr().add(wp),
357 chunk_rem,
358 );
359 *buf.as_mut_ptr().add(wp + chunk_rem) = b'\n';
360 }
361 wp += chunk_rem + 1;
362 }
363 }
364
365 out.write_all(&buf[..wp])
366}
367
368#[inline]
371#[allow(dead_code)]
372fn scatter_lines(
373 temp: &[u8],
374 buf: &mut [u8],
375 line_start: usize,
376 count: usize,
377 wrap_col: usize,
378 line_out: usize,
379) {
380 unsafe {
381 let src = temp.as_ptr();
382 let dst = buf.as_mut_ptr();
383 for i in 0..count {
384 let s_off = i * wrap_col;
385 let d_off = (line_start + i) * line_out;
386 std::ptr::copy_nonoverlapping(src.add(s_off), dst.add(d_off), wrap_col);
387 *dst.add(d_off + wrap_col) = b'\n';
388 }
389 }
390}
391
392#[inline]
400fn expand_backward(ptr: *mut u8, enc_len: usize, out_len: usize, wrap_col: usize) {
401 let num_full = enc_len / wrap_col;
402 let rem = enc_len % wrap_col;
403
404 unsafe {
405 let mut rp = enc_len;
406 let mut wp = out_len;
407
408 if rem > 0 {
410 wp -= 1;
411 *ptr.add(wp) = b'\n';
412 wp -= rem;
413 rp -= rem;
414 if rp != wp {
415 std::ptr::copy(ptr.add(rp), ptr.add(wp), rem);
416 }
417 }
418
419 let mut lines_left = num_full;
421 while lines_left >= 8 {
422 wp -= 1;
424 *ptr.add(wp) = b'\n';
425 rp -= wrap_col;
426 wp -= wrap_col;
427 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
428
429 wp -= 1;
430 *ptr.add(wp) = b'\n';
431 rp -= wrap_col;
432 wp -= wrap_col;
433 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
434
435 wp -= 1;
436 *ptr.add(wp) = b'\n';
437 rp -= wrap_col;
438 wp -= wrap_col;
439 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
440
441 wp -= 1;
442 *ptr.add(wp) = b'\n';
443 rp -= wrap_col;
444 wp -= wrap_col;
445 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
446
447 wp -= 1;
448 *ptr.add(wp) = b'\n';
449 rp -= wrap_col;
450 wp -= wrap_col;
451 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
452
453 wp -= 1;
454 *ptr.add(wp) = b'\n';
455 rp -= wrap_col;
456 wp -= wrap_col;
457 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
458
459 wp -= 1;
460 *ptr.add(wp) = b'\n';
461 rp -= wrap_col;
462 wp -= wrap_col;
463 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
464
465 wp -= 1;
466 *ptr.add(wp) = b'\n';
467 rp -= wrap_col;
468 wp -= wrap_col;
469 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
470
471 lines_left -= 8;
472 }
473
474 while lines_left > 0 {
476 wp -= 1;
477 *ptr.add(wp) = b'\n';
478 rp -= wrap_col;
479 wp -= wrap_col;
480 if rp != wp {
481 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
482 }
483 lines_left -= 1;
484 }
485 }
486}
487
488static NEWLINE: [u8; 1] = [b'\n'];
490
491#[inline]
496#[allow(dead_code)]
497fn write_wrapped_iov(encoded: &[u8], wrap_col: usize, out: &mut impl Write) -> io::Result<()> {
498 const MAX_IOV: usize = 1024;
501
502 let num_full_lines = encoded.len() / wrap_col;
503 let remainder = encoded.len() % wrap_col;
504 let total_iov = num_full_lines * 2 + if remainder > 0 { 2 } else { 0 };
505
506 if total_iov <= MAX_IOV {
508 let mut iov: Vec<io::IoSlice> = Vec::with_capacity(total_iov);
509 let mut pos = 0;
510 for _ in 0..num_full_lines {
511 iov.push(io::IoSlice::new(&encoded[pos..pos + wrap_col]));
512 iov.push(io::IoSlice::new(&NEWLINE));
513 pos += wrap_col;
514 }
515 if remainder > 0 {
516 iov.push(io::IoSlice::new(&encoded[pos..pos + remainder]));
517 iov.push(io::IoSlice::new(&NEWLINE));
518 }
519 return write_all_vectored(out, &iov);
520 }
521
522 let line_out = wrap_col + 1;
529 const BATCH_LINES: usize = 512;
530 let batch_fused_size = BATCH_LINES * line_out;
531 let mut fused: Vec<u8> = Vec::with_capacity(batch_fused_size);
532 #[allow(clippy::uninit_vec)]
533 unsafe {
534 fused.set_len(batch_fused_size);
535 }
536
537 let mut rp = 0;
538 let mut lines_done = 0;
539
540 while lines_done + BATCH_LINES <= num_full_lines {
542 let n = fuse_wrap(
543 &encoded[rp..rp + BATCH_LINES * wrap_col],
544 wrap_col,
545 &mut fused,
546 );
547 out.write_all(&fused[..n])?;
548 rp += BATCH_LINES * wrap_col;
549 lines_done += BATCH_LINES;
550 }
551
552 let remaining_lines = num_full_lines - lines_done;
554 if remaining_lines > 0 {
555 let n = fuse_wrap(
556 &encoded[rp..rp + remaining_lines * wrap_col],
557 wrap_col,
558 &mut fused,
559 );
560 out.write_all(&fused[..n])?;
561 rp += remaining_lines * wrap_col;
562 }
563
564 if remainder > 0 {
566 out.write_all(&encoded[rp..rp + remainder])?;
567 out.write_all(b"\n")?;
568 }
569 Ok(())
570}
571
572#[inline]
576fn write_wrapped_iov_streaming(
577 encoded: &[u8],
578 wrap_col: usize,
579 col: &mut usize,
580 out: &mut impl Write,
581) -> io::Result<()> {
582 const MAX_IOV: usize = 1024;
583 let mut iov: Vec<io::IoSlice> = Vec::with_capacity(MAX_IOV);
584 let mut rp = 0;
585
586 while rp < encoded.len() {
587 let space = wrap_col - *col;
588 let avail = encoded.len() - rp;
589
590 if avail <= space {
591 iov.push(io::IoSlice::new(&encoded[rp..rp + avail]));
593 *col += avail;
594 if *col == wrap_col {
595 iov.push(io::IoSlice::new(&NEWLINE));
596 *col = 0;
597 }
598 break;
599 } else {
600 iov.push(io::IoSlice::new(&encoded[rp..rp + space]));
602 iov.push(io::IoSlice::new(&NEWLINE));
603 rp += space;
604 *col = 0;
605 }
606
607 if iov.len() >= MAX_IOV - 1 {
608 write_all_vectored(out, &iov)?;
609 iov.clear();
610 }
611 }
612
613 if !iov.is_empty() {
614 write_all_vectored(out, &iov)?;
615 }
616 Ok(())
617}
618
619fn encode_wrapped_parallel(
625 data: &[u8],
626 wrap_col: usize,
627 bytes_per_line: usize,
628 out: &mut impl Write,
629) -> io::Result<()> {
630 let num_threads = num_cpus().max(1);
631 let lines_per_chunk = ((data.len() / bytes_per_line) / num_threads).max(1);
632 let chunk_input = lines_per_chunk * bytes_per_line;
633
634 let chunks: Vec<&[u8]> = data.chunks(chunk_input.max(bytes_per_line)).collect();
636
637 let mut offsets: Vec<usize> = Vec::with_capacity(chunks.len() + 1);
639 let mut total_out = 0usize;
640 for chunk in &chunks {
641 offsets.push(total_out);
642 let enc_len = BASE64_ENGINE.encoded_length(chunk.len());
643 let full_lines = enc_len / wrap_col;
644 let remainder = enc_len % wrap_col;
645 total_out += full_lines * (wrap_col + 1) + if remainder > 0 { remainder + 1 } else { 0 };
646 }
647
648 let mut output: Vec<u8> = Vec::with_capacity(total_out);
650 #[allow(clippy::uninit_vec)]
651 unsafe {
652 output.set_len(total_out);
653 }
654 #[cfg(target_os = "linux")]
655 hint_hugepage(&mut output);
656
657 let output_base = output.as_mut_ptr() as usize;
659 rayon::scope(|s| {
660 for (i, chunk) in chunks.iter().enumerate() {
661 let out_off = offsets[i];
662 let out_end = if i + 1 < offsets.len() {
663 offsets[i + 1]
664 } else {
665 total_out
666 };
667 let out_size = out_end - out_off;
668 let base = output_base;
669 s.spawn(move |_| {
670 let out_slice = unsafe {
671 std::slice::from_raw_parts_mut((base + out_off) as *mut u8, out_size)
672 };
673 encode_chunk_l1_scatter_into(chunk, out_slice, wrap_col, bytes_per_line);
674 });
675 }
676 });
677
678 out.write_all(&output[..total_out])
679}
680
681fn encode_chunk_l1_scatter_into(
685 data: &[u8],
686 output: &mut [u8],
687 wrap_col: usize,
688 bytes_per_line: usize,
689) {
690 const GROUP_LINES: usize = 256;
691 let group_input = GROUP_LINES * bytes_per_line;
692 let temp_size = GROUP_LINES * wrap_col;
693 let mut temp: Vec<u8> = Vec::with_capacity(temp_size);
694 #[allow(clippy::uninit_vec)]
695 unsafe {
696 temp.set_len(temp_size);
697 }
698
699 let line_out = wrap_col + 1;
700 let mut wp = 0usize;
701
702 for chunk in data.chunks(group_input) {
703 let clen = BASE64_ENGINE.encoded_length(chunk.len());
704 let _ = BASE64_ENGINE.encode(chunk, temp[..clen].as_out());
705
706 let lines = clen / wrap_col;
707 let chunk_rem = clen % wrap_col;
708
709 let mut i = 0;
711 while i + 8 <= lines {
712 unsafe {
713 let src = temp.as_ptr().add(i * wrap_col);
714 let dst = output.as_mut_ptr().add(wp);
715 std::ptr::copy_nonoverlapping(src, dst, wrap_col);
716 *dst.add(wrap_col) = b'\n';
717 std::ptr::copy_nonoverlapping(src.add(wrap_col), dst.add(line_out), wrap_col);
718 *dst.add(line_out + wrap_col) = b'\n';
719 std::ptr::copy_nonoverlapping(
720 src.add(2 * wrap_col),
721 dst.add(2 * line_out),
722 wrap_col,
723 );
724 *dst.add(2 * line_out + wrap_col) = b'\n';
725 std::ptr::copy_nonoverlapping(
726 src.add(3 * wrap_col),
727 dst.add(3 * line_out),
728 wrap_col,
729 );
730 *dst.add(3 * line_out + wrap_col) = b'\n';
731 std::ptr::copy_nonoverlapping(
732 src.add(4 * wrap_col),
733 dst.add(4 * line_out),
734 wrap_col,
735 );
736 *dst.add(4 * line_out + wrap_col) = b'\n';
737 std::ptr::copy_nonoverlapping(
738 src.add(5 * wrap_col),
739 dst.add(5 * line_out),
740 wrap_col,
741 );
742 *dst.add(5 * line_out + wrap_col) = b'\n';
743 std::ptr::copy_nonoverlapping(
744 src.add(6 * wrap_col),
745 dst.add(6 * line_out),
746 wrap_col,
747 );
748 *dst.add(6 * line_out + wrap_col) = b'\n';
749 std::ptr::copy_nonoverlapping(
750 src.add(7 * wrap_col),
751 dst.add(7 * line_out),
752 wrap_col,
753 );
754 *dst.add(7 * line_out + wrap_col) = b'\n';
755 }
756 wp += 8 * line_out;
757 i += 8;
758 }
759 while i < lines {
760 unsafe {
761 std::ptr::copy_nonoverlapping(
762 temp.as_ptr().add(i * wrap_col),
763 output.as_mut_ptr().add(wp),
764 wrap_col,
765 );
766 *output.as_mut_ptr().add(wp + wrap_col) = b'\n';
767 }
768 wp += line_out;
769 i += 1;
770 }
771 if chunk_rem > 0 {
772 unsafe {
773 std::ptr::copy_nonoverlapping(
774 temp.as_ptr().add(lines * wrap_col),
775 output.as_mut_ptr().add(wp),
776 chunk_rem,
777 );
778 *output.as_mut_ptr().add(wp + chunk_rem) = b'\n';
779 }
780 wp += chunk_rem + 1;
781 }
782 }
783}
784
785#[inline]
789fn fuse_wrap(encoded: &[u8], wrap_col: usize, out_buf: &mut [u8]) -> usize {
790 let line_out = wrap_col + 1; let mut rp = 0;
792 let mut wp = 0;
793
794 while rp + 8 * wrap_col <= encoded.len() {
796 unsafe {
797 let src = encoded.as_ptr().add(rp);
798 let dst = out_buf.as_mut_ptr().add(wp);
799
800 std::ptr::copy_nonoverlapping(src, dst, wrap_col);
801 *dst.add(wrap_col) = b'\n';
802
803 std::ptr::copy_nonoverlapping(src.add(wrap_col), dst.add(line_out), wrap_col);
804 *dst.add(line_out + wrap_col) = b'\n';
805
806 std::ptr::copy_nonoverlapping(src.add(2 * wrap_col), dst.add(2 * line_out), wrap_col);
807 *dst.add(2 * line_out + wrap_col) = b'\n';
808
809 std::ptr::copy_nonoverlapping(src.add(3 * wrap_col), dst.add(3 * line_out), wrap_col);
810 *dst.add(3 * line_out + wrap_col) = b'\n';
811
812 std::ptr::copy_nonoverlapping(src.add(4 * wrap_col), dst.add(4 * line_out), wrap_col);
813 *dst.add(4 * line_out + wrap_col) = b'\n';
814
815 std::ptr::copy_nonoverlapping(src.add(5 * wrap_col), dst.add(5 * line_out), wrap_col);
816 *dst.add(5 * line_out + wrap_col) = b'\n';
817
818 std::ptr::copy_nonoverlapping(src.add(6 * wrap_col), dst.add(6 * line_out), wrap_col);
819 *dst.add(6 * line_out + wrap_col) = b'\n';
820
821 std::ptr::copy_nonoverlapping(src.add(7 * wrap_col), dst.add(7 * line_out), wrap_col);
822 *dst.add(7 * line_out + wrap_col) = b'\n';
823 }
824 rp += 8 * wrap_col;
825 wp += 8 * line_out;
826 }
827
828 while rp + 4 * wrap_col <= encoded.len() {
830 unsafe {
831 let src = encoded.as_ptr().add(rp);
832 let dst = out_buf.as_mut_ptr().add(wp);
833
834 std::ptr::copy_nonoverlapping(src, dst, wrap_col);
835 *dst.add(wrap_col) = b'\n';
836
837 std::ptr::copy_nonoverlapping(src.add(wrap_col), dst.add(line_out), wrap_col);
838 *dst.add(line_out + wrap_col) = b'\n';
839
840 std::ptr::copy_nonoverlapping(src.add(2 * wrap_col), dst.add(2 * line_out), wrap_col);
841 *dst.add(2 * line_out + wrap_col) = b'\n';
842
843 std::ptr::copy_nonoverlapping(src.add(3 * wrap_col), dst.add(3 * line_out), wrap_col);
844 *dst.add(3 * line_out + wrap_col) = b'\n';
845 }
846 rp += 4 * wrap_col;
847 wp += 4 * line_out;
848 }
849
850 while rp + wrap_col <= encoded.len() {
852 unsafe {
853 std::ptr::copy_nonoverlapping(
854 encoded.as_ptr().add(rp),
855 out_buf.as_mut_ptr().add(wp),
856 wrap_col,
857 );
858 *out_buf.as_mut_ptr().add(wp + wrap_col) = b'\n';
859 }
860 rp += wrap_col;
861 wp += line_out;
862 }
863
864 if rp < encoded.len() {
866 let remaining = encoded.len() - rp;
867 unsafe {
868 std::ptr::copy_nonoverlapping(
869 encoded.as_ptr().add(rp),
870 out_buf.as_mut_ptr().add(wp),
871 remaining,
872 );
873 }
874 wp += remaining;
875 out_buf[wp] = b'\n';
876 wp += 1;
877 }
878
879 wp
880}
881
882fn encode_wrapped_small(data: &[u8], wrap_col: usize, out: &mut impl Write) -> io::Result<()> {
884 let enc_max = BASE64_ENGINE.encoded_length(data.len());
885 let mut buf: Vec<u8> = Vec::with_capacity(enc_max);
886 #[allow(clippy::uninit_vec)]
887 unsafe {
888 buf.set_len(enc_max);
889 }
890 let encoded = BASE64_ENGINE.encode(data, buf[..enc_max].as_out());
891
892 let wc = wrap_col.max(1);
893 for line in encoded.chunks(wc) {
894 out.write_all(line)?;
895 out.write_all(b"\n")?;
896 }
897 Ok(())
898}
899
900pub fn decode_to_writer(data: &[u8], ignore_garbage: bool, out: &mut impl Write) -> io::Result<()> {
904 if data.is_empty() {
905 return Ok(());
906 }
907
908 if ignore_garbage {
909 let mut cleaned = strip_non_base64(data);
910 return decode_clean_slice(&mut cleaned, out);
911 }
912
913 if data.len() < 512 * 1024 && data.len() >= 77 {
919 if let Some(result) = try_line_decode(data, out) {
920 return result;
921 }
922 }
923
924 decode_stripping_whitespace(data, out)
926}
927
928pub fn decode_mmap_inplace(
937 data: &mut [u8],
938 ignore_garbage: bool,
939 out: &mut impl Write,
940) -> io::Result<()> {
941 if data.is_empty() {
942 return Ok(());
943 }
944
945 if !ignore_garbage && data.len() >= 77 && data.len() < 512 * 1024 {
948 if let Some(result) = try_line_decode(data, out) {
949 return result;
950 }
951 }
952
953 if ignore_garbage {
954 let ptr = data.as_mut_ptr();
956 let len = data.len();
957 let mut wp = 0;
958 for rp in 0..len {
959 let b = unsafe { *ptr.add(rp) };
960 if is_base64_char(b) {
961 unsafe { *ptr.add(wp) = b };
962 wp += 1;
963 }
964 }
965 let r = decode_inplace_with_padding(&mut data[..wp], out);
966 return r;
967 }
968
969 if data.len() >= 77 {
971 if let Some(result) = try_decode_uniform_lines(data, out) {
972 return result;
973 }
974 }
975
976 if memchr::memchr2(b'\n', b'\r', data).is_none() {
980 if !data
982 .iter()
983 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c)
984 {
985 return decode_inplace_with_padding(data, out);
987 }
988 let ptr = data.as_mut_ptr();
990 let len = data.len();
991 let mut wp = 0;
992 for rp in 0..len {
993 let b = unsafe { *ptr.add(rp) };
994 if NOT_WHITESPACE[b as usize] {
995 unsafe { *ptr.add(wp) = b };
996 wp += 1;
997 }
998 }
999 return decode_inplace_with_padding(&mut data[..wp], out);
1000 }
1001
1002 let ptr = data.as_mut_ptr();
1004 let len = data.len();
1005 let mut wp = 0usize;
1006 let mut gap_start = 0usize;
1007 let mut has_rare_ws = false;
1008
1009 for pos in memchr::memchr2_iter(b'\n', b'\r', data) {
1012 let gap_len = pos - gap_start;
1013 if gap_len > 0 {
1014 if !has_rare_ws {
1015 has_rare_ws = unsafe {
1017 std::slice::from_raw_parts(ptr.add(gap_start), gap_len)
1018 .iter()
1019 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c)
1020 };
1021 }
1022 if wp != gap_start {
1023 unsafe { std::ptr::copy(ptr.add(gap_start), ptr.add(wp), gap_len) };
1024 }
1025 wp += gap_len;
1026 }
1027 gap_start = pos + 1;
1028 }
1029 let tail_len = len - gap_start;
1031 if tail_len > 0 {
1032 if !has_rare_ws {
1033 has_rare_ws = unsafe {
1034 std::slice::from_raw_parts(ptr.add(gap_start), tail_len)
1035 .iter()
1036 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c)
1037 };
1038 }
1039 if wp != gap_start {
1040 unsafe { std::ptr::copy(ptr.add(gap_start), ptr.add(wp), tail_len) };
1041 }
1042 wp += tail_len;
1043 }
1044
1045 if has_rare_ws {
1047 let mut rp = 0;
1048 let mut cwp = 0;
1049 while rp < wp {
1050 let b = unsafe { *ptr.add(rp) };
1051 if NOT_WHITESPACE[b as usize] {
1052 unsafe { *ptr.add(cwp) = b };
1053 cwp += 1;
1054 }
1055 rp += 1;
1056 }
1057 wp = cwp;
1058 }
1059
1060 if wp >= PARALLEL_DECODE_THRESHOLD {
1062 return decode_borrowed_clean_parallel(out, &data[..wp]);
1064 }
1065 decode_inplace_with_padding(&mut data[..wp], out)
1066}
1067
1068pub fn decode_owned(
1070 data: &mut Vec<u8>,
1071 ignore_garbage: bool,
1072 out: &mut impl Write,
1073) -> io::Result<()> {
1074 if data.is_empty() {
1075 return Ok(());
1076 }
1077
1078 if ignore_garbage {
1079 data.retain(|&b| is_base64_char(b));
1080 } else {
1081 strip_whitespace_inplace(data);
1082 }
1083
1084 decode_clean_slice(data, out)
1085}
1086
1087fn strip_whitespace_inplace(data: &mut Vec<u8>) {
1092 if memchr::memchr2(b'\n', b'\r', data).is_none() {
1096 if data
1098 .iter()
1099 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c)
1100 {
1101 data.retain(|&b| NOT_WHITESPACE[b as usize]);
1102 }
1103 return;
1104 }
1105
1106 let ptr = data.as_mut_ptr();
1110 let len = data.len();
1111 let mut wp = 0usize;
1112 let mut gap_start = 0usize;
1113 let mut has_rare_ws = false;
1114
1115 for pos in memchr::memchr2_iter(b'\n', b'\r', data.as_slice()) {
1116 let gap_len = pos - gap_start;
1117 if gap_len > 0 {
1118 if !has_rare_ws {
1119 has_rare_ws = data[gap_start..pos]
1121 .iter()
1122 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
1123 }
1124 if wp != gap_start {
1125 unsafe {
1126 std::ptr::copy(ptr.add(gap_start), ptr.add(wp), gap_len);
1127 }
1128 }
1129 wp += gap_len;
1130 }
1131 gap_start = pos + 1;
1132 }
1133 let tail_len = len - gap_start;
1135 if tail_len > 0 {
1136 if !has_rare_ws {
1137 has_rare_ws = data[gap_start..]
1138 .iter()
1139 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
1140 }
1141 if wp != gap_start {
1142 unsafe {
1143 std::ptr::copy(ptr.add(gap_start), ptr.add(wp), tail_len);
1144 }
1145 }
1146 wp += tail_len;
1147 }
1148
1149 data.truncate(wp);
1150
1151 if has_rare_ws {
1154 let ptr = data.as_mut_ptr();
1155 let len = data.len();
1156 let mut rp = 0;
1157 let mut cwp = 0;
1158 while rp < len {
1159 let b = unsafe { *ptr.add(rp) };
1160 if NOT_WHITESPACE[b as usize] {
1161 unsafe { *ptr.add(cwp) = b };
1162 cwp += 1;
1163 }
1164 rp += 1;
1165 }
1166 data.truncate(cwp);
1167 }
1168}
1169
1170static NOT_WHITESPACE: [bool; 256] = {
1173 let mut table = [true; 256];
1174 table[b' ' as usize] = false;
1175 table[b'\t' as usize] = false;
1176 table[b'\n' as usize] = false;
1177 table[b'\r' as usize] = false;
1178 table[0x0b] = false; table[0x0c] = false; table
1181};
1182
1183fn try_decode_uniform_lines(data: &[u8], out: &mut impl Write) -> Option<io::Result<()>> {
1189 let first_nl = memchr::memchr(b'\n', data)?;
1190 let line_len = first_nl;
1191 if line_len == 0 || line_len % 4 != 0 {
1192 return None;
1193 }
1194
1195 let stride = line_len + 1;
1196
1197 let check_lines = 4.min(data.len() / stride);
1199 for i in 1..check_lines {
1200 let expected_nl = i * stride - 1;
1201 if expected_nl >= data.len() || data[expected_nl] != b'\n' {
1202 return None;
1203 }
1204 }
1205
1206 let full_lines = if data.len() >= stride {
1207 let candidate = data.len() / stride;
1208 if candidate > 0 && data[candidate * stride - 1] != b'\n' {
1209 return None;
1210 }
1211 candidate
1212 } else {
1213 0
1214 };
1215
1216 let remainder_start = full_lines * stride;
1217 let remainder = &data[remainder_start..];
1218 let rem_clean = if remainder.last() == Some(&b'\n') {
1219 &remainder[..remainder.len() - 1]
1220 } else {
1221 remainder
1222 };
1223
1224 let decoded_per_line = line_len * 3 / 4;
1226 let rem_decoded_size = if rem_clean.is_empty() {
1227 0
1228 } else {
1229 let pad = rem_clean
1230 .iter()
1231 .rev()
1232 .take(2)
1233 .filter(|&&b| b == b'=')
1234 .count();
1235 rem_clean.len() * 3 / 4 - pad
1236 };
1237 let total_decoded = full_lines * decoded_per_line + rem_decoded_size;
1238 let clean_len = full_lines * line_len;
1239
1240 if clean_len >= PARALLEL_DECODE_THRESHOLD && num_cpus() > 1 {
1244 let mut output: Vec<u8> = Vec::with_capacity(total_decoded);
1245 #[allow(clippy::uninit_vec)]
1246 unsafe {
1247 output.set_len(total_decoded);
1248 }
1249 #[cfg(target_os = "linux")]
1250 hint_hugepage(&mut output);
1251
1252 let out_ptr = output.as_mut_ptr() as usize;
1253 let src_ptr = data.as_ptr() as usize;
1254 let num_threads = num_cpus().max(1);
1255 let lines_per_thread = (full_lines + num_threads - 1) / num_threads;
1256 let lines_per_sub = (512 * 1024 / line_len).max(1);
1259
1260 let err_flag = std::sync::atomic::AtomicBool::new(false);
1261 rayon::scope(|s| {
1262 for t in 0..num_threads {
1263 let err_flag = &err_flag;
1264 s.spawn(move |_| {
1265 let start_line = t * lines_per_thread;
1266 if start_line >= full_lines {
1267 return;
1268 }
1269 let end_line = (start_line + lines_per_thread).min(full_lines);
1270 let chunk_lines = end_line - start_line;
1271
1272 let sub_buf_size = lines_per_sub.min(chunk_lines) * line_len;
1273 let mut local_buf: Vec<u8> = Vec::with_capacity(sub_buf_size);
1274 #[allow(clippy::uninit_vec)]
1275 unsafe {
1276 local_buf.set_len(sub_buf_size);
1277 }
1278
1279 let src = src_ptr as *const u8;
1280 let out_base = out_ptr as *mut u8;
1281 let local_dst = local_buf.as_mut_ptr();
1282
1283 let mut sub_start = 0usize;
1284 while sub_start < chunk_lines {
1285 if err_flag.load(std::sync::atomic::Ordering::Relaxed) {
1286 return;
1287 }
1288 let sub_count = (chunk_lines - sub_start).min(lines_per_sub);
1289 let sub_clean = sub_count * line_len;
1290
1291 for i in 0..sub_count {
1292 unsafe {
1293 std::ptr::copy_nonoverlapping(
1294 src.add((start_line + sub_start + i) * stride),
1295 local_dst.add(i * line_len),
1296 line_len,
1297 );
1298 }
1299 }
1300
1301 let out_offset = (start_line + sub_start) * decoded_per_line;
1302 let out_size = sub_count * decoded_per_line;
1303 let out_slice = unsafe {
1304 std::slice::from_raw_parts_mut(out_base.add(out_offset), out_size)
1305 };
1306 if BASE64_ENGINE
1307 .decode(&local_buf[..sub_clean], out_slice.as_out())
1308 .is_err()
1309 {
1310 err_flag.store(true, std::sync::atomic::Ordering::Relaxed);
1311 return;
1312 }
1313
1314 sub_start += sub_count;
1315 }
1316 });
1317 }
1318 });
1319 let result: Result<(), io::Error> = if err_flag.load(std::sync::atomic::Ordering::Relaxed) {
1320 Err(io::Error::new(io::ErrorKind::InvalidData, "invalid input"))
1321 } else {
1322 Ok(())
1323 };
1324
1325 if let Err(e) = result {
1326 return Some(Err(e));
1327 }
1328
1329 if !rem_clean.is_empty() {
1330 let rem_out = &mut output[full_lines * decoded_per_line..total_decoded];
1331 match BASE64_ENGINE.decode(rem_clean, rem_out.as_out()) {
1332 Ok(_) => {}
1333 Err(_) => return Some(decode_error()),
1334 }
1335 }
1336
1337 return Some(out.write_all(&output[..total_decoded]));
1338 }
1339
1340 let lines_per_sub = (256 * 1024 / line_len).max(1);
1344 let sub_buf_size = lines_per_sub * line_len;
1345 let mut local_buf: Vec<u8> = Vec::with_capacity(sub_buf_size);
1346 #[allow(clippy::uninit_vec)]
1347 unsafe {
1348 local_buf.set_len(sub_buf_size);
1349 }
1350
1351 let src = data.as_ptr();
1352 let local_dst = local_buf.as_mut_ptr();
1353
1354 let mut line_idx = 0usize;
1355 while line_idx < full_lines {
1356 let sub_count = (full_lines - line_idx).min(lines_per_sub);
1357 let sub_clean = sub_count * line_len;
1358
1359 for i in 0..sub_count {
1360 unsafe {
1361 std::ptr::copy_nonoverlapping(
1362 src.add((line_idx + i) * stride),
1363 local_dst.add(i * line_len),
1364 line_len,
1365 );
1366 }
1367 }
1368
1369 match BASE64_ENGINE.decode_inplace(&mut local_buf[..sub_clean]) {
1370 Ok(decoded) => {
1371 if let Err(e) = out.write_all(decoded) {
1372 return Some(Err(e));
1373 }
1374 }
1375 Err(_) => return Some(decode_error()),
1376 }
1377
1378 line_idx += sub_count;
1379 }
1380
1381 if !rem_clean.is_empty() {
1382 let mut rem_buf = rem_clean.to_vec();
1383 match BASE64_ENGINE.decode_inplace(&mut rem_buf) {
1384 Ok(decoded) => {
1385 if let Err(e) = out.write_all(decoded) {
1386 return Some(Err(e));
1387 }
1388 }
1389 Err(_) => return Some(decode_error()),
1390 }
1391 }
1392
1393 Some(Ok(()))
1394}
1395
1396fn decode_stripping_whitespace(data: &[u8], out: &mut impl Write) -> io::Result<()> {
1401 if data.len() >= 77 {
1405 if let Some(result) = try_decode_uniform_lines(data, out) {
1406 return result;
1407 }
1408 }
1409
1410 if memchr::memchr2(b'\n', b'\r', data).is_none() {
1413 if !data
1415 .iter()
1416 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c)
1417 {
1418 return decode_borrowed_clean(out, data);
1419 }
1420 let mut cleaned: Vec<u8> = Vec::with_capacity(data.len());
1422 for &b in data {
1423 if NOT_WHITESPACE[b as usize] {
1424 cleaned.push(b);
1425 }
1426 }
1427 return decode_clean_slice(&mut cleaned, out);
1428 }
1429
1430 let mut clean: Vec<u8> = Vec::with_capacity(data.len());
1434 let dst = clean.as_mut_ptr();
1435 let mut wp = 0usize;
1436 let mut gap_start = 0usize;
1437 let mut has_rare_ws = false;
1440
1441 for pos in memchr::memchr2_iter(b'\n', b'\r', data) {
1442 let gap_len = pos - gap_start;
1443 if gap_len > 0 {
1444 if !has_rare_ws {
1447 has_rare_ws = data[gap_start..pos]
1448 .iter()
1449 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
1450 }
1451 unsafe {
1452 std::ptr::copy_nonoverlapping(data.as_ptr().add(gap_start), dst.add(wp), gap_len);
1453 }
1454 wp += gap_len;
1455 }
1456 gap_start = pos + 1;
1457 }
1458 let tail_len = data.len() - gap_start;
1460 if tail_len > 0 {
1461 if !has_rare_ws {
1462 has_rare_ws = data[gap_start..]
1463 .iter()
1464 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
1465 }
1466 unsafe {
1467 std::ptr::copy_nonoverlapping(data.as_ptr().add(gap_start), dst.add(wp), tail_len);
1468 }
1469 wp += tail_len;
1470 }
1471 unsafe {
1472 clean.set_len(wp);
1473 }
1474
1475 if has_rare_ws {
1478 let ptr = clean.as_mut_ptr();
1479 let len = clean.len();
1480 let mut rp = 0;
1481 let mut cwp = 0;
1482 while rp < len {
1483 let b = unsafe { *ptr.add(rp) };
1484 if NOT_WHITESPACE[b as usize] {
1485 unsafe { *ptr.add(cwp) = b };
1486 cwp += 1;
1487 }
1488 rp += 1;
1489 }
1490 clean.truncate(cwp);
1491 }
1492
1493 if clean.len() >= PARALLEL_DECODE_THRESHOLD {
1496 decode_borrowed_clean_parallel(out, &clean)
1497 } else {
1498 decode_clean_slice(&mut clean, out)
1499 }
1500}
1501
1502fn try_line_decode(data: &[u8], out: &mut impl Write) -> Option<io::Result<()>> {
1510 let first_nl = memchr::memchr(b'\n', data)?;
1512 let line_len = first_nl; if line_len == 0 || line_len % 4 != 0 {
1516 return None;
1517 }
1518
1519 let line_stride = line_len + 1; let decoded_per_line = line_len * 3 / 4;
1521
1522 let check_lines = 4.min(data.len() / line_stride);
1524 for i in 1..check_lines {
1525 let expected_nl = i * line_stride - 1;
1526 if expected_nl >= data.len() {
1527 break;
1528 }
1529 if data[expected_nl] != b'\n' {
1530 return None; }
1532 }
1533
1534 let full_lines = if data.len() >= line_stride {
1536 let candidate = data.len() / line_stride;
1538 if candidate > 0 && data[candidate * line_stride - 1] != b'\n' {
1540 return None; }
1542 candidate
1543 } else {
1544 0
1545 };
1546
1547 let remainder_start = full_lines * line_stride;
1548 let remainder = &data[remainder_start..];
1549
1550 let remainder_clean_len = if remainder.is_empty() {
1552 0
1553 } else {
1554 let rem = if remainder.last() == Some(&b'\n') {
1556 &remainder[..remainder.len() - 1]
1557 } else {
1558 remainder
1559 };
1560 if rem.is_empty() {
1561 0
1562 } else {
1563 let pad = rem.iter().rev().take(2).filter(|&&b| b == b'=').count();
1565 if rem.len() % 4 != 0 {
1566 return None; }
1568 rem.len() * 3 / 4 - pad
1569 }
1570 };
1571
1572 let total_decoded = full_lines * decoded_per_line + remainder_clean_len;
1577 let mut out_buf: Vec<u8> = Vec::with_capacity(total_decoded);
1578 #[allow(clippy::uninit_vec)]
1579 unsafe {
1580 out_buf.set_len(total_decoded);
1581 }
1582
1583 let dst = out_buf.as_mut_ptr();
1584
1585 if data.len() >= PARALLEL_DECODE_THRESHOLD && full_lines >= 64 {
1589 let out_addr = dst as usize;
1590 let num_threads = num_cpus().max(1);
1591 let lines_per_chunk = (full_lines / num_threads).max(1);
1592
1593 let mut tasks: Vec<(usize, usize)> = Vec::new();
1595 let mut line_off = 0;
1596 while line_off < full_lines {
1597 let end = (line_off + lines_per_chunk).min(full_lines);
1598 tasks.push((line_off, end));
1599 line_off = end;
1600 }
1601
1602 let decode_err = std::sync::atomic::AtomicBool::new(false);
1603 rayon::scope(|s| {
1604 for &(start_line, end_line) in &tasks {
1605 let decode_err = &decode_err;
1606 s.spawn(move |_| {
1607 let out_ptr = out_addr as *mut u8;
1608 let mut i = start_line;
1609
1610 while i + 4 <= end_line {
1611 if decode_err.load(std::sync::atomic::Ordering::Relaxed) {
1612 return;
1613 }
1614 let in_base = i * line_stride;
1615 let ob = i * decoded_per_line;
1616 unsafe {
1617 let s0 =
1618 std::slice::from_raw_parts_mut(out_ptr.add(ob), decoded_per_line);
1619 if BASE64_ENGINE
1620 .decode(&data[in_base..in_base + line_len], s0.as_out())
1621 .is_err()
1622 {
1623 decode_err.store(true, std::sync::atomic::Ordering::Relaxed);
1624 return;
1625 }
1626 let s1 = std::slice::from_raw_parts_mut(
1627 out_ptr.add(ob + decoded_per_line),
1628 decoded_per_line,
1629 );
1630 if BASE64_ENGINE
1631 .decode(
1632 &data[in_base + line_stride..in_base + line_stride + line_len],
1633 s1.as_out(),
1634 )
1635 .is_err()
1636 {
1637 decode_err.store(true, std::sync::atomic::Ordering::Relaxed);
1638 return;
1639 }
1640 let s2 = std::slice::from_raw_parts_mut(
1641 out_ptr.add(ob + 2 * decoded_per_line),
1642 decoded_per_line,
1643 );
1644 if BASE64_ENGINE
1645 .decode(
1646 &data[in_base + 2 * line_stride
1647 ..in_base + 2 * line_stride + line_len],
1648 s2.as_out(),
1649 )
1650 .is_err()
1651 {
1652 decode_err.store(true, std::sync::atomic::Ordering::Relaxed);
1653 return;
1654 }
1655 let s3 = std::slice::from_raw_parts_mut(
1656 out_ptr.add(ob + 3 * decoded_per_line),
1657 decoded_per_line,
1658 );
1659 if BASE64_ENGINE
1660 .decode(
1661 &data[in_base + 3 * line_stride
1662 ..in_base + 3 * line_stride + line_len],
1663 s3.as_out(),
1664 )
1665 .is_err()
1666 {
1667 decode_err.store(true, std::sync::atomic::Ordering::Relaxed);
1668 return;
1669 }
1670 }
1671 i += 4;
1672 }
1673
1674 while i < end_line {
1675 if decode_err.load(std::sync::atomic::Ordering::Relaxed) {
1676 return;
1677 }
1678 let in_start = i * line_stride;
1679 let out_off = i * decoded_per_line;
1680 let out_slice = unsafe {
1681 std::slice::from_raw_parts_mut(out_ptr.add(out_off), decoded_per_line)
1682 };
1683 if BASE64_ENGINE
1684 .decode(&data[in_start..in_start + line_len], out_slice.as_out())
1685 .is_err()
1686 {
1687 decode_err.store(true, std::sync::atomic::Ordering::Relaxed);
1688 return;
1689 }
1690 i += 1;
1691 }
1692 });
1693 }
1694 });
1695
1696 if decode_err.load(std::sync::atomic::Ordering::Relaxed) {
1697 return Some(decode_error());
1698 }
1699 } else {
1700 let mut i = 0;
1702
1703 while i + 4 <= full_lines {
1704 let in_base = i * line_stride;
1705 let out_base = i * decoded_per_line;
1706 unsafe {
1707 let s0 = std::slice::from_raw_parts_mut(dst.add(out_base), decoded_per_line);
1708 if BASE64_ENGINE
1709 .decode(&data[in_base..in_base + line_len], s0.as_out())
1710 .is_err()
1711 {
1712 return Some(decode_error());
1713 }
1714
1715 let s1 = std::slice::from_raw_parts_mut(
1716 dst.add(out_base + decoded_per_line),
1717 decoded_per_line,
1718 );
1719 if BASE64_ENGINE
1720 .decode(
1721 &data[in_base + line_stride..in_base + line_stride + line_len],
1722 s1.as_out(),
1723 )
1724 .is_err()
1725 {
1726 return Some(decode_error());
1727 }
1728
1729 let s2 = std::slice::from_raw_parts_mut(
1730 dst.add(out_base + 2 * decoded_per_line),
1731 decoded_per_line,
1732 );
1733 if BASE64_ENGINE
1734 .decode(
1735 &data[in_base + 2 * line_stride..in_base + 2 * line_stride + line_len],
1736 s2.as_out(),
1737 )
1738 .is_err()
1739 {
1740 return Some(decode_error());
1741 }
1742
1743 let s3 = std::slice::from_raw_parts_mut(
1744 dst.add(out_base + 3 * decoded_per_line),
1745 decoded_per_line,
1746 );
1747 if BASE64_ENGINE
1748 .decode(
1749 &data[in_base + 3 * line_stride..in_base + 3 * line_stride + line_len],
1750 s3.as_out(),
1751 )
1752 .is_err()
1753 {
1754 return Some(decode_error());
1755 }
1756 }
1757 i += 4;
1758 }
1759
1760 while i < full_lines {
1761 let in_start = i * line_stride;
1762 let in_end = in_start + line_len;
1763 let out_off = i * decoded_per_line;
1764 let out_slice =
1765 unsafe { std::slice::from_raw_parts_mut(dst.add(out_off), decoded_per_line) };
1766 match BASE64_ENGINE.decode(&data[in_start..in_end], out_slice.as_out()) {
1767 Ok(_) => {}
1768 Err(_) => return Some(decode_error()),
1769 }
1770 i += 1;
1771 }
1772 }
1773
1774 if remainder_clean_len > 0 {
1776 let rem = if remainder.last() == Some(&b'\n') {
1777 &remainder[..remainder.len() - 1]
1778 } else {
1779 remainder
1780 };
1781 let out_off = full_lines * decoded_per_line;
1782 let out_slice =
1783 unsafe { std::slice::from_raw_parts_mut(dst.add(out_off), remainder_clean_len) };
1784 match BASE64_ENGINE.decode(rem, out_slice.as_out()) {
1785 Ok(_) => {}
1786 Err(_) => return Some(decode_error()),
1787 }
1788 }
1789
1790 Some(out.write_all(&out_buf[..total_decoded]))
1792}
1793
1794fn decode_clean_slice(data: &mut [u8], out: &mut impl Write) -> io::Result<()> {
1796 if data.is_empty() {
1797 return Ok(());
1798 }
1799 decode_inplace_with_padding(data, out)
1800}
1801
1802#[cold]
1804#[inline(never)]
1805fn decode_error() -> io::Result<()> {
1806 Err(io::Error::new(io::ErrorKind::InvalidData, "invalid input"))
1807}
1808
1809fn decode_inplace_with_padding(data: &mut [u8], out: &mut impl Write) -> io::Result<()> {
1813 match BASE64_ENGINE.decode_inplace(data) {
1814 Ok(decoded) => out.write_all(decoded),
1815 Err(_) => {
1816 let remainder = data.len() % 4;
1817 if remainder == 2 || remainder == 3 {
1818 let has_existing_padding = memchr::memchr(b'=', data).is_some();
1819 let mut padded = Vec::with_capacity(data.len() + (4 - remainder));
1820 padded.extend_from_slice(data);
1821 padded.extend(std::iter::repeat_n(b'=', 4 - remainder));
1822 if let Ok(decoded) = BASE64_ENGINE.decode_inplace(&mut padded) {
1823 out.write_all(decoded)?;
1824 if has_existing_padding {
1825 return decode_error();
1826 }
1827 return Ok(());
1828 }
1829 }
1830 decode_error()
1831 }
1832 }
1833}
1834
1835fn decode_borrowed_clean(out: &mut impl Write, data: &[u8]) -> io::Result<()> {
1837 if data.is_empty() {
1838 return Ok(());
1839 }
1840 if data.len() >= PARALLEL_DECODE_THRESHOLD {
1843 return decode_borrowed_clean_parallel(out, data);
1844 }
1845 let remainder = data.len() % 4;
1847 if remainder == 2 || remainder == 3 {
1848 let has_existing_padding = memchr::memchr(b'=', data).is_some();
1851 let mut padded = Vec::with_capacity(data.len() + (4 - remainder));
1852 padded.extend_from_slice(data);
1853 padded.extend(std::iter::repeat_n(b'=', 4 - remainder));
1854 let result = decode_borrowed_clean(out, &padded);
1855 if has_existing_padding && result.is_ok() {
1856 return decode_error();
1857 }
1858 return result;
1859 }
1860 let pad = data.iter().rev().take(2).filter(|&&b| b == b'=').count();
1863 let decoded_size = data.len() * 3 / 4 - pad;
1864 let mut buf: Vec<u8> = Vec::with_capacity(decoded_size);
1865 #[allow(clippy::uninit_vec)]
1866 unsafe {
1867 buf.set_len(decoded_size);
1868 }
1869 match BASE64_ENGINE.decode(data, buf[..decoded_size].as_out()) {
1870 Ok(decoded) => {
1871 out.write_all(decoded)?;
1872 Ok(())
1873 }
1874 Err(_) => decode_error(),
1875 }
1876}
1877
1878fn decode_borrowed_clean_parallel(out: &mut impl Write, data: &[u8]) -> io::Result<()> {
1882 let num_threads = num_cpus().max(1);
1883 let raw_chunk = data.len() / num_threads;
1884 let chunk_size = ((raw_chunk + 3) / 4) * 4;
1886
1887 let chunks: Vec<&[u8]> = data.chunks(chunk_size.max(4)).collect();
1888
1889 let mut offsets: Vec<usize> = Vec::with_capacity(chunks.len() + 1);
1891 offsets.push(0);
1892 let mut total_decoded = 0usize;
1893 for (i, chunk) in chunks.iter().enumerate() {
1894 let decoded_size = if i == chunks.len() - 1 {
1895 let pad = chunk.iter().rev().take(2).filter(|&&b| b == b'=').count();
1896 chunk.len() * 3 / 4 - pad
1897 } else {
1898 chunk.len() * 3 / 4
1899 };
1900 total_decoded += decoded_size;
1901 offsets.push(total_decoded);
1902 }
1903
1904 let mut output_buf: Vec<u8> = Vec::with_capacity(total_decoded);
1905 #[allow(clippy::uninit_vec)]
1906 unsafe {
1907 output_buf.set_len(total_decoded);
1908 }
1909 #[cfg(target_os = "linux")]
1910 hint_hugepage(&mut output_buf);
1911
1912 let out_addr = output_buf.as_mut_ptr() as usize;
1915 let err_flag = std::sync::atomic::AtomicBool::new(false);
1916 rayon::scope(|s| {
1917 for (i, chunk) in chunks.iter().enumerate() {
1918 let offset = offsets[i];
1919 let expected_size = offsets[i + 1] - offset;
1920 let err_flag = &err_flag;
1921 s.spawn(move |_| {
1922 if err_flag.load(std::sync::atomic::Ordering::Relaxed) {
1923 return;
1924 }
1925 let out_slice = unsafe {
1927 std::slice::from_raw_parts_mut((out_addr as *mut u8).add(offset), expected_size)
1928 };
1929 if BASE64_ENGINE.decode(chunk, out_slice.as_out()).is_err() {
1930 err_flag.store(true, std::sync::atomic::Ordering::Relaxed);
1931 }
1932 });
1933 }
1934 });
1935
1936 if err_flag.load(std::sync::atomic::Ordering::Relaxed) {
1937 return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid input"));
1938 }
1939
1940 out.write_all(&output_buf[..total_decoded])
1941}
1942
1943fn strip_non_base64(data: &[u8]) -> Vec<u8> {
1945 data.iter()
1946 .copied()
1947 .filter(|&b| is_base64_char(b))
1948 .collect()
1949}
1950
1951#[inline]
1953fn is_base64_char(b: u8) -> bool {
1954 b.is_ascii_alphanumeric() || b == b'+' || b == b'/' || b == b'='
1955}
1956
1957pub fn encode_stream(
1960 reader: &mut impl Read,
1961 wrap_col: usize,
1962 writer: &mut impl Write,
1963) -> io::Result<()> {
1964 if wrap_col == 0 {
1965 return encode_stream_nowrap(reader, writer);
1966 }
1967 encode_stream_wrapped(reader, wrap_col, writer)
1968}
1969
1970fn encode_stream_nowrap(reader: &mut impl Read, writer: &mut impl Write) -> io::Result<()> {
1975 const NOWRAP_READ: usize = 24 * 1024 * 1024; let mut buf: Vec<u8> = Vec::with_capacity(NOWRAP_READ);
1981 #[allow(clippy::uninit_vec)]
1982 unsafe {
1983 buf.set_len(NOWRAP_READ);
1984 }
1985 let encode_buf_size = BASE64_ENGINE.encoded_length(NOWRAP_READ);
1986 let mut encode_buf: Vec<u8> = Vec::with_capacity(encode_buf_size);
1987 #[allow(clippy::uninit_vec)]
1988 unsafe {
1989 encode_buf.set_len(encode_buf_size);
1990 }
1991
1992 loop {
1993 let n = read_full(reader, &mut buf)?;
1994 if n == 0 {
1995 break;
1996 }
1997 let enc_len = BASE64_ENGINE.encoded_length(n);
1998 let encoded = BASE64_ENGINE.encode(&buf[..n], encode_buf[..enc_len].as_out());
1999 writer.write_all(encoded)?;
2000 }
2001 Ok(())
2002}
2003
2004fn encode_stream_wrapped(
2012 reader: &mut impl Read,
2013 wrap_col: usize,
2014 writer: &mut impl Write,
2015) -> io::Result<()> {
2016 let bytes_per_line = wrap_col * 3 / 4;
2017 if bytes_per_line > 0 && bytes_per_line.is_multiple_of(3) {
2021 return encode_stream_wrapped_fused(reader, wrap_col, bytes_per_line, writer);
2022 }
2023
2024 const STREAM_READ: usize = 12 * 1024 * 1024;
2026 let mut buf: Vec<u8> = Vec::with_capacity(STREAM_READ);
2027 #[allow(clippy::uninit_vec)]
2028 unsafe {
2029 buf.set_len(STREAM_READ);
2030 }
2031 let encode_buf_size = BASE64_ENGINE.encoded_length(STREAM_READ);
2032 let mut encode_buf: Vec<u8> = Vec::with_capacity(encode_buf_size);
2033 #[allow(clippy::uninit_vec)]
2034 unsafe {
2035 encode_buf.set_len(encode_buf_size);
2036 }
2037
2038 let mut col = 0usize;
2039
2040 loop {
2041 let n = read_full(reader, &mut buf)?;
2042 if n == 0 {
2043 break;
2044 }
2045 let enc_len = BASE64_ENGINE.encoded_length(n);
2046 let encoded = BASE64_ENGINE.encode(&buf[..n], encode_buf[..enc_len].as_out());
2047
2048 write_wrapped_iov_streaming(encoded, wrap_col, &mut col, writer)?;
2049 }
2050
2051 if col > 0 {
2052 writer.write_all(b"\n")?;
2053 }
2054
2055 Ok(())
2056}
2057
2058fn encode_stream_wrapped_fused(
2064 reader: &mut impl Read,
2065 wrap_col: usize,
2066 bytes_per_line: usize,
2067 writer: &mut impl Write,
2068) -> io::Result<()> {
2069 let lines_per_chunk = (24 * 1024 * 1024) / bytes_per_line;
2072 let read_size = lines_per_chunk * bytes_per_line;
2073 let line_out = wrap_col + 1; let mut buf: Vec<u8> = Vec::with_capacity(read_size);
2078 #[allow(clippy::uninit_vec)]
2079 unsafe {
2080 buf.set_len(read_size);
2081 }
2082 let max_output = lines_per_chunk * line_out + BASE64_ENGINE.encoded_length(bytes_per_line) + 2;
2084 let mut out_buf: Vec<u8> = Vec::with_capacity(max_output);
2085 #[allow(clippy::uninit_vec)]
2086 unsafe {
2087 out_buf.set_len(max_output);
2088 }
2089
2090 loop {
2091 let n = read_full(reader, &mut buf)?;
2092 if n == 0 {
2093 break;
2094 }
2095
2096 let full_lines = n / bytes_per_line;
2097 let remainder = n % bytes_per_line;
2098
2099 let dst = out_buf.as_mut_ptr();
2103 let mut line_idx = 0;
2104
2105 while line_idx + 4 <= full_lines {
2107 let in_base = line_idx * bytes_per_line;
2108 let out_base = line_idx * line_out;
2109 unsafe {
2110 let s0 = std::slice::from_raw_parts_mut(dst.add(out_base), wrap_col);
2111 let _ = BASE64_ENGINE.encode(&buf[in_base..in_base + bytes_per_line], s0.as_out());
2112 *dst.add(out_base + wrap_col) = b'\n';
2113
2114 let s1 = std::slice::from_raw_parts_mut(dst.add(out_base + line_out), wrap_col);
2115 let _ = BASE64_ENGINE.encode(
2116 &buf[in_base + bytes_per_line..in_base + 2 * bytes_per_line],
2117 s1.as_out(),
2118 );
2119 *dst.add(out_base + line_out + wrap_col) = b'\n';
2120
2121 let s2 = std::slice::from_raw_parts_mut(dst.add(out_base + 2 * line_out), wrap_col);
2122 let _ = BASE64_ENGINE.encode(
2123 &buf[in_base + 2 * bytes_per_line..in_base + 3 * bytes_per_line],
2124 s2.as_out(),
2125 );
2126 *dst.add(out_base + 2 * line_out + wrap_col) = b'\n';
2127
2128 let s3 = std::slice::from_raw_parts_mut(dst.add(out_base + 3 * line_out), wrap_col);
2129 let _ = BASE64_ENGINE.encode(
2130 &buf[in_base + 3 * bytes_per_line..in_base + 4 * bytes_per_line],
2131 s3.as_out(),
2132 );
2133 *dst.add(out_base + 3 * line_out + wrap_col) = b'\n';
2134 }
2135 line_idx += 4;
2136 }
2137
2138 while line_idx < full_lines {
2140 let in_base = line_idx * bytes_per_line;
2141 let out_base = line_idx * line_out;
2142 unsafe {
2143 let s = std::slice::from_raw_parts_mut(dst.add(out_base), wrap_col);
2144 let _ = BASE64_ENGINE.encode(&buf[in_base..in_base + bytes_per_line], s.as_out());
2145 *dst.add(out_base + wrap_col) = b'\n';
2146 }
2147 line_idx += 1;
2148 }
2149
2150 let mut wp = full_lines * line_out;
2151
2152 if remainder > 0 {
2154 let enc_len = BASE64_ENGINE.encoded_length(remainder);
2155 let line_input = &buf[full_lines * bytes_per_line..n];
2156 unsafe {
2157 let s = std::slice::from_raw_parts_mut(dst.add(wp), enc_len);
2158 let _ = BASE64_ENGINE.encode(line_input, s.as_out());
2159 *dst.add(wp + enc_len) = b'\n';
2160 }
2161 wp += enc_len + 1;
2162 }
2163
2164 writer.write_all(&out_buf[..wp])?;
2165 }
2166
2167 Ok(())
2168}
2169
2170pub fn decode_stream(
2177 reader: &mut impl Read,
2178 ignore_garbage: bool,
2179 writer: &mut impl Write,
2180) -> io::Result<()> {
2181 const READ_CHUNK: usize = 32 * 1024 * 1024;
2182 let mut buf: Vec<u8> = Vec::with_capacity(READ_CHUNK + 4);
2185 #[allow(clippy::uninit_vec)]
2186 unsafe {
2187 buf.set_len(READ_CHUNK + 4);
2188 }
2189 let mut carry = [0u8; 4];
2190 let mut carry_len = 0usize;
2191
2192 loop {
2193 if carry_len > 0 {
2195 unsafe {
2196 std::ptr::copy_nonoverlapping(carry.as_ptr(), buf.as_mut_ptr(), carry_len);
2197 }
2198 }
2199 let n = read_full(reader, &mut buf[carry_len..carry_len + READ_CHUNK])?;
2200 if n == 0 {
2201 break;
2202 }
2203 let total_raw = carry_len + n;
2204
2205 let clean_len = if ignore_garbage {
2208 let ptr = buf.as_mut_ptr();
2210 let mut wp = 0usize;
2211 for i in 0..total_raw {
2212 let b = unsafe { *ptr.add(i) };
2213 if is_base64_char(b) {
2214 unsafe { *ptr.add(wp) = b };
2215 wp += 1;
2216 }
2217 }
2218 wp
2219 } else {
2220 let ptr = buf.as_mut_ptr();
2224 let data = &buf[..total_raw];
2225 let mut wp = 0usize;
2226 let mut gap_start = 0usize;
2227 let mut has_rare_ws = false;
2228
2229 for pos in memchr::memchr2_iter(b'\n', b'\r', data) {
2230 let gap_len = pos - gap_start;
2231 if gap_len > 0 {
2232 if !has_rare_ws {
2233 has_rare_ws = data[gap_start..pos]
2234 .iter()
2235 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
2236 }
2237 if wp != gap_start {
2238 unsafe {
2239 std::ptr::copy(ptr.add(gap_start), ptr.add(wp), gap_len);
2240 }
2241 }
2242 wp += gap_len;
2243 }
2244 gap_start = pos + 1;
2245 }
2246 let tail_len = total_raw - gap_start;
2247 if tail_len > 0 {
2248 if !has_rare_ws {
2249 has_rare_ws = data[gap_start..total_raw]
2250 .iter()
2251 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
2252 }
2253 if wp != gap_start {
2254 unsafe {
2255 std::ptr::copy(ptr.add(gap_start), ptr.add(wp), tail_len);
2256 }
2257 }
2258 wp += tail_len;
2259 }
2260
2261 if has_rare_ws {
2263 let mut rp = 0;
2264 let mut cwp = 0;
2265 while rp < wp {
2266 let b = unsafe { *ptr.add(rp) };
2267 if NOT_WHITESPACE[b as usize] {
2268 unsafe { *ptr.add(cwp) = b };
2269 cwp += 1;
2270 }
2271 rp += 1;
2272 }
2273 cwp
2274 } else {
2275 wp
2276 }
2277 };
2278
2279 carry_len = 0;
2280 let is_last = n < READ_CHUNK;
2281
2282 if is_last {
2283 decode_clean_slice(&mut buf[..clean_len], writer)?;
2285 } else {
2286 let decode_len = (clean_len / 4) * 4;
2288 let leftover = clean_len - decode_len;
2289 if leftover > 0 {
2290 unsafe {
2291 std::ptr::copy_nonoverlapping(
2292 buf.as_ptr().add(decode_len),
2293 carry.as_mut_ptr(),
2294 leftover,
2295 );
2296 }
2297 carry_len = leftover;
2298 }
2299 if decode_len > 0 {
2300 decode_clean_slice(&mut buf[..decode_len], writer)?;
2301 }
2302 }
2303 }
2304
2305 if carry_len > 0 {
2307 let mut carry_buf = carry[..carry_len].to_vec();
2308 decode_clean_slice(&mut carry_buf, writer)?;
2309 }
2310
2311 Ok(())
2312}
2313
2314#[inline(always)]
2318fn write_all_vectored(out: &mut impl Write, slices: &[io::IoSlice]) -> io::Result<()> {
2319 if slices.is_empty() {
2320 return Ok(());
2321 }
2322 let total: usize = slices.iter().map(|s| s.len()).sum();
2323 let written = out.write_vectored(slices)?;
2324 if written >= total {
2325 return Ok(());
2326 }
2327 if written == 0 {
2328 return Err(io::Error::new(io::ErrorKind::WriteZero, "write zero"));
2329 }
2330 write_all_vectored_slow(out, slices, written)
2331}
2332
2333#[cold]
2335#[inline(never)]
2336fn write_all_vectored_slow(
2337 out: &mut impl Write,
2338 slices: &[io::IoSlice],
2339 mut skip: usize,
2340) -> io::Result<()> {
2341 for slice in slices {
2342 let len = slice.len();
2343 if skip >= len {
2344 skip -= len;
2345 continue;
2346 }
2347 out.write_all(&slice[skip..])?;
2348 skip = 0;
2349 }
2350 Ok(())
2351}
2352
2353#[inline]
2357fn read_full(reader: &mut impl Read, buf: &mut [u8]) -> io::Result<usize> {
2358 let n = reader.read(buf)?;
2360 if n == buf.len() || n == 0 {
2361 return Ok(n);
2362 }
2363 let mut total = n;
2365 while total < buf.len() {
2366 match reader.read(&mut buf[total..]) {
2367 Ok(0) => break,
2368 Ok(n) => total += n,
2369 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
2370 Err(e) => return Err(e),
2371 }
2372 }
2373 Ok(total)
2374}