Skip to main content

laminar_sql/datafusion/
exec.rs

1//! Streaming scan execution plan for `DataFusion`
2//!
3//! This module provides `StreamingScanExec`, a `DataFusion` execution plan
4//! that reads from a `StreamSource`. It serves as the leaf node in query
5//! plans for streaming data.
6
7use std::any::Any;
8use std::fmt::{Debug, Formatter};
9use std::sync::Arc;
10
11use arrow_schema::{SchemaRef, SortOptions};
12use datafusion::execution::{SendableRecordBatchStream, TaskContext};
13use datafusion::physical_expr::{
14    expressions::Column, EquivalenceProperties, LexOrdering, Partitioning, PhysicalSortExpr,
15};
16use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
17use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
18use datafusion_common::DataFusionError;
19use datafusion_expr::Expr;
20
21use super::source::{SortColumn, StreamSourceRef};
22
23/// A `DataFusion` execution plan that scans from a streaming source.
24///
25/// This is a leaf node in the query plan tree that pulls data from
26/// a `StreamSource` implementation. It handles projection and filter
27/// pushdown to the source when supported.
28///
29/// # Properties
30///
31/// - Single partition (streaming sources are typically not partitioned)
32/// - Unbounded execution mode (streaming)
33/// - No inherent ordering (unless specified by source)
34pub struct StreamingScanExec {
35    /// The streaming source to read from
36    source: StreamSourceRef,
37    /// Output schema (after projection)
38    schema: SchemaRef,
39    /// Column projection (None = all columns)
40    projection: Option<Vec<usize>>,
41    /// Filters pushed down to source
42    filters: Vec<Expr>,
43    /// Cached plan properties
44    properties: PlanProperties,
45}
46
47impl StreamingScanExec {
48    /// Creates a new streaming scan execution plan.
49    ///
50    /// # Arguments
51    ///
52    /// * `source` - The streaming source to read from
53    /// * `projection` - Optional column projection indices
54    /// * `filters` - Filters to push down to the source
55    ///
56    /// # Returns
57    ///
58    /// A new `StreamingScanExec` instance.
59    /// Creates a new streaming scan execution plan.
60    ///
61    /// If the source declares an `output_ordering`, the plan's
62    /// `EquivalenceProperties` will include it so `DataFusion` can elide
63    /// `SortExec` for matching ORDER BY queries.
64    pub fn new(
65        source: StreamSourceRef,
66        projection: Option<Vec<usize>>,
67        filters: Vec<Expr>,
68    ) -> Self {
69        let source_schema = source.schema();
70        let source_ordering = source.output_ordering();
71
72        let schema = match &projection {
73            Some(indices) => {
74                let fields: Vec<_> = indices
75                    .iter()
76                    .map(|&i| source_schema.field(i).clone())
77                    .collect();
78                Arc::new(arrow_schema::Schema::new(fields))
79            }
80            None => source_schema,
81        };
82
83        // Build equivalence properties, optionally with source ordering
84        let eq_properties = Self::build_equivalence_properties(&schema, source_ordering.as_deref());
85
86        // Build plan properties for an unbounded streaming source
87        let properties = PlanProperties::new(
88            eq_properties,
89            Partitioning::UnknownPartitioning(1), // Single partition for streaming
90            EmissionType::Incremental,            // Streaming emits incrementally
91            Boundedness::Unbounded {
92                requires_infinite_memory: false,
93            }, // Streaming is unbounded
94        );
95
96        Self {
97            source,
98            schema,
99            projection,
100            filters,
101            properties,
102        }
103    }
104
105    /// Builds `EquivalenceProperties` with optional source ordering.
106    ///
107    /// Converts `SortColumn` declarations into `DataFusion` `PhysicalSortExpr`
108    /// entries. Only columns present in the output schema are included.
109    fn build_equivalence_properties(
110        schema: &SchemaRef,
111        ordering: Option<&[SortColumn]>,
112    ) -> EquivalenceProperties {
113        let mut eq = EquivalenceProperties::new(Arc::clone(schema));
114
115        if let Some(sort_columns) = ordering {
116            let sort_exprs: Vec<PhysicalSortExpr> = sort_columns
117                .iter()
118                .filter_map(|sc| {
119                    // Find column index in the output schema
120                    schema.index_of(&sc.name).ok().map(|idx| {
121                        PhysicalSortExpr::new(
122                            Arc::new(Column::new(&sc.name, idx)),
123                            SortOptions {
124                                descending: sc.descending,
125                                nulls_first: sc.nulls_first,
126                            },
127                        )
128                    })
129                })
130                .collect();
131
132            if !sort_exprs.is_empty() {
133                eq.add_ordering(sort_exprs);
134            }
135        }
136
137        eq
138    }
139
140    /// Returns the streaming source.
141    #[must_use]
142    pub fn source(&self) -> &StreamSourceRef {
143        &self.source
144    }
145
146    /// Returns the column projection.
147    #[must_use]
148    pub fn projection(&self) -> Option<&[usize]> {
149        self.projection.as_deref()
150    }
151
152    /// Returns the pushed-down filters.
153    #[must_use]
154    pub fn filters(&self) -> &[Expr] {
155        &self.filters
156    }
157}
158
159impl Debug for StreamingScanExec {
160    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
161        f.debug_struct("StreamingScanExec")
162            .field("source", &self.source)
163            .field("schema", &self.schema)
164            .field("projection", &self.projection)
165            .field("filters", &self.filters)
166            .finish_non_exhaustive()
167    }
168}
169
170impl DisplayAs for StreamingScanExec {
171    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> std::fmt::Result {
172        match t {
173            DisplayFormatType::Default | DisplayFormatType::Verbose => {
174                write!(f, "StreamingScanExec: ")?;
175                if let Some(proj) = &self.projection {
176                    write!(f, "projection=[{proj:?}]")?;
177                } else {
178                    write!(f, "projection=[*]")?;
179                }
180                if !self.filters.is_empty() {
181                    write!(f, ", filters={:?}", self.filters)?;
182                }
183                Ok(())
184            }
185            DisplayFormatType::TreeRender => {
186                write!(f, "StreamingScanExec")
187            }
188        }
189    }
190}
191
192impl ExecutionPlan for StreamingScanExec {
193    fn name(&self) -> &'static str {
194        "StreamingScanExec"
195    }
196
197    fn as_any(&self) -> &dyn Any {
198        self
199    }
200
201    fn schema(&self) -> SchemaRef {
202        Arc::clone(&self.schema)
203    }
204
205    fn properties(&self) -> &PlanProperties {
206        &self.properties
207    }
208
209    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
210        // Leaf node - no children
211        vec![]
212    }
213
214    fn with_new_children(
215        self: Arc<Self>,
216        children: Vec<Arc<dyn ExecutionPlan>>,
217    ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
218        if children.is_empty() {
219            // No changes needed for leaf node
220            Ok(self)
221        } else {
222            Err(DataFusionError::Plan(
223                "StreamingScanExec cannot have children".to_string(),
224            ))
225        }
226    }
227
228    fn execute(
229        &self,
230        partition: usize,
231        _context: Arc<TaskContext>,
232    ) -> Result<SendableRecordBatchStream, DataFusionError> {
233        if partition != 0 {
234            return Err(DataFusionError::Plan(format!(
235                "StreamingScanExec only supports partition 0, got {partition}"
236            )));
237        }
238
239        self.source
240            .stream(self.projection.clone(), self.filters.clone())
241    }
242}
243
244// Required for `DataFusion` to use this execution plan
245impl datafusion::physical_plan::ExecutionPlanProperties for StreamingScanExec {
246    fn output_partitioning(&self) -> &Partitioning {
247        self.properties.output_partitioning()
248    }
249
250    fn output_ordering(&self) -> Option<&LexOrdering> {
251        self.properties.output_ordering()
252    }
253
254    fn boundedness(&self) -> Boundedness {
255        Boundedness::Unbounded {
256            requires_infinite_memory: false,
257        }
258    }
259
260    fn pipeline_behavior(&self) -> EmissionType {
261        EmissionType::Incremental
262    }
263
264    fn equivalence_properties(&self) -> &EquivalenceProperties {
265        self.properties.equivalence_properties()
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272    use crate::datafusion::source::StreamSource;
273    use arrow_schema::{DataType, Field, Schema};
274    use async_trait::async_trait;
275
276    #[derive(Debug)]
277    struct MockSource {
278        schema: SchemaRef,
279        ordering: Option<Vec<SortColumn>>,
280    }
281
282    impl MockSource {
283        fn new(schema: SchemaRef) -> Self {
284            Self {
285                schema,
286                ordering: None,
287            }
288        }
289
290        fn with_ordering(mut self, ordering: Vec<SortColumn>) -> Self {
291            self.ordering = Some(ordering);
292            self
293        }
294    }
295
296    #[async_trait]
297    impl StreamSource for MockSource {
298        fn schema(&self) -> SchemaRef {
299            Arc::clone(&self.schema)
300        }
301
302        fn stream(
303            &self,
304            _projection: Option<Vec<usize>>,
305            _filters: Vec<Expr>,
306        ) -> Result<SendableRecordBatchStream, DataFusionError> {
307            Err(DataFusionError::NotImplemented("mock".to_string()))
308        }
309
310        fn output_ordering(&self) -> Option<Vec<SortColumn>> {
311            self.ordering.clone()
312        }
313    }
314
315    fn test_schema() -> SchemaRef {
316        Arc::new(Schema::new(vec![
317            Field::new("id", DataType::Int64, false),
318            Field::new("name", DataType::Utf8, true),
319            Field::new("value", DataType::Float64, true),
320        ]))
321    }
322
323    #[test]
324    fn test_scan_exec_schema() {
325        let schema = test_schema();
326        let source: StreamSourceRef = Arc::new(MockSource::new(Arc::clone(&schema)));
327        let exec = StreamingScanExec::new(source, None, vec![]);
328
329        assert_eq!(exec.schema(), schema);
330    }
331
332    #[test]
333    fn test_scan_exec_projection() {
334        let schema = test_schema();
335        let source: StreamSourceRef = Arc::new(MockSource::new(Arc::clone(&schema)));
336        let exec = StreamingScanExec::new(source, Some(vec![0, 2]), vec![]);
337
338        let output_schema = exec.schema();
339        assert_eq!(output_schema.fields().len(), 2);
340        assert_eq!(output_schema.field(0).name(), "id");
341        assert_eq!(output_schema.field(1).name(), "value");
342    }
343
344    #[test]
345    fn test_scan_exec_properties() {
346        use datafusion::physical_plan::ExecutionPlanProperties;
347
348        let schema = test_schema();
349        let source: StreamSourceRef = Arc::new(MockSource::new(schema));
350        let exec = StreamingScanExec::new(source, None, vec![]);
351
352        // Should be unbounded (streaming)
353        assert!(matches!(exec.boundedness(), Boundedness::Unbounded { .. }));
354
355        // Should be single partition
356        let partitioning = exec.properties().output_partitioning();
357        assert!(matches!(partitioning, Partitioning::UnknownPartitioning(1)));
358
359        // Leaf node has no children
360        assert!(exec.children().is_empty());
361    }
362
363    #[test]
364    fn test_scan_exec_display() {
365        let schema = test_schema();
366        let source: StreamSourceRef = Arc::new(MockSource::new(schema));
367        let exec = StreamingScanExec::new(source, Some(vec![0, 1]), vec![]);
368
369        // Verify it implements DisplayAs by checking the name
370        assert_eq!(exec.name(), "StreamingScanExec");
371        // Debug format should contain the struct info
372        let debug = format!("{exec:?}");
373        assert!(debug.contains("StreamingScanExec"));
374    }
375
376    #[test]
377    fn test_scan_exec_name() {
378        let schema = test_schema();
379        let source: StreamSourceRef = Arc::new(MockSource::new(schema));
380        let exec = StreamingScanExec::new(source, None, vec![]);
381
382        assert_eq!(exec.name(), "StreamingScanExec");
383    }
384
385    // --- Tier 1 ordering tests ---
386
387    #[test]
388    fn test_scan_exec_no_ordering() {
389        use datafusion::physical_plan::ExecutionPlanProperties;
390
391        let schema = test_schema();
392        let source: StreamSourceRef = Arc::new(MockSource::new(schema));
393        let exec = StreamingScanExec::new(source, None, vec![]);
394
395        // No ordering declared -> output_ordering returns None
396        assert!(exec.output_ordering().is_none());
397    }
398
399    #[test]
400    fn test_scan_exec_with_ordering() {
401        use datafusion::physical_plan::ExecutionPlanProperties;
402
403        let schema = test_schema();
404        let source: StreamSourceRef = Arc::new(
405            MockSource::new(Arc::clone(&schema)).with_ordering(vec![SortColumn::ascending("id")]),
406        );
407        let exec = StreamingScanExec::new(source, None, vec![]);
408
409        // Source ordering declared -> output_ordering returns Some
410        let ordering = exec.output_ordering();
411        assert!(ordering.is_some());
412        let lex = ordering.unwrap();
413        assert_eq!(lex.len(), 1);
414    }
415
416    #[test]
417    fn test_scan_exec_output_ordering_returns_some() {
418        use datafusion::physical_plan::ExecutionPlanProperties;
419
420        let schema = test_schema();
421        let source: StreamSourceRef =
422            Arc::new(MockSource::new(Arc::clone(&schema)).with_ordering(vec![
423                SortColumn::ascending("id"),
424                SortColumn::descending("value"),
425            ]));
426        let exec = StreamingScanExec::new(source, None, vec![]);
427
428        let ordering = exec.output_ordering().unwrap();
429        assert_eq!(ordering.len(), 2);
430    }
431
432    #[test]
433    fn test_scan_exec_ordering_with_projection() {
434        use datafusion::physical_plan::ExecutionPlanProperties;
435
436        let schema = test_schema();
437        // Source ordered by "id" ascending
438        let source: StreamSourceRef = Arc::new(
439            MockSource::new(Arc::clone(&schema)).with_ordering(vec![SortColumn::ascending("id")]),
440        );
441        // Project only "id" and "value" (indices 0, 2)
442        let exec = StreamingScanExec::new(source, Some(vec![0, 2]), vec![]);
443
444        // "id" is in the projection -> ordering should still be present
445        let ordering = exec.output_ordering();
446        assert!(ordering.is_some());
447    }
448
449    #[test]
450    fn test_scan_exec_ordering_column_not_in_projection() {
451        use datafusion::physical_plan::ExecutionPlanProperties;
452
453        let schema = test_schema();
454        // Source ordered by "name" ascending
455        let source: StreamSourceRef = Arc::new(
456            MockSource::new(Arc::clone(&schema)).with_ordering(vec![SortColumn::ascending("name")]),
457        );
458        // Project only "id" and "value" (indices 0, 2) -- "name" is NOT projected
459        let exec = StreamingScanExec::new(source, Some(vec![0, 2]), vec![]);
460
461        // "name" is not in the projection -> ordering should be None
462        assert!(exec.output_ordering().is_none());
463    }
464}