datafusion_datasource/write/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module containing helper methods/traits related to enabling
19//! write support for the various file formats
20
21use std::io::Write;
22use std::sync::Arc;
23
24use crate::file_compression_type::FileCompressionType;
25use crate::file_sink_config::FileSinkConfig;
26use datafusion_common::error::Result;
27
28use arrow::array::RecordBatch;
29use arrow::datatypes::Schema;
30use bytes::Bytes;
31use object_store::buffered::BufWriter;
32use object_store::path::Path;
33use object_store::ObjectStore;
34use tokio::io::AsyncWrite;
35
36pub mod demux;
37pub mod orchestration;
38
39/// A buffer with interior mutability shared by the SerializedFileWriter and
40/// ObjectStore writer
41#[derive(Clone)]
42pub struct SharedBuffer {
43 /// The inner buffer for reading and writing
44 ///
45 /// The lock is used to obtain internal mutability, so no worry about the
46 /// lock contention.
47 pub buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
48}
49
50impl SharedBuffer {
51 pub fn new(capacity: usize) -> Self {
52 Self {
53 buffer: Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))),
54 }
55 }
56}
57
58impl Write for SharedBuffer {
59 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
60 let mut buffer = self.buffer.try_lock().unwrap();
61 Write::write(&mut *buffer, buf)
62 }
63
64 fn flush(&mut self) -> std::io::Result<()> {
65 let mut buffer = self.buffer.try_lock().unwrap();
66 Write::flush(&mut *buffer)
67 }
68}
69
70/// A trait that defines the methods required for a RecordBatch serializer.
71pub trait BatchSerializer: Sync + Send {
72 /// Asynchronously serializes a `RecordBatch` and returns the serialized bytes.
73 /// Parameter `initial` signals whether the given batch is the first batch.
74 /// This distinction is important for certain serializers (like CSV).
75 fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes>;
76}
77
78/// Returns an [`AsyncWrite`] which writes to the given object store location
79/// with the specified compression.
80///
81/// The writer will have a default buffer size as chosen by [`BufWriter::new`].
82///
83/// We drop the `AbortableWrite` struct and the writer will not try to cleanup on failure.
84/// Users can configure automatic cleanup with their cloud provider.
85#[deprecated(since = "48.0.0", note = "Use ObjectWriterBuilder::new(...) instead")]
86pub async fn create_writer(
87 file_compression_type: FileCompressionType,
88 location: &Path,
89 object_store: Arc<dyn ObjectStore>,
90) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
91 ObjectWriterBuilder::new(file_compression_type, location, object_store).build()
92}
93
94/// Converts table schema to writer schema, which may differ in the case
95/// of hive style partitioning where some columns are removed from the
96/// underlying files.
97pub fn get_writer_schema(config: &FileSinkConfig) -> Arc<Schema> {
98 if !config.table_partition_cols.is_empty() && !config.keep_partition_by_columns {
99 let schema = config.output_schema();
100 let partition_names: Vec<_> =
101 config.table_partition_cols.iter().map(|(s, _)| s).collect();
102 Arc::new(Schema::new_with_metadata(
103 schema
104 .fields()
105 .iter()
106 .filter(|f| !partition_names.contains(&f.name()))
107 .map(|f| (**f).clone())
108 .collect::<Vec<_>>(),
109 schema.metadata().clone(),
110 ))
111 } else {
112 Arc::clone(config.output_schema())
113 }
114}
115
116/// A builder for an [`AsyncWrite`] that writes to an object store location.
117///
118/// This can be used to specify file compression on the writer. The writer
119/// will have a default buffer size unless altered. The specific default size
120/// is chosen by [`BufWriter::new`].
121///
122/// We drop the `AbortableWrite` struct and the writer will not try to cleanup on failure.
123/// Users can configure automatic cleanup with their cloud provider.
124#[derive(Debug)]
125pub struct ObjectWriterBuilder {
126 /// Compression type for object writer.
127 file_compression_type: FileCompressionType,
128 /// Output path
129 location: Path,
130 /// The related store that handles the given path
131 object_store: Arc<dyn ObjectStore>,
132 /// The size of the buffer for the object writer.
133 buffer_size: Option<usize>,
134}
135
136impl ObjectWriterBuilder {
137 /// Create a new [`ObjectWriterBuilder`] for the specified path and compression type.
138 pub fn new(
139 file_compression_type: FileCompressionType,
140 location: &Path,
141 object_store: Arc<dyn ObjectStore>,
142 ) -> Self {
143 Self {
144 file_compression_type,
145 location: location.clone(),
146 object_store,
147 buffer_size: None,
148 }
149 }
150
151 /// Set buffer size in bytes for object writer.
152 ///
153 /// # Example
154 /// ```
155 /// # use datafusion_datasource::file_compression_type::FileCompressionType;
156 /// # use datafusion_datasource::write::ObjectWriterBuilder;
157 /// # use object_store::memory::InMemory;
158 /// # use object_store::path::Path;
159 /// # use std::sync::Arc;
160 /// # let compression_type = FileCompressionType::UNCOMPRESSED;
161 /// # let location = Path::from("/foo/bar");
162 /// # let object_store = Arc::new(InMemory::new());
163 /// let mut builder = ObjectWriterBuilder::new(compression_type, &location, object_store);
164 /// builder.set_buffer_size(Some(20 * 1024 * 1024)); //20 MiB
165 /// assert_eq!(builder.get_buffer_size(), Some(20 * 1024 * 1024), "Internal error: Builder buffer size doesn't match");
166 /// ```
167 pub fn set_buffer_size(&mut self, buffer_size: Option<usize>) {
168 self.buffer_size = buffer_size;
169 }
170
171 /// Set buffer size in bytes for object writer, returning the builder.
172 ///
173 /// # Example
174 /// ```
175 /// # use datafusion_datasource::file_compression_type::FileCompressionType;
176 /// # use datafusion_datasource::write::ObjectWriterBuilder;
177 /// # use object_store::memory::InMemory;
178 /// # use object_store::path::Path;
179 /// # use std::sync::Arc;
180 /// # let compression_type = FileCompressionType::UNCOMPRESSED;
181 /// # let location = Path::from("/foo/bar");
182 /// # let object_store = Arc::new(InMemory::new());
183 /// let builder = ObjectWriterBuilder::new(compression_type, &location, object_store)
184 /// .with_buffer_size(Some(20 * 1024 * 1024)); //20 MiB
185 /// assert_eq!(builder.get_buffer_size(), Some(20 * 1024 * 1024), "Internal error: Builder buffer size doesn't match");
186 /// ```
187 pub fn with_buffer_size(mut self, buffer_size: Option<usize>) -> Self {
188 self.buffer_size = buffer_size;
189 self
190 }
191
192 /// Currently specified buffer size in bytes.
193 pub fn get_buffer_size(&self) -> Option<usize> {
194 self.buffer_size
195 }
196
197 /// Return a writer object that writes to the object store location.
198 ///
199 /// If a buffer size has not been set, the default buffer buffer size will
200 /// be used.
201 ///
202 /// # Errors
203 /// If there is an error applying the compression type.
204 pub fn build(self) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
205 let Self {
206 file_compression_type,
207 location,
208 object_store,
209 buffer_size,
210 } = self;
211
212 let buf_writer = match buffer_size {
213 Some(size) => BufWriter::with_capacity(object_store, location, size),
214 None => BufWriter::new(object_store, location),
215 };
216
217 file_compression_type.convert_async_writer(buf_writer)
218 }
219}