datafusion_datasource/write/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module containing helper methods/traits related to enabling
19//! write support for the various file formats
20
21use std::io::Write;
22use std::sync::Arc;
23
24use crate::file_compression_type::FileCompressionType;
25use crate::file_sink_config::FileSinkConfig;
26use datafusion_common::error::Result;
27
28use arrow::array::RecordBatch;
29use arrow::datatypes::Schema;
30use bytes::Bytes;
31use object_store::buffered::BufWriter;
32use object_store::path::Path;
33use object_store::ObjectStore;
34use tokio::io::AsyncWrite;
35
36pub mod demux;
37pub mod orchestration;
38
39/// A buffer with interior mutability shared by the SerializedFileWriter and
40/// ObjectStore writer
41#[derive(Clone)]
42pub struct SharedBuffer {
43 /// The inner buffer for reading and writing
44 ///
45 /// The lock is used to obtain internal mutability, so no worry about the
46 /// lock contention.
47 pub buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
48}
49
50impl SharedBuffer {
51 pub fn new(capacity: usize) -> Self {
52 Self {
53 buffer: Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))),
54 }
55 }
56}
57
58impl Write for SharedBuffer {
59 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
60 let mut buffer = self.buffer.try_lock().unwrap();
61 Write::write(&mut *buffer, buf)
62 }
63
64 fn flush(&mut self) -> std::io::Result<()> {
65 let mut buffer = self.buffer.try_lock().unwrap();
66 Write::flush(&mut *buffer)
67 }
68}
69
70/// A trait that defines the methods required for a RecordBatch serializer.
71pub trait BatchSerializer: Sync + Send {
72 /// Asynchronously serializes a `RecordBatch` and returns the serialized bytes.
73 /// Parameter `initial` signals whether the given batch is the first batch.
74 /// This distinction is important for certain serializers (like CSV).
75 fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes>;
76}
77
78/// Returns an [`AsyncWrite`] which writes to the given object store location
79/// with the specified compression.
80/// We drop the `AbortableWrite` struct and the writer will not try to cleanup on failure.
81/// Users can configure automatic cleanup with their cloud provider.
82pub async fn create_writer(
83 file_compression_type: FileCompressionType,
84 location: &Path,
85 object_store: Arc<dyn ObjectStore>,
86) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
87 let buf_writer = BufWriter::new(object_store, location.clone());
88 file_compression_type.convert_async_writer(buf_writer)
89}
90
91/// Converts table schema to writer schema, which may differ in the case
92/// of hive style partitioning where some columns are removed from the
93/// underlying files.
94pub fn get_writer_schema(config: &FileSinkConfig) -> Arc<Schema> {
95 if !config.table_partition_cols.is_empty() && !config.keep_partition_by_columns {
96 let schema = config.output_schema();
97 let partition_names: Vec<_> =
98 config.table_partition_cols.iter().map(|(s, _)| s).collect();
99 Arc::new(Schema::new_with_metadata(
100 schema
101 .fields()
102 .iter()
103 .filter(|f| !partition_names.contains(&f.name()))
104 .map(|f| (**f).clone())
105 .collect::<Vec<_>>(),
106 schema.metadata().clone(),
107 ))
108 } else {
109 Arc::clone(config.output_schema())
110 }
111}