Skip to main content

laminar_sql/datafusion/
exec.rs

1//! Streaming scan execution plan for `DataFusion`
2//!
3//! This module provides `StreamingScanExec`, a `DataFusion` execution plan
4//! that reads from a `StreamSource`. It serves as the leaf node in query
5//! plans for streaming data.
6
7use std::any::Any;
8use std::fmt::{Debug, Formatter};
9use std::sync::Arc;
10
11use arrow_schema::{SchemaRef, SortOptions};
12use datafusion::execution::{SendableRecordBatchStream, TaskContext};
13use datafusion::physical_expr::{
14    expressions::Column, EquivalenceProperties, LexOrdering, Partitioning, PhysicalSortExpr,
15};
16use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType, SchedulingType};
17use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
18use datafusion_common::DataFusionError;
19use datafusion_expr::Expr;
20
21use super::source::{SortColumn, StreamSourceRef};
22use super::watermark_filter::{WatermarkDynamicFilter, WatermarkFilterStream};
23
24/// A `DataFusion` execution plan that scans from a streaming source.
25///
26/// This is a leaf node in the query plan tree that pulls data from
27/// a `StreamSource` implementation. It handles projection and filter
28/// pushdown to the source when supported.
29///
30/// # Properties
31///
32/// - Single partition (streaming sources are typically not partitioned)
33/// - Unbounded execution mode (streaming)
34/// - No inherent ordering (unless specified by source)
35pub struct StreamingScanExec {
36    /// The streaming source to read from
37    source: StreamSourceRef,
38    /// Output schema (after projection)
39    schema: SchemaRef,
40    /// Column projection (None = all columns)
41    projection: Option<Vec<usize>>,
42    /// Filters pushed down to source
43    filters: Vec<Expr>,
44    /// Cached plan properties
45    properties: PlanProperties,
46    /// Optional watermark filter applied at scan level
47    watermark_filter: Option<Arc<WatermarkDynamicFilter>>,
48}
49
50impl StreamingScanExec {
51    /// Creates a new streaming scan execution plan.
52    ///
53    /// If the source declares an `output_ordering`, the plan's
54    /// `EquivalenceProperties` will include it so `DataFusion` can elide
55    /// `SortExec` for matching ORDER BY queries.
56    pub fn new(
57        source: StreamSourceRef,
58        projection: Option<Vec<usize>>,
59        filters: Vec<Expr>,
60    ) -> Self {
61        let source_schema = source.schema();
62        let source_ordering = source.output_ordering();
63
64        let schema = match &projection {
65            Some(indices) => {
66                let fields: Vec<_> = indices
67                    .iter()
68                    .map(|&i| source_schema.field(i).clone())
69                    .collect();
70                Arc::new(arrow_schema::Schema::new(fields))
71            }
72            None => source_schema,
73        };
74
75        let eq_properties = Self::build_equivalence_properties(&schema, source_ordering.as_deref());
76
77        // SchedulingType::NonCooperative causes DataFusion's EnsureCooperative
78        // optimizer rule to auto-wrap this leaf with CooperativeExec, which
79        // yields to the Tokio executor periodically.
80        let properties = PlanProperties::new(
81            eq_properties,
82            Partitioning::UnknownPartitioning(1),
83            EmissionType::Incremental,
84            Boundedness::Unbounded {
85                requires_infinite_memory: false,
86            },
87        )
88        .with_scheduling_type(SchedulingType::NonCooperative);
89
90        Self {
91            source,
92            schema,
93            projection,
94            filters,
95            properties,
96            watermark_filter: None,
97        }
98    }
99
100    /// Attaches a watermark filter that drops late rows at scan time.
101    ///
102    /// When set, `execute()` wraps the inner stream with a
103    /// `WatermarkFilterStream` that applies `ts >= watermark` before
104    /// any downstream processing.
105    #[must_use]
106    pub fn with_watermark_filter(mut self, filter: Arc<WatermarkDynamicFilter>) -> Self {
107        self.watermark_filter = Some(filter);
108        self
109    }
110
111    /// Returns the watermark filter, if set.
112    #[must_use]
113    pub fn watermark_filter(&self) -> Option<&Arc<WatermarkDynamicFilter>> {
114        self.watermark_filter.as_ref()
115    }
116
117    /// Builds `EquivalenceProperties` with optional source ordering.
118    ///
119    /// Converts `SortColumn` declarations into `DataFusion` `PhysicalSortExpr`
120    /// entries. Only columns present in the output schema are included.
121    fn build_equivalence_properties(
122        schema: &SchemaRef,
123        ordering: Option<&[SortColumn]>,
124    ) -> EquivalenceProperties {
125        let mut eq = EquivalenceProperties::new(Arc::clone(schema));
126
127        if let Some(sort_columns) = ordering {
128            let sort_exprs: Vec<PhysicalSortExpr> = sort_columns
129                .iter()
130                .filter_map(|sc| {
131                    // Find column index in the output schema
132                    schema.index_of(&sc.name).ok().map(|idx| {
133                        PhysicalSortExpr::new(
134                            Arc::new(Column::new(&sc.name, idx)),
135                            SortOptions {
136                                descending: sc.descending,
137                                nulls_first: sc.nulls_first,
138                            },
139                        )
140                    })
141                })
142                .collect();
143
144            if !sort_exprs.is_empty() {
145                eq.add_ordering(sort_exprs);
146            }
147        }
148
149        eq
150    }
151
152    /// Returns the streaming source.
153    #[must_use]
154    pub fn source(&self) -> &StreamSourceRef {
155        &self.source
156    }
157
158    /// Returns the column projection.
159    #[must_use]
160    pub fn projection(&self) -> Option<&[usize]> {
161        self.projection.as_deref()
162    }
163
164    /// Returns the pushed-down filters.
165    #[must_use]
166    pub fn filters(&self) -> &[Expr] {
167        &self.filters
168    }
169}
170
171impl Debug for StreamingScanExec {
172    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
173        f.debug_struct("StreamingScanExec")
174            .field("source", &self.source)
175            .field("schema", &self.schema)
176            .field("projection", &self.projection)
177            .field("filters", &self.filters)
178            .field("watermark_filter", &self.watermark_filter)
179            .finish_non_exhaustive()
180    }
181}
182
183impl DisplayAs for StreamingScanExec {
184    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> std::fmt::Result {
185        match t {
186            DisplayFormatType::Default | DisplayFormatType::Verbose => {
187                write!(f, "StreamingScanExec: ")?;
188                if let Some(proj) = &self.projection {
189                    write!(f, "projection=[{proj:?}]")?;
190                } else {
191                    write!(f, "projection=[*]")?;
192                }
193                if !self.filters.is_empty() {
194                    write!(f, ", filters={:?}", self.filters)?;
195                }
196                Ok(())
197            }
198            DisplayFormatType::TreeRender => {
199                write!(f, "StreamingScanExec")
200            }
201        }
202    }
203}
204
205impl ExecutionPlan for StreamingScanExec {
206    fn name(&self) -> &'static str {
207        "StreamingScanExec"
208    }
209
210    fn as_any(&self) -> &dyn Any {
211        self
212    }
213
214    fn schema(&self) -> SchemaRef {
215        Arc::clone(&self.schema)
216    }
217
218    fn properties(&self) -> &PlanProperties {
219        &self.properties
220    }
221
222    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
223        // Leaf node - no children
224        vec![]
225    }
226
227    fn with_new_children(
228        self: Arc<Self>,
229        children: Vec<Arc<dyn ExecutionPlan>>,
230    ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
231        if children.is_empty() {
232            // No changes needed for leaf node
233            Ok(self)
234        } else {
235            Err(DataFusionError::Plan(
236                "StreamingScanExec cannot have children".to_string(),
237            ))
238        }
239    }
240
241    fn execute(
242        &self,
243        partition: usize,
244        _context: Arc<TaskContext>,
245    ) -> Result<SendableRecordBatchStream, DataFusionError> {
246        if partition != 0 {
247            return Err(DataFusionError::Plan(format!(
248                "StreamingScanExec only supports partition 0, got {partition}"
249            )));
250        }
251
252        let stream = self
253            .source
254            .stream(self.projection.clone(), self.filters.clone())?;
255
256        match &self.watermark_filter {
257            Some(filter) => {
258                let schema = stream.schema();
259                Ok(Box::pin(WatermarkFilterStream::new(
260                    stream,
261                    Arc::clone(filter),
262                    schema,
263                )))
264            }
265            None => Ok(stream),
266        }
267    }
268}
269
270// Required for `DataFusion` to use this execution plan
271impl datafusion::physical_plan::ExecutionPlanProperties for StreamingScanExec {
272    fn output_partitioning(&self) -> &Partitioning {
273        self.properties.output_partitioning()
274    }
275
276    fn output_ordering(&self) -> Option<&LexOrdering> {
277        self.properties.output_ordering()
278    }
279
280    fn boundedness(&self) -> Boundedness {
281        Boundedness::Unbounded {
282            requires_infinite_memory: false,
283        }
284    }
285
286    fn pipeline_behavior(&self) -> EmissionType {
287        EmissionType::Incremental
288    }
289
290    fn equivalence_properties(&self) -> &EquivalenceProperties {
291        self.properties.equivalence_properties()
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298    use crate::datafusion::source::StreamSource;
299    use arrow_schema::{DataType, Field, Schema};
300    use async_trait::async_trait;
301
302    #[derive(Debug)]
303    struct MockSource {
304        schema: SchemaRef,
305        ordering: Option<Vec<SortColumn>>,
306    }
307
308    impl MockSource {
309        fn new(schema: SchemaRef) -> Self {
310            Self {
311                schema,
312                ordering: None,
313            }
314        }
315
316        fn with_ordering(mut self, ordering: Vec<SortColumn>) -> Self {
317            self.ordering = Some(ordering);
318            self
319        }
320    }
321
322    #[async_trait]
323    impl StreamSource for MockSource {
324        fn schema(&self) -> SchemaRef {
325            Arc::clone(&self.schema)
326        }
327
328        fn stream(
329            &self,
330            _projection: Option<Vec<usize>>,
331            _filters: Vec<Expr>,
332        ) -> Result<SendableRecordBatchStream, DataFusionError> {
333            Err(DataFusionError::NotImplemented("mock".to_string()))
334        }
335
336        fn output_ordering(&self) -> Option<Vec<SortColumn>> {
337            self.ordering.clone()
338        }
339    }
340
341    fn test_schema() -> SchemaRef {
342        Arc::new(Schema::new(vec![
343            Field::new("id", DataType::Int64, false),
344            Field::new("name", DataType::Utf8, true),
345            Field::new("value", DataType::Float64, true),
346        ]))
347    }
348
349    #[test]
350    fn test_scan_exec_schema() {
351        let schema = test_schema();
352        let source: StreamSourceRef = Arc::new(MockSource::new(Arc::clone(&schema)));
353        let exec = StreamingScanExec::new(source, None, vec![]);
354
355        assert_eq!(exec.schema(), schema);
356    }
357
358    #[test]
359    fn test_scan_exec_projection() {
360        let schema = test_schema();
361        let source: StreamSourceRef = Arc::new(MockSource::new(Arc::clone(&schema)));
362        let exec = StreamingScanExec::new(source, Some(vec![0, 2]), vec![]);
363
364        let output_schema = exec.schema();
365        assert_eq!(output_schema.fields().len(), 2);
366        assert_eq!(output_schema.field(0).name(), "id");
367        assert_eq!(output_schema.field(1).name(), "value");
368    }
369
370    #[test]
371    fn test_scan_exec_properties() {
372        use datafusion::physical_plan::ExecutionPlanProperties;
373
374        let schema = test_schema();
375        let source: StreamSourceRef = Arc::new(MockSource::new(schema));
376        let exec = StreamingScanExec::new(source, None, vec![]);
377
378        // Should be unbounded (streaming)
379        assert!(matches!(exec.boundedness(), Boundedness::Unbounded { .. }));
380
381        // Should be single partition
382        let partitioning = exec.properties().output_partitioning();
383        assert!(matches!(partitioning, Partitioning::UnknownPartitioning(1)));
384
385        // Leaf node has no children
386        assert!(exec.children().is_empty());
387    }
388
389    #[test]
390    fn test_scan_exec_display() {
391        let schema = test_schema();
392        let source: StreamSourceRef = Arc::new(MockSource::new(schema));
393        let exec = StreamingScanExec::new(source, Some(vec![0, 1]), vec![]);
394
395        // Verify it implements DisplayAs by checking the name
396        assert_eq!(exec.name(), "StreamingScanExec");
397        // Debug format should contain the struct info
398        let debug = format!("{exec:?}");
399        assert!(debug.contains("StreamingScanExec"));
400    }
401
402    #[test]
403    fn test_scan_exec_name() {
404        let schema = test_schema();
405        let source: StreamSourceRef = Arc::new(MockSource::new(schema));
406        let exec = StreamingScanExec::new(source, None, vec![]);
407
408        assert_eq!(exec.name(), "StreamingScanExec");
409    }
410
411    // --- Tier 1 ordering tests ---
412
413    #[test]
414    fn test_scan_exec_no_ordering() {
415        use datafusion::physical_plan::ExecutionPlanProperties;
416
417        let schema = test_schema();
418        let source: StreamSourceRef = Arc::new(MockSource::new(schema));
419        let exec = StreamingScanExec::new(source, None, vec![]);
420
421        // No ordering declared -> output_ordering returns None
422        assert!(exec.output_ordering().is_none());
423    }
424
425    #[test]
426    fn test_scan_exec_with_ordering() {
427        use datafusion::physical_plan::ExecutionPlanProperties;
428
429        let schema = test_schema();
430        let source: StreamSourceRef = Arc::new(
431            MockSource::new(Arc::clone(&schema)).with_ordering(vec![SortColumn::ascending("id")]),
432        );
433        let exec = StreamingScanExec::new(source, None, vec![]);
434
435        // Source ordering declared -> output_ordering returns Some
436        let ordering = exec.output_ordering();
437        assert!(ordering.is_some());
438        let lex = ordering.unwrap();
439        assert_eq!(lex.len(), 1);
440    }
441
442    #[test]
443    fn test_scan_exec_output_ordering_returns_some() {
444        use datafusion::physical_plan::ExecutionPlanProperties;
445
446        let schema = test_schema();
447        let source: StreamSourceRef =
448            Arc::new(MockSource::new(Arc::clone(&schema)).with_ordering(vec![
449                SortColumn::ascending("id"),
450                SortColumn::descending("value"),
451            ]));
452        let exec = StreamingScanExec::new(source, None, vec![]);
453
454        let ordering = exec.output_ordering().unwrap();
455        assert_eq!(ordering.len(), 2);
456    }
457
458    #[test]
459    fn test_scan_exec_ordering_with_projection() {
460        use datafusion::physical_plan::ExecutionPlanProperties;
461
462        let schema = test_schema();
463        // Source ordered by "id" ascending
464        let source: StreamSourceRef = Arc::new(
465            MockSource::new(Arc::clone(&schema)).with_ordering(vec![SortColumn::ascending("id")]),
466        );
467        // Project only "id" and "value" (indices 0, 2)
468        let exec = StreamingScanExec::new(source, Some(vec![0, 2]), vec![]);
469
470        // "id" is in the projection -> ordering should still be present
471        let ordering = exec.output_ordering();
472        assert!(ordering.is_some());
473    }
474
475    #[test]
476    fn test_scan_exec_ordering_column_not_in_projection() {
477        use datafusion::physical_plan::ExecutionPlanProperties;
478
479        let schema = test_schema();
480        // Source ordered by "name" ascending
481        let source: StreamSourceRef = Arc::new(
482            MockSource::new(Arc::clone(&schema)).with_ordering(vec![SortColumn::ascending("name")]),
483        );
484        // Project only "id" and "value" (indices 0, 2) -- "name" is NOT projected
485        let exec = StreamingScanExec::new(source, Some(vec![0, 2]), vec![]);
486
487        // "name" is not in the projection -> ordering should be None
488        assert!(exec.output_ordering().is_none());
489    }
490
491    // Cooperative scheduling tests
492
493    #[test]
494    fn test_streaming_scan_exec_scheduling_type() {
495        let schema = test_schema();
496        let source: StreamSourceRef = Arc::new(MockSource::new(schema));
497        let exec = StreamingScanExec::new(source, None, vec![]);
498
499        // StreamingScanExec declares NonCooperative so that DataFusion's
500        // EnsureCooperative optimizer auto-wraps it with CooperativeExec.
501        assert_eq!(
502            exec.properties().scheduling_type,
503            SchedulingType::NonCooperative,
504        );
505    }
506
507    #[tokio::test]
508    async fn test_cooperative_exec_wraps_streaming_scan() {
509        use crate::datafusion::{
510            create_streaming_context, ChannelStreamSource, StreamingTableProvider,
511        };
512        use arrow_schema::{DataType, Field, Schema};
513
514        let ctx = create_streaming_context();
515        let schema = Arc::new(Schema::new(vec![
516            Field::new("id", DataType::Int64, false),
517            Field::new("value", DataType::Float64, true),
518        ]));
519
520        let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
521        let _sender = source.take_sender();
522        let provider = StreamingTableProvider::new("events", source);
523        ctx.register_table("events", Arc::new(provider)).unwrap();
524
525        // Create a physical plan and verify CooperativeExec wrapping
526        let df = ctx.sql("SELECT id FROM events").await.unwrap();
527        let plan = df.create_physical_plan().await.unwrap();
528        let plan_str = format!(
529            "{}",
530            datafusion::physical_plan::displayable(plan.as_ref()).indent(true)
531        );
532        assert!(
533            plan_str.contains("CooperativeExec"),
534            "Expected CooperativeExec wrapper around StreamingScanExec, got:\n{plan_str}"
535        );
536    }
537
538    // Watermark filter tests
539
540    #[test]
541    fn test_streaming_scan_with_watermark_filter() {
542        use super::WatermarkDynamicFilter;
543        use std::sync::atomic::{AtomicI64, AtomicU64};
544
545        let schema = test_schema();
546        let source: StreamSourceRef = Arc::new(MockSource::new(schema));
547        let filter = Arc::new(WatermarkDynamicFilter::new(
548            Arc::new(AtomicI64::new(100)),
549            Arc::new(AtomicU64::new(0)),
550            "id".to_string(),
551        ));
552
553        let exec =
554            StreamingScanExec::new(source, None, vec![]).with_watermark_filter(Arc::clone(&filter));
555
556        assert!(exec.watermark_filter().is_some());
557        assert_eq!(exec.watermark_filter().unwrap().watermark_ms(), 100);
558    }
559
560    #[test]
561    fn test_streaming_scan_watermark_filter_preserved() {
562        use super::WatermarkDynamicFilter;
563        use std::sync::atomic::{AtomicI64, AtomicU64};
564
565        let schema = test_schema();
566        let source: StreamSourceRef = Arc::new(MockSource::new(schema));
567        let filter = Arc::new(WatermarkDynamicFilter::new(
568            Arc::new(AtomicI64::new(200)),
569            Arc::new(AtomicU64::new(0)),
570            "id".to_string(),
571        ));
572
573        let exec =
574            StreamingScanExec::new(source, None, vec![]).with_watermark_filter(Arc::clone(&filter));
575
576        // with_new_children(empty) should preserve the watermark filter
577        let exec_arc: Arc<dyn ExecutionPlan> = Arc::new(exec);
578        let rebuilt = exec_arc.with_new_children(vec![]).unwrap();
579        let rebuilt_scan = rebuilt
580            .as_any()
581            .downcast_ref::<StreamingScanExec>()
582            .unwrap();
583        assert!(rebuilt_scan.watermark_filter().is_some());
584        assert_eq!(rebuilt_scan.watermark_filter().unwrap().watermark_ms(), 200);
585    }
586}