datafusion-datasource 48.0.1

datafusion-datasource
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Module containing helper methods for the various file formats
//! See write.rs for write related helper methods

use ::arrow::array::RecordBatch;

use arrow::error::ArrowError;
use bytes::Buf;
use bytes::Bytes;
use datafusion_common::Result;
use futures::stream::BoxStream;
use futures::StreamExt as _;
use futures::{ready, Stream};
use std::collections::VecDeque;
use std::fmt;
use std::task::Poll;

/// Possible outputs of a [`BatchDeserializer`].
#[derive(Debug, PartialEq)]
pub enum DeserializerOutput {
    /// A successfully deserialized [`RecordBatch`].
    RecordBatch(RecordBatch),
    /// The deserializer requires more data to make progress.
    RequiresMoreData,
    /// The input data has been exhausted.
    InputExhausted,
}

/// Trait defining a scheme for deserializing byte streams into structured data.
/// Implementors of this trait are responsible for converting raw bytes into
/// `RecordBatch` objects.
pub trait BatchDeserializer<T>: Send + fmt::Debug {
    /// Feeds a message for deserialization, updating the internal state of
    /// this `BatchDeserializer`. Note that one can call this function multiple
    /// times before calling `next`, which will queue multiple messages for
    /// deserialization. Returns the number of bytes consumed.
    fn digest(&mut self, message: T) -> usize;

    /// Attempts to deserialize any pending messages and returns a
    /// `DeserializerOutput` to indicate progress.
    fn next(&mut self) -> Result<DeserializerOutput, ArrowError>;

    /// Informs the deserializer that no more messages will be provided for
    /// deserialization.
    fn finish(&mut self);
}

/// A general interface for decoders such as [`arrow::json::reader::Decoder`] and
/// [`arrow::csv::reader::Decoder`]. Defines an interface similar to
/// [`Decoder::decode`] and [`Decoder::flush`] methods, but also includes
/// a method to check if the decoder can flush early. Intended to be used in
/// conjunction with [`DecoderDeserializer`].
///
/// [`arrow::json::reader::Decoder`]: ::arrow::json::reader::Decoder
/// [`arrow::csv::reader::Decoder`]: ::arrow::csv::reader::Decoder
/// [`Decoder::decode`]: ::arrow::json::reader::Decoder::decode
/// [`Decoder::flush`]: ::arrow::json::reader::Decoder::flush
pub trait Decoder: Send + fmt::Debug {
    /// See [`arrow::json::reader::Decoder::decode`].
    ///
    /// [`arrow::json::reader::Decoder::decode`]: ::arrow::json::reader::Decoder::decode
    fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError>;

    /// See [`arrow::json::reader::Decoder::flush`].
    ///
    /// [`arrow::json::reader::Decoder::flush`]: ::arrow::json::reader::Decoder::flush
    fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>;

    /// Whether the decoder can flush early in its current state.
    fn can_flush_early(&self) -> bool;
}

impl<T: Decoder> fmt::Debug for DecoderDeserializer<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Deserializer")
            .field("buffered_queue", &self.buffered_queue)
            .field("finalized", &self.finalized)
            .finish()
    }
}

impl<T: Decoder> BatchDeserializer<Bytes> for DecoderDeserializer<T> {
    fn digest(&mut self, message: Bytes) -> usize {
        if message.is_empty() {
            return 0;
        }

        let consumed = message.len();
        self.buffered_queue.push_back(message);
        consumed
    }

    fn next(&mut self) -> Result<DeserializerOutput, ArrowError> {
        while let Some(buffered) = self.buffered_queue.front_mut() {
            let decoded = self.decoder.decode(buffered)?;
            buffered.advance(decoded);

            if buffered.is_empty() {
                self.buffered_queue.pop_front();
            }

            // Flush when the stream ends or batch size is reached
            // Certain implementations can flush early
            if decoded == 0 || self.decoder.can_flush_early() {
                return match self.decoder.flush() {
                    Ok(Some(batch)) => Ok(DeserializerOutput::RecordBatch(batch)),
                    Ok(None) => continue,
                    Err(e) => Err(e),
                };
            }
        }
        if self.finalized {
            Ok(DeserializerOutput::InputExhausted)
        } else {
            Ok(DeserializerOutput::RequiresMoreData)
        }
    }

    fn finish(&mut self) {
        self.finalized = true;
        // Ensure the decoder is flushed:
        self.buffered_queue.push_back(Bytes::new());
    }
}

/// A generic, decoder-based deserialization scheme for processing encoded data.
///
/// This struct is responsible for converting a stream of bytes, which represent
/// encoded data, into a stream of `RecordBatch` objects, following the specified
/// schema and formatting options. It also handles any buffering necessary to satisfy
/// the `Decoder` interface.
pub struct DecoderDeserializer<T: Decoder> {
    /// The underlying decoder used for deserialization
    pub(crate) decoder: T,
    /// The buffer used to store the remaining bytes to be decoded
    pub(crate) buffered_queue: VecDeque<Bytes>,
    /// Whether the input stream has been fully consumed
    pub(crate) finalized: bool,
}

impl<T: Decoder> DecoderDeserializer<T> {
    /// Creates a new `DecoderDeserializer` with the provided decoder.
    pub fn new(decoder: T) -> Self {
        DecoderDeserializer {
            decoder,
            buffered_queue: VecDeque::new(),
            finalized: false,
        }
    }
}

/// Deserializes a stream of bytes into a stream of [`RecordBatch`] objects using the
/// provided deserializer.
///
/// Returns a boxed stream of `Result<RecordBatch, ArrowError>`. The stream yields [`RecordBatch`]
/// objects as they are produced by the deserializer, or an [`ArrowError`] if an error
/// occurs while polling the input or deserializing.
pub fn deserialize_stream<'a>(
    mut input: impl Stream<Item = Result<Bytes>> + Unpin + Send + 'a,
    mut deserializer: impl BatchDeserializer<Bytes> + 'a,
) -> BoxStream<'a, Result<RecordBatch, ArrowError>> {
    futures::stream::poll_fn(move |cx| loop {
        match ready!(input.poll_next_unpin(cx)).transpose()? {
            Some(b) => _ = deserializer.digest(b),
            None => deserializer.finish(),
        };

        return match deserializer.next()? {
            DeserializerOutput::RecordBatch(rb) => Poll::Ready(Some(Ok(rb))),
            DeserializerOutput::InputExhausted => Poll::Ready(None),
            DeserializerOutput::RequiresMoreData => continue,
        };
    })
    .boxed()
}