datafusion_datasource/
file_sink_config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::write::demux::{start_demuxer_task, DemuxedStreamReceiver};
19use crate::{ListingTableUrl, PartitionedFile};
20use arrow::datatypes::{DataType, SchemaRef};
21use async_trait::async_trait;
22use datafusion_common::Result;
23use datafusion_common_runtime::SpawnedTask;
24use datafusion_execution::object_store::ObjectStoreUrl;
25use datafusion_execution::{SendableRecordBatchStream, TaskContext};
26use datafusion_expr::dml::InsertOp;
27use datafusion_physical_plan::insert::DataSink;
28use object_store::ObjectStore;
29use std::sync::Arc;
30
31/// General behaviors for files that do `DataSink` operations
32#[async_trait]
33pub trait FileSink: DataSink {
34    /// Retrieves the file sink configuration.
35    fn config(&self) -> &FileSinkConfig;
36
37    /// Spawns writer tasks and joins them to perform file writing operations.
38    /// Is a critical part of `FileSink` trait, since it's the very last step for `write_all`.
39    ///
40    /// This function handles the process of writing data to files by:
41    /// 1. Spawning tasks for writing data to individual files.
42    /// 2. Coordinating the tasks using a demuxer to distribute data among files.
43    /// 3. Collecting results using `tokio::join`, ensuring that all tasks complete successfully.
44    ///
45    /// # Parameters
46    /// - `context`: The execution context (`TaskContext`) that provides resources
47    ///   like memory management and runtime environment.
48    /// - `demux_task`: A spawned task that handles demuxing, responsible for splitting
49    ///   an input [`SendableRecordBatchStream`] into dynamically determined partitions.
50    ///   See `start_demuxer_task()`
51    /// - `file_stream_rx`: A receiver that yields streams of record batches and their
52    ///   corresponding file paths for writing. See `start_demuxer_task()`
53    /// - `object_store`: A handle to the object store where the files are written.
54    ///
55    /// # Returns
56    /// - `Result<u64>`: Returns the total number of rows written across all files.
57    async fn spawn_writer_tasks_and_join(
58        &self,
59        context: &Arc<TaskContext>,
60        demux_task: SpawnedTask<Result<()>>,
61        file_stream_rx: DemuxedStreamReceiver,
62        object_store: Arc<dyn ObjectStore>,
63    ) -> Result<u64>;
64
65    /// File sink implementation of the [`DataSink::write_all`] method.
66    async fn write_all(
67        &self,
68        data: SendableRecordBatchStream,
69        context: &Arc<TaskContext>,
70    ) -> Result<u64> {
71        let config = self.config();
72        let object_store = context
73            .runtime_env()
74            .object_store(&config.object_store_url)?;
75        let (demux_task, file_stream_rx) = start_demuxer_task(config, data, context);
76        self.spawn_writer_tasks_and_join(
77            context,
78            demux_task,
79            file_stream_rx,
80            object_store,
81        )
82        .await
83    }
84}
85
86/// The base configurations to provide when creating a physical plan for
87/// writing to any given file format.
88pub struct FileSinkConfig {
89    /// Object store URL, used to get an ObjectStore instance
90    pub object_store_url: ObjectStoreUrl,
91    /// A vector of [`PartitionedFile`] structs, each representing a file partition
92    pub file_groups: Vec<PartitionedFile>,
93    /// Vector of partition paths
94    pub table_paths: Vec<ListingTableUrl>,
95    /// The schema of the output file
96    pub output_schema: SchemaRef,
97    /// A vector of column names and their corresponding data types,
98    /// representing the partitioning columns for the file
99    pub table_partition_cols: Vec<(String, DataType)>,
100    /// Controls how new data should be written to the file, determining whether
101    /// to append to, overwrite, or replace records in existing files.
102    pub insert_op: InsertOp,
103    /// Controls whether partition columns are kept for the file
104    pub keep_partition_by_columns: bool,
105    /// File extension without a dot(.)
106    pub file_extension: String,
107}
108
109impl FileSinkConfig {
110    /// Get output schema
111    pub fn output_schema(&self) -> &SchemaRef {
112        &self.output_schema
113    }
114}