use std::io::Write;
use std::sync::Arc;
use crate::file_compression_type::FileCompressionType;
use crate::file_sink_config::FileSinkConfig;
use datafusion_common::error::Result;
use arrow::array::RecordBatch;
use arrow::datatypes::Schema;
use bytes::Bytes;
use object_store::ObjectStore;
use object_store::buffered::BufWriter;
use object_store::path::Path;
use tokio::io::AsyncWrite;
pub mod demux;
pub mod orchestration;
#[derive(Clone)]
pub struct SharedBuffer {
pub buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
}
impl SharedBuffer {
pub fn new(capacity: usize) -> Self {
Self {
buffer: Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))),
}
}
}
impl Write for SharedBuffer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut buffer = self.buffer.try_lock().unwrap();
Write::write(&mut *buffer, buf)
}
fn flush(&mut self) -> std::io::Result<()> {
let mut buffer = self.buffer.try_lock().unwrap();
Write::flush(&mut *buffer)
}
}
pub trait BatchSerializer: Sync + Send {
fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes>;
}
#[deprecated(since = "48.0.0", note = "Use ObjectWriterBuilder::new(...) instead")]
pub async fn create_writer(
file_compression_type: FileCompressionType,
location: &Path,
object_store: Arc<dyn ObjectStore>,
) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
ObjectWriterBuilder::new(file_compression_type, location, object_store).build()
}
pub fn get_writer_schema(config: &FileSinkConfig) -> Arc<Schema> {
if !config.table_partition_cols.is_empty() && !config.keep_partition_by_columns {
let schema = config.output_schema();
let partition_names: Vec<_> =
config.table_partition_cols.iter().map(|(s, _)| s).collect();
Arc::new(Schema::new_with_metadata(
schema
.fields()
.iter()
.filter(|f| !partition_names.contains(&f.name()))
.map(|f| (**f).clone())
.collect::<Vec<_>>(),
schema.metadata().clone(),
))
} else {
Arc::clone(config.output_schema())
}
}
#[derive(Debug)]
pub struct ObjectWriterBuilder {
file_compression_type: FileCompressionType,
location: Path,
object_store: Arc<dyn ObjectStore>,
buffer_size: Option<usize>,
compression_level: Option<u32>,
}
impl ObjectWriterBuilder {
pub fn new(
file_compression_type: FileCompressionType,
location: &Path,
object_store: Arc<dyn ObjectStore>,
) -> Self {
Self {
file_compression_type,
location: location.clone(),
object_store,
buffer_size: None,
compression_level: None,
}
}
pub fn set_buffer_size(&mut self, buffer_size: Option<usize>) {
self.buffer_size = buffer_size;
}
pub fn with_buffer_size(mut self, buffer_size: Option<usize>) -> Self {
self.buffer_size = buffer_size;
self
}
pub fn get_buffer_size(&self) -> Option<usize> {
self.buffer_size
}
pub fn set_compression_level(&mut self, compression_level: Option<u32>) {
self.compression_level = compression_level;
}
pub fn with_compression_level(mut self, compression_level: Option<u32>) -> Self {
self.compression_level = compression_level;
self
}
pub fn get_compression_level(&self) -> Option<u32> {
self.compression_level
}
pub fn build(self) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
let Self {
file_compression_type,
location,
object_store,
buffer_size,
compression_level,
} = self;
let buf_writer = match buffer_size {
Some(size) => BufWriter::with_capacity(object_store, location, size),
None => BufWriter::new(object_store, location),
};
file_compression_type
.convert_async_writer_with_level(buf_writer, compression_level)
}
}