Skip to main content

base64_ng/stream/
encoder.rs

1use super::{OutputQueue, encode_error_to_io, redacted_inner_state, stream_encoder_failed_error};
2use crate::{Alphabet, Engine};
3use std::io::{self, Write};
4
5/// A streaming Base64 encoder for `std::io::Write`.
6///
7/// Like any [`Write`] implementation, [`Write::write`] may accept only
8/// part of the provided input. Accepted input may be held as encoded
9/// output until [`Write::flush`], [`Self::try_finish`], [`Self::finish`],
10/// or a later write drains the wrapped writer. Use [`Write::write_all`]
11/// when the whole input slice must be consumed.
12pub struct Encoder<W, A, const PAD: bool>
13where
14    A: Alphabet,
15{
16    inner: Option<W>,
17    engine: Engine<A, PAD>,
18    pending: [u8; 2],
19    pending_len: usize,
20    output: OutputQueue<1024>,
21    finalized: bool,
22    failed: bool,
23}
24
25impl<W, A, const PAD: bool> Encoder<W, A, PAD>
26where
27    A: Alphabet,
28{
29    /// Creates a new streaming encoder.
30    #[must_use]
31    pub const fn new(inner: W, engine: Engine<A, PAD>) -> Self {
32        Self {
33            inner: Some(inner),
34            engine,
35            pending: [0; 2],
36            pending_len: 0,
37            output: OutputQueue::new(),
38            finalized: false,
39            failed: false,
40        }
41    }
42
43    /// Returns a shared reference to the wrapped writer.
44    #[must_use]
45    pub fn get_ref(&self) -> &W {
46        self.inner_ref()
47    }
48
49    /// Returns a mutable reference to the wrapped writer.
50    pub fn get_mut(&mut self) -> &mut W {
51        self.inner_mut()
52    }
53
54    /// Returns the Base64 engine used by this adapter.
55    #[must_use]
56    pub const fn engine(&self) -> Engine<A, PAD> {
57        self.engine
58    }
59
60    /// Returns whether this adapter uses padded Base64.
61    #[must_use]
62    pub const fn is_padded(&self) -> bool {
63        PAD
64    }
65
66    /// Returns the number of raw input bytes currently buffered until a
67    /// complete 3-byte Base64 encode quantum is available.
68    #[must_use]
69    pub const fn pending_len(&self) -> usize {
70        self.pending_len
71    }
72
73    /// Returns whether this encoder currently holds a partial input
74    /// quantum.
75    #[must_use]
76    pub const fn has_pending_input(&self) -> bool {
77        self.pending_len != 0
78    }
79
80    /// Returns how many additional input bytes are needed to complete the
81    /// currently buffered encode quantum.
82    ///
83    /// Returns `0` when no partial input quantum is buffered.
84    #[must_use]
85    pub const fn pending_input_needed_len(&self) -> usize {
86        if self.has_pending_input() {
87            3 - self.pending_len
88        } else {
89            0
90        }
91    }
92
93    /// Returns the number of encoded bytes buffered for the wrapped
94    /// writer after a previous write or flush could not fully drain them.
95    #[must_use]
96    pub const fn buffered_output_len(&self) -> usize {
97        self.output.len()
98    }
99
100    /// Returns the maximum number of encoded bytes this adapter can buffer
101    /// before returning bytes to the caller.
102    #[must_use]
103    pub const fn buffered_output_capacity(&self) -> usize {
104        self.output.capacity()
105    }
106
107    /// Returns how many more encoded bytes can be buffered before this
108    /// adapter must drain the wrapped writer.
109    #[must_use]
110    pub const fn buffered_output_remaining_capacity(&self) -> usize {
111        self.output.available_capacity()
112    }
113
114    /// Returns whether this encoder has encoded output waiting to be
115    /// written to the wrapped writer.
116    #[must_use]
117    pub const fn has_buffered_output(&self) -> bool {
118        !self.output.is_empty()
119    }
120
121    /// Returns whether this encoder has been finalized.
122    ///
123    /// Once this returns `true`, later writes return an error.
124    #[must_use]
125    pub const fn is_finalized(&self) -> bool {
126        self.finalized
127    }
128
129    /// Returns whether this encoder has failed closed after an unrecoverable
130    /// internal encoding or buffering error.
131    ///
132    /// Ordinary wrapped-writer I/O errors are retryable and do not set this
133    /// flag. Once this returns `true`, later writes, flushes, and finalization
134    /// attempts return an error. The unchecked [`Self::into_inner`] method can
135    /// still be used for explicit recovery of the wrapped writer.
136    #[must_use]
137    pub const fn is_failed(&self) -> bool {
138        self.failed
139    }
140
141    /// Returns whether [`Self::try_into_inner`] can recover the wrapped
142    /// writer without discarding pending input.
143    #[must_use]
144    pub const fn can_into_inner(&self) -> bool {
145        !self.is_failed() && !self.has_pending_input() && !self.has_buffered_output()
146    }
147
148    /// Consumes the encoder without flushing pending input.
149    ///
150    /// Prefer [`Self::finish`] when the encoded output must be complete.
151    #[must_use]
152    pub fn into_inner(mut self) -> W {
153        self.take_inner()
154    }
155
156    /// Consumes the encoder only when no partial input quantum is buffered.
157    ///
158    /// This does not flush or finalize the wrapped writer. It is a checked
159    /// alternative to [`Self::into_inner`] for callers that want to avoid
160    /// accidentally discarding pending input bytes.
161    #[allow(clippy::result_large_err)]
162    pub fn try_into_inner(mut self) -> Result<W, Self> {
163        if !self.can_into_inner() {
164            return Err(self);
165        }
166        Ok(self.take_inner())
167    }
168
169    fn inner_ref(&self) -> &W {
170        match &self.inner {
171            Some(inner) => inner,
172            None => unreachable!("stream encoder inner writer was already taken"),
173        }
174    }
175
176    fn inner_mut(&mut self) -> &mut W {
177        match &mut self.inner {
178            Some(inner) => inner,
179            None => unreachable!("stream encoder inner writer was already taken"),
180        }
181    }
182
183    fn take_inner(&mut self) -> W {
184        match self.inner.take() {
185            Some(inner) => inner,
186            None => unreachable!("stream encoder inner writer was already taken"),
187        }
188    }
189
190    fn clear_pending(&mut self) {
191        crate::wipe_bytes(&mut self.pending);
192        self.pending_len = 0;
193    }
194
195    fn clear_output(&mut self) {
196        self.output.clear_all();
197    }
198}
199
200impl<W, A, const PAD: bool> Drop for Encoder<W, A, PAD>
201where
202    A: Alphabet,
203{
204    fn drop(&mut self) {
205        self.clear_pending();
206        self.clear_output();
207    }
208}
209
210impl<W, A, const PAD: bool> core::fmt::Debug for Encoder<W, A, PAD>
211where
212    A: Alphabet,
213{
214    fn fmt(&self, formatter: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
215        formatter
216            .debug_struct("Encoder")
217            .field("inner", &redacted_inner_state(self.inner.is_some()))
218            .field("engine", &self.engine)
219            .field("pending", &"<redacted>")
220            .field("pending_len", &self.pending_len)
221            .field("pending_input_needed_len", &self.pending_input_needed_len())
222            .field("buffered_output_len", &self.output.len())
223            .field("buffered_output_capacity", &self.output.capacity())
224            .field(
225                "buffered_output_remaining_capacity",
226                &self.output.available_capacity(),
227            )
228            .field("can_into_inner", &self.can_into_inner())
229            .field("finalized", &self.finalized)
230            .field("failed", &self.failed)
231            .finish()
232    }
233}
234
235impl<W, A, const PAD: bool> Encoder<W, A, PAD>
236where
237    W: Write,
238    A: Alphabet,
239{
240    /// Writes any pending input and flushes the wrapped writer without
241    /// consuming this encoder.
242    ///
243    /// After this succeeds, [`Self::pending_len`] returns `0`, later
244    /// writes are rejected, and [`Self::finish`] can still be used to
245    /// recover the wrapped writer.
246    /// This is useful when a caller needs to finalize a framed payload
247    /// while keeping the stream adapter available for diagnostics or
248    /// explicit recovery.
249    pub fn try_finish(&mut self) -> io::Result<()> {
250        if self.failed {
251            return Err(stream_encoder_failed_error());
252        }
253        if !self.finalized {
254            self.queue_pending_final()?;
255            self.finalized = true;
256        }
257        self.flush()
258    }
259
260    /// Writes any pending input, flushes the wrapped writer, and returns it.
261    pub fn finish(mut self) -> io::Result<W> {
262        self.try_finish()?;
263        Ok(self.take_inner())
264    }
265
266    fn queue_pending_final(&mut self) -> io::Result<()> {
267        if self.pending_len == 0 {
268            return Ok(());
269        }
270
271        let mut pending = [0u8; 2];
272        pending[..self.pending_len].copy_from_slice(&self.pending[..self.pending_len]);
273        let pending_len = self.pending_len;
274        let mut encoded = [0u8; 4];
275        let result = self.queue_encoded_temp(&pending[..pending_len], &mut encoded);
276        crate::wipe_bytes(&mut pending);
277        result?;
278        self.clear_pending();
279        Ok(())
280    }
281
282    fn queue_encoded_temp(&mut self, input: &[u8], encoded: &mut [u8]) -> io::Result<()> {
283        let written = match self.engine.encode_slice(input, encoded) {
284            Ok(written) => written,
285            Err(err) => {
286                crate::wipe_bytes(encoded);
287                self.failed = true;
288                return Err(encode_error_to_io(err));
289            }
290        };
291
292        let result = self.output.push_slice(&encoded[..written]);
293        crate::wipe_bytes(encoded);
294        if result.is_err() {
295            self.failed = true;
296        }
297        result
298    }
299
300    fn drain_output(&mut self) -> io::Result<()> {
301        let mut chunk = [0u8; 1024];
302        while !self.output.is_empty() {
303            let pending = self.output.copy_front(&mut chunk);
304            let result = self.inner_mut().write(&chunk[..pending]);
305            crate::wipe_bytes(&mut chunk[..pending]);
306            match result {
307                Ok(0) => {
308                    return Err(io::Error::new(
309                        io::ErrorKind::WriteZero,
310                        "base64 stream encoder could not drain buffered output",
311                    ));
312                }
313                Ok(written) => {
314                    if written > pending {
315                        self.failed = true;
316                        return Err(io::Error::new(
317                            io::ErrorKind::InvalidData,
318                            "wrapped writer reported more bytes than provided",
319                        ));
320                    }
321                    self.output.discard_front(written);
322                }
323                Err(err) => return Err(err),
324            }
325        }
326
327        Ok(())
328    }
329}
330
331impl<W, A, const PAD: bool> Write for Encoder<W, A, PAD>
332where
333    W: Write,
334    A: Alphabet,
335{
336    fn write(&mut self, input: &[u8]) -> io::Result<usize> {
337        if self.failed {
338            return Err(stream_encoder_failed_error());
339        }
340        self.drain_output()?;
341        if self.finalized {
342            return Err(io::Error::new(
343                io::ErrorKind::InvalidInput,
344                "base64 stream encoder received input after finalization",
345            ));
346        }
347        if input.is_empty() {
348            return Ok(0);
349        }
350
351        let mut consumed = 0;
352        if self.pending_len > 0 {
353            let needed = 3 - self.pending_len;
354            if input.len() < needed {
355                self.pending[self.pending_len..self.pending_len + input.len()]
356                    .copy_from_slice(input);
357                self.pending_len += input.len();
358                return Ok(input.len());
359            }
360
361            let mut chunk = [0u8; 3];
362            chunk[..self.pending_len].copy_from_slice(&self.pending[..self.pending_len]);
363            chunk[self.pending_len..].copy_from_slice(&input[..needed]);
364
365            let mut encoded = [0u8; 4];
366            let result = self.queue_encoded_temp(&chunk, &mut encoded);
367            crate::wipe_bytes(&mut chunk);
368            result?;
369            self.clear_pending();
370            consumed += needed;
371        }
372
373        let remaining = &input[consumed..];
374        let full_len = remaining.len() / 3 * 3;
375        if full_len > 0 {
376            let max_by_queue = self.output.available_capacity() / 4 * 3;
377            let mut take = core::cmp::min(full_len, core::cmp::min(768, max_by_queue));
378            take -= take % 3;
379
380            if take == 0 {
381                return Ok(consumed);
382            }
383
384            let mut encoded = [0u8; 1024];
385            self.queue_encoded_temp(&remaining[..take], &mut encoded)?;
386            consumed += take;
387
388            if take < full_len {
389                return Ok(consumed);
390            }
391        }
392
393        let tail = &input[consumed..];
394        self.pending[..tail.len()].copy_from_slice(tail);
395        self.pending_len = tail.len();
396        consumed += tail.len();
397
398        Ok(consumed)
399    }
400
401    fn flush(&mut self) -> io::Result<()> {
402        if self.failed {
403            return Err(stream_encoder_failed_error());
404        }
405        self.drain_output()?;
406        self.inner_mut().flush()
407    }
408}