datafusion_datasource/
sink.rs1use std::any::Any;
21use std::fmt;
22use std::fmt::Debug;
23use std::sync::Arc;
24
25use datafusion_physical_plan::metrics::MetricsSet;
26use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
27use datafusion_physical_plan::ExecutionPlanProperties;
28use datafusion_physical_plan::{
29 execute_input_stream, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
30 PlanProperties, SendableRecordBatchStream,
31};
32
33use arrow::array::{ArrayRef, RecordBatch, UInt64Array};
34use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
35use datafusion_common::{internal_err, Result};
36use datafusion_execution::TaskContext;
37use datafusion_physical_expr::{Distribution, EquivalenceProperties};
38use datafusion_physical_expr_common::sort_expr::LexRequirement;
39
40use async_trait::async_trait;
41use futures::StreamExt;
42
43#[async_trait]
49pub trait DataSink: DisplayAs + Debug + Send + Sync {
50 fn as_any(&self) -> &dyn Any;
53
54 fn metrics(&self) -> Option<MetricsSet> {
59 None
60 }
61
62 fn schema(&self) -> &SchemaRef;
64
65 async fn write_all(
74 &self,
75 data: SendableRecordBatchStream,
76 context: &Arc<TaskContext>,
77 ) -> Result<u64>;
78}
79
80#[derive(Clone)]
84pub struct DataSinkExec {
85 input: Arc<dyn ExecutionPlan>,
87 sink: Arc<dyn DataSink>,
89 count_schema: SchemaRef,
91 sort_order: Option<LexRequirement>,
93 cache: PlanProperties,
94}
95
96impl Debug for DataSinkExec {
97 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
98 write!(f, "DataSinkExec schema: {:?}", self.count_schema)
99 }
100}
101
102impl DataSinkExec {
103 pub fn new(
105 input: Arc<dyn ExecutionPlan>,
106 sink: Arc<dyn DataSink>,
107 sort_order: Option<LexRequirement>,
108 ) -> Self {
109 let count_schema = make_count_schema();
110 let cache = Self::create_schema(&input, count_schema);
111 Self {
112 input,
113 sink,
114 count_schema: make_count_schema(),
115 sort_order,
116 cache,
117 }
118 }
119
120 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
122 &self.input
123 }
124
125 pub fn sink(&self) -> &dyn DataSink {
127 self.sink.as_ref()
128 }
129
130 pub fn sort_order(&self) -> &Option<LexRequirement> {
132 &self.sort_order
133 }
134
135 fn create_schema(
136 input: &Arc<dyn ExecutionPlan>,
137 schema: SchemaRef,
138 ) -> PlanProperties {
139 let eq_properties = EquivalenceProperties::new(schema);
140 PlanProperties::new(
141 eq_properties,
142 Partitioning::UnknownPartitioning(1),
143 input.pipeline_behavior(),
144 input.boundedness(),
145 )
146 }
147}
148
149impl DisplayAs for DataSinkExec {
150 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
151 match t {
152 DisplayFormatType::Default | DisplayFormatType::Verbose => {
153 write!(f, "DataSinkExec: sink=")?;
154 self.sink.fmt_as(t, f)
155 }
156 DisplayFormatType::TreeRender => self.sink().fmt_as(t, f),
157 }
158 }
159}
160
161impl ExecutionPlan for DataSinkExec {
162 fn name(&self) -> &'static str {
163 "DataSinkExec"
164 }
165
166 fn as_any(&self) -> &dyn Any {
168 self
169 }
170
171 fn properties(&self) -> &PlanProperties {
172 &self.cache
173 }
174
175 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
176 vec![false]
179 }
180
181 fn required_input_distribution(&self) -> Vec<Distribution> {
182 vec![Distribution::SinglePartition; self.children().len()]
185 }
186
187 fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
188 vec![self.sort_order.as_ref().cloned()]
191 }
192
193 fn maintains_input_order(&self) -> Vec<bool> {
194 vec![true]
199 }
200
201 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
202 vec![&self.input]
203 }
204
205 fn with_new_children(
206 self: Arc<Self>,
207 children: Vec<Arc<dyn ExecutionPlan>>,
208 ) -> Result<Arc<dyn ExecutionPlan>> {
209 Ok(Arc::new(Self::new(
210 Arc::clone(&children[0]),
211 Arc::clone(&self.sink),
212 self.sort_order.clone(),
213 )))
214 }
215
216 fn execute(
219 &self,
220 partition: usize,
221 context: Arc<TaskContext>,
222 ) -> Result<SendableRecordBatchStream> {
223 if partition != 0 {
224 return internal_err!("DataSinkExec can only be called on partition 0!");
225 }
226 let data = execute_input_stream(
227 Arc::clone(&self.input),
228 Arc::clone(self.sink.schema()),
229 0,
230 Arc::clone(&context),
231 )?;
232
233 let count_schema = Arc::clone(&self.count_schema);
234 let sink = Arc::clone(&self.sink);
235
236 let stream = futures::stream::once(async move {
237 sink.write_all(data, &context).await.map(make_count_batch)
238 })
239 .boxed();
240
241 Ok(Box::pin(RecordBatchStreamAdapter::new(
242 count_schema,
243 stream,
244 )))
245 }
246
247 fn metrics(&self) -> Option<MetricsSet> {
249 self.sink.metrics()
250 }
251}
252
253fn make_count_batch(count: u64) -> RecordBatch {
263 let array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef;
264
265 RecordBatch::try_from_iter_with_nullable(vec![("count", array, false)]).unwrap()
266}
267
268fn make_count_schema() -> SchemaRef {
269 Arc::new(Schema::new(vec![Field::new(
271 "count",
272 DataType::UInt64,
273 false,
274 )]))
275}