datafusion_datasource/write/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module containing helper methods/traits related to enabling
19//! write support for the various file formats
20
21use std::io::Write;
22use std::sync::Arc;
23
24use crate::file_compression_type::FileCompressionType;
25use crate::file_sink_config::FileSinkConfig;
26use datafusion_common::error::Result;
27
28use arrow::array::RecordBatch;
29use arrow::datatypes::Schema;
30use bytes::Bytes;
31use object_store::buffered::BufWriter;
32use object_store::path::Path;
33use object_store::ObjectStore;
34use tokio::io::AsyncWrite;
35
36pub mod demux;
37pub mod orchestration;
38
39/// A buffer with interior mutability shared by the SerializedFileWriter and
40/// ObjectStore writer
41#[derive(Clone)]
42pub struct SharedBuffer {
43    /// The inner buffer for reading and writing
44    ///
45    /// The lock is used to obtain internal mutability, so no worry about the
46    /// lock contention.
47    pub buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
48}
49
50impl SharedBuffer {
51    pub fn new(capacity: usize) -> Self {
52        Self {
53            buffer: Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))),
54        }
55    }
56}
57
58impl Write for SharedBuffer {
59    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
60        let mut buffer = self.buffer.try_lock().unwrap();
61        Write::write(&mut *buffer, buf)
62    }
63
64    fn flush(&mut self) -> std::io::Result<()> {
65        let mut buffer = self.buffer.try_lock().unwrap();
66        Write::flush(&mut *buffer)
67    }
68}
69
70/// A trait that defines the methods required for a RecordBatch serializer.
71pub trait BatchSerializer: Sync + Send {
72    /// Asynchronously serializes a `RecordBatch` and returns the serialized bytes.
73    /// Parameter `initial` signals whether the given batch is the first batch.
74    /// This distinction is important for certain serializers (like CSV).
75    fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes>;
76}
77
78/// Returns an [`AsyncWrite`] which writes to the given object store location
79/// with the specified compression.
80///
81/// The writer will have a default buffer size as chosen by [`BufWriter::new`].
82///
83/// We drop the `AbortableWrite` struct and the writer will not try to cleanup on failure.
84/// Users can configure automatic cleanup with their cloud provider.
85#[deprecated(since = "48.0.0", note = "Use ObjectWriterBuilder::new(...) instead")]
86pub async fn create_writer(
87    file_compression_type: FileCompressionType,
88    location: &Path,
89    object_store: Arc<dyn ObjectStore>,
90) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
91    ObjectWriterBuilder::new(file_compression_type, location, object_store).build()
92}
93
94/// Converts table schema to writer schema, which may differ in the case
95/// of hive style partitioning where some columns are removed from the
96/// underlying files.
97pub fn get_writer_schema(config: &FileSinkConfig) -> Arc<Schema> {
98    if !config.table_partition_cols.is_empty() && !config.keep_partition_by_columns {
99        let schema = config.output_schema();
100        let partition_names: Vec<_> =
101            config.table_partition_cols.iter().map(|(s, _)| s).collect();
102        Arc::new(Schema::new_with_metadata(
103            schema
104                .fields()
105                .iter()
106                .filter(|f| !partition_names.contains(&f.name()))
107                .map(|f| (**f).clone())
108                .collect::<Vec<_>>(),
109            schema.metadata().clone(),
110        ))
111    } else {
112        Arc::clone(config.output_schema())
113    }
114}
115
116/// A builder for an [`AsyncWrite`] that writes to an object store location.
117///
118/// This can be used to specify file compression on the writer. The writer
119/// will have a default buffer size unless altered. The specific default size
120/// is chosen by [`BufWriter::new`].
121///
122/// We drop the `AbortableWrite` struct and the writer will not try to cleanup on failure.
123/// Users can configure automatic cleanup with their cloud provider.
124#[derive(Debug)]
125pub struct ObjectWriterBuilder {
126    /// Compression type for object writer.
127    file_compression_type: FileCompressionType,
128    /// Output path
129    location: Path,
130    /// The related store that handles the given path
131    object_store: Arc<dyn ObjectStore>,
132    /// The size of the buffer for the object writer.
133    buffer_size: Option<usize>,
134}
135
136impl ObjectWriterBuilder {
137    /// Create a new [`ObjectWriterBuilder`] for the specified path and compression type.
138    pub fn new(
139        file_compression_type: FileCompressionType,
140        location: &Path,
141        object_store: Arc<dyn ObjectStore>,
142    ) -> Self {
143        Self {
144            file_compression_type,
145            location: location.clone(),
146            object_store,
147            buffer_size: None,
148        }
149    }
150
151    /// Set buffer size in bytes for object writer.
152    ///
153    /// # Example
154    /// ```
155    /// # use datafusion_datasource::file_compression_type::FileCompressionType;
156    /// # use datafusion_datasource::write::ObjectWriterBuilder;
157    /// # use object_store::memory::InMemory;
158    /// # use object_store::path::Path;
159    /// # use std::sync::Arc;
160    /// # let compression_type = FileCompressionType::UNCOMPRESSED;
161    /// # let location = Path::from("/foo/bar");
162    /// # let object_store = Arc::new(InMemory::new());
163    /// let mut builder = ObjectWriterBuilder::new(compression_type, &location, object_store);
164    /// builder.set_buffer_size(Some(20 * 1024 * 1024)); //20 MiB
165    /// assert_eq!(builder.get_buffer_size(), Some(20 * 1024 * 1024), "Internal error: Builder buffer size doesn't match");
166    /// ```
167    pub fn set_buffer_size(&mut self, buffer_size: Option<usize>) {
168        self.buffer_size = buffer_size;
169    }
170
171    /// Set buffer size in bytes for object writer, returning the builder.
172    ///
173    /// # Example
174    /// ```
175    /// # use datafusion_datasource::file_compression_type::FileCompressionType;
176    /// # use datafusion_datasource::write::ObjectWriterBuilder;
177    /// # use object_store::memory::InMemory;
178    /// # use object_store::path::Path;
179    /// # use std::sync::Arc;
180    /// # let compression_type = FileCompressionType::UNCOMPRESSED;
181    /// # let location = Path::from("/foo/bar");
182    /// # let object_store = Arc::new(InMemory::new());
183    /// let builder = ObjectWriterBuilder::new(compression_type, &location, object_store)
184    ///     .with_buffer_size(Some(20 * 1024 * 1024)); //20 MiB
185    /// assert_eq!(builder.get_buffer_size(), Some(20 * 1024 * 1024), "Internal error: Builder buffer size doesn't match");
186    /// ```
187    pub fn with_buffer_size(mut self, buffer_size: Option<usize>) -> Self {
188        self.buffer_size = buffer_size;
189        self
190    }
191
192    /// Currently specified buffer size in bytes.
193    pub fn get_buffer_size(&self) -> Option<usize> {
194        self.buffer_size
195    }
196
197    /// Return a writer object that writes to the object store location.
198    ///
199    /// If a buffer size has not been set, the default buffer buffer size will
200    /// be used.
201    ///
202    /// # Errors
203    /// If there is an error applying the compression type.
204    pub fn build(self) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
205        let Self {
206            file_compression_type,
207            location,
208            object_store,
209            buffer_size,
210        } = self;
211
212        let buf_writer = match buffer_size {
213            Some(size) => BufWriter::with_capacity(object_store, location, size),
214            None => BufWriter::new(object_store, location),
215        };
216
217        file_compression_type.convert_async_writer(buf_writer)
218    }
219}