pub mod do_put;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow_flight::{FlightData, SchemaAsIpc};
use arrow_ipc::writer;
use crate::bulk::CompressionType;
#[derive(Debug, Clone)]
pub enum FlightMessage {
Schema(SchemaRef),
RecordBatch(RecordBatch),
}
pub struct FlightEncoder {
write_options: writer::IpcWriteOptions,
data_gen: writer::IpcDataGenerator,
dictionary_tracker: writer::DictionaryTracker,
compression_context: writer::CompressionContext,
}
impl Default for FlightEncoder {
fn default() -> Self {
let write_options = writer::IpcWriteOptions::default()
.try_with_compression(Some(arrow::ipc::CompressionType::LZ4_FRAME))
.unwrap();
Self {
write_options,
data_gen: writer::IpcDataGenerator::default(),
dictionary_tracker: writer::DictionaryTracker::new(false),
compression_context: writer::CompressionContext::default(),
}
}
}
impl FlightEncoder {
pub fn with_compression(compression: CompressionType) -> Self {
let arrow_compression = match compression {
CompressionType::None => None,
CompressionType::Lz4 => Some(arrow::ipc::CompressionType::LZ4_FRAME),
CompressionType::Zstd => Some(arrow::ipc::CompressionType::ZSTD),
};
let write_options = writer::IpcWriteOptions::default()
.try_with_compression(arrow_compression)
.unwrap();
Self {
write_options,
data_gen: writer::IpcDataGenerator::default(),
dictionary_tracker: writer::DictionaryTracker::new(false),
compression_context: writer::CompressionContext::default(),
}
}
pub fn without_compression() -> Self {
Self::with_compression(CompressionType::None)
}
pub fn encode(&mut self, flight_message: FlightMessage) -> FlightData {
match flight_message {
FlightMessage::Schema(schema) => SchemaAsIpc::new(&schema, &self.write_options).into(),
FlightMessage::RecordBatch(record_batch) => {
let (encoded_dictionaries, encoded_batch) = self
.data_gen
.encode(
&record_batch,
&mut self.dictionary_tracker,
&self.write_options,
&mut self.compression_context,
)
.expect("DictionaryTracker configured above to not fail on replacement");
debug_assert_eq!(encoded_dictionaries.len(), 0);
encoded_batch.into()
}
}
}
}