datafusion_physical_plan/
streaming.rs1use std::fmt::Debug;
21use std::sync::Arc;
22
23use super::{DisplayAs, DisplayFormatType, PlanProperties};
24use crate::coop::make_cooperative;
25use crate::display::{ProjectSchemaDisplay, display_orderings};
26use crate::execution_plan::{Boundedness, EmissionType, SchedulingType};
27use crate::limit::LimitStream;
28use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
29use crate::projection::{
30 ProjectionExec, all_alias_free_columns, new_projections_for_columns, update_ordering,
31};
32use crate::stream::RecordBatchStreamAdapter;
33use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
34
35use arrow::datatypes::{Schema, SchemaRef};
36use datafusion_common::{Result, internal_err, plan_err};
37use datafusion_execution::TaskContext;
38use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
39
40use async_trait::async_trait;
41use futures::stream::StreamExt;
42use log::debug;
43
44pub trait PartitionStream: Debug + Send + Sync {
50 fn schema(&self) -> &SchemaRef;
52
53 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream;
55}
56
57#[derive(Clone)]
62pub struct StreamingTableExec {
63 partitions: Vec<Arc<dyn PartitionStream>>,
64 projection: Option<Arc<[usize]>>,
65 projected_schema: SchemaRef,
66 projected_output_ordering: Vec<LexOrdering>,
67 infinite: bool,
68 limit: Option<usize>,
69 cache: Arc<PlanProperties>,
70 metrics: ExecutionPlanMetricsSet,
71}
72
73impl StreamingTableExec {
74 pub fn try_new(
76 schema: SchemaRef,
77 partitions: Vec<Arc<dyn PartitionStream>>,
78 projection: Option<&Vec<usize>>,
79 projected_output_ordering: impl IntoIterator<Item = LexOrdering>,
80 infinite: bool,
81 limit: Option<usize>,
82 ) -> Result<Self> {
83 for x in partitions.iter() {
84 let partition_schema = x.schema();
85 if !schema.eq(partition_schema) {
86 debug!(
87 "Target schema does not match with partition schema. \
88 Target_schema: {schema:?}. Partition Schema: {partition_schema:?}"
89 );
90 return plan_err!("Mismatch between schema and batches");
91 }
92 }
93
94 let projected_schema = match projection {
95 Some(p) => Arc::new(schema.project(p)?),
96 None => schema,
97 };
98 let projected_output_ordering =
99 projected_output_ordering.into_iter().collect::<Vec<_>>();
100 let cache = Self::compute_properties(
101 Arc::clone(&projected_schema),
102 projected_output_ordering.clone(),
103 &partitions,
104 infinite,
105 );
106 Ok(Self {
107 partitions,
108 projected_schema,
109 projection: projection.cloned().map(Into::into),
110 projected_output_ordering,
111 infinite,
112 limit,
113 cache: Arc::new(cache),
114 metrics: ExecutionPlanMetricsSet::new(),
115 })
116 }
117
118 pub fn partitions(&self) -> &Vec<Arc<dyn PartitionStream>> {
119 &self.partitions
120 }
121
122 pub fn partition_schema(&self) -> &SchemaRef {
123 self.partitions[0].schema()
124 }
125
126 pub fn projection(&self) -> &Option<Arc<[usize]>> {
127 &self.projection
128 }
129
130 pub fn projected_schema(&self) -> &Schema {
131 &self.projected_schema
132 }
133
134 pub fn projected_output_ordering(&self) -> impl IntoIterator<Item = LexOrdering> {
135 self.projected_output_ordering.clone()
136 }
137
138 pub fn is_infinite(&self) -> bool {
139 self.infinite
140 }
141
142 pub fn limit(&self) -> Option<usize> {
143 self.limit
144 }
145
146 fn compute_properties(
148 schema: SchemaRef,
149 orderings: Vec<LexOrdering>,
150 partitions: &[Arc<dyn PartitionStream>],
151 infinite: bool,
152 ) -> PlanProperties {
153 let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings);
155
156 let output_partitioning = Partitioning::UnknownPartitioning(partitions.len());
158 let boundedness = if infinite {
159 Boundedness::Unbounded {
160 requires_infinite_memory: false,
161 }
162 } else {
163 Boundedness::Bounded
164 };
165 PlanProperties::new(
166 eq_properties,
167 output_partitioning,
168 EmissionType::Incremental,
169 boundedness,
170 )
171 .with_scheduling_type(SchedulingType::Cooperative)
172 }
173}
174
175impl Debug for StreamingTableExec {
176 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177 f.debug_struct("LazyMemTableExec").finish_non_exhaustive()
178 }
179}
180
181impl DisplayAs for StreamingTableExec {
182 fn fmt_as(
183 &self,
184 t: DisplayFormatType,
185 f: &mut std::fmt::Formatter,
186 ) -> std::fmt::Result {
187 match t {
188 DisplayFormatType::Default | DisplayFormatType::Verbose => {
189 write!(
190 f,
191 "StreamingTableExec: partition_sizes={:?}",
192 self.partitions.len(),
193 )?;
194 if !self.projected_schema.fields().is_empty() {
195 write!(
196 f,
197 ", projection={}",
198 ProjectSchemaDisplay(&self.projected_schema)
199 )?;
200 }
201 if self.infinite {
202 write!(f, ", infinite_source=true")?;
203 }
204 if let Some(fetch) = self.limit {
205 write!(f, ", fetch={fetch}")?;
206 }
207
208 display_orderings(f, &self.projected_output_ordering)?;
209
210 Ok(())
211 }
212 DisplayFormatType::TreeRender => {
213 if self.infinite {
214 writeln!(f, "infinite={}", self.infinite)?;
215 }
216 if let Some(limit) = self.limit {
217 write!(f, "limit={limit}")?;
218 } else {
219 write!(f, "limit=None")?;
220 }
221
222 Ok(())
223 }
224 }
225 }
226}
227
228#[async_trait]
229impl ExecutionPlan for StreamingTableExec {
230 fn name(&self) -> &'static str {
231 "StreamingTableExec"
232 }
233
234 fn properties(&self) -> &Arc<PlanProperties> {
235 &self.cache
236 }
237
238 fn fetch(&self) -> Option<usize> {
239 self.limit
240 }
241
242 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
243 vec![]
244 }
245
246 fn with_new_children(
247 self: Arc<Self>,
248 children: Vec<Arc<dyn ExecutionPlan>>,
249 ) -> Result<Arc<dyn ExecutionPlan>> {
250 if children.is_empty() {
251 Ok(self)
252 } else {
253 internal_err!("Children cannot be replaced in {self:?}")
254 }
255 }
256
257 fn execute(
258 &self,
259 partition: usize,
260 ctx: Arc<TaskContext>,
261 ) -> Result<SendableRecordBatchStream> {
262 let stream = self.partitions[partition].execute(Arc::clone(&ctx));
263 let projected_stream = match self.projection.clone() {
264 Some(projection) => Box::pin(RecordBatchStreamAdapter::new(
265 Arc::clone(&self.projected_schema),
266 stream.map(move |x| {
267 x.and_then(|b| b.project(projection.as_ref()).map_err(Into::into))
268 }),
269 )),
270 None => stream,
271 };
272 let stream = make_cooperative(projected_stream);
273
274 Ok(match self.limit {
275 None => stream,
276 Some(fetch) => {
277 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
278 Box::pin(LimitStream::new(stream, 0, Some(fetch), baseline_metrics))
279 }
280 })
281 }
282
283 fn try_swapping_with_projection(
287 &self,
288 projection: &ProjectionExec,
289 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
290 if !all_alias_free_columns(projection.expr()) {
291 return Ok(None);
292 }
293
294 let streaming_table_projections =
295 self.projection().as_ref().map(|i| i.as_ref().to_vec());
296 let new_projections = new_projections_for_columns(
297 projection.expr(),
298 &streaming_table_projections
299 .unwrap_or_else(|| (0..self.schema().fields().len()).collect()),
300 );
301
302 let mut lex_orderings = vec![];
303 for ordering in self.projected_output_ordering().into_iter() {
304 let Some(ordering) = update_ordering(ordering, projection.expr())? else {
305 return Ok(None);
306 };
307 lex_orderings.push(ordering);
308 }
309
310 StreamingTableExec::try_new(
311 Arc::clone(self.partition_schema()),
312 self.partitions().clone(),
313 Some(new_projections.as_ref()),
314 lex_orderings,
315 self.is_infinite(),
316 self.limit(),
317 )
318 .map(|e| Some(Arc::new(e) as _))
319 }
320
321 fn metrics(&self) -> Option<MetricsSet> {
322 Some(self.metrics.clone_inner())
323 }
324
325 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
326 Some(Arc::new(StreamingTableExec {
327 partitions: self.partitions.clone(),
328 projection: self.projection.clone(),
329 projected_schema: Arc::clone(&self.projected_schema),
330 projected_output_ordering: self.projected_output_ordering.clone(),
331 infinite: self.infinite,
332 limit,
333 cache: Arc::clone(&self.cache),
334 metrics: self.metrics.clone(),
335 }))
336 }
337}
338
339#[cfg(test)]
340mod test {
341 use super::*;
342 use crate::collect_partitioned;
343 use crate::streaming::PartitionStream;
344 use crate::test::{TestPartitionStream, make_partition};
345 use arrow::record_batch::RecordBatch;
346
347 #[tokio::test]
348 async fn test_no_limit() {
349 let exec = TestBuilder::new()
350 .with_batches(vec![make_partition(100), make_partition(100)])
352 .build();
353
354 let counts = collect_num_rows(Arc::new(exec)).await;
355 assert_eq!(counts, vec![200]);
356 }
357
358 #[tokio::test]
359 async fn test_limit() {
360 let exec = TestBuilder::new()
361 .with_batches(vec![make_partition(100), make_partition(100)])
363 .with_limit(Some(75))
365 .build();
366
367 let counts = collect_num_rows(Arc::new(exec)).await;
368 assert_eq!(counts, vec![75]);
369 }
370
371 async fn collect_num_rows(exec: Arc<dyn ExecutionPlan>) -> Vec<usize> {
374 let ctx = Arc::new(TaskContext::default());
375 let partition_batches = collect_partitioned(exec, ctx).await.unwrap();
376 partition_batches
377 .into_iter()
378 .map(|batches| batches.iter().map(|b| b.num_rows()).sum::<usize>())
379 .collect()
380 }
381
382 #[derive(Default)]
383 struct TestBuilder {
384 schema: Option<SchemaRef>,
385 partitions: Vec<Arc<dyn PartitionStream>>,
386 projection: Option<Vec<usize>>,
387 projected_output_ordering: Vec<LexOrdering>,
388 infinite: bool,
389 limit: Option<usize>,
390 }
391
392 impl TestBuilder {
393 fn new() -> Self {
394 Self::default()
395 }
396
397 fn with_batches(mut self, batches: Vec<RecordBatch>) -> Self {
399 let stream = TestPartitionStream::new_with_batches(batches);
400 self.schema = Some(Arc::clone(stream.schema()));
401 self.partitions = vec![Arc::new(stream)];
402 self
403 }
404
405 fn with_limit(mut self, limit: Option<usize>) -> Self {
407 self.limit = limit;
408 self
409 }
410
411 fn build(self) -> StreamingTableExec {
412 StreamingTableExec::try_new(
413 self.schema.unwrap(),
414 self.partitions,
415 self.projection.as_ref(),
416 self.projected_output_ordering,
417 self.infinite,
418 self.limit,
419 )
420 .unwrap()
421 }
422 }
423}