datafusion_datasource/
sink.rs1use std::any::Any;
21use std::fmt;
22use std::fmt::Debug;
23use std::sync::Arc;
24
25use arrow::array::{ArrayRef, RecordBatch, UInt64Array};
26use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
27use datafusion_common::{internal_err, Result};
28use datafusion_execution::TaskContext;
29use datafusion_physical_expr::{Distribution, EquivalenceProperties};
30use datafusion_physical_expr_common::sort_expr::{LexRequirement, OrderingRequirements};
31use datafusion_physical_plan::metrics::MetricsSet;
32use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
33use datafusion_physical_plan::{
34 execute_input_stream, DisplayAs, DisplayFormatType, ExecutionPlan,
35 ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
36};
37
38use async_trait::async_trait;
39use datafusion_physical_plan::execution_plan::{EvaluationType, SchedulingType};
40use futures::StreamExt;
41
42#[async_trait]
48pub trait DataSink: DisplayAs + Debug + Send + Sync {
49 fn as_any(&self) -> &dyn Any;
52
53 fn metrics(&self) -> Option<MetricsSet> {
58 None
59 }
60
61 fn schema(&self) -> &SchemaRef;
63
64 async fn write_all(
73 &self,
74 data: SendableRecordBatchStream,
75 context: &Arc<TaskContext>,
76 ) -> Result<u64>;
77}
78
79#[derive(Clone)]
83pub struct DataSinkExec {
84 input: Arc<dyn ExecutionPlan>,
86 sink: Arc<dyn DataSink>,
88 count_schema: SchemaRef,
90 sort_order: Option<LexRequirement>,
92 cache: PlanProperties,
93}
94
95impl Debug for DataSinkExec {
96 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
97 write!(f, "DataSinkExec schema: {:?}", self.count_schema)
98 }
99}
100
101impl DataSinkExec {
102 pub fn new(
109 input: Arc<dyn ExecutionPlan>,
110 sink: Arc<dyn DataSink>,
111 sort_order: Option<LexRequirement>,
112 ) -> Self {
113 let count_schema = make_count_schema();
114 let cache = Self::create_schema(&input, count_schema);
115 Self {
116 input,
117 sink,
118 count_schema: make_count_schema(),
119 sort_order,
120 cache,
121 }
122 }
123
124 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
126 &self.input
127 }
128
129 pub fn sink(&self) -> &dyn DataSink {
131 self.sink.as_ref()
132 }
133
134 pub fn sort_order(&self) -> &Option<LexRequirement> {
136 &self.sort_order
137 }
138
139 fn create_schema(
140 input: &Arc<dyn ExecutionPlan>,
141 schema: SchemaRef,
142 ) -> PlanProperties {
143 let eq_properties = EquivalenceProperties::new(schema);
144 PlanProperties::new(
145 eq_properties,
146 Partitioning::UnknownPartitioning(1),
147 input.pipeline_behavior(),
148 input.boundedness(),
149 )
150 .with_scheduling_type(SchedulingType::Cooperative)
151 .with_evaluation_type(EvaluationType::Eager)
152 }
153}
154
155impl DisplayAs for DataSinkExec {
156 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
157 match t {
158 DisplayFormatType::Default | DisplayFormatType::Verbose => {
159 write!(f, "DataSinkExec: sink=")?;
160 self.sink.fmt_as(t, f)
161 }
162 DisplayFormatType::TreeRender => self.sink().fmt_as(t, f),
163 }
164 }
165}
166
167impl ExecutionPlan for DataSinkExec {
168 fn name(&self) -> &'static str {
169 "DataSinkExec"
170 }
171
172 fn as_any(&self) -> &dyn Any {
174 self
175 }
176
177 fn properties(&self) -> &PlanProperties {
178 &self.cache
179 }
180
181 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
182 vec![false]
185 }
186
187 fn required_input_distribution(&self) -> Vec<Distribution> {
188 vec![Distribution::SinglePartition; self.children().len()]
191 }
192
193 fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
194 vec![self.sort_order.as_ref().cloned().map(Into::into)]
197 }
198
199 fn maintains_input_order(&self) -> Vec<bool> {
200 vec![true]
205 }
206
207 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
208 vec![&self.input]
209 }
210
211 fn with_new_children(
212 self: Arc<Self>,
213 children: Vec<Arc<dyn ExecutionPlan>>,
214 ) -> Result<Arc<dyn ExecutionPlan>> {
215 Ok(Arc::new(Self::new(
216 Arc::clone(&children[0]),
217 Arc::clone(&self.sink),
218 self.sort_order.clone(),
219 )))
220 }
221
222 fn execute(
225 &self,
226 partition: usize,
227 context: Arc<TaskContext>,
228 ) -> Result<SendableRecordBatchStream> {
229 if partition != 0 {
230 return internal_err!("DataSinkExec can only be called on partition 0!");
231 }
232 let data = execute_input_stream(
233 Arc::clone(&self.input),
234 Arc::clone(self.sink.schema()),
235 0,
236 Arc::clone(&context),
237 )?;
238
239 let count_schema = Arc::clone(&self.count_schema);
240 let sink = Arc::clone(&self.sink);
241
242 let stream = futures::stream::once(async move {
243 sink.write_all(data, &context).await.map(make_count_batch)
244 })
245 .boxed();
246
247 Ok(Box::pin(RecordBatchStreamAdapter::new(
248 count_schema,
249 stream,
250 )))
251 }
252
253 fn metrics(&self) -> Option<MetricsSet> {
255 self.sink.metrics()
256 }
257}
258
259fn make_count_batch(count: u64) -> RecordBatch {
269 let array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef;
270
271 RecordBatch::try_from_iter_with_nullable(vec![("count", array, false)]).unwrap()
272}
273
274fn make_count_schema() -> SchemaRef {
275 Arc::new(Schema::new(vec![Field::new(
277 "count",
278 DataType::UInt64,
279 false,
280 )]))
281}