datafusion_datasource/
decoder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module containing helper methods for the various file formats
19//! See write.rs for write related helper methods
20
21use ::arrow::array::RecordBatch;
22
23use arrow::error::ArrowError;
24use bytes::Buf;
25use bytes::Bytes;
26use datafusion_common::Result;
27use futures::stream::BoxStream;
28use futures::StreamExt as _;
29use futures::{ready, Stream};
30use std::collections::VecDeque;
31use std::fmt;
32use std::task::Poll;
33
34/// Possible outputs of a [`BatchDeserializer`].
35#[derive(Debug, PartialEq)]
36pub enum DeserializerOutput {
37    /// A successfully deserialized [`RecordBatch`].
38    RecordBatch(RecordBatch),
39    /// The deserializer requires more data to make progress.
40    RequiresMoreData,
41    /// The input data has been exhausted.
42    InputExhausted,
43}
44
45/// Trait defining a scheme for deserializing byte streams into structured data.
46/// Implementors of this trait are responsible for converting raw bytes into
47/// `RecordBatch` objects.
48pub trait BatchDeserializer<T>: Send + fmt::Debug {
49    /// Feeds a message for deserialization, updating the internal state of
50    /// this `BatchDeserializer`. Note that one can call this function multiple
51    /// times before calling `next`, which will queue multiple messages for
52    /// deserialization. Returns the number of bytes consumed.
53    fn digest(&mut self, message: T) -> usize;
54
55    /// Attempts to deserialize any pending messages and returns a
56    /// `DeserializerOutput` to indicate progress.
57    fn next(&mut self) -> Result<DeserializerOutput, ArrowError>;
58
59    /// Informs the deserializer that no more messages will be provided for
60    /// deserialization.
61    fn finish(&mut self);
62}
63
64/// A general interface for decoders such as [`arrow::json::reader::Decoder`] and
65/// [`arrow::csv::reader::Decoder`]. Defines an interface similar to
66/// [`Decoder::decode`] and [`Decoder::flush`] methods, but also includes
67/// a method to check if the decoder can flush early. Intended to be used in
68/// conjunction with [`DecoderDeserializer`].
69///
70/// [`arrow::json::reader::Decoder`]: ::arrow::json::reader::Decoder
71/// [`arrow::csv::reader::Decoder`]: ::arrow::csv::reader::Decoder
72/// [`Decoder::decode`]: ::arrow::json::reader::Decoder::decode
73/// [`Decoder::flush`]: ::arrow::json::reader::Decoder::flush
74pub trait Decoder: Send + fmt::Debug {
75    /// See [`arrow::json::reader::Decoder::decode`].
76    ///
77    /// [`arrow::json::reader::Decoder::decode`]: ::arrow::json::reader::Decoder::decode
78    fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError>;
79
80    /// See [`arrow::json::reader::Decoder::flush`].
81    ///
82    /// [`arrow::json::reader::Decoder::flush`]: ::arrow::json::reader::Decoder::flush
83    fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>;
84
85    /// Whether the decoder can flush early in its current state.
86    fn can_flush_early(&self) -> bool;
87}
88
89impl<T: Decoder> fmt::Debug for DecoderDeserializer<T> {
90    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91        f.debug_struct("Deserializer")
92            .field("buffered_queue", &self.buffered_queue)
93            .field("finalized", &self.finalized)
94            .finish()
95    }
96}
97
98impl<T: Decoder> BatchDeserializer<Bytes> for DecoderDeserializer<T> {
99    fn digest(&mut self, message: Bytes) -> usize {
100        if message.is_empty() {
101            return 0;
102        }
103
104        let consumed = message.len();
105        self.buffered_queue.push_back(message);
106        consumed
107    }
108
109    fn next(&mut self) -> Result<DeserializerOutput, ArrowError> {
110        while let Some(buffered) = self.buffered_queue.front_mut() {
111            let decoded = self.decoder.decode(buffered)?;
112            buffered.advance(decoded);
113
114            if buffered.is_empty() {
115                self.buffered_queue.pop_front();
116            }
117
118            // Flush when the stream ends or batch size is reached
119            // Certain implementations can flush early
120            if decoded == 0 || self.decoder.can_flush_early() {
121                return match self.decoder.flush() {
122                    Ok(Some(batch)) => Ok(DeserializerOutput::RecordBatch(batch)),
123                    Ok(None) => continue,
124                    Err(e) => Err(e),
125                };
126            }
127        }
128        if self.finalized {
129            Ok(DeserializerOutput::InputExhausted)
130        } else {
131            Ok(DeserializerOutput::RequiresMoreData)
132        }
133    }
134
135    fn finish(&mut self) {
136        self.finalized = true;
137        // Ensure the decoder is flushed:
138        self.buffered_queue.push_back(Bytes::new());
139    }
140}
141
142/// A generic, decoder-based deserialization scheme for processing encoded data.
143///
144/// This struct is responsible for converting a stream of bytes, which represent
145/// encoded data, into a stream of `RecordBatch` objects, following the specified
146/// schema and formatting options. It also handles any buffering necessary to satisfy
147/// the `Decoder` interface.
148pub struct DecoderDeserializer<T: Decoder> {
149    /// The underlying decoder used for deserialization
150    pub(crate) decoder: T,
151    /// The buffer used to store the remaining bytes to be decoded
152    pub(crate) buffered_queue: VecDeque<Bytes>,
153    /// Whether the input stream has been fully consumed
154    pub(crate) finalized: bool,
155}
156
157impl<T: Decoder> DecoderDeserializer<T> {
158    /// Creates a new `DecoderDeserializer` with the provided decoder.
159    pub fn new(decoder: T) -> Self {
160        DecoderDeserializer {
161            decoder,
162            buffered_queue: VecDeque::new(),
163            finalized: false,
164        }
165    }
166}
167
168/// Deserializes a stream of bytes into a stream of [`RecordBatch`] objects using the
169/// provided deserializer.
170///
171/// Returns a boxed stream of `Result<RecordBatch, ArrowError>`. The stream yields [`RecordBatch`]
172/// objects as they are produced by the deserializer, or an [`ArrowError`] if an error
173/// occurs while polling the input or deserializing.
174pub fn deserialize_stream<'a>(
175    mut input: impl Stream<Item = Result<Bytes>> + Unpin + Send + 'a,
176    mut deserializer: impl BatchDeserializer<Bytes> + 'a,
177) -> BoxStream<'a, Result<RecordBatch, ArrowError>> {
178    futures::stream::poll_fn(move |cx| loop {
179        match ready!(input.poll_next_unpin(cx)).transpose()? {
180            Some(b) => _ = deserializer.digest(b),
181            None => deserializer.finish(),
182        };
183
184        return match deserializer.next()? {
185            DeserializerOutput::RecordBatch(rb) => Poll::Ready(Some(Ok(rb))),
186            DeserializerOutput::InputExhausted => Poll::Ready(None),
187            DeserializerOutput::RequiresMoreData => continue,
188        };
189    })
190    .boxed()
191}