datafusion_datasource/write/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module containing helper methods/traits related to enabling
19//! write support for the various file formats
20
21use std::io::Write;
22use std::sync::Arc;
23
24use crate::file_compression_type::FileCompressionType;
25use crate::file_sink_config::FileSinkConfig;
26use datafusion_common::error::Result;
27
28use arrow::array::RecordBatch;
29use arrow::datatypes::Schema;
30use bytes::Bytes;
31use object_store::ObjectStore;
32use object_store::buffered::BufWriter;
33use object_store::path::Path;
34use tokio::io::AsyncWrite;
35
36pub mod demux;
37pub mod orchestration;
38
39/// A buffer with interior mutability shared by the SerializedFileWriter and
40/// ObjectStore writer
41#[derive(Clone)]
42pub struct SharedBuffer {
43 /// The inner buffer for reading and writing
44 ///
45 /// The lock is used to obtain internal mutability, so no worry about the
46 /// lock contention.
47 pub buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
48}
49
50impl SharedBuffer {
51 pub fn new(capacity: usize) -> Self {
52 Self {
53 buffer: Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))),
54 }
55 }
56}
57
58impl Write for SharedBuffer {
59 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
60 let mut buffer = self.buffer.try_lock().unwrap();
61 Write::write(&mut *buffer, buf)
62 }
63
64 fn flush(&mut self) -> std::io::Result<()> {
65 let mut buffer = self.buffer.try_lock().unwrap();
66 Write::flush(&mut *buffer)
67 }
68}
69
70/// A trait that defines the methods required for a RecordBatch serializer.
71pub trait BatchSerializer: Sync + Send {
72 /// Asynchronously serializes a `RecordBatch` and returns the serialized bytes.
73 /// Parameter `initial` signals whether the given batch is the first batch.
74 /// This distinction is important for certain serializers (like CSV).
75 fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes>;
76}
77
78/// Returns an [`AsyncWrite`] which writes to the given object store location
79/// with the specified compression.
80///
81/// The writer will have a default buffer size as chosen by [`BufWriter::new`].
82///
83/// We drop the `AbortableWrite` struct and the writer will not try to cleanup on failure.
84/// Users can configure automatic cleanup with their cloud provider.
85#[deprecated(since = "48.0.0", note = "Use ObjectWriterBuilder::new(...) instead")]
86pub async fn create_writer(
87 file_compression_type: FileCompressionType,
88 location: &Path,
89 object_store: Arc<dyn ObjectStore>,
90) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
91 ObjectWriterBuilder::new(file_compression_type, location, object_store).build()
92}
93
94/// Converts table schema to writer schema, which may differ in the case
95/// of hive style partitioning where some columns are removed from the
96/// underlying files.
97pub fn get_writer_schema(config: &FileSinkConfig) -> Arc<Schema> {
98 if !config.table_partition_cols.is_empty() && !config.keep_partition_by_columns {
99 let schema = config.output_schema();
100 let partition_names: Vec<_> =
101 config.table_partition_cols.iter().map(|(s, _)| s).collect();
102 Arc::new(Schema::new_with_metadata(
103 schema
104 .fields()
105 .iter()
106 .filter(|f| !partition_names.contains(&f.name()))
107 .map(|f| (**f).clone())
108 .collect::<Vec<_>>(),
109 schema.metadata().clone(),
110 ))
111 } else {
112 Arc::clone(config.output_schema())
113 }
114}
115
116/// A builder for an [`AsyncWrite`] that writes to an object store location.
117///
118/// This can be used to specify file compression on the writer. The writer
119/// will have a default buffer size unless altered. The specific default size
120/// is chosen by [`BufWriter::new`].
121///
122/// We drop the `AbortableWrite` struct and the writer will not try to cleanup on failure.
123/// Users can configure automatic cleanup with their cloud provider.
124#[derive(Debug)]
125pub struct ObjectWriterBuilder {
126 /// Compression type for object writer.
127 file_compression_type: FileCompressionType,
128 /// Output path
129 location: Path,
130 /// The related store that handles the given path
131 object_store: Arc<dyn ObjectStore>,
132 /// The size of the buffer for the object writer.
133 buffer_size: Option<usize>,
134 /// The compression level for the object writer.
135 compression_level: Option<u32>,
136}
137
138impl ObjectWriterBuilder {
139 /// Create a new [`ObjectWriterBuilder`] for the specified path and compression type.
140 pub fn new(
141 file_compression_type: FileCompressionType,
142 location: &Path,
143 object_store: Arc<dyn ObjectStore>,
144 ) -> Self {
145 Self {
146 file_compression_type,
147 location: location.clone(),
148 object_store,
149 buffer_size: None,
150 compression_level: None,
151 }
152 }
153
154 /// Set buffer size in bytes for object writer.
155 ///
156 /// # Example
157 /// ```
158 /// # use datafusion_datasource::file_compression_type::FileCompressionType;
159 /// # use datafusion_datasource::write::ObjectWriterBuilder;
160 /// # use object_store::memory::InMemory;
161 /// # use object_store::path::Path;
162 /// # use std::sync::Arc;
163 /// # let compression_type = FileCompressionType::UNCOMPRESSED;
164 /// # let location = Path::from("/foo/bar");
165 /// # let object_store = Arc::new(InMemory::new());
166 /// let mut builder = ObjectWriterBuilder::new(compression_type, &location, object_store);
167 /// builder.set_buffer_size(Some(20 * 1024 * 1024)); //20 MiB
168 /// assert_eq!(
169 /// builder.get_buffer_size(),
170 /// Some(20 * 1024 * 1024),
171 /// "Internal error: Builder buffer size doesn't match"
172 /// );
173 /// ```
174 pub fn set_buffer_size(&mut self, buffer_size: Option<usize>) {
175 self.buffer_size = buffer_size;
176 }
177
178 /// Set buffer size in bytes for object writer, returning the builder.
179 ///
180 /// # Example
181 /// ```
182 /// # use datafusion_datasource::file_compression_type::FileCompressionType;
183 /// # use datafusion_datasource::write::ObjectWriterBuilder;
184 /// # use object_store::memory::InMemory;
185 /// # use object_store::path::Path;
186 /// # use std::sync::Arc;
187 /// # let compression_type = FileCompressionType::UNCOMPRESSED;
188 /// # let location = Path::from("/foo/bar");
189 /// # let object_store = Arc::new(InMemory::new());
190 /// let builder = ObjectWriterBuilder::new(compression_type, &location, object_store)
191 /// .with_buffer_size(Some(20 * 1024 * 1024)); //20 MiB
192 /// assert_eq!(
193 /// builder.get_buffer_size(),
194 /// Some(20 * 1024 * 1024),
195 /// "Internal error: Builder buffer size doesn't match"
196 /// );
197 /// ```
198 pub fn with_buffer_size(mut self, buffer_size: Option<usize>) -> Self {
199 self.buffer_size = buffer_size;
200 self
201 }
202
203 /// Currently specified buffer size in bytes.
204 pub fn get_buffer_size(&self) -> Option<usize> {
205 self.buffer_size
206 }
207
208 /// Set compression level for object writer.
209 pub fn set_compression_level(&mut self, compression_level: Option<u32>) {
210 self.compression_level = compression_level;
211 }
212
213 /// Set compression level for object writer, returning the builder.
214 pub fn with_compression_level(mut self, compression_level: Option<u32>) -> Self {
215 self.compression_level = compression_level;
216 self
217 }
218
219 /// Currently specified compression level.
220 pub fn get_compression_level(&self) -> Option<u32> {
221 self.compression_level
222 }
223
224 /// Return a writer object that writes to the object store location.
225 ///
226 /// If a buffer size has not been set, the default buffer buffer size will
227 /// be used.
228 ///
229 /// # Errors
230 /// If there is an error applying the compression type.
231 pub fn build(self) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
232 let Self {
233 file_compression_type,
234 location,
235 object_store,
236 buffer_size,
237 compression_level,
238 } = self;
239
240 let buf_writer = match buffer_size {
241 Some(size) => BufWriter::with_capacity(object_store, location, size),
242 None => BufWriter::new(object_store, location),
243 };
244
245 file_compression_type
246 .convert_async_writer_with_level(buf_writer, compression_level)
247 }
248}