datafusion_datasource/file_sink_config.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::ListingTableUrl;
21use crate::file_groups::FileGroup;
22use crate::sink::DataSink;
23use crate::write::demux::{DemuxedStreamReceiver, start_demuxer_task};
24
25use arrow::datatypes::{DataType, SchemaRef};
26use datafusion_common::Result;
27use datafusion_common_runtime::SpawnedTask;
28use datafusion_execution::object_store::ObjectStoreUrl;
29use datafusion_execution::{SendableRecordBatchStream, TaskContext};
30use datafusion_expr::dml::InsertOp;
31
32use async_trait::async_trait;
33use object_store::ObjectStore;
34
35/// Determines how `FileSink` output paths are interpreted.
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub enum FileOutputMode {
38 /// Infer output mode from the output URL (for example, by extension / trailing `/`).
39 #[default]
40 Automatic,
41 /// Write to a single output file at the exact output path.
42 SingleFile,
43 /// Write to a directory under the output path with generated filenames.
44 Directory,
45}
46
47impl FileOutputMode {
48 /// Resolve this mode into a `single_file_output` boolean for the demuxer.
49 pub fn single_file_output(self, base_output_path: &ListingTableUrl) -> bool {
50 match self {
51 Self::Automatic => {
52 !base_output_path.is_collection()
53 && base_output_path.file_extension().is_some()
54 }
55 Self::SingleFile => true,
56 Self::Directory => false,
57 }
58 }
59}
60
61impl From<Option<bool>> for FileOutputMode {
62 fn from(value: Option<bool>) -> Self {
63 match value {
64 None => Self::Automatic,
65 Some(true) => Self::SingleFile,
66 Some(false) => Self::Directory,
67 }
68 }
69}
70
71impl From<FileOutputMode> for Option<bool> {
72 fn from(value: FileOutputMode) -> Self {
73 match value {
74 FileOutputMode::Automatic => None,
75 FileOutputMode::SingleFile => Some(true),
76 FileOutputMode::Directory => Some(false),
77 }
78 }
79}
80
81/// General behaviors for files that do `DataSink` operations
82#[async_trait]
83pub trait FileSink: DataSink {
84 /// Retrieves the file sink configuration.
85 fn config(&self) -> &FileSinkConfig;
86
87 /// Spawns writer tasks and joins them to perform file writing operations.
88 /// Is a critical part of `FileSink` trait, since it's the very last step for `write_all`.
89 ///
90 /// This function handles the process of writing data to files by:
91 /// 1. Spawning tasks for writing data to individual files.
92 /// 2. Coordinating the tasks using a demuxer to distribute data among files.
93 /// 3. Collecting results using `tokio::join`, ensuring that all tasks complete successfully.
94 ///
95 /// # Parameters
96 /// - `context`: The execution context (`TaskContext`) that provides resources
97 /// like memory management and runtime environment.
98 /// - `demux_task`: A spawned task that handles demuxing, responsible for splitting
99 /// an input [`SendableRecordBatchStream`] into dynamically determined partitions.
100 /// See `start_demuxer_task()`
101 /// - `file_stream_rx`: A receiver that yields streams of record batches and their
102 /// corresponding file paths for writing. See `start_demuxer_task()`
103 /// - `object_store`: A handle to the object store where the files are written.
104 ///
105 /// # Returns
106 /// - `Result<u64>`: Returns the total number of rows written across all files.
107 async fn spawn_writer_tasks_and_join(
108 &self,
109 context: &Arc<TaskContext>,
110 demux_task: SpawnedTask<Result<()>>,
111 file_stream_rx: DemuxedStreamReceiver,
112 object_store: Arc<dyn ObjectStore>,
113 ) -> Result<u64>;
114
115 /// File sink implementation of the [`DataSink::write_all`] method.
116 async fn write_all(
117 &self,
118 data: SendableRecordBatchStream,
119 context: &Arc<TaskContext>,
120 ) -> Result<u64> {
121 let config = self.config();
122 let object_store = context
123 .runtime_env()
124 .object_store(&config.object_store_url)?;
125 let (demux_task, file_stream_rx) = start_demuxer_task(config, data, context);
126 self.spawn_writer_tasks_and_join(
127 context,
128 demux_task,
129 file_stream_rx,
130 object_store,
131 )
132 .await
133 }
134}
135
136/// The base configurations to provide when creating a physical plan for
137/// writing to any given file format.
138#[derive(Debug, Clone)]
139pub struct FileSinkConfig {
140 /// The unresolved URL specified by the user
141 pub original_url: String,
142 /// Object store URL, used to get an ObjectStore instance
143 pub object_store_url: ObjectStoreUrl,
144 /// A collection of files organized into groups.
145 /// Each FileGroup contains one or more PartitionedFile objects.
146 pub file_group: FileGroup,
147 /// Vector of partition paths
148 pub table_paths: Vec<ListingTableUrl>,
149 /// The schema of the output file
150 pub output_schema: SchemaRef,
151 /// A vector of column names and their corresponding data types,
152 /// representing the partitioning columns for the file
153 pub table_partition_cols: Vec<(String, DataType)>,
154 /// Controls how new data should be written to the file, determining whether
155 /// to append to, overwrite, or replace records in existing files.
156 pub insert_op: InsertOp,
157 /// Controls whether partition columns are kept for the file
158 pub keep_partition_by_columns: bool,
159 /// File extension without a dot(.)
160 pub file_extension: String,
161 /// Determines how the output path is interpreted.
162 pub file_output_mode: FileOutputMode,
163}
164
165impl FileSinkConfig {
166 /// Get output schema
167 pub fn output_schema(&self) -> &SchemaRef {
168 &self.output_schema
169 }
170}