datafusion_datasource/write/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module containing helper methods/traits related to enabling
19//! write support for the various file formats
20
21use std::io::Write;
22use std::sync::Arc;
23
24use crate::file_compression_type::FileCompressionType;
25use crate::file_sink_config::FileSinkConfig;
26use datafusion_common::error::Result;
27
28use arrow::array::RecordBatch;
29use arrow::datatypes::Schema;
30use bytes::Bytes;
31use object_store::buffered::BufWriter;
32use object_store::path::Path;
33use object_store::ObjectStore;
34use tokio::io::AsyncWrite;
35
36pub mod demux;
37pub mod orchestration;
38
39/// A buffer with interior mutability shared by the SerializedFileWriter and
40/// ObjectStore writer
41#[derive(Clone)]
42pub struct SharedBuffer {
43    /// The inner buffer for reading and writing
44    ///
45    /// The lock is used to obtain internal mutability, so no worry about the
46    /// lock contention.
47    pub buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
48}
49
50impl SharedBuffer {
51    pub fn new(capacity: usize) -> Self {
52        Self {
53            buffer: Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))),
54        }
55    }
56}
57
58impl Write for SharedBuffer {
59    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
60        let mut buffer = self.buffer.try_lock().unwrap();
61        Write::write(&mut *buffer, buf)
62    }
63
64    fn flush(&mut self) -> std::io::Result<()> {
65        let mut buffer = self.buffer.try_lock().unwrap();
66        Write::flush(&mut *buffer)
67    }
68}
69
70/// A trait that defines the methods required for a RecordBatch serializer.
71pub trait BatchSerializer: Sync + Send {
72    /// Asynchronously serializes a `RecordBatch` and returns the serialized bytes.
73    /// Parameter `initial` signals whether the given batch is the first batch.
74    /// This distinction is important for certain serializers (like CSV).
75    fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes>;
76}
77
78/// Returns an [`AsyncWrite`] which writes to the given object store location
79/// with the specified compression.
80/// We drop the `AbortableWrite` struct and the writer will not try to cleanup on failure.
81/// Users can configure automatic cleanup with their cloud provider.
82pub async fn create_writer(
83    file_compression_type: FileCompressionType,
84    location: &Path,
85    object_store: Arc<dyn ObjectStore>,
86) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
87    let buf_writer = BufWriter::new(object_store, location.clone());
88    file_compression_type.convert_async_writer(buf_writer)
89}
90
91/// Converts table schema to writer schema, which may differ in the case
92/// of hive style partitioning where some columns are removed from the
93/// underlying files.
94pub fn get_writer_schema(config: &FileSinkConfig) -> Arc<Schema> {
95    if !config.table_partition_cols.is_empty() && !config.keep_partition_by_columns {
96        let schema = config.output_schema();
97        let partition_names: Vec<_> =
98            config.table_partition_cols.iter().map(|(s, _)| s).collect();
99        Arc::new(Schema::new_with_metadata(
100            schema
101                .fields()
102                .iter()
103                .filter(|f| !partition_names.contains(&f.name()))
104                .map(|f| (**f).clone())
105                .collect::<Vec<_>>(),
106            schema.metadata().clone(),
107        ))
108    } else {
109        Arc::clone(config.output_schema())
110    }
111}