use std::io::Write;
use std::sync::Arc;
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::physical_plan::FileSinkConfig;
use crate::error::Result;
use arrow_array::RecordBatch;
use arrow_schema::Schema;
use bytes::Bytes;
use object_store::buffered::BufWriter;
use object_store::path::Path;
use object_store::ObjectStore;
use tokio::io::AsyncWrite;
pub(crate) mod demux;
pub(crate) mod orchestration;
#[derive(Clone)]
pub(crate) struct SharedBuffer {
pub(crate) buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
}
impl SharedBuffer {
pub fn new(capacity: usize) -> Self {
Self {
buffer: Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))),
}
}
}
impl Write for SharedBuffer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut buffer = self.buffer.try_lock().unwrap();
Write::write(&mut *buffer, buf)
}
fn flush(&mut self) -> std::io::Result<()> {
let mut buffer = self.buffer.try_lock().unwrap();
Write::flush(&mut *buffer)
}
}
pub trait BatchSerializer: Sync + Send {
fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes>;
}
pub(crate) async fn create_writer(
file_compression_type: FileCompressionType,
location: &Path,
object_store: Arc<dyn ObjectStore>,
) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
let buf_writer = BufWriter::new(object_store, location.clone());
file_compression_type.convert_async_writer(buf_writer)
}
pub(crate) fn get_writer_schema(config: &FileSinkConfig) -> Arc<Schema> {
if !config.table_partition_cols.is_empty() && !config.keep_partition_by_columns {
let schema = config.output_schema();
let partition_names: Vec<_> =
config.table_partition_cols.iter().map(|(s, _)| s).collect();
Arc::new(Schema::new_with_metadata(
schema
.fields()
.iter()
.filter(|f| !partition_names.contains(&f.name()))
.map(|f| (**f).clone())
.collect::<Vec<_>>(),
schema.metadata().clone(),
))
} else {
Arc::clone(config.output_schema())
}
}