datafusion_physical_plan/
streaming.rs1use std::any::Any;
21use std::fmt::Debug;
22use std::sync::Arc;
23
24use super::{DisplayAs, DisplayFormatType, PlanProperties};
25use crate::display::{display_orderings, ProjectSchemaDisplay};
26use crate::execution_plan::{Boundedness, EmissionType};
27use crate::limit::LimitStream;
28use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
29use crate::projection::{
30 all_alias_free_columns, new_projections_for_columns, update_expr, ProjectionExec,
31};
32use crate::stream::RecordBatchStreamAdapter;
33use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
34
35use arrow::datatypes::{Schema, SchemaRef};
36use datafusion_common::{internal_err, plan_err, Result};
37use datafusion_execution::TaskContext;
38use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr};
39
40use async_trait::async_trait;
41use futures::stream::StreamExt;
42use log::debug;
43
44pub trait PartitionStream: Debug + Send + Sync {
50 fn schema(&self) -> &SchemaRef;
52
53 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream;
55}
56
57#[derive(Clone)]
62pub struct StreamingTableExec {
63 partitions: Vec<Arc<dyn PartitionStream>>,
64 projection: Option<Arc<[usize]>>,
65 projected_schema: SchemaRef,
66 projected_output_ordering: Vec<LexOrdering>,
67 infinite: bool,
68 limit: Option<usize>,
69 cache: PlanProperties,
70 metrics: ExecutionPlanMetricsSet,
71}
72
73impl StreamingTableExec {
74 pub fn try_new(
76 schema: SchemaRef,
77 partitions: Vec<Arc<dyn PartitionStream>>,
78 projection: Option<&Vec<usize>>,
79 projected_output_ordering: impl IntoIterator<Item = LexOrdering>,
80 infinite: bool,
81 limit: Option<usize>,
82 ) -> Result<Self> {
83 for x in partitions.iter() {
84 let partition_schema = x.schema();
85 if !schema.eq(partition_schema) {
86 debug!(
87 "Target schema does not match with partition schema. \
88 Target_schema: {schema:?}. Partition Schema: {partition_schema:?}"
89 );
90 return plan_err!("Mismatch between schema and batches");
91 }
92 }
93
94 let projected_schema = match projection {
95 Some(p) => Arc::new(schema.project(p)?),
96 None => schema,
97 };
98 let projected_output_ordering =
99 projected_output_ordering.into_iter().collect::<Vec<_>>();
100 let cache = Self::compute_properties(
101 Arc::clone(&projected_schema),
102 &projected_output_ordering,
103 &partitions,
104 infinite,
105 );
106 Ok(Self {
107 partitions,
108 projected_schema,
109 projection: projection.cloned().map(Into::into),
110 projected_output_ordering,
111 infinite,
112 limit,
113 cache,
114 metrics: ExecutionPlanMetricsSet::new(),
115 })
116 }
117
118 pub fn partitions(&self) -> &Vec<Arc<dyn PartitionStream>> {
119 &self.partitions
120 }
121
122 pub fn partition_schema(&self) -> &SchemaRef {
123 self.partitions[0].schema()
124 }
125
126 pub fn projection(&self) -> &Option<Arc<[usize]>> {
127 &self.projection
128 }
129
130 pub fn projected_schema(&self) -> &Schema {
131 &self.projected_schema
132 }
133
134 pub fn projected_output_ordering(&self) -> impl IntoIterator<Item = LexOrdering> {
135 self.projected_output_ordering.clone()
136 }
137
138 pub fn is_infinite(&self) -> bool {
139 self.infinite
140 }
141
142 pub fn limit(&self) -> Option<usize> {
143 self.limit
144 }
145
146 fn compute_properties(
148 schema: SchemaRef,
149 orderings: &[LexOrdering],
150 partitions: &[Arc<dyn PartitionStream>],
151 infinite: bool,
152 ) -> PlanProperties {
153 let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings);
155
156 let output_partitioning = Partitioning::UnknownPartitioning(partitions.len());
158 let boundedness = if infinite {
159 Boundedness::Unbounded {
160 requires_infinite_memory: false,
161 }
162 } else {
163 Boundedness::Bounded
164 };
165 PlanProperties::new(
166 eq_properties,
167 output_partitioning,
168 EmissionType::Incremental,
169 boundedness,
170 )
171 }
172}
173
174impl Debug for StreamingTableExec {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176 f.debug_struct("LazyMemTableExec").finish_non_exhaustive()
177 }
178}
179
180impl DisplayAs for StreamingTableExec {
181 fn fmt_as(
182 &self,
183 t: DisplayFormatType,
184 f: &mut std::fmt::Formatter,
185 ) -> std::fmt::Result {
186 match t {
187 DisplayFormatType::Default | DisplayFormatType::Verbose => {
188 write!(
189 f,
190 "StreamingTableExec: partition_sizes={:?}",
191 self.partitions.len(),
192 )?;
193 if !self.projected_schema.fields().is_empty() {
194 write!(
195 f,
196 ", projection={}",
197 ProjectSchemaDisplay(&self.projected_schema)
198 )?;
199 }
200 if self.infinite {
201 write!(f, ", infinite_source=true")?;
202 }
203 if let Some(fetch) = self.limit {
204 write!(f, ", fetch={fetch}")?;
205 }
206
207 display_orderings(f, &self.projected_output_ordering)?;
208
209 Ok(())
210 }
211 DisplayFormatType::TreeRender => {
212 if self.infinite {
213 writeln!(f, "infinite={}", self.infinite)?;
214 }
215 if let Some(limit) = self.limit {
216 write!(f, "limit={limit}")?;
217 } else {
218 write!(f, "limit=None")?;
219 }
220
221 Ok(())
222 }
223 }
224 }
225}
226
227#[async_trait]
228impl ExecutionPlan for StreamingTableExec {
229 fn name(&self) -> &'static str {
230 "StreamingTableExec"
231 }
232
233 fn as_any(&self) -> &dyn Any {
234 self
235 }
236
237 fn properties(&self) -> &PlanProperties {
238 &self.cache
239 }
240
241 fn fetch(&self) -> Option<usize> {
242 self.limit
243 }
244
245 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
246 vec![]
247 }
248
249 fn with_new_children(
250 self: Arc<Self>,
251 children: Vec<Arc<dyn ExecutionPlan>>,
252 ) -> Result<Arc<dyn ExecutionPlan>> {
253 if children.is_empty() {
254 Ok(self)
255 } else {
256 internal_err!("Children cannot be replaced in {self:?}")
257 }
258 }
259
260 fn execute(
261 &self,
262 partition: usize,
263 ctx: Arc<TaskContext>,
264 ) -> Result<SendableRecordBatchStream> {
265 let stream = self.partitions[partition].execute(ctx);
266 let projected_stream = match self.projection.clone() {
267 Some(projection) => Box::pin(RecordBatchStreamAdapter::new(
268 Arc::clone(&self.projected_schema),
269 stream.map(move |x| {
270 x.and_then(|b| b.project(projection.as_ref()).map_err(Into::into))
271 }),
272 )),
273 None => stream,
274 };
275 Ok(match self.limit {
276 None => projected_stream,
277 Some(fetch) => {
278 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
279 Box::pin(LimitStream::new(
280 projected_stream,
281 0,
282 Some(fetch),
283 baseline_metrics,
284 ))
285 }
286 })
287 }
288
289 fn try_swapping_with_projection(
293 &self,
294 projection: &ProjectionExec,
295 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
296 if !all_alias_free_columns(projection.expr()) {
297 return Ok(None);
298 }
299
300 let streaming_table_projections =
301 self.projection().as_ref().map(|i| i.as_ref().to_vec());
302 let new_projections = new_projections_for_columns(
303 projection,
304 &streaming_table_projections
305 .unwrap_or_else(|| (0..self.schema().fields().len()).collect()),
306 );
307
308 let mut lex_orderings = vec![];
309 for lex_ordering in self.projected_output_ordering().into_iter() {
310 let mut orderings = LexOrdering::default();
311 for order in lex_ordering {
312 let Some(new_ordering) =
313 update_expr(&order.expr, projection.expr(), false)?
314 else {
315 return Ok(None);
316 };
317 orderings.push(PhysicalSortExpr {
318 expr: new_ordering,
319 options: order.options,
320 });
321 }
322 lex_orderings.push(orderings);
323 }
324
325 StreamingTableExec::try_new(
326 Arc::clone(self.partition_schema()),
327 self.partitions().clone(),
328 Some(new_projections.as_ref()),
329 lex_orderings,
330 self.is_infinite(),
331 self.limit(),
332 )
333 .map(|e| Some(Arc::new(e) as _))
334 }
335
336 fn metrics(&self) -> Option<MetricsSet> {
337 Some(self.metrics.clone_inner())
338 }
339
340 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
341 Some(Arc::new(StreamingTableExec {
342 partitions: self.partitions.clone(),
343 projection: self.projection.clone(),
344 projected_schema: Arc::clone(&self.projected_schema),
345 projected_output_ordering: self.projected_output_ordering.clone(),
346 infinite: self.infinite,
347 limit,
348 cache: self.cache.clone(),
349 metrics: self.metrics.clone(),
350 }))
351 }
352}
353
354#[cfg(test)]
355mod test {
356 use super::*;
357 use crate::collect_partitioned;
358 use crate::streaming::PartitionStream;
359 use crate::test::{make_partition, TestPartitionStream};
360 use arrow::record_batch::RecordBatch;
361
362 #[tokio::test]
363 async fn test_no_limit() {
364 let exec = TestBuilder::new()
365 .with_batches(vec![make_partition(100), make_partition(100)])
367 .build();
368
369 let counts = collect_num_rows(Arc::new(exec)).await;
370 assert_eq!(counts, vec![200]);
371 }
372
373 #[tokio::test]
374 async fn test_limit() {
375 let exec = TestBuilder::new()
376 .with_batches(vec![make_partition(100), make_partition(100)])
378 .with_limit(Some(75))
380 .build();
381
382 let counts = collect_num_rows(Arc::new(exec)).await;
383 assert_eq!(counts, vec![75]);
384 }
385
386 async fn collect_num_rows(exec: Arc<dyn ExecutionPlan>) -> Vec<usize> {
389 let ctx = Arc::new(TaskContext::default());
390 let partition_batches = collect_partitioned(exec, ctx).await.unwrap();
391 partition_batches
392 .into_iter()
393 .map(|batches| batches.iter().map(|b| b.num_rows()).sum::<usize>())
394 .collect()
395 }
396
397 #[derive(Default)]
398 struct TestBuilder {
399 schema: Option<SchemaRef>,
400 partitions: Vec<Arc<dyn PartitionStream>>,
401 projection: Option<Vec<usize>>,
402 projected_output_ordering: Vec<LexOrdering>,
403 infinite: bool,
404 limit: Option<usize>,
405 }
406
407 impl TestBuilder {
408 fn new() -> Self {
409 Self::default()
410 }
411
412 fn with_batches(mut self, batches: Vec<RecordBatch>) -> Self {
414 let stream = TestPartitionStream::new_with_batches(batches);
415 self.schema = Some(Arc::clone(stream.schema()));
416 self.partitions = vec![Arc::new(stream)];
417 self
418 }
419
420 fn with_limit(mut self, limit: Option<usize>) -> Self {
422 self.limit = limit;
423 self
424 }
425
426 fn build(self) -> StreamingTableExec {
427 StreamingTableExec::try_new(
428 self.schema.unwrap(),
429 self.partitions,
430 self.projection.as_ref(),
431 self.projected_output_ordering,
432 self.infinite,
433 self.limit,
434 )
435 .unwrap()
436 }
437 }
438}