1use std::io::{self, Read, Write};
2
3use base64_simd::AsOut;
4
5const BASE64_ENGINE: &base64_simd::Base64 = &base64_simd::STANDARD;
6
7#[inline]
11fn num_cpus() -> usize {
12 std::thread::available_parallelism()
13 .map(|n| n.get())
14 .unwrap_or(1)
15}
16
17const NOWRAP_CHUNK: usize = 8 * 1024 * 1024 - (8 * 1024 * 1024 % 3);
21
22const PARALLEL_NOWRAP_THRESHOLD: usize = 16 * 1024 * 1024;
29
30const PARALLEL_WRAPPED_THRESHOLD: usize = 12 * 1024 * 1024;
35
36const PARALLEL_DECODE_THRESHOLD: usize = 1024 * 1024;
41
42#[cfg(target_os = "linux")]
46fn hint_hugepage(buf: &mut Vec<u8>) {
47 if buf.capacity() >= 2 * 1024 * 1024 {
48 unsafe {
49 libc::madvise(
50 buf.as_mut_ptr() as *mut libc::c_void,
51 buf.capacity(),
52 libc::MADV_HUGEPAGE,
53 );
54 }
55 }
56}
57
58pub fn encode_to_writer(data: &[u8], wrap_col: usize, out: &mut impl Write) -> io::Result<()> {
61 if data.is_empty() {
62 return Ok(());
63 }
64
65 if wrap_col == 0 {
66 return encode_no_wrap(data, out);
67 }
68
69 encode_wrapped(data, wrap_col, out)
70}
71
72fn encode_no_wrap(data: &[u8], out: &mut impl Write) -> io::Result<()> {
74 if data.len() >= PARALLEL_NOWRAP_THRESHOLD && num_cpus() > 1 {
75 return encode_no_wrap_parallel(data, out);
76 }
77
78 let enc_len = BASE64_ENGINE.encoded_length(data.len().min(NOWRAP_CHUNK));
81 let mut buf: Vec<u8> = Vec::with_capacity(enc_len);
82 #[allow(clippy::uninit_vec)]
83 unsafe {
84 buf.set_len(enc_len);
85 }
86
87 for chunk in data.chunks(NOWRAP_CHUNK) {
88 let clen = BASE64_ENGINE.encoded_length(chunk.len());
89 let encoded = BASE64_ENGINE.encode(chunk, buf[..clen].as_out());
90 out.write_all(encoded)?;
91 }
92 Ok(())
93}
94
95fn encode_no_wrap_parallel(data: &[u8], out: &mut impl Write) -> io::Result<()> {
100 let num_threads = num_cpus().max(1);
101 let raw_chunk = data.len() / num_threads;
102 let chunk_size = ((raw_chunk + 2) / 3) * 3;
104
105 let chunks: Vec<&[u8]> = data.chunks(chunk_size.max(3)).collect();
107
108 let mut offsets: Vec<usize> = Vec::with_capacity(chunks.len() + 1);
110 let mut total_out = 0usize;
111 for chunk in &chunks {
112 offsets.push(total_out);
113 total_out += BASE64_ENGINE.encoded_length(chunk.len());
114 }
115
116 let mut output: Vec<u8> = Vec::with_capacity(total_out);
118 #[allow(clippy::uninit_vec)]
119 unsafe {
120 output.set_len(total_out);
121 }
122 #[cfg(target_os = "linux")]
123 hint_hugepage(&mut output);
124
125 let output_base = output.as_mut_ptr() as usize;
127 rayon::scope(|s| {
128 for (i, chunk) in chunks.iter().enumerate() {
129 let out_off = offsets[i];
130 let enc_len = BASE64_ENGINE.encoded_length(chunk.len());
131 let base = output_base;
132 s.spawn(move |_| {
133 let dest =
134 unsafe { std::slice::from_raw_parts_mut((base + out_off) as *mut u8, enc_len) };
135 let _ = BASE64_ENGINE.encode(chunk, dest.as_out());
136 });
137 }
138 });
139
140 out.write_all(&output[..total_out])
141}
142
143fn encode_wrapped(data: &[u8], wrap_col: usize, out: &mut impl Write) -> io::Result<()> {
152 let bytes_per_line = wrap_col * 3 / 4;
153 if bytes_per_line == 0 {
154 return encode_wrapped_small(data, wrap_col, out);
155 }
156
157 if data.len() >= PARALLEL_WRAPPED_THRESHOLD && bytes_per_line.is_multiple_of(3) {
158 return encode_wrapped_parallel(data, wrap_col, bytes_per_line, out);
159 }
160
161 if bytes_per_line.is_multiple_of(3) {
162 return encode_wrapped_expand(data, wrap_col, bytes_per_line, out);
163 }
164
165 let enc_max = BASE64_ENGINE.encoded_length(data.len());
167 let num_full = enc_max / wrap_col;
168 let rem = enc_max % wrap_col;
169 let out_len = num_full * (wrap_col + 1) + if rem > 0 { rem + 1 } else { 0 };
170
171 let mut enc_buf: Vec<u8> = Vec::with_capacity(enc_max);
173 #[allow(clippy::uninit_vec)]
174 unsafe {
175 enc_buf.set_len(enc_max);
176 }
177 let _ = BASE64_ENGINE.encode(data, enc_buf[..enc_max].as_out());
178
179 let mut out_buf: Vec<u8> = Vec::with_capacity(out_len);
180 #[allow(clippy::uninit_vec)]
181 unsafe {
182 out_buf.set_len(out_len);
183 }
184 let n = fuse_wrap(&enc_buf, wrap_col, &mut out_buf);
185 out.write_all(&out_buf[..n])
186}
187
188fn encode_wrapped_expand(
194 data: &[u8],
195 wrap_col: usize,
196 bytes_per_line: usize,
197 out: &mut impl Write,
198) -> io::Result<()> {
199 debug_assert!(bytes_per_line.is_multiple_of(3));
200 let enc_len = BASE64_ENGINE.encoded_length(data.len());
201 if enc_len == 0 {
202 return Ok(());
203 }
204
205 let num_full = enc_len / wrap_col;
206 let rem = enc_len % wrap_col;
207 let out_len = num_full * (wrap_col + 1) + if rem > 0 { rem + 1 } else { 0 };
208
209 let mut buf: Vec<u8> = Vec::with_capacity(out_len);
213 #[allow(clippy::uninit_vec)]
214 unsafe {
215 buf.set_len(out_len);
216 }
217 #[cfg(target_os = "linux")]
218 hint_hugepage(&mut buf);
219
220 let encoded = BASE64_ENGINE.encode(data, buf[..enc_len].as_out());
222 debug_assert_eq!(encoded.len(), enc_len, "encode wrote unexpected length");
223
224 expand_backward(buf.as_mut_ptr(), enc_len, out_len, wrap_col);
226
227 out.write_all(&buf[..out_len])
228}
229
230#[allow(dead_code)]
242fn encode_wrapped_scatter(
243 data: &[u8],
244 wrap_col: usize,
245 bytes_per_line: usize,
246 out: &mut impl Write,
247) -> io::Result<()> {
248 let enc_len = BASE64_ENGINE.encoded_length(data.len());
249 if enc_len == 0 {
250 return Ok(());
251 }
252
253 let num_full = enc_len / wrap_col;
254 let rem = enc_len % wrap_col;
255 let out_len = num_full * (wrap_col + 1) + if rem > 0 { rem + 1 } else { 0 };
256
257 let mut buf: Vec<u8> = Vec::with_capacity(out_len);
259 #[allow(clippy::uninit_vec)]
260 unsafe {
261 buf.set_len(out_len);
262 }
263 #[cfg(target_os = "linux")]
264 hint_hugepage(&mut buf);
265
266 const GROUP_LINES: usize = 256;
269 let group_input = GROUP_LINES * bytes_per_line;
270 let temp_size = GROUP_LINES * wrap_col;
271 let mut temp: Vec<u8> = Vec::with_capacity(temp_size);
272 #[allow(clippy::uninit_vec)]
273 unsafe {
274 temp.set_len(temp_size);
275 }
276
277 let line_out = wrap_col + 1;
278 let mut wp = 0usize; for chunk in data.chunks(group_input) {
281 let clen = BASE64_ENGINE.encoded_length(chunk.len());
282 let _ = BASE64_ENGINE.encode(chunk, temp[..clen].as_out());
283
284 let lines = clen / wrap_col;
286 let chunk_rem = clen % wrap_col;
287
288 let mut i = 0;
290 while i + 8 <= lines {
291 unsafe {
292 let src = temp.as_ptr().add(i * wrap_col);
293 let dst = buf.as_mut_ptr().add(wp);
294 std::ptr::copy_nonoverlapping(src, dst, wrap_col);
295 *dst.add(wrap_col) = b'\n';
296 std::ptr::copy_nonoverlapping(src.add(wrap_col), dst.add(line_out), wrap_col);
297 *dst.add(line_out + wrap_col) = b'\n';
298 std::ptr::copy_nonoverlapping(
299 src.add(2 * wrap_col),
300 dst.add(2 * line_out),
301 wrap_col,
302 );
303 *dst.add(2 * line_out + wrap_col) = b'\n';
304 std::ptr::copy_nonoverlapping(
305 src.add(3 * wrap_col),
306 dst.add(3 * line_out),
307 wrap_col,
308 );
309 *dst.add(3 * line_out + wrap_col) = b'\n';
310 std::ptr::copy_nonoverlapping(
311 src.add(4 * wrap_col),
312 dst.add(4 * line_out),
313 wrap_col,
314 );
315 *dst.add(4 * line_out + wrap_col) = b'\n';
316 std::ptr::copy_nonoverlapping(
317 src.add(5 * wrap_col),
318 dst.add(5 * line_out),
319 wrap_col,
320 );
321 *dst.add(5 * line_out + wrap_col) = b'\n';
322 std::ptr::copy_nonoverlapping(
323 src.add(6 * wrap_col),
324 dst.add(6 * line_out),
325 wrap_col,
326 );
327 *dst.add(6 * line_out + wrap_col) = b'\n';
328 std::ptr::copy_nonoverlapping(
329 src.add(7 * wrap_col),
330 dst.add(7 * line_out),
331 wrap_col,
332 );
333 *dst.add(7 * line_out + wrap_col) = b'\n';
334 }
335 wp += 8 * line_out;
336 i += 8;
337 }
338 while i < lines {
340 unsafe {
341 std::ptr::copy_nonoverlapping(
342 temp.as_ptr().add(i * wrap_col),
343 buf.as_mut_ptr().add(wp),
344 wrap_col,
345 );
346 *buf.as_mut_ptr().add(wp + wrap_col) = b'\n';
347 }
348 wp += line_out;
349 i += 1;
350 }
351 if chunk_rem > 0 {
353 unsafe {
354 std::ptr::copy_nonoverlapping(
355 temp.as_ptr().add(lines * wrap_col),
356 buf.as_mut_ptr().add(wp),
357 chunk_rem,
358 );
359 *buf.as_mut_ptr().add(wp + chunk_rem) = b'\n';
360 }
361 wp += chunk_rem + 1;
362 }
363 }
364
365 out.write_all(&buf[..wp])
366}
367
368#[inline]
371#[allow(dead_code)]
372fn scatter_lines(
373 temp: &[u8],
374 buf: &mut [u8],
375 line_start: usize,
376 count: usize,
377 wrap_col: usize,
378 line_out: usize,
379) {
380 unsafe {
381 let src = temp.as_ptr();
382 let dst = buf.as_mut_ptr();
383 for i in 0..count {
384 let s_off = i * wrap_col;
385 let d_off = (line_start + i) * line_out;
386 std::ptr::copy_nonoverlapping(src.add(s_off), dst.add(d_off), wrap_col);
387 *dst.add(d_off + wrap_col) = b'\n';
388 }
389 }
390}
391
392#[inline]
400fn expand_backward(ptr: *mut u8, enc_len: usize, out_len: usize, wrap_col: usize) {
401 let num_full = enc_len / wrap_col;
402 let rem = enc_len % wrap_col;
403
404 unsafe {
405 let mut rp = enc_len;
406 let mut wp = out_len;
407
408 if rem > 0 {
410 wp -= 1;
411 *ptr.add(wp) = b'\n';
412 wp -= rem;
413 rp -= rem;
414 if rp != wp {
415 std::ptr::copy(ptr.add(rp), ptr.add(wp), rem);
416 }
417 }
418
419 let mut lines_left = num_full;
421 while lines_left >= 8 {
422 wp -= 1;
424 *ptr.add(wp) = b'\n';
425 rp -= wrap_col;
426 wp -= wrap_col;
427 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
428
429 wp -= 1;
430 *ptr.add(wp) = b'\n';
431 rp -= wrap_col;
432 wp -= wrap_col;
433 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
434
435 wp -= 1;
436 *ptr.add(wp) = b'\n';
437 rp -= wrap_col;
438 wp -= wrap_col;
439 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
440
441 wp -= 1;
442 *ptr.add(wp) = b'\n';
443 rp -= wrap_col;
444 wp -= wrap_col;
445 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
446
447 wp -= 1;
448 *ptr.add(wp) = b'\n';
449 rp -= wrap_col;
450 wp -= wrap_col;
451 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
452
453 wp -= 1;
454 *ptr.add(wp) = b'\n';
455 rp -= wrap_col;
456 wp -= wrap_col;
457 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
458
459 wp -= 1;
460 *ptr.add(wp) = b'\n';
461 rp -= wrap_col;
462 wp -= wrap_col;
463 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
464
465 wp -= 1;
466 *ptr.add(wp) = b'\n';
467 rp -= wrap_col;
468 wp -= wrap_col;
469 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
470
471 lines_left -= 8;
472 }
473
474 while lines_left > 0 {
476 wp -= 1;
477 *ptr.add(wp) = b'\n';
478 rp -= wrap_col;
479 wp -= wrap_col;
480 if rp != wp {
481 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
482 }
483 lines_left -= 1;
484 }
485 }
486}
487
488static NEWLINE: [u8; 1] = [b'\n'];
490
491#[inline]
496#[allow(dead_code)]
497fn write_wrapped_iov(encoded: &[u8], wrap_col: usize, out: &mut impl Write) -> io::Result<()> {
498 const MAX_IOV: usize = 1024;
501
502 let num_full_lines = encoded.len() / wrap_col;
503 let remainder = encoded.len() % wrap_col;
504 let total_iov = num_full_lines * 2 + if remainder > 0 { 2 } else { 0 };
505
506 if total_iov <= MAX_IOV {
508 let mut iov: Vec<io::IoSlice> = Vec::with_capacity(total_iov);
509 let mut pos = 0;
510 for _ in 0..num_full_lines {
511 iov.push(io::IoSlice::new(&encoded[pos..pos + wrap_col]));
512 iov.push(io::IoSlice::new(&NEWLINE));
513 pos += wrap_col;
514 }
515 if remainder > 0 {
516 iov.push(io::IoSlice::new(&encoded[pos..pos + remainder]));
517 iov.push(io::IoSlice::new(&NEWLINE));
518 }
519 return write_all_vectored(out, &iov);
520 }
521
522 let line_out = wrap_col + 1;
529 const BATCH_LINES: usize = 512;
530 let batch_fused_size = BATCH_LINES * line_out;
531 let mut fused: Vec<u8> = Vec::with_capacity(batch_fused_size);
532 #[allow(clippy::uninit_vec)]
533 unsafe {
534 fused.set_len(batch_fused_size);
535 }
536
537 let mut rp = 0;
538 let mut lines_done = 0;
539
540 while lines_done + BATCH_LINES <= num_full_lines {
542 let n = fuse_wrap(
543 &encoded[rp..rp + BATCH_LINES * wrap_col],
544 wrap_col,
545 &mut fused,
546 );
547 out.write_all(&fused[..n])?;
548 rp += BATCH_LINES * wrap_col;
549 lines_done += BATCH_LINES;
550 }
551
552 let remaining_lines = num_full_lines - lines_done;
554 if remaining_lines > 0 {
555 let n = fuse_wrap(
556 &encoded[rp..rp + remaining_lines * wrap_col],
557 wrap_col,
558 &mut fused,
559 );
560 out.write_all(&fused[..n])?;
561 rp += remaining_lines * wrap_col;
562 }
563
564 if remainder > 0 {
566 out.write_all(&encoded[rp..rp + remainder])?;
567 out.write_all(b"\n")?;
568 }
569 Ok(())
570}
571
572#[inline]
576fn write_wrapped_iov_streaming(
577 encoded: &[u8],
578 wrap_col: usize,
579 col: &mut usize,
580 out: &mut impl Write,
581) -> io::Result<()> {
582 const MAX_IOV: usize = 1024;
583 let mut iov: Vec<io::IoSlice> = Vec::with_capacity(MAX_IOV);
584 let mut rp = 0;
585
586 while rp < encoded.len() {
587 let space = wrap_col - *col;
588 let avail = encoded.len() - rp;
589
590 if avail <= space {
591 iov.push(io::IoSlice::new(&encoded[rp..rp + avail]));
593 *col += avail;
594 if *col == wrap_col {
595 iov.push(io::IoSlice::new(&NEWLINE));
596 *col = 0;
597 }
598 break;
599 } else {
600 iov.push(io::IoSlice::new(&encoded[rp..rp + space]));
602 iov.push(io::IoSlice::new(&NEWLINE));
603 rp += space;
604 *col = 0;
605 }
606
607 if iov.len() >= MAX_IOV - 1 {
608 write_all_vectored(out, &iov)?;
609 iov.clear();
610 }
611 }
612
613 if !iov.is_empty() {
614 write_all_vectored(out, &iov)?;
615 }
616 Ok(())
617}
618
619fn encode_wrapped_parallel(
625 data: &[u8],
626 wrap_col: usize,
627 bytes_per_line: usize,
628 out: &mut impl Write,
629) -> io::Result<()> {
630 let num_threads = num_cpus().max(1);
631 let lines_per_chunk = ((data.len() / bytes_per_line) / num_threads).max(1);
632 let chunk_input = lines_per_chunk * bytes_per_line;
633
634 let chunks: Vec<&[u8]> = data.chunks(chunk_input.max(bytes_per_line)).collect();
636
637 let mut offsets: Vec<usize> = Vec::with_capacity(chunks.len() + 1);
639 let mut total_out = 0usize;
640 for chunk in &chunks {
641 offsets.push(total_out);
642 let enc_len = BASE64_ENGINE.encoded_length(chunk.len());
643 let full_lines = enc_len / wrap_col;
644 let remainder = enc_len % wrap_col;
645 total_out += full_lines * (wrap_col + 1) + if remainder > 0 { remainder + 1 } else { 0 };
646 }
647
648 let mut output: Vec<u8> = Vec::with_capacity(total_out);
650 #[allow(clippy::uninit_vec)]
651 unsafe {
652 output.set_len(total_out);
653 }
654 #[cfg(target_os = "linux")]
655 hint_hugepage(&mut output);
656
657 let output_base = output.as_mut_ptr() as usize;
659 rayon::scope(|s| {
660 for (i, chunk) in chunks.iter().enumerate() {
661 let out_off = offsets[i];
662 let out_end = if i + 1 < offsets.len() {
663 offsets[i + 1]
664 } else {
665 total_out
666 };
667 let out_size = out_end - out_off;
668 let base = output_base;
669 s.spawn(move |_| {
670 let out_slice = unsafe {
671 std::slice::from_raw_parts_mut((base + out_off) as *mut u8, out_size)
672 };
673 encode_chunk_l1_scatter_into(chunk, out_slice, wrap_col, bytes_per_line);
674 });
675 }
676 });
677
678 out.write_all(&output[..total_out])
679}
680
681fn encode_chunk_l1_scatter_into(
685 data: &[u8],
686 output: &mut [u8],
687 wrap_col: usize,
688 bytes_per_line: usize,
689) {
690 const GROUP_LINES: usize = 256;
691 let group_input = GROUP_LINES * bytes_per_line;
692 let temp_size = GROUP_LINES * wrap_col;
693 let mut temp: Vec<u8> = Vec::with_capacity(temp_size);
694 #[allow(clippy::uninit_vec)]
695 unsafe {
696 temp.set_len(temp_size);
697 }
698
699 let line_out = wrap_col + 1;
700 let mut wp = 0usize;
701
702 for chunk in data.chunks(group_input) {
703 let clen = BASE64_ENGINE.encoded_length(chunk.len());
704 let _ = BASE64_ENGINE.encode(chunk, temp[..clen].as_out());
705
706 let lines = clen / wrap_col;
707 let chunk_rem = clen % wrap_col;
708
709 let mut i = 0;
711 while i + 8 <= lines {
712 unsafe {
713 let src = temp.as_ptr().add(i * wrap_col);
714 let dst = output.as_mut_ptr().add(wp);
715 std::ptr::copy_nonoverlapping(src, dst, wrap_col);
716 *dst.add(wrap_col) = b'\n';
717 std::ptr::copy_nonoverlapping(src.add(wrap_col), dst.add(line_out), wrap_col);
718 *dst.add(line_out + wrap_col) = b'\n';
719 std::ptr::copy_nonoverlapping(
720 src.add(2 * wrap_col),
721 dst.add(2 * line_out),
722 wrap_col,
723 );
724 *dst.add(2 * line_out + wrap_col) = b'\n';
725 std::ptr::copy_nonoverlapping(
726 src.add(3 * wrap_col),
727 dst.add(3 * line_out),
728 wrap_col,
729 );
730 *dst.add(3 * line_out + wrap_col) = b'\n';
731 std::ptr::copy_nonoverlapping(
732 src.add(4 * wrap_col),
733 dst.add(4 * line_out),
734 wrap_col,
735 );
736 *dst.add(4 * line_out + wrap_col) = b'\n';
737 std::ptr::copy_nonoverlapping(
738 src.add(5 * wrap_col),
739 dst.add(5 * line_out),
740 wrap_col,
741 );
742 *dst.add(5 * line_out + wrap_col) = b'\n';
743 std::ptr::copy_nonoverlapping(
744 src.add(6 * wrap_col),
745 dst.add(6 * line_out),
746 wrap_col,
747 );
748 *dst.add(6 * line_out + wrap_col) = b'\n';
749 std::ptr::copy_nonoverlapping(
750 src.add(7 * wrap_col),
751 dst.add(7 * line_out),
752 wrap_col,
753 );
754 *dst.add(7 * line_out + wrap_col) = b'\n';
755 }
756 wp += 8 * line_out;
757 i += 8;
758 }
759 while i < lines {
760 unsafe {
761 std::ptr::copy_nonoverlapping(
762 temp.as_ptr().add(i * wrap_col),
763 output.as_mut_ptr().add(wp),
764 wrap_col,
765 );
766 *output.as_mut_ptr().add(wp + wrap_col) = b'\n';
767 }
768 wp += line_out;
769 i += 1;
770 }
771 if chunk_rem > 0 {
772 unsafe {
773 std::ptr::copy_nonoverlapping(
774 temp.as_ptr().add(lines * wrap_col),
775 output.as_mut_ptr().add(wp),
776 chunk_rem,
777 );
778 *output.as_mut_ptr().add(wp + chunk_rem) = b'\n';
779 }
780 wp += chunk_rem + 1;
781 }
782 }
783}
784
785#[inline]
789fn fuse_wrap(encoded: &[u8], wrap_col: usize, out_buf: &mut [u8]) -> usize {
790 let line_out = wrap_col + 1; let mut rp = 0;
792 let mut wp = 0;
793
794 while rp + 8 * wrap_col <= encoded.len() {
796 unsafe {
797 let src = encoded.as_ptr().add(rp);
798 let dst = out_buf.as_mut_ptr().add(wp);
799
800 std::ptr::copy_nonoverlapping(src, dst, wrap_col);
801 *dst.add(wrap_col) = b'\n';
802
803 std::ptr::copy_nonoverlapping(src.add(wrap_col), dst.add(line_out), wrap_col);
804 *dst.add(line_out + wrap_col) = b'\n';
805
806 std::ptr::copy_nonoverlapping(src.add(2 * wrap_col), dst.add(2 * line_out), wrap_col);
807 *dst.add(2 * line_out + wrap_col) = b'\n';
808
809 std::ptr::copy_nonoverlapping(src.add(3 * wrap_col), dst.add(3 * line_out), wrap_col);
810 *dst.add(3 * line_out + wrap_col) = b'\n';
811
812 std::ptr::copy_nonoverlapping(src.add(4 * wrap_col), dst.add(4 * line_out), wrap_col);
813 *dst.add(4 * line_out + wrap_col) = b'\n';
814
815 std::ptr::copy_nonoverlapping(src.add(5 * wrap_col), dst.add(5 * line_out), wrap_col);
816 *dst.add(5 * line_out + wrap_col) = b'\n';
817
818 std::ptr::copy_nonoverlapping(src.add(6 * wrap_col), dst.add(6 * line_out), wrap_col);
819 *dst.add(6 * line_out + wrap_col) = b'\n';
820
821 std::ptr::copy_nonoverlapping(src.add(7 * wrap_col), dst.add(7 * line_out), wrap_col);
822 *dst.add(7 * line_out + wrap_col) = b'\n';
823 }
824 rp += 8 * wrap_col;
825 wp += 8 * line_out;
826 }
827
828 while rp + 4 * wrap_col <= encoded.len() {
830 unsafe {
831 let src = encoded.as_ptr().add(rp);
832 let dst = out_buf.as_mut_ptr().add(wp);
833
834 std::ptr::copy_nonoverlapping(src, dst, wrap_col);
835 *dst.add(wrap_col) = b'\n';
836
837 std::ptr::copy_nonoverlapping(src.add(wrap_col), dst.add(line_out), wrap_col);
838 *dst.add(line_out + wrap_col) = b'\n';
839
840 std::ptr::copy_nonoverlapping(src.add(2 * wrap_col), dst.add(2 * line_out), wrap_col);
841 *dst.add(2 * line_out + wrap_col) = b'\n';
842
843 std::ptr::copy_nonoverlapping(src.add(3 * wrap_col), dst.add(3 * line_out), wrap_col);
844 *dst.add(3 * line_out + wrap_col) = b'\n';
845 }
846 rp += 4 * wrap_col;
847 wp += 4 * line_out;
848 }
849
850 while rp + wrap_col <= encoded.len() {
852 unsafe {
853 std::ptr::copy_nonoverlapping(
854 encoded.as_ptr().add(rp),
855 out_buf.as_mut_ptr().add(wp),
856 wrap_col,
857 );
858 *out_buf.as_mut_ptr().add(wp + wrap_col) = b'\n';
859 }
860 rp += wrap_col;
861 wp += line_out;
862 }
863
864 if rp < encoded.len() {
866 let remaining = encoded.len() - rp;
867 unsafe {
868 std::ptr::copy_nonoverlapping(
869 encoded.as_ptr().add(rp),
870 out_buf.as_mut_ptr().add(wp),
871 remaining,
872 );
873 }
874 wp += remaining;
875 out_buf[wp] = b'\n';
876 wp += 1;
877 }
878
879 wp
880}
881
882fn encode_wrapped_small(data: &[u8], wrap_col: usize, out: &mut impl Write) -> io::Result<()> {
884 let enc_max = BASE64_ENGINE.encoded_length(data.len());
885 let mut buf: Vec<u8> = Vec::with_capacity(enc_max);
886 #[allow(clippy::uninit_vec)]
887 unsafe {
888 buf.set_len(enc_max);
889 }
890 let encoded = BASE64_ENGINE.encode(data, buf[..enc_max].as_out());
891
892 let wc = wrap_col.max(1);
893 for line in encoded.chunks(wc) {
894 out.write_all(line)?;
895 out.write_all(b"\n")?;
896 }
897 Ok(())
898}
899
900pub fn decode_to_writer(data: &[u8], ignore_garbage: bool, out: &mut impl Write) -> io::Result<()> {
904 if data.is_empty() {
905 return Ok(());
906 }
907
908 if ignore_garbage {
909 let mut cleaned = strip_non_base64(data);
910 return decode_clean_slice(&mut cleaned, out);
911 }
912
913 if data.len() < 512 * 1024 && data.len() >= 77 {
919 if let Some(result) = try_line_decode(data, out) {
920 return result;
921 }
922 }
923
924 decode_stripping_whitespace(data, out)
926}
927
928pub fn decode_mmap_inplace(
937 data: &mut [u8],
938 ignore_garbage: bool,
939 out: &mut impl Write,
940) -> io::Result<()> {
941 if data.is_empty() {
942 return Ok(());
943 }
944
945 if !ignore_garbage && data.len() >= 77 && data.len() < 512 * 1024 {
948 if let Some(result) = try_line_decode(data, out) {
949 return result;
950 }
951 }
952
953 if ignore_garbage {
954 let ptr = data.as_mut_ptr();
956 let len = data.len();
957 let mut wp = 0;
958 for rp in 0..len {
959 let b = unsafe { *ptr.add(rp) };
960 if is_base64_char(b) {
961 unsafe { *ptr.add(wp) = b };
962 wp += 1;
963 }
964 }
965 match BASE64_ENGINE.decode_inplace(&mut data[..wp]) {
966 Ok(decoded) => return out.write_all(decoded),
967 Err(_) => return decode_error(),
968 }
969 }
970
971 if data.len() >= 77 {
973 if let Some(result) = try_decode_uniform_lines(data, out) {
974 return result;
975 }
976 }
977
978 if memchr::memchr2(b'\n', b'\r', data).is_none() {
982 if !data
984 .iter()
985 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c)
986 {
987 match BASE64_ENGINE.decode_inplace(data) {
989 Ok(decoded) => return out.write_all(decoded),
990 Err(_) => return decode_error(),
991 }
992 }
993 let ptr = data.as_mut_ptr();
995 let len = data.len();
996 let mut wp = 0;
997 for rp in 0..len {
998 let b = unsafe { *ptr.add(rp) };
999 if NOT_WHITESPACE[b as usize] {
1000 unsafe { *ptr.add(wp) = b };
1001 wp += 1;
1002 }
1003 }
1004 match BASE64_ENGINE.decode_inplace(&mut data[..wp]) {
1005 Ok(decoded) => return out.write_all(decoded),
1006 Err(_) => return decode_error(),
1007 }
1008 }
1009
1010 let ptr = data.as_mut_ptr();
1012 let len = data.len();
1013 let mut wp = 0usize;
1014 let mut gap_start = 0usize;
1015 let mut has_rare_ws = false;
1016
1017 for pos in memchr::memchr2_iter(b'\n', b'\r', data) {
1020 let gap_len = pos - gap_start;
1021 if gap_len > 0 {
1022 if !has_rare_ws {
1023 has_rare_ws = unsafe {
1025 std::slice::from_raw_parts(ptr.add(gap_start), gap_len)
1026 .iter()
1027 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c)
1028 };
1029 }
1030 if wp != gap_start {
1031 unsafe { std::ptr::copy(ptr.add(gap_start), ptr.add(wp), gap_len) };
1032 }
1033 wp += gap_len;
1034 }
1035 gap_start = pos + 1;
1036 }
1037 let tail_len = len - gap_start;
1039 if tail_len > 0 {
1040 if !has_rare_ws {
1041 has_rare_ws = unsafe {
1042 std::slice::from_raw_parts(ptr.add(gap_start), tail_len)
1043 .iter()
1044 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c)
1045 };
1046 }
1047 if wp != gap_start {
1048 unsafe { std::ptr::copy(ptr.add(gap_start), ptr.add(wp), tail_len) };
1049 }
1050 wp += tail_len;
1051 }
1052
1053 if has_rare_ws {
1055 let mut rp = 0;
1056 let mut cwp = 0;
1057 while rp < wp {
1058 let b = unsafe { *ptr.add(rp) };
1059 if NOT_WHITESPACE[b as usize] {
1060 unsafe { *ptr.add(cwp) = b };
1061 cwp += 1;
1062 }
1063 rp += 1;
1064 }
1065 wp = cwp;
1066 }
1067
1068 if wp >= PARALLEL_DECODE_THRESHOLD {
1070 return decode_borrowed_clean_parallel(out, &data[..wp]);
1072 }
1073 match BASE64_ENGINE.decode_inplace(&mut data[..wp]) {
1074 Ok(decoded) => out.write_all(decoded),
1075 Err(_) => decode_error(),
1076 }
1077}
1078
1079pub fn decode_owned(
1081 data: &mut Vec<u8>,
1082 ignore_garbage: bool,
1083 out: &mut impl Write,
1084) -> io::Result<()> {
1085 if data.is_empty() {
1086 return Ok(());
1087 }
1088
1089 if ignore_garbage {
1090 data.retain(|&b| is_base64_char(b));
1091 } else {
1092 strip_whitespace_inplace(data);
1093 }
1094
1095 decode_clean_slice(data, out)
1096}
1097
1098fn strip_whitespace_inplace(data: &mut Vec<u8>) {
1103 if memchr::memchr2(b'\n', b'\r', data).is_none() {
1107 if data
1109 .iter()
1110 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c)
1111 {
1112 data.retain(|&b| NOT_WHITESPACE[b as usize]);
1113 }
1114 return;
1115 }
1116
1117 let ptr = data.as_mut_ptr();
1121 let len = data.len();
1122 let mut wp = 0usize;
1123 let mut gap_start = 0usize;
1124 let mut has_rare_ws = false;
1125
1126 for pos in memchr::memchr2_iter(b'\n', b'\r', data.as_slice()) {
1127 let gap_len = pos - gap_start;
1128 if gap_len > 0 {
1129 if !has_rare_ws {
1130 has_rare_ws = data[gap_start..pos]
1132 .iter()
1133 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
1134 }
1135 if wp != gap_start {
1136 unsafe {
1137 std::ptr::copy(ptr.add(gap_start), ptr.add(wp), gap_len);
1138 }
1139 }
1140 wp += gap_len;
1141 }
1142 gap_start = pos + 1;
1143 }
1144 let tail_len = len - gap_start;
1146 if tail_len > 0 {
1147 if !has_rare_ws {
1148 has_rare_ws = data[gap_start..]
1149 .iter()
1150 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
1151 }
1152 if wp != gap_start {
1153 unsafe {
1154 std::ptr::copy(ptr.add(gap_start), ptr.add(wp), tail_len);
1155 }
1156 }
1157 wp += tail_len;
1158 }
1159
1160 data.truncate(wp);
1161
1162 if has_rare_ws {
1165 let ptr = data.as_mut_ptr();
1166 let len = data.len();
1167 let mut rp = 0;
1168 let mut cwp = 0;
1169 while rp < len {
1170 let b = unsafe { *ptr.add(rp) };
1171 if NOT_WHITESPACE[b as usize] {
1172 unsafe { *ptr.add(cwp) = b };
1173 cwp += 1;
1174 }
1175 rp += 1;
1176 }
1177 data.truncate(cwp);
1178 }
1179}
1180
1181static NOT_WHITESPACE: [bool; 256] = {
1184 let mut table = [true; 256];
1185 table[b' ' as usize] = false;
1186 table[b'\t' as usize] = false;
1187 table[b'\n' as usize] = false;
1188 table[b'\r' as usize] = false;
1189 table[0x0b] = false; table[0x0c] = false; table
1192};
1193
1194fn try_decode_uniform_lines(data: &[u8], out: &mut impl Write) -> Option<io::Result<()>> {
1200 let first_nl = memchr::memchr(b'\n', data)?;
1201 let line_len = first_nl;
1202 if line_len == 0 || line_len % 4 != 0 {
1203 return None;
1204 }
1205
1206 let stride = line_len + 1;
1207
1208 let check_lines = 4.min(data.len() / stride);
1210 for i in 1..check_lines {
1211 let expected_nl = i * stride - 1;
1212 if expected_nl >= data.len() || data[expected_nl] != b'\n' {
1213 return None;
1214 }
1215 }
1216
1217 let full_lines = if data.len() >= stride {
1218 let candidate = data.len() / stride;
1219 if candidate > 0 && data[candidate * stride - 1] != b'\n' {
1220 return None;
1221 }
1222 candidate
1223 } else {
1224 0
1225 };
1226
1227 let remainder_start = full_lines * stride;
1228 let remainder = &data[remainder_start..];
1229 let rem_clean = if remainder.last() == Some(&b'\n') {
1230 &remainder[..remainder.len() - 1]
1231 } else {
1232 remainder
1233 };
1234
1235 let decoded_per_line = line_len * 3 / 4;
1237 let rem_decoded_size = if rem_clean.is_empty() {
1238 0
1239 } else {
1240 let pad = rem_clean
1241 .iter()
1242 .rev()
1243 .take(2)
1244 .filter(|&&b| b == b'=')
1245 .count();
1246 rem_clean.len() * 3 / 4 - pad
1247 };
1248 let total_decoded = full_lines * decoded_per_line + rem_decoded_size;
1249 let clean_len = full_lines * line_len;
1250
1251 if clean_len >= PARALLEL_DECODE_THRESHOLD && num_cpus() > 1 {
1255 let mut output: Vec<u8> = Vec::with_capacity(total_decoded);
1256 #[allow(clippy::uninit_vec)]
1257 unsafe {
1258 output.set_len(total_decoded);
1259 }
1260 #[cfg(target_os = "linux")]
1261 hint_hugepage(&mut output);
1262
1263 let out_ptr = output.as_mut_ptr() as usize;
1264 let src_ptr = data.as_ptr() as usize;
1265 let num_threads = num_cpus().max(1);
1266 let lines_per_thread = (full_lines + num_threads - 1) / num_threads;
1267 let lines_per_sub = (512 * 1024 / line_len).max(1);
1270
1271 let err_flag = std::sync::atomic::AtomicBool::new(false);
1272 rayon::scope(|s| {
1273 for t in 0..num_threads {
1274 let err_flag = &err_flag;
1275 s.spawn(move |_| {
1276 let start_line = t * lines_per_thread;
1277 if start_line >= full_lines {
1278 return;
1279 }
1280 let end_line = (start_line + lines_per_thread).min(full_lines);
1281 let chunk_lines = end_line - start_line;
1282
1283 let sub_buf_size = lines_per_sub.min(chunk_lines) * line_len;
1284 let mut local_buf: Vec<u8> = Vec::with_capacity(sub_buf_size);
1285 #[allow(clippy::uninit_vec)]
1286 unsafe {
1287 local_buf.set_len(sub_buf_size);
1288 }
1289
1290 let src = src_ptr as *const u8;
1291 let out_base = out_ptr as *mut u8;
1292 let local_dst = local_buf.as_mut_ptr();
1293
1294 let mut sub_start = 0usize;
1295 while sub_start < chunk_lines {
1296 if err_flag.load(std::sync::atomic::Ordering::Relaxed) {
1297 return;
1298 }
1299 let sub_count = (chunk_lines - sub_start).min(lines_per_sub);
1300 let sub_clean = sub_count * line_len;
1301
1302 for i in 0..sub_count {
1303 unsafe {
1304 std::ptr::copy_nonoverlapping(
1305 src.add((start_line + sub_start + i) * stride),
1306 local_dst.add(i * line_len),
1307 line_len,
1308 );
1309 }
1310 }
1311
1312 let out_offset = (start_line + sub_start) * decoded_per_line;
1313 let out_size = sub_count * decoded_per_line;
1314 let out_slice = unsafe {
1315 std::slice::from_raw_parts_mut(out_base.add(out_offset), out_size)
1316 };
1317 if BASE64_ENGINE
1318 .decode(&local_buf[..sub_clean], out_slice.as_out())
1319 .is_err()
1320 {
1321 err_flag.store(true, std::sync::atomic::Ordering::Relaxed);
1322 return;
1323 }
1324
1325 sub_start += sub_count;
1326 }
1327 });
1328 }
1329 });
1330 let result: Result<(), io::Error> = if err_flag.load(std::sync::atomic::Ordering::Relaxed) {
1331 Err(io::Error::new(io::ErrorKind::InvalidData, "invalid input"))
1332 } else {
1333 Ok(())
1334 };
1335
1336 if let Err(e) = result {
1337 return Some(Err(e));
1338 }
1339
1340 if !rem_clean.is_empty() {
1341 let rem_out = &mut output[full_lines * decoded_per_line..total_decoded];
1342 match BASE64_ENGINE.decode(rem_clean, rem_out.as_out()) {
1343 Ok(_) => {}
1344 Err(_) => return Some(decode_error()),
1345 }
1346 }
1347
1348 return Some(out.write_all(&output[..total_decoded]));
1349 }
1350
1351 let lines_per_sub = (256 * 1024 / line_len).max(1);
1355 let sub_buf_size = lines_per_sub * line_len;
1356 let mut local_buf: Vec<u8> = Vec::with_capacity(sub_buf_size);
1357 #[allow(clippy::uninit_vec)]
1358 unsafe {
1359 local_buf.set_len(sub_buf_size);
1360 }
1361
1362 let src = data.as_ptr();
1363 let local_dst = local_buf.as_mut_ptr();
1364
1365 let mut line_idx = 0usize;
1366 while line_idx < full_lines {
1367 let sub_count = (full_lines - line_idx).min(lines_per_sub);
1368 let sub_clean = sub_count * line_len;
1369
1370 for i in 0..sub_count {
1371 unsafe {
1372 std::ptr::copy_nonoverlapping(
1373 src.add((line_idx + i) * stride),
1374 local_dst.add(i * line_len),
1375 line_len,
1376 );
1377 }
1378 }
1379
1380 match BASE64_ENGINE.decode_inplace(&mut local_buf[..sub_clean]) {
1381 Ok(decoded) => {
1382 if let Err(e) = out.write_all(decoded) {
1383 return Some(Err(e));
1384 }
1385 }
1386 Err(_) => return Some(decode_error()),
1387 }
1388
1389 line_idx += sub_count;
1390 }
1391
1392 if !rem_clean.is_empty() {
1393 let mut rem_buf = rem_clean.to_vec();
1394 match BASE64_ENGINE.decode_inplace(&mut rem_buf) {
1395 Ok(decoded) => {
1396 if let Err(e) = out.write_all(decoded) {
1397 return Some(Err(e));
1398 }
1399 }
1400 Err(_) => return Some(decode_error()),
1401 }
1402 }
1403
1404 Some(Ok(()))
1405}
1406
1407fn decode_stripping_whitespace(data: &[u8], out: &mut impl Write) -> io::Result<()> {
1412 if data.len() >= 77 {
1416 if let Some(result) = try_decode_uniform_lines(data, out) {
1417 return result;
1418 }
1419 }
1420
1421 if memchr::memchr2(b'\n', b'\r', data).is_none() {
1424 if !data
1426 .iter()
1427 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c)
1428 {
1429 return decode_borrowed_clean(out, data);
1430 }
1431 let mut cleaned: Vec<u8> = Vec::with_capacity(data.len());
1433 for &b in data {
1434 if NOT_WHITESPACE[b as usize] {
1435 cleaned.push(b);
1436 }
1437 }
1438 return decode_clean_slice(&mut cleaned, out);
1439 }
1440
1441 let mut clean: Vec<u8> = Vec::with_capacity(data.len());
1445 let dst = clean.as_mut_ptr();
1446 let mut wp = 0usize;
1447 let mut gap_start = 0usize;
1448 let mut has_rare_ws = false;
1451
1452 for pos in memchr::memchr2_iter(b'\n', b'\r', data) {
1453 let gap_len = pos - gap_start;
1454 if gap_len > 0 {
1455 if !has_rare_ws {
1458 has_rare_ws = data[gap_start..pos]
1459 .iter()
1460 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
1461 }
1462 unsafe {
1463 std::ptr::copy_nonoverlapping(data.as_ptr().add(gap_start), dst.add(wp), gap_len);
1464 }
1465 wp += gap_len;
1466 }
1467 gap_start = pos + 1;
1468 }
1469 let tail_len = data.len() - gap_start;
1471 if tail_len > 0 {
1472 if !has_rare_ws {
1473 has_rare_ws = data[gap_start..]
1474 .iter()
1475 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
1476 }
1477 unsafe {
1478 std::ptr::copy_nonoverlapping(data.as_ptr().add(gap_start), dst.add(wp), tail_len);
1479 }
1480 wp += tail_len;
1481 }
1482 unsafe {
1483 clean.set_len(wp);
1484 }
1485
1486 if has_rare_ws {
1489 let ptr = clean.as_mut_ptr();
1490 let len = clean.len();
1491 let mut rp = 0;
1492 let mut cwp = 0;
1493 while rp < len {
1494 let b = unsafe { *ptr.add(rp) };
1495 if NOT_WHITESPACE[b as usize] {
1496 unsafe { *ptr.add(cwp) = b };
1497 cwp += 1;
1498 }
1499 rp += 1;
1500 }
1501 clean.truncate(cwp);
1502 }
1503
1504 if clean.len() >= PARALLEL_DECODE_THRESHOLD {
1507 decode_borrowed_clean_parallel(out, &clean)
1508 } else {
1509 decode_clean_slice(&mut clean, out)
1510 }
1511}
1512
1513fn try_line_decode(data: &[u8], out: &mut impl Write) -> Option<io::Result<()>> {
1521 let first_nl = memchr::memchr(b'\n', data)?;
1523 let line_len = first_nl; if line_len == 0 || line_len % 4 != 0 {
1527 return None;
1528 }
1529
1530 let line_stride = line_len + 1; let decoded_per_line = line_len * 3 / 4;
1532
1533 let check_lines = 4.min(data.len() / line_stride);
1535 for i in 1..check_lines {
1536 let expected_nl = i * line_stride - 1;
1537 if expected_nl >= data.len() {
1538 break;
1539 }
1540 if data[expected_nl] != b'\n' {
1541 return None; }
1543 }
1544
1545 let full_lines = if data.len() >= line_stride {
1547 let candidate = data.len() / line_stride;
1549 if candidate > 0 && data[candidate * line_stride - 1] != b'\n' {
1551 return None; }
1553 candidate
1554 } else {
1555 0
1556 };
1557
1558 let remainder_start = full_lines * line_stride;
1559 let remainder = &data[remainder_start..];
1560
1561 let remainder_clean_len = if remainder.is_empty() {
1563 0
1564 } else {
1565 let rem = if remainder.last() == Some(&b'\n') {
1567 &remainder[..remainder.len() - 1]
1568 } else {
1569 remainder
1570 };
1571 if rem.is_empty() {
1572 0
1573 } else {
1574 let pad = rem.iter().rev().take(2).filter(|&&b| b == b'=').count();
1576 if rem.len() % 4 != 0 {
1577 return None; }
1579 rem.len() * 3 / 4 - pad
1580 }
1581 };
1582
1583 let total_decoded = full_lines * decoded_per_line + remainder_clean_len;
1588 let mut out_buf: Vec<u8> = Vec::with_capacity(total_decoded);
1589 #[allow(clippy::uninit_vec)]
1590 unsafe {
1591 out_buf.set_len(total_decoded);
1592 }
1593
1594 let dst = out_buf.as_mut_ptr();
1595
1596 if data.len() >= PARALLEL_DECODE_THRESHOLD && full_lines >= 64 {
1600 let out_addr = dst as usize;
1601 let num_threads = num_cpus().max(1);
1602 let lines_per_chunk = (full_lines / num_threads).max(1);
1603
1604 let mut tasks: Vec<(usize, usize)> = Vec::new();
1606 let mut line_off = 0;
1607 while line_off < full_lines {
1608 let end = (line_off + lines_per_chunk).min(full_lines);
1609 tasks.push((line_off, end));
1610 line_off = end;
1611 }
1612
1613 let decode_err = std::sync::atomic::AtomicBool::new(false);
1614 rayon::scope(|s| {
1615 for &(start_line, end_line) in &tasks {
1616 let decode_err = &decode_err;
1617 s.spawn(move |_| {
1618 let out_ptr = out_addr as *mut u8;
1619 let mut i = start_line;
1620
1621 while i + 4 <= end_line {
1622 if decode_err.load(std::sync::atomic::Ordering::Relaxed) {
1623 return;
1624 }
1625 let in_base = i * line_stride;
1626 let ob = i * decoded_per_line;
1627 unsafe {
1628 let s0 =
1629 std::slice::from_raw_parts_mut(out_ptr.add(ob), decoded_per_line);
1630 if BASE64_ENGINE
1631 .decode(&data[in_base..in_base + line_len], s0.as_out())
1632 .is_err()
1633 {
1634 decode_err.store(true, std::sync::atomic::Ordering::Relaxed);
1635 return;
1636 }
1637 let s1 = std::slice::from_raw_parts_mut(
1638 out_ptr.add(ob + decoded_per_line),
1639 decoded_per_line,
1640 );
1641 if BASE64_ENGINE
1642 .decode(
1643 &data[in_base + line_stride..in_base + line_stride + line_len],
1644 s1.as_out(),
1645 )
1646 .is_err()
1647 {
1648 decode_err.store(true, std::sync::atomic::Ordering::Relaxed);
1649 return;
1650 }
1651 let s2 = std::slice::from_raw_parts_mut(
1652 out_ptr.add(ob + 2 * decoded_per_line),
1653 decoded_per_line,
1654 );
1655 if BASE64_ENGINE
1656 .decode(
1657 &data[in_base + 2 * line_stride
1658 ..in_base + 2 * line_stride + line_len],
1659 s2.as_out(),
1660 )
1661 .is_err()
1662 {
1663 decode_err.store(true, std::sync::atomic::Ordering::Relaxed);
1664 return;
1665 }
1666 let s3 = std::slice::from_raw_parts_mut(
1667 out_ptr.add(ob + 3 * decoded_per_line),
1668 decoded_per_line,
1669 );
1670 if BASE64_ENGINE
1671 .decode(
1672 &data[in_base + 3 * line_stride
1673 ..in_base + 3 * line_stride + line_len],
1674 s3.as_out(),
1675 )
1676 .is_err()
1677 {
1678 decode_err.store(true, std::sync::atomic::Ordering::Relaxed);
1679 return;
1680 }
1681 }
1682 i += 4;
1683 }
1684
1685 while i < end_line {
1686 if decode_err.load(std::sync::atomic::Ordering::Relaxed) {
1687 return;
1688 }
1689 let in_start = i * line_stride;
1690 let out_off = i * decoded_per_line;
1691 let out_slice = unsafe {
1692 std::slice::from_raw_parts_mut(out_ptr.add(out_off), decoded_per_line)
1693 };
1694 if BASE64_ENGINE
1695 .decode(&data[in_start..in_start + line_len], out_slice.as_out())
1696 .is_err()
1697 {
1698 decode_err.store(true, std::sync::atomic::Ordering::Relaxed);
1699 return;
1700 }
1701 i += 1;
1702 }
1703 });
1704 }
1705 });
1706
1707 if decode_err.load(std::sync::atomic::Ordering::Relaxed) {
1708 return Some(decode_error());
1709 }
1710 } else {
1711 let mut i = 0;
1713
1714 while i + 4 <= full_lines {
1715 let in_base = i * line_stride;
1716 let out_base = i * decoded_per_line;
1717 unsafe {
1718 let s0 = std::slice::from_raw_parts_mut(dst.add(out_base), decoded_per_line);
1719 if BASE64_ENGINE
1720 .decode(&data[in_base..in_base + line_len], s0.as_out())
1721 .is_err()
1722 {
1723 return Some(decode_error());
1724 }
1725
1726 let s1 = std::slice::from_raw_parts_mut(
1727 dst.add(out_base + decoded_per_line),
1728 decoded_per_line,
1729 );
1730 if BASE64_ENGINE
1731 .decode(
1732 &data[in_base + line_stride..in_base + line_stride + line_len],
1733 s1.as_out(),
1734 )
1735 .is_err()
1736 {
1737 return Some(decode_error());
1738 }
1739
1740 let s2 = std::slice::from_raw_parts_mut(
1741 dst.add(out_base + 2 * decoded_per_line),
1742 decoded_per_line,
1743 );
1744 if BASE64_ENGINE
1745 .decode(
1746 &data[in_base + 2 * line_stride..in_base + 2 * line_stride + line_len],
1747 s2.as_out(),
1748 )
1749 .is_err()
1750 {
1751 return Some(decode_error());
1752 }
1753
1754 let s3 = std::slice::from_raw_parts_mut(
1755 dst.add(out_base + 3 * decoded_per_line),
1756 decoded_per_line,
1757 );
1758 if BASE64_ENGINE
1759 .decode(
1760 &data[in_base + 3 * line_stride..in_base + 3 * line_stride + line_len],
1761 s3.as_out(),
1762 )
1763 .is_err()
1764 {
1765 return Some(decode_error());
1766 }
1767 }
1768 i += 4;
1769 }
1770
1771 while i < full_lines {
1772 let in_start = i * line_stride;
1773 let in_end = in_start + line_len;
1774 let out_off = i * decoded_per_line;
1775 let out_slice =
1776 unsafe { std::slice::from_raw_parts_mut(dst.add(out_off), decoded_per_line) };
1777 match BASE64_ENGINE.decode(&data[in_start..in_end], out_slice.as_out()) {
1778 Ok(_) => {}
1779 Err(_) => return Some(decode_error()),
1780 }
1781 i += 1;
1782 }
1783 }
1784
1785 if remainder_clean_len > 0 {
1787 let rem = if remainder.last() == Some(&b'\n') {
1788 &remainder[..remainder.len() - 1]
1789 } else {
1790 remainder
1791 };
1792 let out_off = full_lines * decoded_per_line;
1793 let out_slice =
1794 unsafe { std::slice::from_raw_parts_mut(dst.add(out_off), remainder_clean_len) };
1795 match BASE64_ENGINE.decode(rem, out_slice.as_out()) {
1796 Ok(_) => {}
1797 Err(_) => return Some(decode_error()),
1798 }
1799 }
1800
1801 Some(out.write_all(&out_buf[..total_decoded]))
1803}
1804
1805fn decode_clean_slice(data: &mut [u8], out: &mut impl Write) -> io::Result<()> {
1807 if data.is_empty() {
1808 return Ok(());
1809 }
1810 match BASE64_ENGINE.decode_inplace(data) {
1811 Ok(decoded) => out.write_all(decoded),
1812 Err(_) => decode_error(),
1813 }
1814}
1815
1816#[cold]
1818#[inline(never)]
1819fn decode_error() -> io::Result<()> {
1820 Err(io::Error::new(io::ErrorKind::InvalidData, "invalid input"))
1821}
1822
1823fn decode_borrowed_clean(out: &mut impl Write, data: &[u8]) -> io::Result<()> {
1825 if data.is_empty() {
1826 return Ok(());
1827 }
1828 if data.len() >= PARALLEL_DECODE_THRESHOLD {
1831 return decode_borrowed_clean_parallel(out, data);
1832 }
1833 let pad = data.iter().rev().take(2).filter(|&&b| b == b'=').count();
1836 let decoded_size = data.len() * 3 / 4 - pad;
1837 let mut buf: Vec<u8> = Vec::with_capacity(decoded_size);
1838 #[allow(clippy::uninit_vec)]
1839 unsafe {
1840 buf.set_len(decoded_size);
1841 }
1842 match BASE64_ENGINE.decode(data, buf[..decoded_size].as_out()) {
1843 Ok(decoded) => {
1844 out.write_all(decoded)?;
1845 Ok(())
1846 }
1847 Err(_) => decode_error(),
1848 }
1849}
1850
1851fn decode_borrowed_clean_parallel(out: &mut impl Write, data: &[u8]) -> io::Result<()> {
1855 let num_threads = num_cpus().max(1);
1856 let raw_chunk = data.len() / num_threads;
1857 let chunk_size = ((raw_chunk + 3) / 4) * 4;
1859
1860 let chunks: Vec<&[u8]> = data.chunks(chunk_size.max(4)).collect();
1861
1862 let mut offsets: Vec<usize> = Vec::with_capacity(chunks.len() + 1);
1864 offsets.push(0);
1865 let mut total_decoded = 0usize;
1866 for (i, chunk) in chunks.iter().enumerate() {
1867 let decoded_size = if i == chunks.len() - 1 {
1868 let pad = chunk.iter().rev().take(2).filter(|&&b| b == b'=').count();
1869 chunk.len() * 3 / 4 - pad
1870 } else {
1871 chunk.len() * 3 / 4
1872 };
1873 total_decoded += decoded_size;
1874 offsets.push(total_decoded);
1875 }
1876
1877 let mut output_buf: Vec<u8> = Vec::with_capacity(total_decoded);
1878 #[allow(clippy::uninit_vec)]
1879 unsafe {
1880 output_buf.set_len(total_decoded);
1881 }
1882 #[cfg(target_os = "linux")]
1883 hint_hugepage(&mut output_buf);
1884
1885 let out_addr = output_buf.as_mut_ptr() as usize;
1888 let err_flag = std::sync::atomic::AtomicBool::new(false);
1889 rayon::scope(|s| {
1890 for (i, chunk) in chunks.iter().enumerate() {
1891 let offset = offsets[i];
1892 let expected_size = offsets[i + 1] - offset;
1893 let err_flag = &err_flag;
1894 s.spawn(move |_| {
1895 if err_flag.load(std::sync::atomic::Ordering::Relaxed) {
1896 return;
1897 }
1898 let out_slice = unsafe {
1900 std::slice::from_raw_parts_mut((out_addr as *mut u8).add(offset), expected_size)
1901 };
1902 if BASE64_ENGINE.decode(chunk, out_slice.as_out()).is_err() {
1903 err_flag.store(true, std::sync::atomic::Ordering::Relaxed);
1904 }
1905 });
1906 }
1907 });
1908
1909 if err_flag.load(std::sync::atomic::Ordering::Relaxed) {
1910 return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid input"));
1911 }
1912
1913 out.write_all(&output_buf[..total_decoded])
1914}
1915
1916fn strip_non_base64(data: &[u8]) -> Vec<u8> {
1918 data.iter()
1919 .copied()
1920 .filter(|&b| is_base64_char(b))
1921 .collect()
1922}
1923
1924#[inline]
1926fn is_base64_char(b: u8) -> bool {
1927 b.is_ascii_alphanumeric() || b == b'+' || b == b'/' || b == b'='
1928}
1929
1930pub fn encode_stream(
1933 reader: &mut impl Read,
1934 wrap_col: usize,
1935 writer: &mut impl Write,
1936) -> io::Result<()> {
1937 if wrap_col == 0 {
1938 return encode_stream_nowrap(reader, writer);
1939 }
1940 encode_stream_wrapped(reader, wrap_col, writer)
1941}
1942
1943fn encode_stream_nowrap(reader: &mut impl Read, writer: &mut impl Write) -> io::Result<()> {
1948 const NOWRAP_READ: usize = 24 * 1024 * 1024; let mut buf: Vec<u8> = Vec::with_capacity(NOWRAP_READ);
1954 #[allow(clippy::uninit_vec)]
1955 unsafe {
1956 buf.set_len(NOWRAP_READ);
1957 }
1958 let encode_buf_size = BASE64_ENGINE.encoded_length(NOWRAP_READ);
1959 let mut encode_buf: Vec<u8> = Vec::with_capacity(encode_buf_size);
1960 #[allow(clippy::uninit_vec)]
1961 unsafe {
1962 encode_buf.set_len(encode_buf_size);
1963 }
1964
1965 loop {
1966 let n = read_full(reader, &mut buf)?;
1967 if n == 0 {
1968 break;
1969 }
1970 let enc_len = BASE64_ENGINE.encoded_length(n);
1971 let encoded = BASE64_ENGINE.encode(&buf[..n], encode_buf[..enc_len].as_out());
1972 writer.write_all(encoded)?;
1973 }
1974 Ok(())
1975}
1976
1977fn encode_stream_wrapped(
1985 reader: &mut impl Read,
1986 wrap_col: usize,
1987 writer: &mut impl Write,
1988) -> io::Result<()> {
1989 let bytes_per_line = wrap_col * 3 / 4;
1990 if bytes_per_line > 0 && bytes_per_line.is_multiple_of(3) {
1994 return encode_stream_wrapped_fused(reader, wrap_col, bytes_per_line, writer);
1995 }
1996
1997 const STREAM_READ: usize = 12 * 1024 * 1024;
1999 let mut buf: Vec<u8> = Vec::with_capacity(STREAM_READ);
2000 #[allow(clippy::uninit_vec)]
2001 unsafe {
2002 buf.set_len(STREAM_READ);
2003 }
2004 let encode_buf_size = BASE64_ENGINE.encoded_length(STREAM_READ);
2005 let mut encode_buf: Vec<u8> = Vec::with_capacity(encode_buf_size);
2006 #[allow(clippy::uninit_vec)]
2007 unsafe {
2008 encode_buf.set_len(encode_buf_size);
2009 }
2010
2011 let mut col = 0usize;
2012
2013 loop {
2014 let n = read_full(reader, &mut buf)?;
2015 if n == 0 {
2016 break;
2017 }
2018 let enc_len = BASE64_ENGINE.encoded_length(n);
2019 let encoded = BASE64_ENGINE.encode(&buf[..n], encode_buf[..enc_len].as_out());
2020
2021 write_wrapped_iov_streaming(encoded, wrap_col, &mut col, writer)?;
2022 }
2023
2024 if col > 0 {
2025 writer.write_all(b"\n")?;
2026 }
2027
2028 Ok(())
2029}
2030
2031fn encode_stream_wrapped_fused(
2037 reader: &mut impl Read,
2038 wrap_col: usize,
2039 bytes_per_line: usize,
2040 writer: &mut impl Write,
2041) -> io::Result<()> {
2042 let lines_per_chunk = (24 * 1024 * 1024) / bytes_per_line;
2045 let read_size = lines_per_chunk * bytes_per_line;
2046 let line_out = wrap_col + 1; let mut buf: Vec<u8> = Vec::with_capacity(read_size);
2051 #[allow(clippy::uninit_vec)]
2052 unsafe {
2053 buf.set_len(read_size);
2054 }
2055 let max_output = lines_per_chunk * line_out + BASE64_ENGINE.encoded_length(bytes_per_line) + 2;
2057 let mut out_buf: Vec<u8> = Vec::with_capacity(max_output);
2058 #[allow(clippy::uninit_vec)]
2059 unsafe {
2060 out_buf.set_len(max_output);
2061 }
2062
2063 loop {
2064 let n = read_full(reader, &mut buf)?;
2065 if n == 0 {
2066 break;
2067 }
2068
2069 let full_lines = n / bytes_per_line;
2070 let remainder = n % bytes_per_line;
2071
2072 let dst = out_buf.as_mut_ptr();
2076 let mut line_idx = 0;
2077
2078 while line_idx + 4 <= full_lines {
2080 let in_base = line_idx * bytes_per_line;
2081 let out_base = line_idx * line_out;
2082 unsafe {
2083 let s0 = std::slice::from_raw_parts_mut(dst.add(out_base), wrap_col);
2084 let _ = BASE64_ENGINE.encode(&buf[in_base..in_base + bytes_per_line], s0.as_out());
2085 *dst.add(out_base + wrap_col) = b'\n';
2086
2087 let s1 = std::slice::from_raw_parts_mut(dst.add(out_base + line_out), wrap_col);
2088 let _ = BASE64_ENGINE.encode(
2089 &buf[in_base + bytes_per_line..in_base + 2 * bytes_per_line],
2090 s1.as_out(),
2091 );
2092 *dst.add(out_base + line_out + wrap_col) = b'\n';
2093
2094 let s2 = std::slice::from_raw_parts_mut(dst.add(out_base + 2 * line_out), wrap_col);
2095 let _ = BASE64_ENGINE.encode(
2096 &buf[in_base + 2 * bytes_per_line..in_base + 3 * bytes_per_line],
2097 s2.as_out(),
2098 );
2099 *dst.add(out_base + 2 * line_out + wrap_col) = b'\n';
2100
2101 let s3 = std::slice::from_raw_parts_mut(dst.add(out_base + 3 * line_out), wrap_col);
2102 let _ = BASE64_ENGINE.encode(
2103 &buf[in_base + 3 * bytes_per_line..in_base + 4 * bytes_per_line],
2104 s3.as_out(),
2105 );
2106 *dst.add(out_base + 3 * line_out + wrap_col) = b'\n';
2107 }
2108 line_idx += 4;
2109 }
2110
2111 while line_idx < full_lines {
2113 let in_base = line_idx * bytes_per_line;
2114 let out_base = line_idx * line_out;
2115 unsafe {
2116 let s = std::slice::from_raw_parts_mut(dst.add(out_base), wrap_col);
2117 let _ = BASE64_ENGINE.encode(&buf[in_base..in_base + bytes_per_line], s.as_out());
2118 *dst.add(out_base + wrap_col) = b'\n';
2119 }
2120 line_idx += 1;
2121 }
2122
2123 let mut wp = full_lines * line_out;
2124
2125 if remainder > 0 {
2127 let enc_len = BASE64_ENGINE.encoded_length(remainder);
2128 let line_input = &buf[full_lines * bytes_per_line..n];
2129 unsafe {
2130 let s = std::slice::from_raw_parts_mut(dst.add(wp), enc_len);
2131 let _ = BASE64_ENGINE.encode(line_input, s.as_out());
2132 *dst.add(wp + enc_len) = b'\n';
2133 }
2134 wp += enc_len + 1;
2135 }
2136
2137 writer.write_all(&out_buf[..wp])?;
2138 }
2139
2140 Ok(())
2141}
2142
2143pub fn decode_stream(
2150 reader: &mut impl Read,
2151 ignore_garbage: bool,
2152 writer: &mut impl Write,
2153) -> io::Result<()> {
2154 const READ_CHUNK: usize = 32 * 1024 * 1024;
2155 let mut buf: Vec<u8> = Vec::with_capacity(READ_CHUNK + 4);
2158 #[allow(clippy::uninit_vec)]
2159 unsafe {
2160 buf.set_len(READ_CHUNK + 4);
2161 }
2162 let mut carry = [0u8; 4];
2163 let mut carry_len = 0usize;
2164
2165 loop {
2166 if carry_len > 0 {
2168 unsafe {
2169 std::ptr::copy_nonoverlapping(carry.as_ptr(), buf.as_mut_ptr(), carry_len);
2170 }
2171 }
2172 let n = read_full(reader, &mut buf[carry_len..carry_len + READ_CHUNK])?;
2173 if n == 0 {
2174 break;
2175 }
2176 let total_raw = carry_len + n;
2177
2178 let clean_len = if ignore_garbage {
2181 let ptr = buf.as_mut_ptr();
2183 let mut wp = 0usize;
2184 for i in 0..total_raw {
2185 let b = unsafe { *ptr.add(i) };
2186 if is_base64_char(b) {
2187 unsafe { *ptr.add(wp) = b };
2188 wp += 1;
2189 }
2190 }
2191 wp
2192 } else {
2193 let ptr = buf.as_mut_ptr();
2197 let data = &buf[..total_raw];
2198 let mut wp = 0usize;
2199 let mut gap_start = 0usize;
2200 let mut has_rare_ws = false;
2201
2202 for pos in memchr::memchr2_iter(b'\n', b'\r', data) {
2203 let gap_len = pos - gap_start;
2204 if gap_len > 0 {
2205 if !has_rare_ws {
2206 has_rare_ws = data[gap_start..pos]
2207 .iter()
2208 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
2209 }
2210 if wp != gap_start {
2211 unsafe {
2212 std::ptr::copy(ptr.add(gap_start), ptr.add(wp), gap_len);
2213 }
2214 }
2215 wp += gap_len;
2216 }
2217 gap_start = pos + 1;
2218 }
2219 let tail_len = total_raw - gap_start;
2220 if tail_len > 0 {
2221 if !has_rare_ws {
2222 has_rare_ws = data[gap_start..total_raw]
2223 .iter()
2224 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
2225 }
2226 if wp != gap_start {
2227 unsafe {
2228 std::ptr::copy(ptr.add(gap_start), ptr.add(wp), tail_len);
2229 }
2230 }
2231 wp += tail_len;
2232 }
2233
2234 if has_rare_ws {
2236 let mut rp = 0;
2237 let mut cwp = 0;
2238 while rp < wp {
2239 let b = unsafe { *ptr.add(rp) };
2240 if NOT_WHITESPACE[b as usize] {
2241 unsafe { *ptr.add(cwp) = b };
2242 cwp += 1;
2243 }
2244 rp += 1;
2245 }
2246 cwp
2247 } else {
2248 wp
2249 }
2250 };
2251
2252 carry_len = 0;
2253 let is_last = n < READ_CHUNK;
2254
2255 if is_last {
2256 decode_clean_slice(&mut buf[..clean_len], writer)?;
2258 } else {
2259 let decode_len = (clean_len / 4) * 4;
2261 let leftover = clean_len - decode_len;
2262 if leftover > 0 {
2263 unsafe {
2264 std::ptr::copy_nonoverlapping(
2265 buf.as_ptr().add(decode_len),
2266 carry.as_mut_ptr(),
2267 leftover,
2268 );
2269 }
2270 carry_len = leftover;
2271 }
2272 if decode_len > 0 {
2273 decode_clean_slice(&mut buf[..decode_len], writer)?;
2274 }
2275 }
2276 }
2277
2278 if carry_len > 0 {
2280 let mut carry_buf = carry[..carry_len].to_vec();
2281 decode_clean_slice(&mut carry_buf, writer)?;
2282 }
2283
2284 Ok(())
2285}
2286
2287#[inline(always)]
2291fn write_all_vectored(out: &mut impl Write, slices: &[io::IoSlice]) -> io::Result<()> {
2292 if slices.is_empty() {
2293 return Ok(());
2294 }
2295 let total: usize = slices.iter().map(|s| s.len()).sum();
2296 let written = out.write_vectored(slices)?;
2297 if written >= total {
2298 return Ok(());
2299 }
2300 if written == 0 {
2301 return Err(io::Error::new(io::ErrorKind::WriteZero, "write zero"));
2302 }
2303 write_all_vectored_slow(out, slices, written)
2304}
2305
2306#[cold]
2308#[inline(never)]
2309fn write_all_vectored_slow(
2310 out: &mut impl Write,
2311 slices: &[io::IoSlice],
2312 mut skip: usize,
2313) -> io::Result<()> {
2314 for slice in slices {
2315 let len = slice.len();
2316 if skip >= len {
2317 skip -= len;
2318 continue;
2319 }
2320 out.write_all(&slice[skip..])?;
2321 skip = 0;
2322 }
2323 Ok(())
2324}
2325
2326#[inline]
2330fn read_full(reader: &mut impl Read, buf: &mut [u8]) -> io::Result<usize> {
2331 let n = reader.read(buf)?;
2333 if n == buf.len() || n == 0 {
2334 return Ok(n);
2335 }
2336 let mut total = n;
2338 while total < buf.len() {
2339 match reader.read(&mut buf[total..]) {
2340 Ok(0) => break,
2341 Ok(n) => total += n,
2342 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
2343 Err(e) => return Err(e),
2344 }
2345 }
2346 Ok(total)
2347}