Skip to main content

qubit_codec/buffered/
buffered_convert_engine.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Reusable buffered converter engine.
11
12use super::{
13    buffered_convert_hooks::BufferedConvertHooks,
14    buffered_decode_engine::BufferedDecodeEngine,
15    buffered_encode_engine::BufferedEncodeEngine,
16    convert_error_of::{
17        ConvertErrorOf,
18        ConvertProgressResult,
19    },
20    convert_state::ConvertState,
21    convert_step_result::ConvertStepResult,
22    encode_context::EncodeContext,
23    encode_step::EncodeStep,
24    finish_error::FinishError,
25    pending_encode_step::PendingEncodeStep,
26    pending_value::PendingValue,
27    pending_value_slot::PendingValueSlot,
28};
29use crate::{
30    CapacityError,
31    Codec,
32    codec::assert_unit_bounds,
33};
34
35/// Reusable buffered conversion engine.
36///
37/// The engine owns reusable buffered decode and encode engines plus a small
38/// conversion-level hook object. It keeps common converter control flow private:
39/// index validation, pending-value retention, pending flush, decode-error
40/// policy dispatch, encode planning, output-capacity checks, and progress
41/// reporting.
42///
43/// `BufferedConvertEngine` is intentionally batch-oriented. Its public
44/// `transcode` method drives a source/output buffer loop and reuses the same
45/// unchecked codec and hook primitives as [`crate::BufferedDecodeEngine`] and
46/// [`crate::BufferedEncodeEngine`]. It does not call one-value public
47/// transcoders in the hot path.
48///
49/// # Type Parameters
50///
51/// - `D`: Source-side decoder codec.
52/// - `E`: Target-side encoder codec.
53/// - `H`: Conversion-level policy hooks.
54#[derive(Clone, Debug, Eq, Hash, PartialEq)]
55pub struct BufferedConvertEngine<D, E, H>
56where
57    D: Codec,
58    E: Codec<Value = D::Value>,
59    H: BufferedConvertHooks<D, E>,
60{
61    /// Source-side buffered decoder engine.
62    decode_engine: BufferedDecodeEngine<D, H::DecodeHooks>,
63    /// Target-side buffered encoder engine.
64    encode_engine: BufferedEncodeEngine<E, H::EncodeHooks>,
65    /// Conversion-level policy hooks.
66    hooks: H,
67    /// Decoded value waiting for target output capacity.
68    pending: PendingValueSlot<D::Value>,
69}
70
71impl<D, E, H> BufferedConvertEngine<D, E, H>
72where
73    D: Codec,
74    E: Codec<Value = D::Value>,
75    H: BufferedConvertHooks<D, E>,
76{
77    /// Creates a buffered converter engine.
78    ///
79    /// The supplied conversion hooks create the internal decode and encode hook
80    /// instances. This keeps codec-specific hook initialization with the
81    /// conversion policy instead of requiring those hook types to implement
82    /// [`Default`].
83    ///
84    /// # Parameters
85    ///
86    /// - `decoder`: Low-level codec used for source decoding.
87    /// - `encoder`: Low-level codec used for target encoding.
88    /// - `hooks`: Conversion-level policy hooks.
89    ///
90    /// # Returns
91    ///
92    /// Returns a buffered converter engine.
93    #[must_use]
94    #[inline]
95    pub fn new(decoder: D, encoder: E, hooks: H) -> Self {
96        let decode_hooks = hooks.create_decode_hooks(&decoder, &encoder);
97        let encode_hooks = hooks.create_encode_hooks(&decoder, &encoder);
98        Self::from_parts(decoder, encoder, hooks, decode_hooks, encode_hooks)
99    }
100
101    /// Builds the engine from already-created component hooks.
102    ///
103    /// # Type Parameters
104    ///
105    /// - `D`: Source-side decoder codec.
106    /// - `E`: Target-side encoder codec.
107    /// - `H`: Conversion-level policy hooks.
108    ///
109    /// # Parameters
110    ///
111    /// - `decoder`: Low-level decode codec.
112    /// - `encoder`: Low-level encode codec.
113    /// - `hooks`: Conversion-level hook aggregator.
114    /// - `decode_hooks`: Decode hooks instance created from `hooks`.
115    /// - `encode_hooks`: Encode hooks instance created from `hooks`.
116    ///
117    /// # Returns
118    ///
119    /// Returns an engine assembled from the provided codecs and hooks.
120    #[inline(always)]
121    const fn from_parts(
122        decoder: D,
123        encoder: E,
124        hooks: H,
125        decode_hooks: H::DecodeHooks,
126        encode_hooks: H::EncodeHooks,
127    ) -> Self {
128        Self {
129            decode_engine: BufferedDecodeEngine::new(decoder, decode_hooks),
130            encode_engine: BufferedEncodeEngine::new(encoder, encode_hooks),
131            hooks,
132            pending: PendingValueSlot::empty(),
133        }
134    }
135
136    /// Returns the source-side decode codec.
137    ///
138    /// # Returns
139    ///
140    /// Returns the reference to the internal decode codec.
141    #[must_use]
142    #[inline(always)]
143    fn decode_codec(&self) -> &D {
144        &self.decode_engine.codec
145    }
146
147    /// Returns the target-side encode codec.
148    ///
149    /// # Returns
150    ///
151    /// Returns the reference to the internal encode codec.
152    #[must_use]
153    #[inline(always)]
154    fn encode_codec(&self) -> &E {
155        &self.encode_engine.codec
156    }
157
158    /// Returns an upper bound for target units produced from `input_len` units.
159    #[must_use = "capacity planning can fail on overflow"]
160    pub fn max_output_len(&self, input_len: usize) -> Result<usize, CapacityError> {
161        let pending_units = self.pending_output_len()?;
162        let decoded_values = self.decode_engine.max_output_len(input_len)?;
163        let converted_units = self.encode_engine.max_output_len(decoded_values)?;
164        pending_units
165            .checked_add(converted_units)
166            .ok_or(CapacityError::OutputLengthOverflow)
167    }
168
169    /// Returns the maximum target units emitted by finishing retained state.
170    #[must_use = "capacity planning can fail on overflow"]
171    pub fn max_finish_output_len(&self) -> Result<usize, CapacityError> {
172        let pending_units = self.pending_output_len()?;
173        let decoder_finish_values = self.decode_engine.max_finish_output_len();
174        let decoder_finish_units = self.encode_engine.max_output_len(decoder_finish_values)?;
175        let encoder_finish_units = self.encode_engine.max_finish_output_len();
176        let pending_and_decoder = pending_units
177            .checked_add(decoder_finish_units)
178            .ok_or(CapacityError::OutputLengthOverflow)?;
179        pending_and_decoder
180            .checked_add(encoder_finish_units)
181            .ok_or(CapacityError::OutputLengthOverflow)
182    }
183
184    /// Converts source units into target units.
185    ///
186    /// # Parameters
187    ///
188    /// - `input`: Complete input unit slice visible to the converter.
189    /// - `input_index`: Absolute input index where conversion starts.
190    /// - `output`: Complete output unit slice visible to the converter.
191    /// - `output_index`: Absolute output index where writing starts.
192    ///
193    /// # Returns
194    ///
195    /// Returns conversion progress.
196    ///
197    /// # Errors
198    ///
199    /// Returns hook errors when indices are invalid or concrete conversion fails.
200    /// Invalid output indices are reported through the encode-side error path.
201    pub fn transcode(
202        &mut self,
203        input: &[D::Unit],
204        input_index: usize,
205        output: &mut [E::Unit],
206        output_index: usize,
207    ) -> ConvertProgressResult<D, E, H> {
208        if input_index > input.len() {
209            return Err(self
210                .hooks
211                .invalid_input_index(self.decode_codec(), input_index, input.len()));
212        }
213        if output_index > output.len() {
214            let error = self.encode_engine.invalid_output_index(output_index, output.len());
215            return Err(self.hooks.map_encode_error(error));
216        }
217        assert_unit_bounds::<D>(self.decode_codec());
218        assert_unit_bounds::<E>(self.encode_codec());
219
220        let mut state = ConvertState::new(input, input_index, output, output_index);
221
222        // A retained decoded value must be written before consuming more input,
223        // otherwise callers could observe output reordered across buffer turns.
224        if let Some(progress) = self.drain_pending(&mut state)? {
225            return Ok(progress);
226        }
227
228        while state.has_input() {
229            let previous_read = state.read();
230            // Each hot-path step decodes one source value and immediately tries
231            // to encode it, preserving backpressure at the target output.
232            if let Some(progress) = self.convert_next(&mut state)? {
233                return Ok(progress);
234            }
235            debug_assert!(
236                state.read() > previous_read,
237                "BufferedConvertEngine conversion step must consume input or stop",
238            );
239        }
240
241        Ok(state.complete_progress())
242    }
243
244    /// Finishes retained output after EOF.
245    ///
246    /// Finalization drains a pending decoded value first, then lets the
247    /// source-side decode hooks emit final values, encodes those values through
248    /// the target-side encode hooks, and finally finishes target-side encode hook
249    /// state. The decode-finish value buffer used for this cold path requires
250    /// `D::Value: Default`; the normal `transcode` loop does not.
251    ///
252    /// # Parameters
253    ///
254    /// - `output`: Complete output unit slice visible to the converter.
255    /// - `output_index`: Absolute output index where writing starts.
256    ///
257    /// # Returns
258    ///
259    /// Returns the number of target units written during finalization.
260    ///
261    /// # Errors
262    ///
263    /// Returns [`FinishError`] when capacity planning overflows, when the caller
264    /// provides invalid or insufficient output capacity, or when hook
265    /// finalization fails.
266    pub fn finish(
267        &mut self,
268        output: &mut [E::Unit],
269        output_index: usize,
270    ) -> Result<usize, FinishError<ConvertErrorOf<D, E, H>>>
271    where
272        D::Value: Default,
273    {
274        assert_unit_bounds::<D>(self.decode_codec());
275        assert_unit_bounds::<E>(self.encode_codec());
276        let required = self.max_finish_output_len().map_err(FinishError::capacity)?;
277        FinishError::ensure_output_capacity(output.len(), output_index, required)?;
278
279        let empty_input: &[D::Unit] = &[];
280        let mut state = ConvertState::new(empty_input, 0, output, output_index);
281        // Finish keeps the same priority as transcode: output any retained
282        // decoded value before asking source-side hooks for final values.
283        if self.drain_pending(&mut state).map_err(FinishError::source)?.is_some() {
284            unreachable!("converter finish bound must reserve space for pending values");
285        }
286
287        // Source-side finish may emit one or more final values. Drain them into
288        // the target encoder before finishing target-side hook state.
289        self.drain_decoder_finish(&mut state)?;
290
291        let output_cursor = state.output_cursor();
292        let written = self
293            .encode_engine
294            .finish(state.output_mut(), output_cursor)
295            .map_err(|error| error.map_source(|error| self.hooks.map_encode_error(error)))?;
296        state.advance_output(written);
297        Ok(state.written())
298    }
299
300    /// Resets hook-owned and component-owned state.
301    ///
302    /// # Parameters
303    ///
304    /// - `self`: Converter instance whose retained state is cleared.
305    ///
306    /// # Returns
307    ///
308    /// Returns unit `()`.
309    #[inline(always)]
310    pub fn reset(&mut self) {
311        self.pending.clear();
312        self.decode_engine.reset();
313        self.encode_engine.reset();
314        self.hooks.reset();
315    }
316
317    /// Converts one value from the current state cursors.
318    #[inline]
319    fn convert_next(&mut self, state: &mut ConvertState<'_, D::Unit, E::Unit>) -> ConvertStepResult<D, E, H> {
320        let step = self
321            .decode_engine
322            .decode_step(state.input(), state.decode_context())
323            .map_err(|error| self.hooks.map_decode_error(error))?;
324        step.apply_to_convert_state(state, |pending, state| self.encode_pending(pending, state))
325    }
326
327    /// Returns the output bound for the retained pending value.
328    #[inline(always)]
329    fn pending_output_len(&self) -> Result<usize, CapacityError> {
330        self.pending.max_output_len(&self.encode_engine)
331    }
332
333    /// Writes a retained decoded value before new input is consumed.
334    #[inline]
335    fn drain_pending(&mut self, state: &mut ConvertState<'_, D::Unit, E::Unit>) -> ConvertStepResult<D, E, H> {
336        let Some(pending) = self.pending.take() else {
337            return Ok(None);
338        };
339        self.encode_pending(pending, state)
340    }
341
342    /// Drains source-side decode finish output and encodes emitted final values.
343    fn drain_decoder_finish(
344        &mut self,
345        state: &mut ConvertState<'_, D::Unit, E::Unit>,
346    ) -> Result<(), FinishError<ConvertErrorOf<D, E, H>>>
347    where
348        D::Value: Default,
349    {
350        let value_count = self.decode_engine.max_finish_output_len();
351        let mut decoded: Vec<D::Value> = (0..value_count).map(|_| D::Value::default()).collect();
352        let written = self
353            .decode_engine
354            .finish(&mut decoded, 0)
355            .map_err(|error| error.map_source(|error| self.hooks.map_decode_error(error)))?;
356        for value in decoded.into_iter().take(written) {
357            let pending = PendingValue::new(value, 0);
358            if self
359                .encode_pending(pending, state)
360                .map_err(FinishError::source)?
361                .is_some()
362            {
363                unreachable!("converter finish bound must reserve space for decode finish values");
364            }
365        }
366        Ok(())
367    }
368
369    /// Encodes one pending value and applies output/pending state changes.
370    #[inline]
371    fn encode_pending(
372        &mut self,
373        pending: PendingValue<D::Value>,
374        state: &mut ConvertState<'_, D::Unit, E::Unit>,
375    ) -> ConvertStepResult<D, E, H> {
376        let input_index = pending.input_index();
377        let output_index = state.output_cursor();
378        let context = EncodeContext {
379            input_value: pending.value(),
380            input_index,
381            output: state.output_mut(),
382            output_index,
383        };
384        let step = self
385            .encode_engine
386            .encode_step(context)
387            .map_err(|error| self.hooks.map_encode_error(error))?;
388        let step = match step {
389            EncodeStep::Written { written } => PendingEncodeStep::written(written),
390            EncodeStep::NeedOutput { additional, available } => {
391                PendingEncodeStep::need_output(pending, additional, available)
392            }
393        };
394        Ok(self.pending.apply_pending_encode_step(step, state))
395    }
396}
397
398impl<D, E, H> Default for BufferedConvertEngine<D, E, H>
399where
400    D: Codec + Default,
401    E: Codec<Value = D::Value> + Default,
402    H: BufferedConvertHooks<D, E> + Default,
403{
404    /// Creates a default buffered converter engine.
405    ///
406    /// # Returns
407    ///
408    /// Returns a converter engine constructed from default codecs and hooks.
409    #[inline(always)]
410    fn default() -> Self {
411        Self::new(D::default(), E::default(), H::default())
412    }
413}