datafusion_datasource/decoder.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module containing helper methods for the various file formats
19//! See write.rs for write related helper methods
20
21use ::arrow::array::RecordBatch;
22
23use arrow::error::ArrowError;
24use bytes::Buf;
25use bytes::Bytes;
26use datafusion_common::Result;
27use futures::stream::BoxStream;
28use futures::StreamExt as _;
29use futures::{ready, Stream};
30use std::collections::VecDeque;
31use std::fmt;
32use std::task::Poll;
33
34/// Possible outputs of a [`BatchDeserializer`].
35#[derive(Debug, PartialEq)]
36pub enum DeserializerOutput {
37 /// A successfully deserialized [`RecordBatch`].
38 RecordBatch(RecordBatch),
39 /// The deserializer requires more data to make progress.
40 RequiresMoreData,
41 /// The input data has been exhausted.
42 InputExhausted,
43}
44
45/// Trait defining a scheme for deserializing byte streams into structured data.
46/// Implementors of this trait are responsible for converting raw bytes into
47/// `RecordBatch` objects.
48pub trait BatchDeserializer<T>: Send + fmt::Debug {
49 /// Feeds a message for deserialization, updating the internal state of
50 /// this `BatchDeserializer`. Note that one can call this function multiple
51 /// times before calling `next`, which will queue multiple messages for
52 /// deserialization. Returns the number of bytes consumed.
53 fn digest(&mut self, message: T) -> usize;
54
55 /// Attempts to deserialize any pending messages and returns a
56 /// `DeserializerOutput` to indicate progress.
57 fn next(&mut self) -> Result<DeserializerOutput, ArrowError>;
58
59 /// Informs the deserializer that no more messages will be provided for
60 /// deserialization.
61 fn finish(&mut self);
62}
63
64/// A general interface for decoders such as [`arrow::json::reader::Decoder`] and
65/// [`arrow::csv::reader::Decoder`]. Defines an interface similar to
66/// [`Decoder::decode`] and [`Decoder::flush`] methods, but also includes
67/// a method to check if the decoder can flush early. Intended to be used in
68/// conjunction with [`DecoderDeserializer`].
69///
70/// [`arrow::json::reader::Decoder`]: ::arrow::json::reader::Decoder
71/// [`arrow::csv::reader::Decoder`]: ::arrow::csv::reader::Decoder
72/// [`Decoder::decode`]: ::arrow::json::reader::Decoder::decode
73/// [`Decoder::flush`]: ::arrow::json::reader::Decoder::flush
74pub trait Decoder: Send + fmt::Debug {
75 /// See [`arrow::json::reader::Decoder::decode`].
76 ///
77 /// [`arrow::json::reader::Decoder::decode`]: ::arrow::json::reader::Decoder::decode
78 fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError>;
79
80 /// See [`arrow::json::reader::Decoder::flush`].
81 ///
82 /// [`arrow::json::reader::Decoder::flush`]: ::arrow::json::reader::Decoder::flush
83 fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>;
84
85 /// Whether the decoder can flush early in its current state.
86 fn can_flush_early(&self) -> bool;
87}
88
89impl<T: Decoder> fmt::Debug for DecoderDeserializer<T> {
90 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91 f.debug_struct("Deserializer")
92 .field("buffered_queue", &self.buffered_queue)
93 .field("finalized", &self.finalized)
94 .finish()
95 }
96}
97
98impl<T: Decoder> BatchDeserializer<Bytes> for DecoderDeserializer<T> {
99 fn digest(&mut self, message: Bytes) -> usize {
100 if message.is_empty() {
101 return 0;
102 }
103
104 let consumed = message.len();
105 self.buffered_queue.push_back(message);
106 consumed
107 }
108
109 fn next(&mut self) -> Result<DeserializerOutput, ArrowError> {
110 while let Some(buffered) = self.buffered_queue.front_mut() {
111 let decoded = self.decoder.decode(buffered)?;
112 buffered.advance(decoded);
113
114 if buffered.is_empty() {
115 self.buffered_queue.pop_front();
116 }
117
118 // Flush when the stream ends or batch size is reached
119 // Certain implementations can flush early
120 if decoded == 0 || self.decoder.can_flush_early() {
121 return match self.decoder.flush() {
122 Ok(Some(batch)) => Ok(DeserializerOutput::RecordBatch(batch)),
123 Ok(None) => continue,
124 Err(e) => Err(e),
125 };
126 }
127 }
128 if self.finalized {
129 Ok(DeserializerOutput::InputExhausted)
130 } else {
131 Ok(DeserializerOutput::RequiresMoreData)
132 }
133 }
134
135 fn finish(&mut self) {
136 self.finalized = true;
137 // Ensure the decoder is flushed:
138 self.buffered_queue.push_back(Bytes::new());
139 }
140}
141
142/// A generic, decoder-based deserialization scheme for processing encoded data.
143///
144/// This struct is responsible for converting a stream of bytes, which represent
145/// encoded data, into a stream of `RecordBatch` objects, following the specified
146/// schema and formatting options. It also handles any buffering necessary to satisfy
147/// the `Decoder` interface.
148pub struct DecoderDeserializer<T: Decoder> {
149 /// The underlying decoder used for deserialization
150 pub(crate) decoder: T,
151 /// The buffer used to store the remaining bytes to be decoded
152 pub(crate) buffered_queue: VecDeque<Bytes>,
153 /// Whether the input stream has been fully consumed
154 pub(crate) finalized: bool,
155}
156
157impl<T: Decoder> DecoderDeserializer<T> {
158 /// Creates a new `DecoderDeserializer` with the provided decoder.
159 pub fn new(decoder: T) -> Self {
160 DecoderDeserializer {
161 decoder,
162 buffered_queue: VecDeque::new(),
163 finalized: false,
164 }
165 }
166}
167
168/// Deserializes a stream of bytes into a stream of [`RecordBatch`] objects using the
169/// provided deserializer.
170///
171/// Returns a boxed stream of `Result<RecordBatch, ArrowError>`. The stream yields [`RecordBatch`]
172/// objects as they are produced by the deserializer, or an [`ArrowError`] if an error
173/// occurs while polling the input or deserializing.
174pub fn deserialize_stream<'a>(
175 mut input: impl Stream<Item = Result<Bytes>> + Unpin + Send + 'a,
176 mut deserializer: impl BatchDeserializer<Bytes> + 'a,
177) -> BoxStream<'a, Result<RecordBatch, ArrowError>> {
178 futures::stream::poll_fn(move |cx| loop {
179 match ready!(input.poll_next_unpin(cx)).transpose()? {
180 Some(b) => _ = deserializer.digest(b),
181 None => deserializer.finish(),
182 };
183
184 return match deserializer.next()? {
185 DeserializerOutput::RecordBatch(rb) => Poll::Ready(Some(Ok(rb))),
186 DeserializerOutput::InputExhausted => Poll::Ready(None),
187 DeserializerOutput::RequiresMoreData => continue,
188 };
189 })
190 .boxed()
191}