datafusion_datasource/file_sink_config.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::file_groups::FileGroup;
21use crate::sink::DataSink;
22use crate::write::demux::{start_demuxer_task, DemuxedStreamReceiver};
23use crate::ListingTableUrl;
24
25use arrow::datatypes::{DataType, SchemaRef};
26use datafusion_common::Result;
27use datafusion_common_runtime::SpawnedTask;
28use datafusion_execution::object_store::ObjectStoreUrl;
29use datafusion_execution::{SendableRecordBatchStream, TaskContext};
30use datafusion_expr::dml::InsertOp;
31
32use async_trait::async_trait;
33use object_store::ObjectStore;
34
35/// General behaviors for files that do `DataSink` operations
36#[async_trait]
37pub trait FileSink: DataSink {
38 /// Retrieves the file sink configuration.
39 fn config(&self) -> &FileSinkConfig;
40
41 /// Spawns writer tasks and joins them to perform file writing operations.
42 /// Is a critical part of `FileSink` trait, since it's the very last step for `write_all`.
43 ///
44 /// This function handles the process of writing data to files by:
45 /// 1. Spawning tasks for writing data to individual files.
46 /// 2. Coordinating the tasks using a demuxer to distribute data among files.
47 /// 3. Collecting results using `tokio::join`, ensuring that all tasks complete successfully.
48 ///
49 /// # Parameters
50 /// - `context`: The execution context (`TaskContext`) that provides resources
51 /// like memory management and runtime environment.
52 /// - `demux_task`: A spawned task that handles demuxing, responsible for splitting
53 /// an input [`SendableRecordBatchStream`] into dynamically determined partitions.
54 /// See `start_demuxer_task()`
55 /// - `file_stream_rx`: A receiver that yields streams of record batches and their
56 /// corresponding file paths for writing. See `start_demuxer_task()`
57 /// - `object_store`: A handle to the object store where the files are written.
58 ///
59 /// # Returns
60 /// - `Result<u64>`: Returns the total number of rows written across all files.
61 async fn spawn_writer_tasks_and_join(
62 &self,
63 context: &Arc<TaskContext>,
64 demux_task: SpawnedTask<Result<()>>,
65 file_stream_rx: DemuxedStreamReceiver,
66 object_store: Arc<dyn ObjectStore>,
67 ) -> Result<u64>;
68
69 /// File sink implementation of the [`DataSink::write_all`] method.
70 async fn write_all(
71 &self,
72 data: SendableRecordBatchStream,
73 context: &Arc<TaskContext>,
74 ) -> Result<u64> {
75 let config = self.config();
76 let object_store = context
77 .runtime_env()
78 .object_store(&config.object_store_url)?;
79 let (demux_task, file_stream_rx) = start_demuxer_task(config, data, context);
80 self.spawn_writer_tasks_and_join(
81 context,
82 demux_task,
83 file_stream_rx,
84 object_store,
85 )
86 .await
87 }
88}
89
90/// The base configurations to provide when creating a physical plan for
91/// writing to any given file format.
92#[derive(Debug, Clone)]
93pub struct FileSinkConfig {
94 /// The unresolved URL specified by the user
95 pub original_url: String,
96 /// Object store URL, used to get an ObjectStore instance
97 pub object_store_url: ObjectStoreUrl,
98 /// A collection of files organized into groups.
99 /// Each FileGroup contains one or more PartitionedFile objects.
100 pub file_group: FileGroup,
101 /// Vector of partition paths
102 pub table_paths: Vec<ListingTableUrl>,
103 /// The schema of the output file
104 pub output_schema: SchemaRef,
105 /// A vector of column names and their corresponding data types,
106 /// representing the partitioning columns for the file
107 pub table_partition_cols: Vec<(String, DataType)>,
108 /// Controls how new data should be written to the file, determining whether
109 /// to append to, overwrite, or replace records in existing files.
110 pub insert_op: InsertOp,
111 /// Controls whether partition columns are kept for the file
112 pub keep_partition_by_columns: bool,
113 /// File extension without a dot(.)
114 pub file_extension: String,
115}
116
117impl FileSinkConfig {
118 /// Get output schema
119 pub fn output_schema(&self) -> &SchemaRef {
120 &self.output_schema
121 }
122}