use std::sync::Arc;
use crate::file_groups::FileGroup;
use crate::sink::DataSink;
use crate::write::demux::{start_demuxer_task, DemuxedStreamReceiver};
use crate::ListingTableUrl;
use arrow::datatypes::{DataType, SchemaRef};
use datafusion_common::Result;
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use async_trait::async_trait;
use object_store::ObjectStore;
#[async_trait]
pub trait FileSink: DataSink {
fn config(&self) -> &FileSinkConfig;
async fn spawn_writer_tasks_and_join(
&self,
context: &Arc<TaskContext>,
demux_task: SpawnedTask<Result<()>>,
file_stream_rx: DemuxedStreamReceiver,
object_store: Arc<dyn ObjectStore>,
) -> Result<u64>;
async fn write_all(
&self,
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
let config = self.config();
let object_store = context
.runtime_env()
.object_store(&config.object_store_url)?;
let (demux_task, file_stream_rx) = start_demuxer_task(config, data, context);
self.spawn_writer_tasks_and_join(
context,
demux_task,
file_stream_rx,
object_store,
)
.await
}
}
#[derive(Debug, Clone)]
pub struct FileSinkConfig {
pub original_url: String,
pub object_store_url: ObjectStoreUrl,
pub file_group: FileGroup,
pub table_paths: Vec<ListingTableUrl>,
pub output_schema: SchemaRef,
pub table_partition_cols: Vec<(String, DataType)>,
pub insert_op: InsertOp,
pub keep_partition_by_columns: bool,
pub file_extension: String,
}
impl FileSinkConfig {
pub fn output_schema(&self) -> &SchemaRef {
&self.output_schema
}
}