datafusion_datasource/
sink.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plan for writing data to [`DataSink`]s
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::Debug;
23use std::sync::Arc;
24
25use arrow::array::{ArrayRef, RecordBatch, UInt64Array};
26use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
27use datafusion_common::{internal_err, Result};
28use datafusion_execution::TaskContext;
29use datafusion_physical_expr::{Distribution, EquivalenceProperties};
30use datafusion_physical_expr_common::sort_expr::{LexRequirement, OrderingRequirements};
31use datafusion_physical_plan::metrics::MetricsSet;
32use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
33use datafusion_physical_plan::{
34    execute_input_stream, DisplayAs, DisplayFormatType, ExecutionPlan,
35    ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
36};
37
38use async_trait::async_trait;
39use datafusion_physical_plan::execution_plan::{EvaluationType, SchedulingType};
40use futures::StreamExt;
41
42/// `DataSink` implements writing streams of [`RecordBatch`]es to
43/// user defined destinations.
44///
45/// The `Display` impl is used to format the sink for explain plan
46/// output.
47#[async_trait]
48pub trait DataSink: DisplayAs + Debug + Send + Sync {
49    /// Returns the data sink as [`Any`] so that it can be
50    /// downcast to a specific implementation.
51    fn as_any(&self) -> &dyn Any;
52
53    /// Return a snapshot of the [MetricsSet] for this
54    /// [DataSink].
55    ///
56    /// See [ExecutionPlan::metrics()] for more details
57    fn metrics(&self) -> Option<MetricsSet> {
58        None
59    }
60
61    /// Returns the sink schema
62    fn schema(&self) -> &SchemaRef;
63
64    // TODO add desired input ordering
65    // How does this sink want its input ordered?
66
67    /// Writes the data to the sink, returns the number of values written
68    ///
69    /// This method will be called exactly once during each DML
70    /// statement. Thus prior to return, the sink should do any commit
71    /// or rollback required.
72    async fn write_all(
73        &self,
74        data: SendableRecordBatchStream,
75        context: &Arc<TaskContext>,
76    ) -> Result<u64>;
77}
78
79/// Execution plan for writing record batches to a [`DataSink`]
80///
81/// Returns a single row with the number of values written
82#[derive(Clone)]
83pub struct DataSinkExec {
84    /// Input plan that produces the record batches to be written.
85    input: Arc<dyn ExecutionPlan>,
86    /// Sink to which to write
87    sink: Arc<dyn DataSink>,
88    /// Schema describing the structure of the output data.
89    count_schema: SchemaRef,
90    /// Optional required sort order for output data.
91    sort_order: Option<LexRequirement>,
92    cache: PlanProperties,
93}
94
95impl Debug for DataSinkExec {
96    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
97        write!(f, "DataSinkExec schema: {:?}", self.count_schema)
98    }
99}
100
101impl DataSinkExec {
102    /// Create a plan to write to `sink`
103    /// Note: DataSinkExec requires its input to have a single partition.
104    /// If the input has multiple partitions, the physical optimizer will
105    /// automatically insert a Merge-related operator to merge them.
106    /// If you construct PhysicalPlan without going through the physical optimizer,
107    /// you must ensure that the input has a single partition.
108    pub fn new(
109        input: Arc<dyn ExecutionPlan>,
110        sink: Arc<dyn DataSink>,
111        sort_order: Option<LexRequirement>,
112    ) -> Self {
113        let count_schema = make_count_schema();
114        let cache = Self::create_schema(&input, count_schema);
115        Self {
116            input,
117            sink,
118            count_schema: make_count_schema(),
119            sort_order,
120            cache,
121        }
122    }
123
124    /// Input execution plan
125    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
126        &self.input
127    }
128
129    /// Returns insert sink
130    pub fn sink(&self) -> &dyn DataSink {
131        self.sink.as_ref()
132    }
133
134    /// Optional sort order for output data
135    pub fn sort_order(&self) -> &Option<LexRequirement> {
136        &self.sort_order
137    }
138
139    fn create_schema(
140        input: &Arc<dyn ExecutionPlan>,
141        schema: SchemaRef,
142    ) -> PlanProperties {
143        let eq_properties = EquivalenceProperties::new(schema);
144        PlanProperties::new(
145            eq_properties,
146            Partitioning::UnknownPartitioning(1),
147            input.pipeline_behavior(),
148            input.boundedness(),
149        )
150        .with_scheduling_type(SchedulingType::Cooperative)
151        .with_evaluation_type(EvaluationType::Eager)
152    }
153}
154
155impl DisplayAs for DataSinkExec {
156    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
157        match t {
158            DisplayFormatType::Default | DisplayFormatType::Verbose => {
159                write!(f, "DataSinkExec: sink=")?;
160                self.sink.fmt_as(t, f)
161            }
162            DisplayFormatType::TreeRender => self.sink().fmt_as(t, f),
163        }
164    }
165}
166
167impl ExecutionPlan for DataSinkExec {
168    fn name(&self) -> &'static str {
169        "DataSinkExec"
170    }
171
172    /// Return a reference to Any that can be used for downcasting
173    fn as_any(&self) -> &dyn Any {
174        self
175    }
176
177    fn properties(&self) -> &PlanProperties {
178        &self.cache
179    }
180
181    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
182        // DataSink is responsible for dynamically partitioning its
183        // own input at execution time.
184        vec![false]
185    }
186
187    fn required_input_distribution(&self) -> Vec<Distribution> {
188        // DataSink is responsible for dynamically partitioning its
189        // own input at execution time, and so requires a single input partition.
190        vec![Distribution::SinglePartition; self.children().len()]
191    }
192
193    fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
194        // The required input ordering is set externally (e.g. by a `ListingTable`).
195        // Otherwise, there is no specific requirement (i.e. `sort_order` is `None`).
196        vec![self.sort_order.as_ref().cloned().map(Into::into)]
197    }
198
199    fn maintains_input_order(&self) -> Vec<bool> {
200        // Maintains ordering in the sense that the written file will reflect
201        // the ordering of the input. For more context, see:
202        //
203        // https://github.com/apache/datafusion/pull/6354#discussion_r1195284178
204        vec![true]
205    }
206
207    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
208        vec![&self.input]
209    }
210
211    fn with_new_children(
212        self: Arc<Self>,
213        children: Vec<Arc<dyn ExecutionPlan>>,
214    ) -> Result<Arc<dyn ExecutionPlan>> {
215        Ok(Arc::new(Self::new(
216            Arc::clone(&children[0]),
217            Arc::clone(&self.sink),
218            self.sort_order.clone(),
219        )))
220    }
221
222    /// Execute the plan and return a stream of `RecordBatch`es for
223    /// the specified partition.
224    fn execute(
225        &self,
226        partition: usize,
227        context: Arc<TaskContext>,
228    ) -> Result<SendableRecordBatchStream> {
229        if partition != 0 {
230            return internal_err!("DataSinkExec can only be called on partition 0!");
231        }
232        let data = execute_input_stream(
233            Arc::clone(&self.input),
234            Arc::clone(self.sink.schema()),
235            0,
236            Arc::clone(&context),
237        )?;
238
239        let count_schema = Arc::clone(&self.count_schema);
240        let sink = Arc::clone(&self.sink);
241
242        let stream = futures::stream::once(async move {
243            sink.write_all(data, &context).await.map(make_count_batch)
244        })
245        .boxed();
246
247        Ok(Box::pin(RecordBatchStreamAdapter::new(
248            count_schema,
249            stream,
250        )))
251    }
252
253    /// Returns the metrics of the underlying [DataSink]
254    fn metrics(&self) -> Option<MetricsSet> {
255        self.sink.metrics()
256    }
257}
258
259/// Create a output record batch with a count
260///
261/// ```text
262/// +-------+,
263/// | count |,
264/// +-------+,
265/// | 6     |,
266/// +-------+,
267/// ```
268fn make_count_batch(count: u64) -> RecordBatch {
269    let array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef;
270
271    RecordBatch::try_from_iter_with_nullable(vec![("count", array, false)]).unwrap()
272}
273
274fn make_count_schema() -> SchemaRef {
275    // Define a schema.
276    Arc::new(Schema::new(vec![Field::new(
277        "count",
278        DataType::UInt64,
279        false,
280    )]))
281}