1use std::io::{self, Read, Write};
2
3use base64_simd::AsOut;
4
5const BASE64_ENGINE: &base64_simd::Base64 = &base64_simd::STANDARD;
6
7#[inline]
11fn num_cpus() -> usize {
12 std::thread::available_parallelism()
13 .map(|n| n.get())
14 .unwrap_or(1)
15}
16
17const NOWRAP_CHUNK: usize = 8 * 1024 * 1024 - (8 * 1024 * 1024 % 3);
21
22const PARALLEL_NOWRAP_THRESHOLD: usize = 2 * 1024 * 1024;
27
28const PARALLEL_WRAPPED_THRESHOLD: usize = 1024 * 1024;
33
34const PARALLEL_DECODE_THRESHOLD: usize = 1024 * 1024;
38
39#[cfg(target_os = "linux")]
43fn hint_hugepage(buf: &mut Vec<u8>) {
44 if buf.capacity() >= 2 * 1024 * 1024 {
45 unsafe {
46 libc::madvise(
47 buf.as_mut_ptr() as *mut libc::c_void,
48 buf.capacity(),
49 libc::MADV_HUGEPAGE,
50 );
51 }
52 }
53}
54
55pub fn encode_to_writer(data: &[u8], wrap_col: usize, out: &mut impl Write) -> io::Result<()> {
58 if data.is_empty() {
59 return Ok(());
60 }
61
62 if wrap_col == 0 {
63 return encode_no_wrap(data, out);
64 }
65
66 encode_wrapped(data, wrap_col, out)
67}
68
69fn encode_no_wrap(data: &[u8], out: &mut impl Write) -> io::Result<()> {
71 if data.len() >= PARALLEL_NOWRAP_THRESHOLD && num_cpus() > 1 {
72 return encode_no_wrap_parallel(data, out);
73 }
74
75 let enc_len = BASE64_ENGINE.encoded_length(data.len().min(NOWRAP_CHUNK));
78 let mut buf: Vec<u8> = Vec::with_capacity(enc_len);
79 #[allow(clippy::uninit_vec)]
80 unsafe {
81 buf.set_len(enc_len);
82 }
83
84 for chunk in data.chunks(NOWRAP_CHUNK) {
85 let clen = BASE64_ENGINE.encoded_length(chunk.len());
86 let encoded = BASE64_ENGINE.encode(chunk, buf[..clen].as_out());
87 out.write_all(encoded)?;
88 }
89 Ok(())
90}
91
92fn encode_no_wrap_parallel(data: &[u8], out: &mut impl Write) -> io::Result<()> {
97 let num_threads = num_cpus().max(1);
98 let raw_chunk = data.len() / num_threads;
99 let chunk_size = ((raw_chunk + 2) / 3) * 3;
101
102 let chunks: Vec<&[u8]> = data.chunks(chunk_size.max(3)).collect();
104
105 let mut offsets: Vec<usize> = Vec::with_capacity(chunks.len() + 1);
107 let mut total_out = 0usize;
108 for chunk in &chunks {
109 offsets.push(total_out);
110 total_out += BASE64_ENGINE.encoded_length(chunk.len());
111 }
112
113 let mut output: Vec<u8> = Vec::with_capacity(total_out);
115 #[allow(clippy::uninit_vec)]
116 unsafe {
117 output.set_len(total_out);
118 }
119 #[cfg(target_os = "linux")]
120 hint_hugepage(&mut output);
121
122 let output_base = output.as_mut_ptr() as usize;
124 rayon::scope(|s| {
125 for (i, chunk) in chunks.iter().enumerate() {
126 let out_off = offsets[i];
127 let enc_len = BASE64_ENGINE.encoded_length(chunk.len());
128 let base = output_base;
129 s.spawn(move |_| {
130 let dest =
131 unsafe { std::slice::from_raw_parts_mut((base + out_off) as *mut u8, enc_len) };
132 let _ = BASE64_ENGINE.encode(chunk, dest.as_out());
133 });
134 }
135 });
136
137 out.write_all(&output[..total_out])
138}
139
140fn encode_wrapped(data: &[u8], wrap_col: usize, out: &mut impl Write) -> io::Result<()> {
149 let bytes_per_line = wrap_col * 3 / 4;
150 if bytes_per_line == 0 {
151 return encode_wrapped_small(data, wrap_col, out);
152 }
153
154 if data.len() >= PARALLEL_WRAPPED_THRESHOLD && bytes_per_line.is_multiple_of(3) {
155 return encode_wrapped_parallel(data, wrap_col, bytes_per_line, out);
156 }
157
158 if bytes_per_line.is_multiple_of(3) {
159 return encode_wrapped_scatter(data, wrap_col, bytes_per_line, out);
160 }
161
162 let enc_max = BASE64_ENGINE.encoded_length(data.len());
164 let num_full = enc_max / wrap_col;
165 let rem = enc_max % wrap_col;
166 let out_len = num_full * (wrap_col + 1) + if rem > 0 { rem + 1 } else { 0 };
167
168 let mut enc_buf: Vec<u8> = Vec::with_capacity(enc_max);
170 #[allow(clippy::uninit_vec)]
171 unsafe {
172 enc_buf.set_len(enc_max);
173 }
174 let _ = BASE64_ENGINE.encode(data, enc_buf[..enc_max].as_out());
175
176 let mut out_buf: Vec<u8> = Vec::with_capacity(out_len);
177 #[allow(clippy::uninit_vec)]
178 unsafe {
179 out_buf.set_len(out_len);
180 }
181 let n = fuse_wrap(&enc_buf, wrap_col, &mut out_buf);
182 out.write_all(&out_buf[..n])
183}
184
185fn encode_wrapped_scatter(
197 data: &[u8],
198 wrap_col: usize,
199 bytes_per_line: usize,
200 out: &mut impl Write,
201) -> io::Result<()> {
202 let enc_len = BASE64_ENGINE.encoded_length(data.len());
203 if enc_len == 0 {
204 return Ok(());
205 }
206
207 let num_full = enc_len / wrap_col;
208 let rem = enc_len % wrap_col;
209 let out_len = num_full * (wrap_col + 1) + if rem > 0 { rem + 1 } else { 0 };
210
211 let mut buf: Vec<u8> = Vec::with_capacity(out_len);
213 #[allow(clippy::uninit_vec)]
214 unsafe {
215 buf.set_len(out_len);
216 }
217 #[cfg(target_os = "linux")]
218 hint_hugepage(&mut buf);
219
220 const GROUP_LINES: usize = 256;
223 let group_input = GROUP_LINES * bytes_per_line;
224 let temp_size = GROUP_LINES * wrap_col;
225 let mut temp: Vec<u8> = Vec::with_capacity(temp_size);
226 #[allow(clippy::uninit_vec)]
227 unsafe {
228 temp.set_len(temp_size);
229 }
230
231 let line_out = wrap_col + 1;
232 let mut wp = 0usize; for chunk in data.chunks(group_input) {
235 let clen = BASE64_ENGINE.encoded_length(chunk.len());
236 let _ = BASE64_ENGINE.encode(chunk, temp[..clen].as_out());
237
238 let lines = clen / wrap_col;
240 let chunk_rem = clen % wrap_col;
241
242 let mut i = 0;
244 while i + 8 <= lines {
245 unsafe {
246 let src = temp.as_ptr().add(i * wrap_col);
247 let dst = buf.as_mut_ptr().add(wp);
248 std::ptr::copy_nonoverlapping(src, dst, wrap_col);
249 *dst.add(wrap_col) = b'\n';
250 std::ptr::copy_nonoverlapping(src.add(wrap_col), dst.add(line_out), wrap_col);
251 *dst.add(line_out + wrap_col) = b'\n';
252 std::ptr::copy_nonoverlapping(
253 src.add(2 * wrap_col),
254 dst.add(2 * line_out),
255 wrap_col,
256 );
257 *dst.add(2 * line_out + wrap_col) = b'\n';
258 std::ptr::copy_nonoverlapping(
259 src.add(3 * wrap_col),
260 dst.add(3 * line_out),
261 wrap_col,
262 );
263 *dst.add(3 * line_out + wrap_col) = b'\n';
264 std::ptr::copy_nonoverlapping(
265 src.add(4 * wrap_col),
266 dst.add(4 * line_out),
267 wrap_col,
268 );
269 *dst.add(4 * line_out + wrap_col) = b'\n';
270 std::ptr::copy_nonoverlapping(
271 src.add(5 * wrap_col),
272 dst.add(5 * line_out),
273 wrap_col,
274 );
275 *dst.add(5 * line_out + wrap_col) = b'\n';
276 std::ptr::copy_nonoverlapping(
277 src.add(6 * wrap_col),
278 dst.add(6 * line_out),
279 wrap_col,
280 );
281 *dst.add(6 * line_out + wrap_col) = b'\n';
282 std::ptr::copy_nonoverlapping(
283 src.add(7 * wrap_col),
284 dst.add(7 * line_out),
285 wrap_col,
286 );
287 *dst.add(7 * line_out + wrap_col) = b'\n';
288 }
289 wp += 8 * line_out;
290 i += 8;
291 }
292 while i < lines {
294 unsafe {
295 std::ptr::copy_nonoverlapping(
296 temp.as_ptr().add(i * wrap_col),
297 buf.as_mut_ptr().add(wp),
298 wrap_col,
299 );
300 *buf.as_mut_ptr().add(wp + wrap_col) = b'\n';
301 }
302 wp += line_out;
303 i += 1;
304 }
305 if chunk_rem > 0 {
307 unsafe {
308 std::ptr::copy_nonoverlapping(
309 temp.as_ptr().add(lines * wrap_col),
310 buf.as_mut_ptr().add(wp),
311 chunk_rem,
312 );
313 *buf.as_mut_ptr().add(wp + chunk_rem) = b'\n';
314 }
315 wp += chunk_rem + 1;
316 }
317 }
318
319 out.write_all(&buf[..wp])
320}
321
322#[inline]
325#[allow(dead_code)]
326fn scatter_lines(
327 temp: &[u8],
328 buf: &mut [u8],
329 line_start: usize,
330 count: usize,
331 wrap_col: usize,
332 line_out: usize,
333) {
334 unsafe {
335 let src = temp.as_ptr();
336 let dst = buf.as_mut_ptr();
337 for i in 0..count {
338 let s_off = i * wrap_col;
339 let d_off = (line_start + i) * line_out;
340 std::ptr::copy_nonoverlapping(src.add(s_off), dst.add(d_off), wrap_col);
341 *dst.add(d_off + wrap_col) = b'\n';
342 }
343 }
344}
345
346#[inline]
354#[allow(dead_code)]
355fn expand_backward(ptr: *mut u8, enc_len: usize, out_len: usize, wrap_col: usize) {
356 let num_full = enc_len / wrap_col;
357 let rem = enc_len % wrap_col;
358
359 unsafe {
360 let mut rp = enc_len;
361 let mut wp = out_len;
362
363 if rem > 0 {
365 wp -= 1;
366 *ptr.add(wp) = b'\n';
367 wp -= rem;
368 rp -= rem;
369 if rp != wp {
370 std::ptr::copy(ptr.add(rp), ptr.add(wp), rem);
371 }
372 }
373
374 let mut lines_left = num_full;
376 while lines_left >= 8 {
377 wp -= 1;
379 *ptr.add(wp) = b'\n';
380 rp -= wrap_col;
381 wp -= wrap_col;
382 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
383
384 wp -= 1;
385 *ptr.add(wp) = b'\n';
386 rp -= wrap_col;
387 wp -= wrap_col;
388 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
389
390 wp -= 1;
391 *ptr.add(wp) = b'\n';
392 rp -= wrap_col;
393 wp -= wrap_col;
394 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
395
396 wp -= 1;
397 *ptr.add(wp) = b'\n';
398 rp -= wrap_col;
399 wp -= wrap_col;
400 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
401
402 wp -= 1;
403 *ptr.add(wp) = b'\n';
404 rp -= wrap_col;
405 wp -= wrap_col;
406 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
407
408 wp -= 1;
409 *ptr.add(wp) = b'\n';
410 rp -= wrap_col;
411 wp -= wrap_col;
412 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
413
414 wp -= 1;
415 *ptr.add(wp) = b'\n';
416 rp -= wrap_col;
417 wp -= wrap_col;
418 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
419
420 wp -= 1;
421 *ptr.add(wp) = b'\n';
422 rp -= wrap_col;
423 wp -= wrap_col;
424 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
425
426 lines_left -= 8;
427 }
428
429 while lines_left > 0 {
431 wp -= 1;
432 *ptr.add(wp) = b'\n';
433 rp -= wrap_col;
434 wp -= wrap_col;
435 if rp != wp {
436 std::ptr::copy(ptr.add(rp), ptr.add(wp), wrap_col);
437 }
438 lines_left -= 1;
439 }
440 }
441}
442
443static NEWLINE: [u8; 1] = [b'\n'];
445
446#[inline]
451#[allow(dead_code)]
452fn write_wrapped_iov(encoded: &[u8], wrap_col: usize, out: &mut impl Write) -> io::Result<()> {
453 const MAX_IOV: usize = 1024;
456
457 let num_full_lines = encoded.len() / wrap_col;
458 let remainder = encoded.len() % wrap_col;
459 let total_iov = num_full_lines * 2 + if remainder > 0 { 2 } else { 0 };
460
461 if total_iov <= MAX_IOV {
463 let mut iov: Vec<io::IoSlice> = Vec::with_capacity(total_iov);
464 let mut pos = 0;
465 for _ in 0..num_full_lines {
466 iov.push(io::IoSlice::new(&encoded[pos..pos + wrap_col]));
467 iov.push(io::IoSlice::new(&NEWLINE));
468 pos += wrap_col;
469 }
470 if remainder > 0 {
471 iov.push(io::IoSlice::new(&encoded[pos..pos + remainder]));
472 iov.push(io::IoSlice::new(&NEWLINE));
473 }
474 return write_all_vectored(out, &iov);
475 }
476
477 let line_out = wrap_col + 1;
484 const BATCH_LINES: usize = 512;
485 let batch_fused_size = BATCH_LINES * line_out;
486 let mut fused: Vec<u8> = Vec::with_capacity(batch_fused_size);
487 #[allow(clippy::uninit_vec)]
488 unsafe {
489 fused.set_len(batch_fused_size);
490 }
491
492 let mut rp = 0;
493 let mut lines_done = 0;
494
495 while lines_done + BATCH_LINES <= num_full_lines {
497 let n = fuse_wrap(
498 &encoded[rp..rp + BATCH_LINES * wrap_col],
499 wrap_col,
500 &mut fused,
501 );
502 out.write_all(&fused[..n])?;
503 rp += BATCH_LINES * wrap_col;
504 lines_done += BATCH_LINES;
505 }
506
507 let remaining_lines = num_full_lines - lines_done;
509 if remaining_lines > 0 {
510 let n = fuse_wrap(
511 &encoded[rp..rp + remaining_lines * wrap_col],
512 wrap_col,
513 &mut fused,
514 );
515 out.write_all(&fused[..n])?;
516 rp += remaining_lines * wrap_col;
517 }
518
519 if remainder > 0 {
521 out.write_all(&encoded[rp..rp + remainder])?;
522 out.write_all(b"\n")?;
523 }
524 Ok(())
525}
526
527#[inline]
531fn write_wrapped_iov_streaming(
532 encoded: &[u8],
533 wrap_col: usize,
534 col: &mut usize,
535 out: &mut impl Write,
536) -> io::Result<()> {
537 const MAX_IOV: usize = 1024;
538 let mut iov: Vec<io::IoSlice> = Vec::with_capacity(MAX_IOV);
539 let mut rp = 0;
540
541 while rp < encoded.len() {
542 let space = wrap_col - *col;
543 let avail = encoded.len() - rp;
544
545 if avail <= space {
546 iov.push(io::IoSlice::new(&encoded[rp..rp + avail]));
548 *col += avail;
549 if *col == wrap_col {
550 iov.push(io::IoSlice::new(&NEWLINE));
551 *col = 0;
552 }
553 break;
554 } else {
555 iov.push(io::IoSlice::new(&encoded[rp..rp + space]));
557 iov.push(io::IoSlice::new(&NEWLINE));
558 rp += space;
559 *col = 0;
560 }
561
562 if iov.len() >= MAX_IOV - 1 {
563 write_all_vectored(out, &iov)?;
564 iov.clear();
565 }
566 }
567
568 if !iov.is_empty() {
569 write_all_vectored(out, &iov)?;
570 }
571 Ok(())
572}
573
574fn encode_wrapped_parallel(
580 data: &[u8],
581 wrap_col: usize,
582 bytes_per_line: usize,
583 out: &mut impl Write,
584) -> io::Result<()> {
585 let num_threads = num_cpus().max(1);
586 let lines_per_chunk = ((data.len() / bytes_per_line) / num_threads).max(1);
587 let chunk_input = lines_per_chunk * bytes_per_line;
588
589 let chunks: Vec<&[u8]> = data.chunks(chunk_input.max(bytes_per_line)).collect();
591
592 let mut offsets: Vec<usize> = Vec::with_capacity(chunks.len() + 1);
594 let mut total_out = 0usize;
595 for chunk in &chunks {
596 offsets.push(total_out);
597 let enc_len = BASE64_ENGINE.encoded_length(chunk.len());
598 let full_lines = enc_len / wrap_col;
599 let remainder = enc_len % wrap_col;
600 total_out += full_lines * (wrap_col + 1) + if remainder > 0 { remainder + 1 } else { 0 };
601 }
602
603 let mut output: Vec<u8> = Vec::with_capacity(total_out);
605 #[allow(clippy::uninit_vec)]
606 unsafe {
607 output.set_len(total_out);
608 }
609 #[cfg(target_os = "linux")]
610 hint_hugepage(&mut output);
611
612 let output_base = output.as_mut_ptr() as usize;
614 rayon::scope(|s| {
615 for (i, chunk) in chunks.iter().enumerate() {
616 let out_off = offsets[i];
617 let out_end = if i + 1 < offsets.len() {
618 offsets[i + 1]
619 } else {
620 total_out
621 };
622 let out_size = out_end - out_off;
623 let base = output_base;
624 s.spawn(move |_| {
625 let out_slice = unsafe {
626 std::slice::from_raw_parts_mut((base + out_off) as *mut u8, out_size)
627 };
628 encode_chunk_l1_scatter_into(chunk, out_slice, wrap_col, bytes_per_line);
629 });
630 }
631 });
632
633 out.write_all(&output[..total_out])
634}
635
636fn encode_chunk_l1_scatter_into(
640 data: &[u8],
641 output: &mut [u8],
642 wrap_col: usize,
643 bytes_per_line: usize,
644) {
645 const GROUP_LINES: usize = 256;
646 let group_input = GROUP_LINES * bytes_per_line;
647 let temp_size = GROUP_LINES * wrap_col;
648 let mut temp: Vec<u8> = Vec::with_capacity(temp_size);
649 #[allow(clippy::uninit_vec)]
650 unsafe {
651 temp.set_len(temp_size);
652 }
653
654 let line_out = wrap_col + 1;
655 let mut wp = 0usize;
656
657 for chunk in data.chunks(group_input) {
658 let clen = BASE64_ENGINE.encoded_length(chunk.len());
659 let _ = BASE64_ENGINE.encode(chunk, temp[..clen].as_out());
660
661 let lines = clen / wrap_col;
662 let chunk_rem = clen % wrap_col;
663
664 let mut i = 0;
666 while i + 8 <= lines {
667 unsafe {
668 let src = temp.as_ptr().add(i * wrap_col);
669 let dst = output.as_mut_ptr().add(wp);
670 std::ptr::copy_nonoverlapping(src, dst, wrap_col);
671 *dst.add(wrap_col) = b'\n';
672 std::ptr::copy_nonoverlapping(src.add(wrap_col), dst.add(line_out), wrap_col);
673 *dst.add(line_out + wrap_col) = b'\n';
674 std::ptr::copy_nonoverlapping(
675 src.add(2 * wrap_col),
676 dst.add(2 * line_out),
677 wrap_col,
678 );
679 *dst.add(2 * line_out + wrap_col) = b'\n';
680 std::ptr::copy_nonoverlapping(
681 src.add(3 * wrap_col),
682 dst.add(3 * line_out),
683 wrap_col,
684 );
685 *dst.add(3 * line_out + wrap_col) = b'\n';
686 std::ptr::copy_nonoverlapping(
687 src.add(4 * wrap_col),
688 dst.add(4 * line_out),
689 wrap_col,
690 );
691 *dst.add(4 * line_out + wrap_col) = b'\n';
692 std::ptr::copy_nonoverlapping(
693 src.add(5 * wrap_col),
694 dst.add(5 * line_out),
695 wrap_col,
696 );
697 *dst.add(5 * line_out + wrap_col) = b'\n';
698 std::ptr::copy_nonoverlapping(
699 src.add(6 * wrap_col),
700 dst.add(6 * line_out),
701 wrap_col,
702 );
703 *dst.add(6 * line_out + wrap_col) = b'\n';
704 std::ptr::copy_nonoverlapping(
705 src.add(7 * wrap_col),
706 dst.add(7 * line_out),
707 wrap_col,
708 );
709 *dst.add(7 * line_out + wrap_col) = b'\n';
710 }
711 wp += 8 * line_out;
712 i += 8;
713 }
714 while i < lines {
715 unsafe {
716 std::ptr::copy_nonoverlapping(
717 temp.as_ptr().add(i * wrap_col),
718 output.as_mut_ptr().add(wp),
719 wrap_col,
720 );
721 *output.as_mut_ptr().add(wp + wrap_col) = b'\n';
722 }
723 wp += line_out;
724 i += 1;
725 }
726 if chunk_rem > 0 {
727 unsafe {
728 std::ptr::copy_nonoverlapping(
729 temp.as_ptr().add(lines * wrap_col),
730 output.as_mut_ptr().add(wp),
731 chunk_rem,
732 );
733 *output.as_mut_ptr().add(wp + chunk_rem) = b'\n';
734 }
735 wp += chunk_rem + 1;
736 }
737 }
738}
739
740#[inline]
744fn fuse_wrap(encoded: &[u8], wrap_col: usize, out_buf: &mut [u8]) -> usize {
745 let line_out = wrap_col + 1; let mut rp = 0;
747 let mut wp = 0;
748
749 while rp + 8 * wrap_col <= encoded.len() {
751 unsafe {
752 let src = encoded.as_ptr().add(rp);
753 let dst = out_buf.as_mut_ptr().add(wp);
754
755 std::ptr::copy_nonoverlapping(src, dst, wrap_col);
756 *dst.add(wrap_col) = b'\n';
757
758 std::ptr::copy_nonoverlapping(src.add(wrap_col), dst.add(line_out), wrap_col);
759 *dst.add(line_out + wrap_col) = b'\n';
760
761 std::ptr::copy_nonoverlapping(src.add(2 * wrap_col), dst.add(2 * line_out), wrap_col);
762 *dst.add(2 * line_out + wrap_col) = b'\n';
763
764 std::ptr::copy_nonoverlapping(src.add(3 * wrap_col), dst.add(3 * line_out), wrap_col);
765 *dst.add(3 * line_out + wrap_col) = b'\n';
766
767 std::ptr::copy_nonoverlapping(src.add(4 * wrap_col), dst.add(4 * line_out), wrap_col);
768 *dst.add(4 * line_out + wrap_col) = b'\n';
769
770 std::ptr::copy_nonoverlapping(src.add(5 * wrap_col), dst.add(5 * line_out), wrap_col);
771 *dst.add(5 * line_out + wrap_col) = b'\n';
772
773 std::ptr::copy_nonoverlapping(src.add(6 * wrap_col), dst.add(6 * line_out), wrap_col);
774 *dst.add(6 * line_out + wrap_col) = b'\n';
775
776 std::ptr::copy_nonoverlapping(src.add(7 * wrap_col), dst.add(7 * line_out), wrap_col);
777 *dst.add(7 * line_out + wrap_col) = b'\n';
778 }
779 rp += 8 * wrap_col;
780 wp += 8 * line_out;
781 }
782
783 while rp + 4 * wrap_col <= encoded.len() {
785 unsafe {
786 let src = encoded.as_ptr().add(rp);
787 let dst = out_buf.as_mut_ptr().add(wp);
788
789 std::ptr::copy_nonoverlapping(src, dst, wrap_col);
790 *dst.add(wrap_col) = b'\n';
791
792 std::ptr::copy_nonoverlapping(src.add(wrap_col), dst.add(line_out), wrap_col);
793 *dst.add(line_out + wrap_col) = b'\n';
794
795 std::ptr::copy_nonoverlapping(src.add(2 * wrap_col), dst.add(2 * line_out), wrap_col);
796 *dst.add(2 * line_out + wrap_col) = b'\n';
797
798 std::ptr::copy_nonoverlapping(src.add(3 * wrap_col), dst.add(3 * line_out), wrap_col);
799 *dst.add(3 * line_out + wrap_col) = b'\n';
800 }
801 rp += 4 * wrap_col;
802 wp += 4 * line_out;
803 }
804
805 while rp + wrap_col <= encoded.len() {
807 unsafe {
808 std::ptr::copy_nonoverlapping(
809 encoded.as_ptr().add(rp),
810 out_buf.as_mut_ptr().add(wp),
811 wrap_col,
812 );
813 *out_buf.as_mut_ptr().add(wp + wrap_col) = b'\n';
814 }
815 rp += wrap_col;
816 wp += line_out;
817 }
818
819 if rp < encoded.len() {
821 let remaining = encoded.len() - rp;
822 unsafe {
823 std::ptr::copy_nonoverlapping(
824 encoded.as_ptr().add(rp),
825 out_buf.as_mut_ptr().add(wp),
826 remaining,
827 );
828 }
829 wp += remaining;
830 out_buf[wp] = b'\n';
831 wp += 1;
832 }
833
834 wp
835}
836
837fn encode_wrapped_small(data: &[u8], wrap_col: usize, out: &mut impl Write) -> io::Result<()> {
839 let enc_max = BASE64_ENGINE.encoded_length(data.len());
840 let mut buf: Vec<u8> = Vec::with_capacity(enc_max);
841 #[allow(clippy::uninit_vec)]
842 unsafe {
843 buf.set_len(enc_max);
844 }
845 let encoded = BASE64_ENGINE.encode(data, buf[..enc_max].as_out());
846
847 let wc = wrap_col.max(1);
848 for line in encoded.chunks(wc) {
849 out.write_all(line)?;
850 out.write_all(b"\n")?;
851 }
852 Ok(())
853}
854
855pub fn decode_to_writer(data: &[u8], ignore_garbage: bool, out: &mut impl Write) -> io::Result<()> {
859 if data.is_empty() {
860 return Ok(());
861 }
862
863 if ignore_garbage {
864 let mut cleaned = strip_non_base64(data);
865 return decode_clean_slice(&mut cleaned, out);
866 }
867
868 if data.len() < 512 * 1024 && data.len() >= 77 {
874 if let Some(result) = try_line_decode(data, out) {
875 return result;
876 }
877 }
878
879 decode_stripping_whitespace(data, out)
881}
882
883pub fn decode_mmap_inplace(
892 data: &mut [u8],
893 ignore_garbage: bool,
894 out: &mut impl Write,
895) -> io::Result<()> {
896 if data.is_empty() {
897 return Ok(());
898 }
899
900 if !ignore_garbage && data.len() >= 77 && data.len() < 512 * 1024 {
903 if let Some(result) = try_line_decode(data, out) {
904 return result;
905 }
906 }
907
908 if ignore_garbage {
909 let ptr = data.as_mut_ptr();
911 let len = data.len();
912 let mut wp = 0;
913 for rp in 0..len {
914 let b = unsafe { *ptr.add(rp) };
915 if is_base64_char(b) {
916 unsafe { *ptr.add(wp) = b };
917 wp += 1;
918 }
919 }
920 match BASE64_ENGINE.decode_inplace(&mut data[..wp]) {
921 Ok(decoded) => return out.write_all(decoded),
922 Err(_) => return decode_error(),
923 }
924 }
925
926 if data.len() >= 77 {
928 if let Some(result) = try_decode_uniform_lines(data, out) {
929 return result;
930 }
931 }
932
933 if memchr::memchr2(b'\n', b'\r', data).is_none() {
937 if !data
939 .iter()
940 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c)
941 {
942 match BASE64_ENGINE.decode_inplace(data) {
944 Ok(decoded) => return out.write_all(decoded),
945 Err(_) => return decode_error(),
946 }
947 }
948 let ptr = data.as_mut_ptr();
950 let len = data.len();
951 let mut wp = 0;
952 for rp in 0..len {
953 let b = unsafe { *ptr.add(rp) };
954 if NOT_WHITESPACE[b as usize] {
955 unsafe { *ptr.add(wp) = b };
956 wp += 1;
957 }
958 }
959 match BASE64_ENGINE.decode_inplace(&mut data[..wp]) {
960 Ok(decoded) => return out.write_all(decoded),
961 Err(_) => return decode_error(),
962 }
963 }
964
965 let ptr = data.as_mut_ptr();
967 let len = data.len();
968 let mut wp = 0usize;
969 let mut gap_start = 0usize;
970 let mut has_rare_ws = false;
971
972 for pos in memchr::memchr2_iter(b'\n', b'\r', data) {
975 let gap_len = pos - gap_start;
976 if gap_len > 0 {
977 if !has_rare_ws {
978 has_rare_ws = unsafe {
980 std::slice::from_raw_parts(ptr.add(gap_start), gap_len)
981 .iter()
982 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c)
983 };
984 }
985 if wp != gap_start {
986 unsafe { std::ptr::copy(ptr.add(gap_start), ptr.add(wp), gap_len) };
987 }
988 wp += gap_len;
989 }
990 gap_start = pos + 1;
991 }
992 let tail_len = len - gap_start;
994 if tail_len > 0 {
995 if !has_rare_ws {
996 has_rare_ws = unsafe {
997 std::slice::from_raw_parts(ptr.add(gap_start), tail_len)
998 .iter()
999 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c)
1000 };
1001 }
1002 if wp != gap_start {
1003 unsafe { std::ptr::copy(ptr.add(gap_start), ptr.add(wp), tail_len) };
1004 }
1005 wp += tail_len;
1006 }
1007
1008 if has_rare_ws {
1010 let mut rp = 0;
1011 let mut cwp = 0;
1012 while rp < wp {
1013 let b = unsafe { *ptr.add(rp) };
1014 if NOT_WHITESPACE[b as usize] {
1015 unsafe { *ptr.add(cwp) = b };
1016 cwp += 1;
1017 }
1018 rp += 1;
1019 }
1020 wp = cwp;
1021 }
1022
1023 if wp >= PARALLEL_DECODE_THRESHOLD {
1025 return decode_borrowed_clean_parallel(out, &data[..wp]);
1027 }
1028 match BASE64_ENGINE.decode_inplace(&mut data[..wp]) {
1029 Ok(decoded) => out.write_all(decoded),
1030 Err(_) => decode_error(),
1031 }
1032}
1033
1034pub fn decode_owned(
1036 data: &mut Vec<u8>,
1037 ignore_garbage: bool,
1038 out: &mut impl Write,
1039) -> io::Result<()> {
1040 if data.is_empty() {
1041 return Ok(());
1042 }
1043
1044 if ignore_garbage {
1045 data.retain(|&b| is_base64_char(b));
1046 } else {
1047 strip_whitespace_inplace(data);
1048 }
1049
1050 decode_clean_slice(data, out)
1051}
1052
1053fn strip_whitespace_inplace(data: &mut Vec<u8>) {
1058 if memchr::memchr2(b'\n', b'\r', data).is_none() {
1062 if data
1064 .iter()
1065 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c)
1066 {
1067 data.retain(|&b| NOT_WHITESPACE[b as usize]);
1068 }
1069 return;
1070 }
1071
1072 let ptr = data.as_mut_ptr();
1076 let len = data.len();
1077 let mut wp = 0usize;
1078 let mut gap_start = 0usize;
1079 let mut has_rare_ws = false;
1080
1081 for pos in memchr::memchr2_iter(b'\n', b'\r', data.as_slice()) {
1082 let gap_len = pos - gap_start;
1083 if gap_len > 0 {
1084 if !has_rare_ws {
1085 has_rare_ws = data[gap_start..pos]
1087 .iter()
1088 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
1089 }
1090 if wp != gap_start {
1091 unsafe {
1092 std::ptr::copy(ptr.add(gap_start), ptr.add(wp), gap_len);
1093 }
1094 }
1095 wp += gap_len;
1096 }
1097 gap_start = pos + 1;
1098 }
1099 let tail_len = len - gap_start;
1101 if tail_len > 0 {
1102 if !has_rare_ws {
1103 has_rare_ws = data[gap_start..]
1104 .iter()
1105 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
1106 }
1107 if wp != gap_start {
1108 unsafe {
1109 std::ptr::copy(ptr.add(gap_start), ptr.add(wp), tail_len);
1110 }
1111 }
1112 wp += tail_len;
1113 }
1114
1115 data.truncate(wp);
1116
1117 if has_rare_ws {
1120 let ptr = data.as_mut_ptr();
1121 let len = data.len();
1122 let mut rp = 0;
1123 let mut cwp = 0;
1124 while rp < len {
1125 let b = unsafe { *ptr.add(rp) };
1126 if NOT_WHITESPACE[b as usize] {
1127 unsafe { *ptr.add(cwp) = b };
1128 cwp += 1;
1129 }
1130 rp += 1;
1131 }
1132 data.truncate(cwp);
1133 }
1134}
1135
1136static NOT_WHITESPACE: [bool; 256] = {
1139 let mut table = [true; 256];
1140 table[b' ' as usize] = false;
1141 table[b'\t' as usize] = false;
1142 table[b'\n' as usize] = false;
1143 table[b'\r' as usize] = false;
1144 table[0x0b] = false; table[0x0c] = false; table
1147};
1148
1149fn try_decode_uniform_lines(data: &[u8], out: &mut impl Write) -> Option<io::Result<()>> {
1155 let first_nl = memchr::memchr(b'\n', data)?;
1156 let line_len = first_nl;
1157 if line_len == 0 || line_len % 4 != 0 {
1158 return None;
1159 }
1160
1161 let stride = line_len + 1;
1162
1163 let check_lines = 4.min(data.len() / stride);
1165 for i in 1..check_lines {
1166 let expected_nl = i * stride - 1;
1167 if expected_nl >= data.len() || data[expected_nl] != b'\n' {
1168 return None;
1169 }
1170 }
1171
1172 let full_lines = if data.len() >= stride {
1173 let candidate = data.len() / stride;
1174 if candidate > 0 && data[candidate * stride - 1] != b'\n' {
1175 return None;
1176 }
1177 candidate
1178 } else {
1179 0
1180 };
1181
1182 let remainder_start = full_lines * stride;
1183 let remainder = &data[remainder_start..];
1184 let rem_clean = if remainder.last() == Some(&b'\n') {
1185 &remainder[..remainder.len() - 1]
1186 } else {
1187 remainder
1188 };
1189
1190 let decoded_per_line = line_len * 3 / 4;
1192 let rem_decoded_size = if rem_clean.is_empty() {
1193 0
1194 } else {
1195 let pad = rem_clean
1196 .iter()
1197 .rev()
1198 .take(2)
1199 .filter(|&&b| b == b'=')
1200 .count();
1201 rem_clean.len() * 3 / 4 - pad
1202 };
1203 let total_decoded = full_lines * decoded_per_line + rem_decoded_size;
1204 let clean_len = full_lines * line_len;
1205
1206 if clean_len >= PARALLEL_DECODE_THRESHOLD && num_cpus() > 1 {
1210 let mut output: Vec<u8> = Vec::with_capacity(total_decoded);
1211 #[allow(clippy::uninit_vec)]
1212 unsafe {
1213 output.set_len(total_decoded);
1214 }
1215 #[cfg(target_os = "linux")]
1216 hint_hugepage(&mut output);
1217
1218 let out_ptr = output.as_mut_ptr() as usize;
1219 let src_ptr = data.as_ptr() as usize;
1220 let num_threads = num_cpus().max(1);
1221 let lines_per_thread = (full_lines + num_threads - 1) / num_threads;
1222 let lines_per_sub = (512 * 1024 / line_len).max(1);
1225
1226 let err_flag = std::sync::atomic::AtomicBool::new(false);
1227 rayon::scope(|s| {
1228 for t in 0..num_threads {
1229 let err_flag = &err_flag;
1230 s.spawn(move |_| {
1231 let start_line = t * lines_per_thread;
1232 if start_line >= full_lines {
1233 return;
1234 }
1235 let end_line = (start_line + lines_per_thread).min(full_lines);
1236 let chunk_lines = end_line - start_line;
1237
1238 let sub_buf_size = lines_per_sub.min(chunk_lines) * line_len;
1239 let mut local_buf: Vec<u8> = Vec::with_capacity(sub_buf_size);
1240 #[allow(clippy::uninit_vec)]
1241 unsafe {
1242 local_buf.set_len(sub_buf_size);
1243 }
1244
1245 let src = src_ptr as *const u8;
1246 let out_base = out_ptr as *mut u8;
1247 let local_dst = local_buf.as_mut_ptr();
1248
1249 let mut sub_start = 0usize;
1250 while sub_start < chunk_lines {
1251 if err_flag.load(std::sync::atomic::Ordering::Relaxed) {
1252 return;
1253 }
1254 let sub_count = (chunk_lines - sub_start).min(lines_per_sub);
1255 let sub_clean = sub_count * line_len;
1256
1257 for i in 0..sub_count {
1258 unsafe {
1259 std::ptr::copy_nonoverlapping(
1260 src.add((start_line + sub_start + i) * stride),
1261 local_dst.add(i * line_len),
1262 line_len,
1263 );
1264 }
1265 }
1266
1267 let out_offset = (start_line + sub_start) * decoded_per_line;
1268 let out_size = sub_count * decoded_per_line;
1269 let out_slice = unsafe {
1270 std::slice::from_raw_parts_mut(out_base.add(out_offset), out_size)
1271 };
1272 if BASE64_ENGINE
1273 .decode(&local_buf[..sub_clean], out_slice.as_out())
1274 .is_err()
1275 {
1276 err_flag.store(true, std::sync::atomic::Ordering::Relaxed);
1277 return;
1278 }
1279
1280 sub_start += sub_count;
1281 }
1282 });
1283 }
1284 });
1285 let result: Result<(), io::Error> = if err_flag.load(std::sync::atomic::Ordering::Relaxed) {
1286 Err(io::Error::new(io::ErrorKind::InvalidData, "invalid input"))
1287 } else {
1288 Ok(())
1289 };
1290
1291 if let Err(e) = result {
1292 return Some(Err(e));
1293 }
1294
1295 if !rem_clean.is_empty() {
1296 let rem_out = &mut output[full_lines * decoded_per_line..total_decoded];
1297 match BASE64_ENGINE.decode(rem_clean, rem_out.as_out()) {
1298 Ok(_) => {}
1299 Err(_) => return Some(decode_error()),
1300 }
1301 }
1302
1303 return Some(out.write_all(&output[..total_decoded]));
1304 }
1305
1306 let lines_per_sub = (256 * 1024 / line_len).max(1);
1310 let sub_buf_size = lines_per_sub * line_len;
1311 let mut local_buf: Vec<u8> = Vec::with_capacity(sub_buf_size);
1312 #[allow(clippy::uninit_vec)]
1313 unsafe {
1314 local_buf.set_len(sub_buf_size);
1315 }
1316
1317 let src = data.as_ptr();
1318 let local_dst = local_buf.as_mut_ptr();
1319
1320 let mut line_idx = 0usize;
1321 while line_idx < full_lines {
1322 let sub_count = (full_lines - line_idx).min(lines_per_sub);
1323 let sub_clean = sub_count * line_len;
1324
1325 for i in 0..sub_count {
1326 unsafe {
1327 std::ptr::copy_nonoverlapping(
1328 src.add((line_idx + i) * stride),
1329 local_dst.add(i * line_len),
1330 line_len,
1331 );
1332 }
1333 }
1334
1335 match BASE64_ENGINE.decode_inplace(&mut local_buf[..sub_clean]) {
1336 Ok(decoded) => {
1337 if let Err(e) = out.write_all(decoded) {
1338 return Some(Err(e));
1339 }
1340 }
1341 Err(_) => return Some(decode_error()),
1342 }
1343
1344 line_idx += sub_count;
1345 }
1346
1347 if !rem_clean.is_empty() {
1348 let mut rem_buf = rem_clean.to_vec();
1349 match BASE64_ENGINE.decode_inplace(&mut rem_buf) {
1350 Ok(decoded) => {
1351 if let Err(e) = out.write_all(decoded) {
1352 return Some(Err(e));
1353 }
1354 }
1355 Err(_) => return Some(decode_error()),
1356 }
1357 }
1358
1359 Some(Ok(()))
1360}
1361
1362fn decode_stripping_whitespace(data: &[u8], out: &mut impl Write) -> io::Result<()> {
1367 if data.len() >= 77 {
1371 if let Some(result) = try_decode_uniform_lines(data, out) {
1372 return result;
1373 }
1374 }
1375
1376 if memchr::memchr2(b'\n', b'\r', data).is_none() {
1379 if !data
1381 .iter()
1382 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c)
1383 {
1384 return decode_borrowed_clean(out, data);
1385 }
1386 let mut cleaned: Vec<u8> = Vec::with_capacity(data.len());
1388 for &b in data {
1389 if NOT_WHITESPACE[b as usize] {
1390 cleaned.push(b);
1391 }
1392 }
1393 return decode_clean_slice(&mut cleaned, out);
1394 }
1395
1396 let mut clean: Vec<u8> = Vec::with_capacity(data.len());
1400 let dst = clean.as_mut_ptr();
1401 let mut wp = 0usize;
1402 let mut gap_start = 0usize;
1403 let mut has_rare_ws = false;
1406
1407 for pos in memchr::memchr2_iter(b'\n', b'\r', data) {
1408 let gap_len = pos - gap_start;
1409 if gap_len > 0 {
1410 if !has_rare_ws {
1413 has_rare_ws = data[gap_start..pos]
1414 .iter()
1415 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
1416 }
1417 unsafe {
1418 std::ptr::copy_nonoverlapping(data.as_ptr().add(gap_start), dst.add(wp), gap_len);
1419 }
1420 wp += gap_len;
1421 }
1422 gap_start = pos + 1;
1423 }
1424 let tail_len = data.len() - gap_start;
1426 if tail_len > 0 {
1427 if !has_rare_ws {
1428 has_rare_ws = data[gap_start..]
1429 .iter()
1430 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
1431 }
1432 unsafe {
1433 std::ptr::copy_nonoverlapping(data.as_ptr().add(gap_start), dst.add(wp), tail_len);
1434 }
1435 wp += tail_len;
1436 }
1437 unsafe {
1438 clean.set_len(wp);
1439 }
1440
1441 if has_rare_ws {
1444 let ptr = clean.as_mut_ptr();
1445 let len = clean.len();
1446 let mut rp = 0;
1447 let mut cwp = 0;
1448 while rp < len {
1449 let b = unsafe { *ptr.add(rp) };
1450 if NOT_WHITESPACE[b as usize] {
1451 unsafe { *ptr.add(cwp) = b };
1452 cwp += 1;
1453 }
1454 rp += 1;
1455 }
1456 clean.truncate(cwp);
1457 }
1458
1459 if clean.len() >= PARALLEL_DECODE_THRESHOLD {
1462 decode_borrowed_clean_parallel(out, &clean)
1463 } else {
1464 decode_clean_slice(&mut clean, out)
1465 }
1466}
1467
1468fn try_line_decode(data: &[u8], out: &mut impl Write) -> Option<io::Result<()>> {
1476 let first_nl = memchr::memchr(b'\n', data)?;
1478 let line_len = first_nl; if line_len == 0 || line_len % 4 != 0 {
1482 return None;
1483 }
1484
1485 let line_stride = line_len + 1; let decoded_per_line = line_len * 3 / 4;
1487
1488 let check_lines = 4.min(data.len() / line_stride);
1490 for i in 1..check_lines {
1491 let expected_nl = i * line_stride - 1;
1492 if expected_nl >= data.len() {
1493 break;
1494 }
1495 if data[expected_nl] != b'\n' {
1496 return None; }
1498 }
1499
1500 let full_lines = if data.len() >= line_stride {
1502 let candidate = data.len() / line_stride;
1504 if candidate > 0 && data[candidate * line_stride - 1] != b'\n' {
1506 return None; }
1508 candidate
1509 } else {
1510 0
1511 };
1512
1513 let remainder_start = full_lines * line_stride;
1514 let remainder = &data[remainder_start..];
1515
1516 let remainder_clean_len = if remainder.is_empty() {
1518 0
1519 } else {
1520 let rem = if remainder.last() == Some(&b'\n') {
1522 &remainder[..remainder.len() - 1]
1523 } else {
1524 remainder
1525 };
1526 if rem.is_empty() {
1527 0
1528 } else {
1529 let pad = rem.iter().rev().take(2).filter(|&&b| b == b'=').count();
1531 if rem.len() % 4 != 0 {
1532 return None; }
1534 rem.len() * 3 / 4 - pad
1535 }
1536 };
1537
1538 let total_decoded = full_lines * decoded_per_line + remainder_clean_len;
1543 let mut out_buf: Vec<u8> = Vec::with_capacity(total_decoded);
1544 #[allow(clippy::uninit_vec)]
1545 unsafe {
1546 out_buf.set_len(total_decoded);
1547 }
1548
1549 let dst = out_buf.as_mut_ptr();
1550
1551 if data.len() >= PARALLEL_DECODE_THRESHOLD && full_lines >= 64 {
1555 let out_addr = dst as usize;
1556 let num_threads = num_cpus().max(1);
1557 let lines_per_chunk = (full_lines / num_threads).max(1);
1558
1559 let mut tasks: Vec<(usize, usize)> = Vec::new();
1561 let mut line_off = 0;
1562 while line_off < full_lines {
1563 let end = (line_off + lines_per_chunk).min(full_lines);
1564 tasks.push((line_off, end));
1565 line_off = end;
1566 }
1567
1568 let decode_err = std::sync::atomic::AtomicBool::new(false);
1569 rayon::scope(|s| {
1570 for &(start_line, end_line) in &tasks {
1571 let decode_err = &decode_err;
1572 s.spawn(move |_| {
1573 let out_ptr = out_addr as *mut u8;
1574 let mut i = start_line;
1575
1576 while i + 4 <= end_line {
1577 if decode_err.load(std::sync::atomic::Ordering::Relaxed) {
1578 return;
1579 }
1580 let in_base = i * line_stride;
1581 let ob = i * decoded_per_line;
1582 unsafe {
1583 let s0 =
1584 std::slice::from_raw_parts_mut(out_ptr.add(ob), decoded_per_line);
1585 if BASE64_ENGINE
1586 .decode(&data[in_base..in_base + line_len], s0.as_out())
1587 .is_err()
1588 {
1589 decode_err.store(true, std::sync::atomic::Ordering::Relaxed);
1590 return;
1591 }
1592 let s1 = std::slice::from_raw_parts_mut(
1593 out_ptr.add(ob + decoded_per_line),
1594 decoded_per_line,
1595 );
1596 if BASE64_ENGINE
1597 .decode(
1598 &data[in_base + line_stride..in_base + line_stride + line_len],
1599 s1.as_out(),
1600 )
1601 .is_err()
1602 {
1603 decode_err.store(true, std::sync::atomic::Ordering::Relaxed);
1604 return;
1605 }
1606 let s2 = std::slice::from_raw_parts_mut(
1607 out_ptr.add(ob + 2 * decoded_per_line),
1608 decoded_per_line,
1609 );
1610 if BASE64_ENGINE
1611 .decode(
1612 &data[in_base + 2 * line_stride
1613 ..in_base + 2 * line_stride + line_len],
1614 s2.as_out(),
1615 )
1616 .is_err()
1617 {
1618 decode_err.store(true, std::sync::atomic::Ordering::Relaxed);
1619 return;
1620 }
1621 let s3 = std::slice::from_raw_parts_mut(
1622 out_ptr.add(ob + 3 * decoded_per_line),
1623 decoded_per_line,
1624 );
1625 if BASE64_ENGINE
1626 .decode(
1627 &data[in_base + 3 * line_stride
1628 ..in_base + 3 * line_stride + line_len],
1629 s3.as_out(),
1630 )
1631 .is_err()
1632 {
1633 decode_err.store(true, std::sync::atomic::Ordering::Relaxed);
1634 return;
1635 }
1636 }
1637 i += 4;
1638 }
1639
1640 while i < end_line {
1641 if decode_err.load(std::sync::atomic::Ordering::Relaxed) {
1642 return;
1643 }
1644 let in_start = i * line_stride;
1645 let out_off = i * decoded_per_line;
1646 let out_slice = unsafe {
1647 std::slice::from_raw_parts_mut(out_ptr.add(out_off), decoded_per_line)
1648 };
1649 if BASE64_ENGINE
1650 .decode(&data[in_start..in_start + line_len], out_slice.as_out())
1651 .is_err()
1652 {
1653 decode_err.store(true, std::sync::atomic::Ordering::Relaxed);
1654 return;
1655 }
1656 i += 1;
1657 }
1658 });
1659 }
1660 });
1661
1662 if decode_err.load(std::sync::atomic::Ordering::Relaxed) {
1663 return Some(decode_error());
1664 }
1665 } else {
1666 let mut i = 0;
1668
1669 while i + 4 <= full_lines {
1670 let in_base = i * line_stride;
1671 let out_base = i * decoded_per_line;
1672 unsafe {
1673 let s0 = std::slice::from_raw_parts_mut(dst.add(out_base), decoded_per_line);
1674 if BASE64_ENGINE
1675 .decode(&data[in_base..in_base + line_len], s0.as_out())
1676 .is_err()
1677 {
1678 return Some(decode_error());
1679 }
1680
1681 let s1 = std::slice::from_raw_parts_mut(
1682 dst.add(out_base + decoded_per_line),
1683 decoded_per_line,
1684 );
1685 if BASE64_ENGINE
1686 .decode(
1687 &data[in_base + line_stride..in_base + line_stride + line_len],
1688 s1.as_out(),
1689 )
1690 .is_err()
1691 {
1692 return Some(decode_error());
1693 }
1694
1695 let s2 = std::slice::from_raw_parts_mut(
1696 dst.add(out_base + 2 * decoded_per_line),
1697 decoded_per_line,
1698 );
1699 if BASE64_ENGINE
1700 .decode(
1701 &data[in_base + 2 * line_stride..in_base + 2 * line_stride + line_len],
1702 s2.as_out(),
1703 )
1704 .is_err()
1705 {
1706 return Some(decode_error());
1707 }
1708
1709 let s3 = std::slice::from_raw_parts_mut(
1710 dst.add(out_base + 3 * decoded_per_line),
1711 decoded_per_line,
1712 );
1713 if BASE64_ENGINE
1714 .decode(
1715 &data[in_base + 3 * line_stride..in_base + 3 * line_stride + line_len],
1716 s3.as_out(),
1717 )
1718 .is_err()
1719 {
1720 return Some(decode_error());
1721 }
1722 }
1723 i += 4;
1724 }
1725
1726 while i < full_lines {
1727 let in_start = i * line_stride;
1728 let in_end = in_start + line_len;
1729 let out_off = i * decoded_per_line;
1730 let out_slice =
1731 unsafe { std::slice::from_raw_parts_mut(dst.add(out_off), decoded_per_line) };
1732 match BASE64_ENGINE.decode(&data[in_start..in_end], out_slice.as_out()) {
1733 Ok(_) => {}
1734 Err(_) => return Some(decode_error()),
1735 }
1736 i += 1;
1737 }
1738 }
1739
1740 if remainder_clean_len > 0 {
1742 let rem = if remainder.last() == Some(&b'\n') {
1743 &remainder[..remainder.len() - 1]
1744 } else {
1745 remainder
1746 };
1747 let out_off = full_lines * decoded_per_line;
1748 let out_slice =
1749 unsafe { std::slice::from_raw_parts_mut(dst.add(out_off), remainder_clean_len) };
1750 match BASE64_ENGINE.decode(rem, out_slice.as_out()) {
1751 Ok(_) => {}
1752 Err(_) => return Some(decode_error()),
1753 }
1754 }
1755
1756 Some(out.write_all(&out_buf[..total_decoded]))
1758}
1759
1760fn decode_clean_slice(data: &mut [u8], out: &mut impl Write) -> io::Result<()> {
1762 if data.is_empty() {
1763 return Ok(());
1764 }
1765 match BASE64_ENGINE.decode_inplace(data) {
1766 Ok(decoded) => out.write_all(decoded),
1767 Err(_) => decode_error(),
1768 }
1769}
1770
1771#[cold]
1773#[inline(never)]
1774fn decode_error() -> io::Result<()> {
1775 Err(io::Error::new(io::ErrorKind::InvalidData, "invalid input"))
1776}
1777
1778fn decode_borrowed_clean(out: &mut impl Write, data: &[u8]) -> io::Result<()> {
1780 if data.is_empty() {
1781 return Ok(());
1782 }
1783 if data.len() >= PARALLEL_DECODE_THRESHOLD {
1786 return decode_borrowed_clean_parallel(out, data);
1787 }
1788 let pad = data.iter().rev().take(2).filter(|&&b| b == b'=').count();
1791 let decoded_size = data.len() * 3 / 4 - pad;
1792 let mut buf: Vec<u8> = Vec::with_capacity(decoded_size);
1793 #[allow(clippy::uninit_vec)]
1794 unsafe {
1795 buf.set_len(decoded_size);
1796 }
1797 match BASE64_ENGINE.decode(data, buf[..decoded_size].as_out()) {
1798 Ok(decoded) => {
1799 out.write_all(decoded)?;
1800 Ok(())
1801 }
1802 Err(_) => decode_error(),
1803 }
1804}
1805
1806fn decode_borrowed_clean_parallel(out: &mut impl Write, data: &[u8]) -> io::Result<()> {
1810 let num_threads = num_cpus().max(1);
1811 let raw_chunk = data.len() / num_threads;
1812 let chunk_size = ((raw_chunk + 3) / 4) * 4;
1814
1815 let chunks: Vec<&[u8]> = data.chunks(chunk_size.max(4)).collect();
1816
1817 let mut offsets: Vec<usize> = Vec::with_capacity(chunks.len() + 1);
1819 offsets.push(0);
1820 let mut total_decoded = 0usize;
1821 for (i, chunk) in chunks.iter().enumerate() {
1822 let decoded_size = if i == chunks.len() - 1 {
1823 let pad = chunk.iter().rev().take(2).filter(|&&b| b == b'=').count();
1824 chunk.len() * 3 / 4 - pad
1825 } else {
1826 chunk.len() * 3 / 4
1827 };
1828 total_decoded += decoded_size;
1829 offsets.push(total_decoded);
1830 }
1831
1832 let mut output_buf: Vec<u8> = Vec::with_capacity(total_decoded);
1833 #[allow(clippy::uninit_vec)]
1834 unsafe {
1835 output_buf.set_len(total_decoded);
1836 }
1837 #[cfg(target_os = "linux")]
1838 hint_hugepage(&mut output_buf);
1839
1840 let out_addr = output_buf.as_mut_ptr() as usize;
1843 let err_flag = std::sync::atomic::AtomicBool::new(false);
1844 rayon::scope(|s| {
1845 for (i, chunk) in chunks.iter().enumerate() {
1846 let offset = offsets[i];
1847 let expected_size = offsets[i + 1] - offset;
1848 let err_flag = &err_flag;
1849 s.spawn(move |_| {
1850 if err_flag.load(std::sync::atomic::Ordering::Relaxed) {
1851 return;
1852 }
1853 let out_slice = unsafe {
1855 std::slice::from_raw_parts_mut((out_addr as *mut u8).add(offset), expected_size)
1856 };
1857 if BASE64_ENGINE.decode(chunk, out_slice.as_out()).is_err() {
1858 err_flag.store(true, std::sync::atomic::Ordering::Relaxed);
1859 }
1860 });
1861 }
1862 });
1863
1864 if err_flag.load(std::sync::atomic::Ordering::Relaxed) {
1865 return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid input"));
1866 }
1867
1868 out.write_all(&output_buf[..total_decoded])
1869}
1870
1871fn strip_non_base64(data: &[u8]) -> Vec<u8> {
1873 data.iter()
1874 .copied()
1875 .filter(|&b| is_base64_char(b))
1876 .collect()
1877}
1878
1879#[inline]
1881fn is_base64_char(b: u8) -> bool {
1882 b.is_ascii_alphanumeric() || b == b'+' || b == b'/' || b == b'='
1883}
1884
1885pub fn encode_stream(
1888 reader: &mut impl Read,
1889 wrap_col: usize,
1890 writer: &mut impl Write,
1891) -> io::Result<()> {
1892 if wrap_col == 0 {
1893 return encode_stream_nowrap(reader, writer);
1894 }
1895 encode_stream_wrapped(reader, wrap_col, writer)
1896}
1897
1898fn encode_stream_nowrap(reader: &mut impl Read, writer: &mut impl Write) -> io::Result<()> {
1903 const NOWRAP_READ: usize = 24 * 1024 * 1024; let mut buf: Vec<u8> = Vec::with_capacity(NOWRAP_READ);
1909 #[allow(clippy::uninit_vec)]
1910 unsafe {
1911 buf.set_len(NOWRAP_READ);
1912 }
1913 let encode_buf_size = BASE64_ENGINE.encoded_length(NOWRAP_READ);
1914 let mut encode_buf: Vec<u8> = Vec::with_capacity(encode_buf_size);
1915 #[allow(clippy::uninit_vec)]
1916 unsafe {
1917 encode_buf.set_len(encode_buf_size);
1918 }
1919
1920 loop {
1921 let n = read_full(reader, &mut buf)?;
1922 if n == 0 {
1923 break;
1924 }
1925 let enc_len = BASE64_ENGINE.encoded_length(n);
1926 let encoded = BASE64_ENGINE.encode(&buf[..n], encode_buf[..enc_len].as_out());
1927 writer.write_all(encoded)?;
1928 }
1929 Ok(())
1930}
1931
1932fn encode_stream_wrapped(
1940 reader: &mut impl Read,
1941 wrap_col: usize,
1942 writer: &mut impl Write,
1943) -> io::Result<()> {
1944 let bytes_per_line = wrap_col * 3 / 4;
1945 if bytes_per_line > 0 && bytes_per_line.is_multiple_of(3) {
1949 return encode_stream_wrapped_fused(reader, wrap_col, bytes_per_line, writer);
1950 }
1951
1952 const STREAM_READ: usize = 12 * 1024 * 1024;
1954 let mut buf: Vec<u8> = Vec::with_capacity(STREAM_READ);
1955 #[allow(clippy::uninit_vec)]
1956 unsafe {
1957 buf.set_len(STREAM_READ);
1958 }
1959 let encode_buf_size = BASE64_ENGINE.encoded_length(STREAM_READ);
1960 let mut encode_buf: Vec<u8> = Vec::with_capacity(encode_buf_size);
1961 #[allow(clippy::uninit_vec)]
1962 unsafe {
1963 encode_buf.set_len(encode_buf_size);
1964 }
1965
1966 let mut col = 0usize;
1967
1968 loop {
1969 let n = read_full(reader, &mut buf)?;
1970 if n == 0 {
1971 break;
1972 }
1973 let enc_len = BASE64_ENGINE.encoded_length(n);
1974 let encoded = BASE64_ENGINE.encode(&buf[..n], encode_buf[..enc_len].as_out());
1975
1976 write_wrapped_iov_streaming(encoded, wrap_col, &mut col, writer)?;
1977 }
1978
1979 if col > 0 {
1980 writer.write_all(b"\n")?;
1981 }
1982
1983 Ok(())
1984}
1985
1986fn encode_stream_wrapped_fused(
1992 reader: &mut impl Read,
1993 wrap_col: usize,
1994 bytes_per_line: usize,
1995 writer: &mut impl Write,
1996) -> io::Result<()> {
1997 let lines_per_chunk = (24 * 1024 * 1024) / bytes_per_line;
2000 let read_size = lines_per_chunk * bytes_per_line;
2001 let line_out = wrap_col + 1; let mut buf: Vec<u8> = Vec::with_capacity(read_size);
2006 #[allow(clippy::uninit_vec)]
2007 unsafe {
2008 buf.set_len(read_size);
2009 }
2010 let max_output = lines_per_chunk * line_out + BASE64_ENGINE.encoded_length(bytes_per_line) + 2;
2012 let mut out_buf: Vec<u8> = Vec::with_capacity(max_output);
2013 #[allow(clippy::uninit_vec)]
2014 unsafe {
2015 out_buf.set_len(max_output);
2016 }
2017
2018 loop {
2019 let n = read_full(reader, &mut buf)?;
2020 if n == 0 {
2021 break;
2022 }
2023
2024 let full_lines = n / bytes_per_line;
2025 let remainder = n % bytes_per_line;
2026
2027 let dst = out_buf.as_mut_ptr();
2031 let mut line_idx = 0;
2032
2033 while line_idx + 4 <= full_lines {
2035 let in_base = line_idx * bytes_per_line;
2036 let out_base = line_idx * line_out;
2037 unsafe {
2038 let s0 = std::slice::from_raw_parts_mut(dst.add(out_base), wrap_col);
2039 let _ = BASE64_ENGINE.encode(&buf[in_base..in_base + bytes_per_line], s0.as_out());
2040 *dst.add(out_base + wrap_col) = b'\n';
2041
2042 let s1 = std::slice::from_raw_parts_mut(dst.add(out_base + line_out), wrap_col);
2043 let _ = BASE64_ENGINE.encode(
2044 &buf[in_base + bytes_per_line..in_base + 2 * bytes_per_line],
2045 s1.as_out(),
2046 );
2047 *dst.add(out_base + line_out + wrap_col) = b'\n';
2048
2049 let s2 = std::slice::from_raw_parts_mut(dst.add(out_base + 2 * line_out), wrap_col);
2050 let _ = BASE64_ENGINE.encode(
2051 &buf[in_base + 2 * bytes_per_line..in_base + 3 * bytes_per_line],
2052 s2.as_out(),
2053 );
2054 *dst.add(out_base + 2 * line_out + wrap_col) = b'\n';
2055
2056 let s3 = std::slice::from_raw_parts_mut(dst.add(out_base + 3 * line_out), wrap_col);
2057 let _ = BASE64_ENGINE.encode(
2058 &buf[in_base + 3 * bytes_per_line..in_base + 4 * bytes_per_line],
2059 s3.as_out(),
2060 );
2061 *dst.add(out_base + 3 * line_out + wrap_col) = b'\n';
2062 }
2063 line_idx += 4;
2064 }
2065
2066 while line_idx < full_lines {
2068 let in_base = line_idx * bytes_per_line;
2069 let out_base = line_idx * line_out;
2070 unsafe {
2071 let s = std::slice::from_raw_parts_mut(dst.add(out_base), wrap_col);
2072 let _ = BASE64_ENGINE.encode(&buf[in_base..in_base + bytes_per_line], s.as_out());
2073 *dst.add(out_base + wrap_col) = b'\n';
2074 }
2075 line_idx += 1;
2076 }
2077
2078 let mut wp = full_lines * line_out;
2079
2080 if remainder > 0 {
2082 let enc_len = BASE64_ENGINE.encoded_length(remainder);
2083 let line_input = &buf[full_lines * bytes_per_line..n];
2084 unsafe {
2085 let s = std::slice::from_raw_parts_mut(dst.add(wp), enc_len);
2086 let _ = BASE64_ENGINE.encode(line_input, s.as_out());
2087 *dst.add(wp + enc_len) = b'\n';
2088 }
2089 wp += enc_len + 1;
2090 }
2091
2092 writer.write_all(&out_buf[..wp])?;
2093 }
2094
2095 Ok(())
2096}
2097
2098pub fn decode_stream(
2105 reader: &mut impl Read,
2106 ignore_garbage: bool,
2107 writer: &mut impl Write,
2108) -> io::Result<()> {
2109 const READ_CHUNK: usize = 32 * 1024 * 1024;
2110 let mut buf: Vec<u8> = Vec::with_capacity(READ_CHUNK + 4);
2113 #[allow(clippy::uninit_vec)]
2114 unsafe {
2115 buf.set_len(READ_CHUNK + 4);
2116 }
2117 let mut carry = [0u8; 4];
2118 let mut carry_len = 0usize;
2119
2120 loop {
2121 if carry_len > 0 {
2123 unsafe {
2124 std::ptr::copy_nonoverlapping(carry.as_ptr(), buf.as_mut_ptr(), carry_len);
2125 }
2126 }
2127 let n = read_full(reader, &mut buf[carry_len..carry_len + READ_CHUNK])?;
2128 if n == 0 {
2129 break;
2130 }
2131 let total_raw = carry_len + n;
2132
2133 let clean_len = if ignore_garbage {
2136 let ptr = buf.as_mut_ptr();
2138 let mut wp = 0usize;
2139 for i in 0..total_raw {
2140 let b = unsafe { *ptr.add(i) };
2141 if is_base64_char(b) {
2142 unsafe { *ptr.add(wp) = b };
2143 wp += 1;
2144 }
2145 }
2146 wp
2147 } else {
2148 let ptr = buf.as_mut_ptr();
2152 let data = &buf[..total_raw];
2153 let mut wp = 0usize;
2154 let mut gap_start = 0usize;
2155 let mut has_rare_ws = false;
2156
2157 for pos in memchr::memchr2_iter(b'\n', b'\r', data) {
2158 let gap_len = pos - gap_start;
2159 if gap_len > 0 {
2160 if !has_rare_ws {
2161 has_rare_ws = data[gap_start..pos]
2162 .iter()
2163 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
2164 }
2165 if wp != gap_start {
2166 unsafe {
2167 std::ptr::copy(ptr.add(gap_start), ptr.add(wp), gap_len);
2168 }
2169 }
2170 wp += gap_len;
2171 }
2172 gap_start = pos + 1;
2173 }
2174 let tail_len = total_raw - gap_start;
2175 if tail_len > 0 {
2176 if !has_rare_ws {
2177 has_rare_ws = data[gap_start..total_raw]
2178 .iter()
2179 .any(|&b| b == b' ' || b == b'\t' || b == 0x0b || b == 0x0c);
2180 }
2181 if wp != gap_start {
2182 unsafe {
2183 std::ptr::copy(ptr.add(gap_start), ptr.add(wp), tail_len);
2184 }
2185 }
2186 wp += tail_len;
2187 }
2188
2189 if has_rare_ws {
2191 let mut rp = 0;
2192 let mut cwp = 0;
2193 while rp < wp {
2194 let b = unsafe { *ptr.add(rp) };
2195 if NOT_WHITESPACE[b as usize] {
2196 unsafe { *ptr.add(cwp) = b };
2197 cwp += 1;
2198 }
2199 rp += 1;
2200 }
2201 cwp
2202 } else {
2203 wp
2204 }
2205 };
2206
2207 carry_len = 0;
2208 let is_last = n < READ_CHUNK;
2209
2210 if is_last {
2211 decode_clean_slice(&mut buf[..clean_len], writer)?;
2213 } else {
2214 let decode_len = (clean_len / 4) * 4;
2216 let leftover = clean_len - decode_len;
2217 if leftover > 0 {
2218 unsafe {
2219 std::ptr::copy_nonoverlapping(
2220 buf.as_ptr().add(decode_len),
2221 carry.as_mut_ptr(),
2222 leftover,
2223 );
2224 }
2225 carry_len = leftover;
2226 }
2227 if decode_len > 0 {
2228 decode_clean_slice(&mut buf[..decode_len], writer)?;
2229 }
2230 }
2231 }
2232
2233 if carry_len > 0 {
2235 let mut carry_buf = carry[..carry_len].to_vec();
2236 decode_clean_slice(&mut carry_buf, writer)?;
2237 }
2238
2239 Ok(())
2240}
2241
2242#[inline(always)]
2246fn write_all_vectored(out: &mut impl Write, slices: &[io::IoSlice]) -> io::Result<()> {
2247 if slices.is_empty() {
2248 return Ok(());
2249 }
2250 let total: usize = slices.iter().map(|s| s.len()).sum();
2251 let written = out.write_vectored(slices)?;
2252 if written >= total {
2253 return Ok(());
2254 }
2255 if written == 0 {
2256 return Err(io::Error::new(io::ErrorKind::WriteZero, "write zero"));
2257 }
2258 write_all_vectored_slow(out, slices, written)
2259}
2260
2261#[cold]
2263#[inline(never)]
2264fn write_all_vectored_slow(
2265 out: &mut impl Write,
2266 slices: &[io::IoSlice],
2267 mut skip: usize,
2268) -> io::Result<()> {
2269 for slice in slices {
2270 let len = slice.len();
2271 if skip >= len {
2272 skip -= len;
2273 continue;
2274 }
2275 out.write_all(&slice[skip..])?;
2276 skip = 0;
2277 }
2278 Ok(())
2279}
2280
2281#[inline]
2285fn read_full(reader: &mut impl Read, buf: &mut [u8]) -> io::Result<usize> {
2286 let n = reader.read(buf)?;
2288 if n == buf.len() || n == 0 {
2289 return Ok(n);
2290 }
2291 let mut total = n;
2293 while total < buf.len() {
2294 match reader.read(&mut buf[total..]) {
2295 Ok(0) => break,
2296 Ok(n) => total += n,
2297 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
2298 Err(e) => return Err(e),
2299 }
2300 }
2301 Ok(total)
2302}