Skip to main content

datafusion_datasource/
sink.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plan for writing data to [`DataSink`]s
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::Debug;
23use std::sync::Arc;
24
25use arrow::array::{ArrayRef, RecordBatch, UInt64Array};
26use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
27use datafusion_common::{Result, assert_eq_or_internal_err};
28use datafusion_execution::TaskContext;
29use datafusion_physical_expr::{Distribution, EquivalenceProperties};
30use datafusion_physical_expr_common::sort_expr::{LexRequirement, OrderingRequirements};
31use datafusion_physical_plan::metrics::MetricsSet;
32use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
33use datafusion_physical_plan::{
34    DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
35    PlanProperties, SendableRecordBatchStream, execute_input_stream,
36};
37
38use async_trait::async_trait;
39use datafusion_physical_plan::execution_plan::{EvaluationType, SchedulingType};
40use futures::StreamExt;
41
42/// `DataSink` implements writing streams of [`RecordBatch`]es to
43/// user defined destinations.
44///
45/// The `Display` impl is used to format the sink for explain plan
46/// output.
47#[async_trait]
48pub trait DataSink: Any + DisplayAs + Debug + Send + Sync {
49    /// Return a snapshot of the [MetricsSet] for this
50    /// [DataSink].
51    ///
52    /// See [ExecutionPlan::metrics()] for more details
53    fn metrics(&self) -> Option<MetricsSet> {
54        None
55    }
56
57    /// Returns the sink schema
58    fn schema(&self) -> &SchemaRef;
59
60    // TODO add desired input ordering
61    // How does this sink want its input ordered?
62
63    /// Writes the data to the sink, returns the number of values written
64    ///
65    /// This method will be called exactly once during each DML
66    /// statement. Thus prior to return, the sink should do any commit
67    /// or rollback required.
68    async fn write_all(
69        &self,
70        data: SendableRecordBatchStream,
71        context: &Arc<TaskContext>,
72    ) -> Result<u64>;
73}
74
75impl dyn DataSink {
76    /// Returns true if the inner type is `T`.
77    pub fn is<T: DataSink>(&self) -> bool {
78        (self as &dyn Any).is::<T>()
79    }
80
81    /// Returns a reference to the inner value as the type `T` if it is of that type.
82    pub fn downcast_ref<T: DataSink>(&self) -> Option<&T> {
83        (self as &dyn Any).downcast_ref()
84    }
85}
86
87/// Execution plan for writing record batches to a [`DataSink`]
88///
89/// Returns a single row with the number of values written
90#[derive(Clone)]
91pub struct DataSinkExec {
92    /// Input plan that produces the record batches to be written.
93    input: Arc<dyn ExecutionPlan>,
94    /// Sink to which to write
95    sink: Arc<dyn DataSink>,
96    /// Schema describing the structure of the output data.
97    count_schema: SchemaRef,
98    /// Optional required sort order for output data.
99    sort_order: Option<LexRequirement>,
100    cache: Arc<PlanProperties>,
101}
102
103impl Debug for DataSinkExec {
104    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
105        write!(f, "DataSinkExec schema: {}", self.count_schema)
106    }
107}
108
109impl DataSinkExec {
110    /// Create a plan to write to `sink`
111    /// Note: DataSinkExec requires its input to have a single partition.
112    /// If the input has multiple partitions, the physical optimizer will
113    /// automatically insert a Merge-related operator to merge them.
114    /// If you construct PhysicalPlan without going through the physical optimizer,
115    /// you must ensure that the input has a single partition.
116    pub fn new(
117        input: Arc<dyn ExecutionPlan>,
118        sink: Arc<dyn DataSink>,
119        sort_order: Option<LexRequirement>,
120    ) -> Self {
121        let count_schema = make_count_schema();
122        let cache = Self::create_schema(&input, count_schema);
123        Self {
124            input,
125            sink,
126            count_schema: make_count_schema(),
127            sort_order,
128            cache: Arc::new(cache),
129        }
130    }
131
132    /// Input execution plan
133    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
134        &self.input
135    }
136
137    /// Returns insert sink
138    pub fn sink(&self) -> &dyn DataSink {
139        self.sink.as_ref()
140    }
141
142    /// Optional sort order for output data
143    pub fn sort_order(&self) -> &Option<LexRequirement> {
144        &self.sort_order
145    }
146
147    fn create_schema(
148        input: &Arc<dyn ExecutionPlan>,
149        schema: SchemaRef,
150    ) -> PlanProperties {
151        let eq_properties = EquivalenceProperties::new(schema);
152        PlanProperties::new(
153            eq_properties,
154            Partitioning::UnknownPartitioning(1),
155            input.pipeline_behavior(),
156            input.boundedness(),
157        )
158        .with_scheduling_type(SchedulingType::Cooperative)
159        .with_evaluation_type(EvaluationType::Eager)
160    }
161}
162
163impl DisplayAs for DataSinkExec {
164    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
165        match t {
166            DisplayFormatType::Default | DisplayFormatType::Verbose => {
167                write!(f, "DataSinkExec: sink=")?;
168                self.sink.fmt_as(t, f)
169            }
170            DisplayFormatType::TreeRender => self.sink().fmt_as(t, f),
171        }
172    }
173}
174
175impl ExecutionPlan for DataSinkExec {
176    fn name(&self) -> &'static str {
177        "DataSinkExec"
178    }
179
180    /// Return a reference to Any that can be used for downcasting
181    fn properties(&self) -> &Arc<PlanProperties> {
182        &self.cache
183    }
184
185    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
186        // DataSink is responsible for dynamically partitioning its
187        // own input at execution time.
188        vec![false]
189    }
190
191    fn required_input_distribution(&self) -> Vec<Distribution> {
192        // DataSink is responsible for dynamically partitioning its
193        // own input at execution time, and so requires a single input partition.
194        vec![Distribution::SinglePartition; self.children().len()]
195    }
196
197    fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
198        // The required input ordering is set externally (e.g. by a `ListingTable`).
199        // Otherwise, there is no specific requirement (i.e. `sort_order` is `None`).
200        vec![self.sort_order.as_ref().cloned().map(Into::into)]
201    }
202
203    fn maintains_input_order(&self) -> Vec<bool> {
204        // Maintains ordering in the sense that the written file will reflect
205        // the ordering of the input. For more context, see:
206        //
207        // https://github.com/apache/datafusion/pull/6354#discussion_r1195284178
208        vec![true]
209    }
210
211    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
212        vec![&self.input]
213    }
214
215    fn with_new_children(
216        self: Arc<Self>,
217        children: Vec<Arc<dyn ExecutionPlan>>,
218    ) -> Result<Arc<dyn ExecutionPlan>> {
219        Ok(Arc::new(Self::new(
220            Arc::clone(&children[0]),
221            Arc::clone(&self.sink),
222            self.sort_order.clone(),
223        )))
224    }
225
226    /// Execute the plan and return a stream of `RecordBatch`es for
227    /// the specified partition.
228    fn execute(
229        &self,
230        partition: usize,
231        context: Arc<TaskContext>,
232    ) -> Result<SendableRecordBatchStream> {
233        assert_eq_or_internal_err!(
234            partition,
235            0,
236            "DataSinkExec can only be called on partition 0!"
237        );
238        let data = execute_input_stream(
239            Arc::clone(&self.input),
240            Arc::clone(self.sink.schema()),
241            0,
242            Arc::clone(&context),
243        )?;
244
245        let count_schema = Arc::clone(&self.count_schema);
246        let sink = Arc::clone(&self.sink);
247
248        let stream = futures::stream::once(async move {
249            sink.write_all(data, &context).await.map(make_count_batch)
250        })
251        .boxed();
252
253        Ok(Box::pin(RecordBatchStreamAdapter::new(
254            count_schema,
255            stream,
256        )))
257    }
258
259    /// Returns the metrics of the underlying [DataSink]
260    fn metrics(&self) -> Option<MetricsSet> {
261        self.sink.metrics()
262    }
263}
264
265/// Create a output record batch with a count
266///
267/// ```text
268/// +-------+,
269/// | count |,
270/// +-------+,
271/// | 6     |,
272/// +-------+,
273/// ```
274fn make_count_batch(count: u64) -> RecordBatch {
275    let array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef;
276
277    RecordBatch::try_from_iter_with_nullable(vec![("count", array, false)]).unwrap()
278}
279
280fn make_count_schema() -> SchemaRef {
281    // Define a schema.
282    Arc::new(Schema::new(vec![Field::new(
283        "count",
284        DataType::UInt64,
285        false,
286    )]))
287}