datafusion_datasource/
sink.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plan for writing data to [`DataSink`]s
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::Debug;
23use std::sync::Arc;
24
25use arrow::array::{ArrayRef, RecordBatch, UInt64Array};
26use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
27use datafusion_common::{internal_err, Result};
28use datafusion_execution::TaskContext;
29use datafusion_physical_expr::{Distribution, EquivalenceProperties};
30use datafusion_physical_expr_common::sort_expr::{LexRequirement, OrderingRequirements};
31use datafusion_physical_plan::metrics::MetricsSet;
32use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
33use datafusion_physical_plan::{
34    execute_input_stream, DisplayAs, DisplayFormatType, ExecutionPlan,
35    ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
36};
37
38use async_trait::async_trait;
39use datafusion_physical_plan::execution_plan::{EvaluationType, SchedulingType};
40use futures::StreamExt;
41
42/// `DataSink` implements writing streams of [`RecordBatch`]es to
43/// user defined destinations.
44///
45/// The `Display` impl is used to format the sink for explain plan
46/// output.
47#[async_trait]
48pub trait DataSink: DisplayAs + Debug + Send + Sync {
49    /// Returns the data sink as [`Any`] so that it can be
50    /// downcast to a specific implementation.
51    fn as_any(&self) -> &dyn Any;
52
53    /// Return a snapshot of the [MetricsSet] for this
54    /// [DataSink].
55    ///
56    /// See [ExecutionPlan::metrics()] for more details
57    fn metrics(&self) -> Option<MetricsSet> {
58        None
59    }
60
61    /// Returns the sink schema
62    fn schema(&self) -> &SchemaRef;
63
64    // TODO add desired input ordering
65    // How does this sink want its input ordered?
66
67    /// Writes the data to the sink, returns the number of values written
68    ///
69    /// This method will be called exactly once during each DML
70    /// statement. Thus prior to return, the sink should do any commit
71    /// or rollback required.
72    async fn write_all(
73        &self,
74        data: SendableRecordBatchStream,
75        context: &Arc<TaskContext>,
76    ) -> Result<u64>;
77}
78
79/// Execution plan for writing record batches to a [`DataSink`]
80///
81/// Returns a single row with the number of values written
82#[derive(Clone)]
83pub struct DataSinkExec {
84    /// Input plan that produces the record batches to be written.
85    input: Arc<dyn ExecutionPlan>,
86    /// Sink to which to write
87    sink: Arc<dyn DataSink>,
88    /// Schema describing the structure of the output data.
89    count_schema: SchemaRef,
90    /// Optional required sort order for output data.
91    sort_order: Option<LexRequirement>,
92    cache: PlanProperties,
93}
94
95impl Debug for DataSinkExec {
96    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
97        write!(f, "DataSinkExec schema: {:?}", self.count_schema)
98    }
99}
100
101impl DataSinkExec {
102    /// Create a plan to write to `sink`
103    pub fn new(
104        input: Arc<dyn ExecutionPlan>,
105        sink: Arc<dyn DataSink>,
106        sort_order: Option<LexRequirement>,
107    ) -> Self {
108        let count_schema = make_count_schema();
109        let cache = Self::create_schema(&input, count_schema);
110        Self {
111            input,
112            sink,
113            count_schema: make_count_schema(),
114            sort_order,
115            cache,
116        }
117    }
118
119    /// Input execution plan
120    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
121        &self.input
122    }
123
124    /// Returns insert sink
125    pub fn sink(&self) -> &dyn DataSink {
126        self.sink.as_ref()
127    }
128
129    /// Optional sort order for output data
130    pub fn sort_order(&self) -> &Option<LexRequirement> {
131        &self.sort_order
132    }
133
134    fn create_schema(
135        input: &Arc<dyn ExecutionPlan>,
136        schema: SchemaRef,
137    ) -> PlanProperties {
138        let eq_properties = EquivalenceProperties::new(schema);
139        PlanProperties::new(
140            eq_properties,
141            Partitioning::UnknownPartitioning(1),
142            input.pipeline_behavior(),
143            input.boundedness(),
144        )
145        .with_scheduling_type(SchedulingType::Cooperative)
146        .with_evaluation_type(EvaluationType::Eager)
147    }
148}
149
150impl DisplayAs for DataSinkExec {
151    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
152        match t {
153            DisplayFormatType::Default | DisplayFormatType::Verbose => {
154                write!(f, "DataSinkExec: sink=")?;
155                self.sink.fmt_as(t, f)
156            }
157            DisplayFormatType::TreeRender => self.sink().fmt_as(t, f),
158        }
159    }
160}
161
162impl ExecutionPlan for DataSinkExec {
163    fn name(&self) -> &'static str {
164        "DataSinkExec"
165    }
166
167    /// Return a reference to Any that can be used for downcasting
168    fn as_any(&self) -> &dyn Any {
169        self
170    }
171
172    fn properties(&self) -> &PlanProperties {
173        &self.cache
174    }
175
176    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
177        // DataSink is responsible for dynamically partitioning its
178        // own input at execution time.
179        vec![false]
180    }
181
182    fn required_input_distribution(&self) -> Vec<Distribution> {
183        // DataSink is responsible for dynamically partitioning its
184        // own input at execution time, and so requires a single input partition.
185        vec![Distribution::SinglePartition; self.children().len()]
186    }
187
188    fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
189        // The required input ordering is set externally (e.g. by a `ListingTable`).
190        // Otherwise, there is no specific requirement (i.e. `sort_order` is `None`).
191        vec![self.sort_order.as_ref().cloned().map(Into::into)]
192    }
193
194    fn maintains_input_order(&self) -> Vec<bool> {
195        // Maintains ordering in the sense that the written file will reflect
196        // the ordering of the input. For more context, see:
197        //
198        // https://github.com/apache/datafusion/pull/6354#discussion_r1195284178
199        vec![true]
200    }
201
202    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
203        vec![&self.input]
204    }
205
206    fn with_new_children(
207        self: Arc<Self>,
208        children: Vec<Arc<dyn ExecutionPlan>>,
209    ) -> Result<Arc<dyn ExecutionPlan>> {
210        Ok(Arc::new(Self::new(
211            Arc::clone(&children[0]),
212            Arc::clone(&self.sink),
213            self.sort_order.clone(),
214        )))
215    }
216
217    /// Execute the plan and return a stream of `RecordBatch`es for
218    /// the specified partition.
219    fn execute(
220        &self,
221        partition: usize,
222        context: Arc<TaskContext>,
223    ) -> Result<SendableRecordBatchStream> {
224        if partition != 0 {
225            return internal_err!("DataSinkExec can only be called on partition 0!");
226        }
227        let data = execute_input_stream(
228            Arc::clone(&self.input),
229            Arc::clone(self.sink.schema()),
230            0,
231            Arc::clone(&context),
232        )?;
233
234        let count_schema = Arc::clone(&self.count_schema);
235        let sink = Arc::clone(&self.sink);
236
237        let stream = futures::stream::once(async move {
238            sink.write_all(data, &context).await.map(make_count_batch)
239        })
240        .boxed();
241
242        Ok(Box::pin(RecordBatchStreamAdapter::new(
243            count_schema,
244            stream,
245        )))
246    }
247
248    /// Returns the metrics of the underlying [DataSink]
249    fn metrics(&self) -> Option<MetricsSet> {
250        self.sink.metrics()
251    }
252}
253
254/// Create a output record batch with a count
255///
256/// ```text
257/// +-------+,
258/// | count |,
259/// +-------+,
260/// | 6     |,
261/// +-------+,
262/// ```
263fn make_count_batch(count: u64) -> RecordBatch {
264    let array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef;
265
266    RecordBatch::try_from_iter_with_nullable(vec![("count", array, false)]).unwrap()
267}
268
269fn make_count_schema() -> SchemaRef {
270    // Define a schema.
271    Arc::new(Schema::new(vec![Field::new(
272        "count",
273        DataType::UInt64,
274        false,
275    )]))
276}