datafusion_datasource/
sink.rs1use std::any::Any;
21use std::fmt;
22use std::fmt::Debug;
23use std::sync::Arc;
24
25use arrow::array::{ArrayRef, RecordBatch, UInt64Array};
26use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
27use datafusion_common::{Result, assert_eq_or_internal_err};
28use datafusion_execution::TaskContext;
29use datafusion_physical_expr::{Distribution, EquivalenceProperties};
30use datafusion_physical_expr_common::sort_expr::{LexRequirement, OrderingRequirements};
31use datafusion_physical_plan::metrics::MetricsSet;
32use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
33use datafusion_physical_plan::{
34 DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
35 PlanProperties, SendableRecordBatchStream, execute_input_stream,
36};
37
38use async_trait::async_trait;
39use datafusion_physical_plan::execution_plan::{EvaluationType, SchedulingType};
40use futures::StreamExt;
41
42#[async_trait]
48pub trait DataSink: Any + DisplayAs + Debug + Send + Sync {
49 fn metrics(&self) -> Option<MetricsSet> {
54 None
55 }
56
57 fn schema(&self) -> &SchemaRef;
59
60 async fn write_all(
69 &self,
70 data: SendableRecordBatchStream,
71 context: &Arc<TaskContext>,
72 ) -> Result<u64>;
73}
74
75impl dyn DataSink {
76 pub fn is<T: DataSink>(&self) -> bool {
78 (self as &dyn Any).is::<T>()
79 }
80
81 pub fn downcast_ref<T: DataSink>(&self) -> Option<&T> {
83 (self as &dyn Any).downcast_ref()
84 }
85}
86
87#[derive(Clone)]
91pub struct DataSinkExec {
92 input: Arc<dyn ExecutionPlan>,
94 sink: Arc<dyn DataSink>,
96 count_schema: SchemaRef,
98 sort_order: Option<LexRequirement>,
100 cache: Arc<PlanProperties>,
101}
102
103impl Debug for DataSinkExec {
104 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
105 write!(f, "DataSinkExec schema: {}", self.count_schema)
106 }
107}
108
109impl DataSinkExec {
110 pub fn new(
117 input: Arc<dyn ExecutionPlan>,
118 sink: Arc<dyn DataSink>,
119 sort_order: Option<LexRequirement>,
120 ) -> Self {
121 let count_schema = make_count_schema();
122 let cache = Self::create_schema(&input, count_schema);
123 Self {
124 input,
125 sink,
126 count_schema: make_count_schema(),
127 sort_order,
128 cache: Arc::new(cache),
129 }
130 }
131
132 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
134 &self.input
135 }
136
137 pub fn sink(&self) -> &dyn DataSink {
139 self.sink.as_ref()
140 }
141
142 pub fn sort_order(&self) -> &Option<LexRequirement> {
144 &self.sort_order
145 }
146
147 fn create_schema(
148 input: &Arc<dyn ExecutionPlan>,
149 schema: SchemaRef,
150 ) -> PlanProperties {
151 let eq_properties = EquivalenceProperties::new(schema);
152 PlanProperties::new(
153 eq_properties,
154 Partitioning::UnknownPartitioning(1),
155 input.pipeline_behavior(),
156 input.boundedness(),
157 )
158 .with_scheduling_type(SchedulingType::Cooperative)
159 .with_evaluation_type(EvaluationType::Eager)
160 }
161}
162
163impl DisplayAs for DataSinkExec {
164 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
165 match t {
166 DisplayFormatType::Default | DisplayFormatType::Verbose => {
167 write!(f, "DataSinkExec: sink=")?;
168 self.sink.fmt_as(t, f)
169 }
170 DisplayFormatType::TreeRender => self.sink().fmt_as(t, f),
171 }
172 }
173}
174
175impl ExecutionPlan for DataSinkExec {
176 fn name(&self) -> &'static str {
177 "DataSinkExec"
178 }
179
180 fn properties(&self) -> &Arc<PlanProperties> {
182 &self.cache
183 }
184
185 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
186 vec![false]
189 }
190
191 fn required_input_distribution(&self) -> Vec<Distribution> {
192 vec![Distribution::SinglePartition; self.children().len()]
195 }
196
197 fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
198 vec![self.sort_order.as_ref().cloned().map(Into::into)]
201 }
202
203 fn maintains_input_order(&self) -> Vec<bool> {
204 vec![true]
209 }
210
211 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
212 vec![&self.input]
213 }
214
215 fn with_new_children(
216 self: Arc<Self>,
217 children: Vec<Arc<dyn ExecutionPlan>>,
218 ) -> Result<Arc<dyn ExecutionPlan>> {
219 Ok(Arc::new(Self::new(
220 Arc::clone(&children[0]),
221 Arc::clone(&self.sink),
222 self.sort_order.clone(),
223 )))
224 }
225
226 fn execute(
229 &self,
230 partition: usize,
231 context: Arc<TaskContext>,
232 ) -> Result<SendableRecordBatchStream> {
233 assert_eq_or_internal_err!(
234 partition,
235 0,
236 "DataSinkExec can only be called on partition 0!"
237 );
238 let data = execute_input_stream(
239 Arc::clone(&self.input),
240 Arc::clone(self.sink.schema()),
241 0,
242 Arc::clone(&context),
243 )?;
244
245 let count_schema = Arc::clone(&self.count_schema);
246 let sink = Arc::clone(&self.sink);
247
248 let stream = futures::stream::once(async move {
249 sink.write_all(data, &context).await.map(make_count_batch)
250 })
251 .boxed();
252
253 Ok(Box::pin(RecordBatchStreamAdapter::new(
254 count_schema,
255 stream,
256 )))
257 }
258
259 fn metrics(&self) -> Option<MetricsSet> {
261 self.sink.metrics()
262 }
263}
264
265fn make_count_batch(count: u64) -> RecordBatch {
275 let array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef;
276
277 RecordBatch::try_from_iter_with_nullable(vec![("count", array, false)]).unwrap()
278}
279
280fn make_count_schema() -> SchemaRef {
281 Arc::new(Schema::new(vec![Field::new(
283 "count",
284 DataType::UInt64,
285 false,
286 )]))
287}