datafusion_datasource/
sink.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plan for writing data to [`DataSink`]s
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::Debug;
23use std::sync::Arc;
24
25use datafusion_physical_plan::metrics::MetricsSet;
26use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
27use datafusion_physical_plan::ExecutionPlanProperties;
28use datafusion_physical_plan::{
29    execute_input_stream, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
30    PlanProperties, SendableRecordBatchStream,
31};
32
33use arrow::array::{ArrayRef, RecordBatch, UInt64Array};
34use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
35use datafusion_common::{internal_err, Result};
36use datafusion_execution::TaskContext;
37use datafusion_physical_expr::{Distribution, EquivalenceProperties};
38use datafusion_physical_expr_common::sort_expr::LexRequirement;
39
40use async_trait::async_trait;
41use futures::StreamExt;
42
43/// `DataSink` implements writing streams of [`RecordBatch`]es to
44/// user defined destinations.
45///
46/// The `Display` impl is used to format the sink for explain plan
47/// output.
48#[async_trait]
49pub trait DataSink: DisplayAs + Debug + Send + Sync {
50    /// Returns the data sink as [`Any`](std::any::Any) so that it can be
51    /// downcast to a specific implementation.
52    fn as_any(&self) -> &dyn Any;
53
54    /// Return a snapshot of the [MetricsSet] for this
55    /// [DataSink].
56    ///
57    /// See [ExecutionPlan::metrics()] for more details
58    fn metrics(&self) -> Option<MetricsSet> {
59        None
60    }
61
62    /// Returns the sink schema
63    fn schema(&self) -> &SchemaRef;
64
65    // TODO add desired input ordering
66    // How does this sink want its input ordered?
67
68    /// Writes the data to the sink, returns the number of values written
69    ///
70    /// This method will be called exactly once during each DML
71    /// statement. Thus prior to return, the sink should do any commit
72    /// or rollback required.
73    async fn write_all(
74        &self,
75        data: SendableRecordBatchStream,
76        context: &Arc<TaskContext>,
77    ) -> Result<u64>;
78}
79
80/// Execution plan for writing record batches to a [`DataSink`]
81///
82/// Returns a single row with the number of values written
83#[derive(Clone)]
84pub struct DataSinkExec {
85    /// Input plan that produces the record batches to be written.
86    input: Arc<dyn ExecutionPlan>,
87    /// Sink to which to write
88    sink: Arc<dyn DataSink>,
89    /// Schema describing the structure of the output data.
90    count_schema: SchemaRef,
91    /// Optional required sort order for output data.
92    sort_order: Option<LexRequirement>,
93    cache: PlanProperties,
94}
95
96impl Debug for DataSinkExec {
97    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
98        write!(f, "DataSinkExec schema: {:?}", self.count_schema)
99    }
100}
101
102impl DataSinkExec {
103    /// Create a plan to write to `sink`
104    pub fn new(
105        input: Arc<dyn ExecutionPlan>,
106        sink: Arc<dyn DataSink>,
107        sort_order: Option<LexRequirement>,
108    ) -> Self {
109        let count_schema = make_count_schema();
110        let cache = Self::create_schema(&input, count_schema);
111        Self {
112            input,
113            sink,
114            count_schema: make_count_schema(),
115            sort_order,
116            cache,
117        }
118    }
119
120    /// Input execution plan
121    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
122        &self.input
123    }
124
125    /// Returns insert sink
126    pub fn sink(&self) -> &dyn DataSink {
127        self.sink.as_ref()
128    }
129
130    /// Optional sort order for output data
131    pub fn sort_order(&self) -> &Option<LexRequirement> {
132        &self.sort_order
133    }
134
135    fn create_schema(
136        input: &Arc<dyn ExecutionPlan>,
137        schema: SchemaRef,
138    ) -> PlanProperties {
139        let eq_properties = EquivalenceProperties::new(schema);
140        PlanProperties::new(
141            eq_properties,
142            Partitioning::UnknownPartitioning(1),
143            input.pipeline_behavior(),
144            input.boundedness(),
145        )
146    }
147}
148
149impl DisplayAs for DataSinkExec {
150    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
151        match t {
152            DisplayFormatType::Default | DisplayFormatType::Verbose => {
153                write!(f, "DataSinkExec: sink=")?;
154                self.sink.fmt_as(t, f)
155            }
156            DisplayFormatType::TreeRender => self.sink().fmt_as(t, f),
157        }
158    }
159}
160
161impl ExecutionPlan for DataSinkExec {
162    fn name(&self) -> &'static str {
163        "DataSinkExec"
164    }
165
166    /// Return a reference to Any that can be used for downcasting
167    fn as_any(&self) -> &dyn Any {
168        self
169    }
170
171    fn properties(&self) -> &PlanProperties {
172        &self.cache
173    }
174
175    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
176        // DataSink is responsible for dynamically partitioning its
177        // own input at execution time.
178        vec![false]
179    }
180
181    fn required_input_distribution(&self) -> Vec<Distribution> {
182        // DataSink is responsible for dynamically partitioning its
183        // own input at execution time, and so requires a single input partition.
184        vec![Distribution::SinglePartition; self.children().len()]
185    }
186
187    fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
188        // The required input ordering is set externally (e.g. by a `ListingTable`).
189        // Otherwise, there is no specific requirement (i.e. `sort_expr` is `None`).
190        vec![self.sort_order.as_ref().cloned()]
191    }
192
193    fn maintains_input_order(&self) -> Vec<bool> {
194        // Maintains ordering in the sense that the written file will reflect
195        // the ordering of the input. For more context, see:
196        //
197        // https://github.com/apache/datafusion/pull/6354#discussion_r1195284178
198        vec![true]
199    }
200
201    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
202        vec![&self.input]
203    }
204
205    fn with_new_children(
206        self: Arc<Self>,
207        children: Vec<Arc<dyn ExecutionPlan>>,
208    ) -> Result<Arc<dyn ExecutionPlan>> {
209        Ok(Arc::new(Self::new(
210            Arc::clone(&children[0]),
211            Arc::clone(&self.sink),
212            self.sort_order.clone(),
213        )))
214    }
215
216    /// Execute the plan and return a stream of `RecordBatch`es for
217    /// the specified partition.
218    fn execute(
219        &self,
220        partition: usize,
221        context: Arc<TaskContext>,
222    ) -> Result<SendableRecordBatchStream> {
223        if partition != 0 {
224            return internal_err!("DataSinkExec can only be called on partition 0!");
225        }
226        let data = execute_input_stream(
227            Arc::clone(&self.input),
228            Arc::clone(self.sink.schema()),
229            0,
230            Arc::clone(&context),
231        )?;
232
233        let count_schema = Arc::clone(&self.count_schema);
234        let sink = Arc::clone(&self.sink);
235
236        let stream = futures::stream::once(async move {
237            sink.write_all(data, &context).await.map(make_count_batch)
238        })
239        .boxed();
240
241        Ok(Box::pin(RecordBatchStreamAdapter::new(
242            count_schema,
243            stream,
244        )))
245    }
246
247    /// Returns the metrics of the underlying [DataSink]
248    fn metrics(&self) -> Option<MetricsSet> {
249        self.sink.metrics()
250    }
251}
252
253/// Create a output record batch with a count
254///
255/// ```text
256/// +-------+,
257/// | count |,
258/// +-------+,
259/// | 6     |,
260/// +-------+,
261/// ```
262fn make_count_batch(count: u64) -> RecordBatch {
263    let array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef;
264
265    RecordBatch::try_from_iter_with_nullable(vec![("count", array, false)]).unwrap()
266}
267
268fn make_count_schema() -> SchemaRef {
269    // Define a schema.
270    Arc::new(Schema::new(vec![Field::new(
271        "count",
272        DataType::UInt64,
273        false,
274    )]))
275}