Skip to main content

base64_ng/stream/
decoder.rs

1use super::{
2    OutputQueue, decode_error_to_io, redacted_inner_state, stream_decoder_failed_error,
3    trailing_input_after_padding_error,
4};
5use crate::{Alphabet, Engine};
6use std::io::{self, Write};
7
8/// A streaming Base64 decoder for `std::io::Write`.
9///
10/// Like any [`Write`] implementation, [`Write::write`] may accept only
11/// part of the provided input. Accepted input may be held as decoded
12/// output until [`Write::flush`], [`Self::try_finish`], [`Self::finish`],
13/// or a later write drains the wrapped writer. Use [`Write::write_all`]
14/// when the whole input slice must be consumed.
15///
16/// # Security
17///
18/// This adapter uses the normal strict decoder, not the [`crate::ct`]
19/// module. It may branch or return early based on malformed input and it
20/// preserves strict error diagnostics. Do not use it for secret-bearing
21/// payloads when malformed-input timing matters; decode a complete frame
22/// with the matching `ct` engine instead.
23///
24/// Decoded bytes are written to the wrapped writer as valid quads are
25/// accepted. If a later quad in the same logical frame is malformed, already
26/// written bytes cannot be recalled from sockets, pipes, files, or other
27/// external sinks. For atomic frame semantics, decode into an in-memory buffer
28/// first and transfer to the final writer only after [`Self::finish`] succeeds.
29///
30/// If malformed input is detected after earlier quads in the same
31/// [`Write::write`] call were accepted, the adapter may return `Ok(consumed)`
32/// for the accepted prefix while latching itself as failed. The next write,
33/// flush, or finish call then returns the stored failure state. Callers must
34/// follow normal `Write` partial-progress rules and continue checking for the
35/// terminal error.
36pub struct Decoder<W, A, const PAD: bool>
37where
38    A: Alphabet,
39{
40    inner: Option<W>,
41    engine: Engine<A, PAD>,
42    pending: [u8; 4],
43    pending_len: usize,
44    output: OutputQueue<1024>,
45    finished: bool,
46    failed: bool,
47    finalized: bool,
48}
49
50impl<W, A, const PAD: bool> Decoder<W, A, PAD>
51where
52    A: Alphabet,
53{
54    /// Creates a new streaming decoder.
55    ///
56    /// # Security
57    ///
58    /// Streaming decoders use the normal strict decode path. They are not
59    /// constant-time-oriented secret decoders.
60    #[must_use]
61    pub const fn new(inner: W, engine: Engine<A, PAD>) -> Self {
62        Self {
63            inner: Some(inner),
64            engine,
65            pending: [0; 4],
66            pending_len: 0,
67            output: OutputQueue::new(),
68            finished: false,
69            finalized: false,
70            failed: false,
71        }
72    }
73
74    /// Returns a shared reference to the wrapped writer.
75    #[must_use]
76    pub fn get_ref(&self) -> &W {
77        self.inner_ref()
78    }
79
80    /// Returns a mutable reference to the wrapped writer.
81    pub fn get_mut(&mut self) -> &mut W {
82        self.inner_mut()
83    }
84
85    /// Returns the Base64 engine used by this adapter.
86    #[must_use]
87    pub const fn engine(&self) -> Engine<A, PAD> {
88        self.engine
89    }
90
91    /// Returns whether this adapter uses padded Base64.
92    #[must_use]
93    pub const fn is_padded(&self) -> bool {
94        PAD
95    }
96
97    /// Returns the number of encoded input bytes currently buffered until
98    /// a complete 4-byte Base64 decode quantum is available.
99    #[must_use]
100    pub const fn pending_len(&self) -> usize {
101        self.pending_len
102    }
103
104    /// Returns whether this decoder currently holds a partial input
105    /// quantum.
106    #[must_use]
107    pub const fn has_pending_input(&self) -> bool {
108        self.pending_len != 0
109    }
110
111    /// Returns how many additional input bytes are needed to complete the
112    /// currently buffered decode quantum.
113    ///
114    /// Returns `0` when no partial input quantum is buffered.
115    #[must_use]
116    pub const fn pending_input_needed_len(&self) -> usize {
117        if self.has_pending_input() {
118            4 - self.pending_len
119        } else {
120            0
121        }
122    }
123
124    /// Returns the number of decoded bytes buffered for the wrapped writer
125    /// after a previous write or flush could not fully drain them.
126    #[must_use]
127    pub const fn buffered_output_len(&self) -> usize {
128        self.output.len()
129    }
130
131    /// Returns the maximum number of decoded bytes this adapter can buffer
132    /// before returning bytes to the caller.
133    #[must_use]
134    pub const fn buffered_output_capacity(&self) -> usize {
135        self.output.capacity()
136    }
137
138    /// Returns how many more decoded bytes can be buffered before this
139    /// adapter must drain the wrapped writer.
140    #[must_use]
141    pub const fn buffered_output_remaining_capacity(&self) -> usize {
142        self.output.available_capacity()
143    }
144
145    /// Returns whether this decoder has decoded output waiting to be
146    /// written to the wrapped writer.
147    #[must_use]
148    pub const fn has_buffered_output(&self) -> bool {
149        !self.output.is_empty()
150    }
151
152    /// Returns whether this decoder has processed a terminal padded block.
153    ///
154    /// Once this returns `true`, later calls to [`Write::write`] with
155    /// additional input return an error because strict Base64 does not
156    /// permit trailing payload bytes after padding.
157    #[must_use]
158    pub const fn has_terminal_padding(&self) -> bool {
159        self.finished
160    }
161
162    /// Returns whether this decoder has been finalized.
163    ///
164    /// Once this returns `true`, later non-empty writes return an error.
165    #[must_use]
166    pub const fn is_finalized(&self) -> bool {
167        self.finalized
168    }
169
170    /// Returns whether this decoder has rejected malformed Base64 input.
171    ///
172    /// Once this returns `true`, later writes, flushes, and finalization
173    /// attempts return an error. The unchecked [`Self::into_inner`] method
174    /// can still be used for explicit recovery of the wrapped writer.
175    #[must_use]
176    pub const fn is_failed(&self) -> bool {
177        self.failed
178    }
179
180    /// Returns whether [`Self::try_into_inner`] can recover the wrapped
181    /// writer without discarding pending encoded input.
182    #[must_use]
183    pub const fn can_into_inner(&self) -> bool {
184        !self.is_failed() && !self.has_pending_input() && !self.has_buffered_output()
185    }
186
187    /// Consumes the decoder without flushing pending input.
188    ///
189    /// Prefer [`Self::finish`] when the decoded output must be complete.
190    #[must_use]
191    pub fn into_inner(mut self) -> W {
192        self.take_inner()
193    }
194
195    /// Consumes the decoder only when no partial input quantum is buffered.
196    ///
197    /// This does not flush or finalize the wrapped writer. It is a checked
198    /// alternative to [`Self::into_inner`] for callers that want to avoid
199    /// accidentally discarding pending encoded input bytes.
200    #[allow(clippy::result_large_err)]
201    pub fn try_into_inner(mut self) -> Result<W, Self> {
202        if !self.can_into_inner() {
203            return Err(self);
204        }
205        Ok(self.take_inner())
206    }
207
208    fn inner_ref(&self) -> &W {
209        match &self.inner {
210            Some(inner) => inner,
211            None => unreachable!("stream decoder inner writer was already taken"),
212        }
213    }
214
215    fn inner_mut(&mut self) -> &mut W {
216        match &mut self.inner {
217            Some(inner) => inner,
218            None => unreachable!("stream decoder inner writer was already taken"),
219        }
220    }
221
222    fn take_inner(&mut self) -> W {
223        match self.inner.take() {
224            Some(inner) => inner,
225            None => unreachable!("stream decoder inner writer was already taken"),
226        }
227    }
228
229    fn clear_pending(&mut self) {
230        crate::wipe_bytes(&mut self.pending);
231        self.pending_len = 0;
232    }
233
234    fn clear_output(&mut self) {
235        self.output.clear_all();
236    }
237}
238
239impl<W, A, const PAD: bool> Drop for Decoder<W, A, PAD>
240where
241    A: Alphabet,
242{
243    fn drop(&mut self) {
244        self.clear_pending();
245        self.clear_output();
246    }
247}
248
249impl<W, A, const PAD: bool> core::fmt::Debug for Decoder<W, A, PAD>
250where
251    A: Alphabet,
252{
253    fn fmt(&self, formatter: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
254        formatter
255            .debug_struct("Decoder")
256            .field("inner", &redacted_inner_state(self.inner.is_some()))
257            .field("engine", &self.engine)
258            .field("pending", &"<redacted>")
259            .field("pending_len", &self.pending_len)
260            .field("pending_input_needed_len", &self.pending_input_needed_len())
261            .field("buffered_output_len", &self.output.len())
262            .field("buffered_output_capacity", &self.output.capacity())
263            .field(
264                "buffered_output_remaining_capacity",
265                &self.output.available_capacity(),
266            )
267            .field("can_into_inner", &self.can_into_inner())
268            .field("terminal_padding", &self.finished)
269            .field("finalized", &self.finalized)
270            .field("failed", &self.failed)
271            .finish()
272    }
273}
274
275impl<W, A, const PAD: bool> Decoder<W, A, PAD>
276where
277    W: Write,
278    A: Alphabet,
279{
280    /// Validates any final pending input and flushes the wrapped writer
281    /// without consuming this decoder.
282    ///
283    /// After this succeeds, [`Self::pending_len`] returns `0`, later
284    /// writes are rejected, and [`Self::finish`] can still be used to
285    /// recover the wrapped writer.
286    /// If the final buffered input is malformed, an error is returned and
287    /// the caller still owns the decoder for diagnostics or explicit
288    /// recovery.
289    pub fn try_finish(&mut self) -> io::Result<()> {
290        if self.failed {
291            return Err(stream_decoder_failed_error());
292        }
293        if !self.finalized {
294            self.queue_pending_final()?;
295            self.finalized = true;
296        }
297        self.flush()
298    }
299
300    /// Validates final pending input, flushes the wrapped writer, and returns it.
301    pub fn finish(mut self) -> io::Result<W> {
302        self.try_finish()?;
303        Ok(self.take_inner())
304    }
305
306    fn queue_pending_final(&mut self) -> io::Result<()> {
307        if self.pending_len == 0 {
308            return Ok(());
309        }
310
311        let mut pending = [0u8; 4];
312        pending[..self.pending_len].copy_from_slice(&self.pending[..self.pending_len]);
313        let pending_len = self.pending_len;
314        let mut decoded = [0u8; 3];
315        let result = self.queue_decoded_temp(&pending[..pending_len], &mut decoded);
316        crate::wipe_bytes(&mut pending);
317        if let Err(err) = result {
318            self.clear_pending();
319            return Err(err);
320        }
321        self.clear_pending();
322        Ok(())
323    }
324
325    fn queue_full_quad(&mut self, mut input: [u8; 4]) -> io::Result<()> {
326        let mut decoded = [0u8; 3];
327        let result = self.queue_decoded_temp(&input, &mut decoded);
328        crate::wipe_bytes(&mut input);
329        let written = result?;
330        if written < 3 {
331            self.finished = true;
332        }
333        Ok(())
334    }
335
336    fn queue_decoded_temp(&mut self, input: &[u8], decoded: &mut [u8]) -> io::Result<usize> {
337        let written = match self.engine.decode_slice(input, decoded) {
338            Ok(written) => written,
339            Err(err) => {
340                crate::wipe_bytes(decoded);
341                self.failed = true;
342                return Err(decode_error_to_io(err));
343            }
344        };
345
346        let result = self.output.push_slice(&decoded[..written]);
347        crate::wipe_bytes(decoded);
348        if result.is_err() {
349            self.failed = true;
350        }
351        result?;
352        Ok(written)
353    }
354
355    fn drain_output(&mut self) -> io::Result<()> {
356        let mut chunk = [0u8; 1024];
357        while !self.output.is_empty() {
358            let pending = self.output.copy_front(&mut chunk);
359            let result = self.inner_mut().write(&chunk[..pending]);
360            crate::wipe_bytes(&mut chunk[..pending]);
361            match result {
362                Ok(0) => {
363                    return Err(io::Error::new(
364                        io::ErrorKind::WriteZero,
365                        "base64 stream decoder could not drain buffered output",
366                    ));
367                }
368                Ok(written) => {
369                    if written > pending {
370                        self.failed = true;
371                        return Err(io::Error::new(
372                            io::ErrorKind::InvalidData,
373                            "wrapped writer reported more bytes than provided",
374                        ));
375                    }
376                    self.output.discard_front(written);
377                }
378                Err(err) => return Err(err),
379            }
380        }
381
382        Ok(())
383    }
384}
385
386impl<W, A, const PAD: bool> Write for Decoder<W, A, PAD>
387where
388    W: Write,
389    A: Alphabet,
390{
391    fn write(&mut self, input: &[u8]) -> io::Result<usize> {
392        if self.failed {
393            return Err(stream_decoder_failed_error());
394        }
395        if input.is_empty() {
396            self.drain_output()?;
397            return Ok(0);
398        }
399        self.drain_output()?;
400        if self.finalized {
401            return Err(io::Error::new(
402                io::ErrorKind::InvalidInput,
403                "base64 stream decoder received input after finalization",
404            ));
405        }
406        if self.finished {
407            self.failed = true;
408            return Err(trailing_input_after_padding_error());
409        }
410
411        let mut consumed = 0;
412        if self.pending_len > 0 {
413            let needed = 4 - self.pending_len;
414            if input.len() < needed {
415                self.pending[self.pending_len..self.pending_len + input.len()]
416                    .copy_from_slice(input);
417                self.pending_len += input.len();
418                return Ok(input.len());
419            }
420
421            let mut quad = [0u8; 4];
422            quad[..self.pending_len].copy_from_slice(&self.pending[..self.pending_len]);
423            quad[self.pending_len..].copy_from_slice(&input[..needed]);
424            let result = self.queue_full_quad(quad);
425            crate::wipe_bytes(&mut quad);
426            if let Err(err) = result {
427                self.clear_pending();
428                return Err(err);
429            }
430            self.clear_pending();
431            consumed += needed;
432
433            if self.finished {
434                return Ok(consumed);
435            }
436        }
437
438        while input.len() - consumed >= 4 {
439            if self.output.available_capacity() < 3 {
440                return Ok(consumed);
441            }
442
443            let mut quad = [
444                input[consumed],
445                input[consumed + 1],
446                input[consumed + 2],
447                input[consumed + 3],
448            ];
449            let mut decoded = [0u8; 3];
450            let written = match self.engine.decode_slice(&quad, &mut decoded) {
451                Ok(written) => written,
452                Err(err) => {
453                    crate::wipe_bytes(&mut quad);
454                    crate::wipe_bytes(&mut decoded);
455                    self.failed = true;
456                    if consumed > 0 {
457                        // Report accepted prefix progress per `io::Write`.
458                        // The adapter is now failed; the next write/flush/
459                        // finish call returns an error for the malformed quad.
460                        return Ok(consumed);
461                    }
462
463                    return Err(decode_error_to_io(err));
464                }
465            };
466
467            let result = self.output.push_slice(&decoded[..written]);
468            crate::wipe_bytes(&mut quad);
469            crate::wipe_bytes(&mut decoded);
470            result?;
471            consumed += 4;
472
473            if written < 3 {
474                self.finished = true;
475                return Ok(consumed);
476            }
477        }
478
479        let tail = &input[consumed..];
480        self.pending[..tail.len()].copy_from_slice(tail);
481        self.pending_len = tail.len();
482        consumed += tail.len();
483
484        Ok(consumed)
485    }
486
487    fn flush(&mut self) -> io::Result<()> {
488        if self.failed {
489            return Err(stream_decoder_failed_error());
490        }
491        self.drain_output()?;
492        self.inner_mut().flush()
493    }
494}