datafusion_datasource/
sink.rs1use std::any::Any;
21use std::fmt;
22use std::fmt::Debug;
23use std::sync::Arc;
24
25use arrow::array::{ArrayRef, RecordBatch, UInt64Array};
26use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
27use datafusion_common::{internal_err, Result};
28use datafusion_execution::TaskContext;
29use datafusion_physical_expr::{Distribution, EquivalenceProperties};
30use datafusion_physical_expr_common::sort_expr::{LexRequirement, OrderingRequirements};
31use datafusion_physical_plan::metrics::MetricsSet;
32use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
33use datafusion_physical_plan::{
34 execute_input_stream, DisplayAs, DisplayFormatType, ExecutionPlan,
35 ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
36};
37
38use async_trait::async_trait;
39use datafusion_physical_plan::execution_plan::{EvaluationType, SchedulingType};
40use futures::StreamExt;
41
42#[async_trait]
48pub trait DataSink: DisplayAs + Debug + Send + Sync {
49 fn as_any(&self) -> &dyn Any;
52
53 fn metrics(&self) -> Option<MetricsSet> {
58 None
59 }
60
61 fn schema(&self) -> &SchemaRef;
63
64 async fn write_all(
73 &self,
74 data: SendableRecordBatchStream,
75 context: &Arc<TaskContext>,
76 ) -> Result<u64>;
77}
78
79#[derive(Clone)]
83pub struct DataSinkExec {
84 input: Arc<dyn ExecutionPlan>,
86 sink: Arc<dyn DataSink>,
88 count_schema: SchemaRef,
90 sort_order: Option<LexRequirement>,
92 cache: PlanProperties,
93}
94
95impl Debug for DataSinkExec {
96 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
97 write!(f, "DataSinkExec schema: {:?}", self.count_schema)
98 }
99}
100
101impl DataSinkExec {
102 pub fn new(
104 input: Arc<dyn ExecutionPlan>,
105 sink: Arc<dyn DataSink>,
106 sort_order: Option<LexRequirement>,
107 ) -> Self {
108 let count_schema = make_count_schema();
109 let cache = Self::create_schema(&input, count_schema);
110 Self {
111 input,
112 sink,
113 count_schema: make_count_schema(),
114 sort_order,
115 cache,
116 }
117 }
118
119 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
121 &self.input
122 }
123
124 pub fn sink(&self) -> &dyn DataSink {
126 self.sink.as_ref()
127 }
128
129 pub fn sort_order(&self) -> &Option<LexRequirement> {
131 &self.sort_order
132 }
133
134 fn create_schema(
135 input: &Arc<dyn ExecutionPlan>,
136 schema: SchemaRef,
137 ) -> PlanProperties {
138 let eq_properties = EquivalenceProperties::new(schema);
139 PlanProperties::new(
140 eq_properties,
141 Partitioning::UnknownPartitioning(1),
142 input.pipeline_behavior(),
143 input.boundedness(),
144 )
145 .with_scheduling_type(SchedulingType::Cooperative)
146 .with_evaluation_type(EvaluationType::Eager)
147 }
148}
149
150impl DisplayAs for DataSinkExec {
151 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
152 match t {
153 DisplayFormatType::Default | DisplayFormatType::Verbose => {
154 write!(f, "DataSinkExec: sink=")?;
155 self.sink.fmt_as(t, f)
156 }
157 DisplayFormatType::TreeRender => self.sink().fmt_as(t, f),
158 }
159 }
160}
161
162impl ExecutionPlan for DataSinkExec {
163 fn name(&self) -> &'static str {
164 "DataSinkExec"
165 }
166
167 fn as_any(&self) -> &dyn Any {
169 self
170 }
171
172 fn properties(&self) -> &PlanProperties {
173 &self.cache
174 }
175
176 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
177 vec![false]
180 }
181
182 fn required_input_distribution(&self) -> Vec<Distribution> {
183 vec![Distribution::SinglePartition; self.children().len()]
186 }
187
188 fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
189 vec![self.sort_order.as_ref().cloned().map(Into::into)]
192 }
193
194 fn maintains_input_order(&self) -> Vec<bool> {
195 vec![true]
200 }
201
202 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
203 vec![&self.input]
204 }
205
206 fn with_new_children(
207 self: Arc<Self>,
208 children: Vec<Arc<dyn ExecutionPlan>>,
209 ) -> Result<Arc<dyn ExecutionPlan>> {
210 Ok(Arc::new(Self::new(
211 Arc::clone(&children[0]),
212 Arc::clone(&self.sink),
213 self.sort_order.clone(),
214 )))
215 }
216
217 fn execute(
220 &self,
221 partition: usize,
222 context: Arc<TaskContext>,
223 ) -> Result<SendableRecordBatchStream> {
224 if partition != 0 {
225 return internal_err!("DataSinkExec can only be called on partition 0!");
226 }
227 let data = execute_input_stream(
228 Arc::clone(&self.input),
229 Arc::clone(self.sink.schema()),
230 0,
231 Arc::clone(&context),
232 )?;
233
234 let count_schema = Arc::clone(&self.count_schema);
235 let sink = Arc::clone(&self.sink);
236
237 let stream = futures::stream::once(async move {
238 sink.write_all(data, &context).await.map(make_count_batch)
239 })
240 .boxed();
241
242 Ok(Box::pin(RecordBatchStreamAdapter::new(
243 count_schema,
244 stream,
245 )))
246 }
247
248 fn metrics(&self) -> Option<MetricsSet> {
250 self.sink.metrics()
251 }
252}
253
254fn make_count_batch(count: u64) -> RecordBatch {
264 let array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef;
265
266 RecordBatch::try_from_iter_with_nullable(vec![("count", array, false)]).unwrap()
267}
268
269fn make_count_schema() -> SchemaRef {
270 Arc::new(Schema::new(vec![Field::new(
272 "count",
273 DataType::UInt64,
274 false,
275 )]))
276}