1use std::any::Any;
8use std::fmt::{Debug, Formatter};
9use std::sync::Arc;
10
11use arrow_schema::{SchemaRef, SortOptions};
12use datafusion::execution::{SendableRecordBatchStream, TaskContext};
13use datafusion::physical_expr::{
14 expressions::Column, EquivalenceProperties, LexOrdering, Partitioning, PhysicalSortExpr,
15};
16use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType, SchedulingType};
17use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
18use datafusion_common::DataFusionError;
19use datafusion_expr::Expr;
20
21use super::source::{SortColumn, StreamSourceRef};
22use super::watermark_filter::{WatermarkDynamicFilter, WatermarkFilterStream};
23
24pub struct StreamingScanExec {
36 source: StreamSourceRef,
38 schema: SchemaRef,
40 projection: Option<Vec<usize>>,
42 filters: Vec<Expr>,
44 properties: PlanProperties,
46 watermark_filter: Option<Arc<WatermarkDynamicFilter>>,
48}
49
50impl StreamingScanExec {
51 pub fn new(
57 source: StreamSourceRef,
58 projection: Option<Vec<usize>>,
59 filters: Vec<Expr>,
60 ) -> Self {
61 let source_schema = source.schema();
62 let source_ordering = source.output_ordering();
63
64 let schema = match &projection {
65 Some(indices) => {
66 let fields: Vec<_> = indices
67 .iter()
68 .map(|&i| source_schema.field(i).clone())
69 .collect();
70 Arc::new(arrow_schema::Schema::new(fields))
71 }
72 None => source_schema,
73 };
74
75 let eq_properties = Self::build_equivalence_properties(&schema, source_ordering.as_deref());
76
77 let properties = PlanProperties::new(
81 eq_properties,
82 Partitioning::UnknownPartitioning(1),
83 EmissionType::Incremental,
84 Boundedness::Unbounded {
85 requires_infinite_memory: false,
86 },
87 )
88 .with_scheduling_type(SchedulingType::NonCooperative);
89
90 Self {
91 source,
92 schema,
93 projection,
94 filters,
95 properties,
96 watermark_filter: None,
97 }
98 }
99
100 #[must_use]
106 pub fn with_watermark_filter(mut self, filter: Arc<WatermarkDynamicFilter>) -> Self {
107 self.watermark_filter = Some(filter);
108 self
109 }
110
111 #[must_use]
113 pub fn watermark_filter(&self) -> Option<&Arc<WatermarkDynamicFilter>> {
114 self.watermark_filter.as_ref()
115 }
116
117 fn build_equivalence_properties(
122 schema: &SchemaRef,
123 ordering: Option<&[SortColumn]>,
124 ) -> EquivalenceProperties {
125 let mut eq = EquivalenceProperties::new(Arc::clone(schema));
126
127 if let Some(sort_columns) = ordering {
128 let sort_exprs: Vec<PhysicalSortExpr> = sort_columns
129 .iter()
130 .filter_map(|sc| {
131 schema.index_of(&sc.name).ok().map(|idx| {
133 PhysicalSortExpr::new(
134 Arc::new(Column::new(&sc.name, idx)),
135 SortOptions {
136 descending: sc.descending,
137 nulls_first: sc.nulls_first,
138 },
139 )
140 })
141 })
142 .collect();
143
144 if !sort_exprs.is_empty() {
145 eq.add_ordering(sort_exprs);
146 }
147 }
148
149 eq
150 }
151
152 #[must_use]
154 pub fn source(&self) -> &StreamSourceRef {
155 &self.source
156 }
157
158 #[must_use]
160 pub fn projection(&self) -> Option<&[usize]> {
161 self.projection.as_deref()
162 }
163
164 #[must_use]
166 pub fn filters(&self) -> &[Expr] {
167 &self.filters
168 }
169}
170
171impl Debug for StreamingScanExec {
172 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
173 f.debug_struct("StreamingScanExec")
174 .field("source", &self.source)
175 .field("schema", &self.schema)
176 .field("projection", &self.projection)
177 .field("filters", &self.filters)
178 .field("watermark_filter", &self.watermark_filter)
179 .finish_non_exhaustive()
180 }
181}
182
183impl DisplayAs for StreamingScanExec {
184 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> std::fmt::Result {
185 match t {
186 DisplayFormatType::Default | DisplayFormatType::Verbose => {
187 write!(f, "StreamingScanExec: ")?;
188 if let Some(proj) = &self.projection {
189 write!(f, "projection=[{proj:?}]")?;
190 } else {
191 write!(f, "projection=[*]")?;
192 }
193 if !self.filters.is_empty() {
194 write!(f, ", filters={:?}", self.filters)?;
195 }
196 Ok(())
197 }
198 DisplayFormatType::TreeRender => {
199 write!(f, "StreamingScanExec")
200 }
201 }
202 }
203}
204
205impl ExecutionPlan for StreamingScanExec {
206 fn name(&self) -> &'static str {
207 "StreamingScanExec"
208 }
209
210 fn as_any(&self) -> &dyn Any {
211 self
212 }
213
214 fn schema(&self) -> SchemaRef {
215 Arc::clone(&self.schema)
216 }
217
218 fn properties(&self) -> &PlanProperties {
219 &self.properties
220 }
221
222 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
223 vec![]
225 }
226
227 fn with_new_children(
228 self: Arc<Self>,
229 children: Vec<Arc<dyn ExecutionPlan>>,
230 ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
231 if children.is_empty() {
232 Ok(self)
234 } else {
235 Err(DataFusionError::Plan(
236 "StreamingScanExec cannot have children".to_string(),
237 ))
238 }
239 }
240
241 fn execute(
242 &self,
243 partition: usize,
244 _context: Arc<TaskContext>,
245 ) -> Result<SendableRecordBatchStream, DataFusionError> {
246 if partition != 0 {
247 return Err(DataFusionError::Plan(format!(
248 "StreamingScanExec only supports partition 0, got {partition}"
249 )));
250 }
251
252 let stream = self
253 .source
254 .stream(self.projection.clone(), self.filters.clone())?;
255
256 match &self.watermark_filter {
257 Some(filter) => {
258 let schema = stream.schema();
259 Ok(Box::pin(WatermarkFilterStream::new(
260 stream,
261 Arc::clone(filter),
262 schema,
263 )))
264 }
265 None => Ok(stream),
266 }
267 }
268}
269
270impl datafusion::physical_plan::ExecutionPlanProperties for StreamingScanExec {
272 fn output_partitioning(&self) -> &Partitioning {
273 self.properties.output_partitioning()
274 }
275
276 fn output_ordering(&self) -> Option<&LexOrdering> {
277 self.properties.output_ordering()
278 }
279
280 fn boundedness(&self) -> Boundedness {
281 Boundedness::Unbounded {
282 requires_infinite_memory: false,
283 }
284 }
285
286 fn pipeline_behavior(&self) -> EmissionType {
287 EmissionType::Incremental
288 }
289
290 fn equivalence_properties(&self) -> &EquivalenceProperties {
291 self.properties.equivalence_properties()
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298 use crate::datafusion::source::StreamSource;
299 use arrow_schema::{DataType, Field, Schema};
300 use async_trait::async_trait;
301
302 #[derive(Debug)]
303 struct MockSource {
304 schema: SchemaRef,
305 ordering: Option<Vec<SortColumn>>,
306 }
307
308 impl MockSource {
309 fn new(schema: SchemaRef) -> Self {
310 Self {
311 schema,
312 ordering: None,
313 }
314 }
315
316 fn with_ordering(mut self, ordering: Vec<SortColumn>) -> Self {
317 self.ordering = Some(ordering);
318 self
319 }
320 }
321
322 #[async_trait]
323 impl StreamSource for MockSource {
324 fn schema(&self) -> SchemaRef {
325 Arc::clone(&self.schema)
326 }
327
328 fn stream(
329 &self,
330 _projection: Option<Vec<usize>>,
331 _filters: Vec<Expr>,
332 ) -> Result<SendableRecordBatchStream, DataFusionError> {
333 Err(DataFusionError::NotImplemented("mock".to_string()))
334 }
335
336 fn output_ordering(&self) -> Option<Vec<SortColumn>> {
337 self.ordering.clone()
338 }
339 }
340
341 fn test_schema() -> SchemaRef {
342 Arc::new(Schema::new(vec![
343 Field::new("id", DataType::Int64, false),
344 Field::new("name", DataType::Utf8, true),
345 Field::new("value", DataType::Float64, true),
346 ]))
347 }
348
349 #[test]
350 fn test_scan_exec_schema() {
351 let schema = test_schema();
352 let source: StreamSourceRef = Arc::new(MockSource::new(Arc::clone(&schema)));
353 let exec = StreamingScanExec::new(source, None, vec![]);
354
355 assert_eq!(exec.schema(), schema);
356 }
357
358 #[test]
359 fn test_scan_exec_projection() {
360 let schema = test_schema();
361 let source: StreamSourceRef = Arc::new(MockSource::new(Arc::clone(&schema)));
362 let exec = StreamingScanExec::new(source, Some(vec![0, 2]), vec![]);
363
364 let output_schema = exec.schema();
365 assert_eq!(output_schema.fields().len(), 2);
366 assert_eq!(output_schema.field(0).name(), "id");
367 assert_eq!(output_schema.field(1).name(), "value");
368 }
369
370 #[test]
371 fn test_scan_exec_properties() {
372 use datafusion::physical_plan::ExecutionPlanProperties;
373
374 let schema = test_schema();
375 let source: StreamSourceRef = Arc::new(MockSource::new(schema));
376 let exec = StreamingScanExec::new(source, None, vec![]);
377
378 assert!(matches!(exec.boundedness(), Boundedness::Unbounded { .. }));
380
381 let partitioning = exec.properties().output_partitioning();
383 assert!(matches!(partitioning, Partitioning::UnknownPartitioning(1)));
384
385 assert!(exec.children().is_empty());
387 }
388
389 #[test]
390 fn test_scan_exec_display() {
391 let schema = test_schema();
392 let source: StreamSourceRef = Arc::new(MockSource::new(schema));
393 let exec = StreamingScanExec::new(source, Some(vec![0, 1]), vec![]);
394
395 assert_eq!(exec.name(), "StreamingScanExec");
397 let debug = format!("{exec:?}");
399 assert!(debug.contains("StreamingScanExec"));
400 }
401
402 #[test]
403 fn test_scan_exec_name() {
404 let schema = test_schema();
405 let source: StreamSourceRef = Arc::new(MockSource::new(schema));
406 let exec = StreamingScanExec::new(source, None, vec![]);
407
408 assert_eq!(exec.name(), "StreamingScanExec");
409 }
410
411 #[test]
414 fn test_scan_exec_no_ordering() {
415 use datafusion::physical_plan::ExecutionPlanProperties;
416
417 let schema = test_schema();
418 let source: StreamSourceRef = Arc::new(MockSource::new(schema));
419 let exec = StreamingScanExec::new(source, None, vec![]);
420
421 assert!(exec.output_ordering().is_none());
423 }
424
425 #[test]
426 fn test_scan_exec_with_ordering() {
427 use datafusion::physical_plan::ExecutionPlanProperties;
428
429 let schema = test_schema();
430 let source: StreamSourceRef = Arc::new(
431 MockSource::new(Arc::clone(&schema)).with_ordering(vec![SortColumn::ascending("id")]),
432 );
433 let exec = StreamingScanExec::new(source, None, vec![]);
434
435 let ordering = exec.output_ordering();
437 assert!(ordering.is_some());
438 let lex = ordering.unwrap();
439 assert_eq!(lex.len(), 1);
440 }
441
442 #[test]
443 fn test_scan_exec_output_ordering_returns_some() {
444 use datafusion::physical_plan::ExecutionPlanProperties;
445
446 let schema = test_schema();
447 let source: StreamSourceRef =
448 Arc::new(MockSource::new(Arc::clone(&schema)).with_ordering(vec![
449 SortColumn::ascending("id"),
450 SortColumn::descending("value"),
451 ]));
452 let exec = StreamingScanExec::new(source, None, vec![]);
453
454 let ordering = exec.output_ordering().unwrap();
455 assert_eq!(ordering.len(), 2);
456 }
457
458 #[test]
459 fn test_scan_exec_ordering_with_projection() {
460 use datafusion::physical_plan::ExecutionPlanProperties;
461
462 let schema = test_schema();
463 let source: StreamSourceRef = Arc::new(
465 MockSource::new(Arc::clone(&schema)).with_ordering(vec![SortColumn::ascending("id")]),
466 );
467 let exec = StreamingScanExec::new(source, Some(vec![0, 2]), vec![]);
469
470 let ordering = exec.output_ordering();
472 assert!(ordering.is_some());
473 }
474
475 #[test]
476 fn test_scan_exec_ordering_column_not_in_projection() {
477 use datafusion::physical_plan::ExecutionPlanProperties;
478
479 let schema = test_schema();
480 let source: StreamSourceRef = Arc::new(
482 MockSource::new(Arc::clone(&schema)).with_ordering(vec![SortColumn::ascending("name")]),
483 );
484 let exec = StreamingScanExec::new(source, Some(vec![0, 2]), vec![]);
486
487 assert!(exec.output_ordering().is_none());
489 }
490
491 #[test]
494 fn test_streaming_scan_exec_scheduling_type() {
495 let schema = test_schema();
496 let source: StreamSourceRef = Arc::new(MockSource::new(schema));
497 let exec = StreamingScanExec::new(source, None, vec![]);
498
499 assert_eq!(
502 exec.properties().scheduling_type,
503 SchedulingType::NonCooperative,
504 );
505 }
506
507 #[tokio::test]
508 async fn test_cooperative_exec_wraps_streaming_scan() {
509 use crate::datafusion::{
510 create_streaming_context, ChannelStreamSource, StreamingTableProvider,
511 };
512 use arrow_schema::{DataType, Field, Schema};
513
514 let ctx = create_streaming_context();
515 let schema = Arc::new(Schema::new(vec![
516 Field::new("id", DataType::Int64, false),
517 Field::new("value", DataType::Float64, true),
518 ]));
519
520 let source = Arc::new(ChannelStreamSource::new(Arc::clone(&schema)));
521 let _sender = source.take_sender();
522 let provider = StreamingTableProvider::new("events", source);
523 ctx.register_table("events", Arc::new(provider)).unwrap();
524
525 let df = ctx.sql("SELECT id FROM events").await.unwrap();
527 let plan = df.create_physical_plan().await.unwrap();
528 let plan_str = format!(
529 "{}",
530 datafusion::physical_plan::displayable(plan.as_ref()).indent(true)
531 );
532 assert!(
533 plan_str.contains("CooperativeExec"),
534 "Expected CooperativeExec wrapper around StreamingScanExec, got:\n{plan_str}"
535 );
536 }
537
538 #[test]
541 fn test_streaming_scan_with_watermark_filter() {
542 use super::WatermarkDynamicFilter;
543 use std::sync::atomic::{AtomicI64, AtomicU64};
544
545 let schema = test_schema();
546 let source: StreamSourceRef = Arc::new(MockSource::new(schema));
547 let filter = Arc::new(WatermarkDynamicFilter::new(
548 Arc::new(AtomicI64::new(100)),
549 Arc::new(AtomicU64::new(0)),
550 "id".to_string(),
551 ));
552
553 let exec =
554 StreamingScanExec::new(source, None, vec![]).with_watermark_filter(Arc::clone(&filter));
555
556 assert!(exec.watermark_filter().is_some());
557 assert_eq!(exec.watermark_filter().unwrap().watermark_ms(), 100);
558 }
559
560 #[test]
561 fn test_streaming_scan_watermark_filter_preserved() {
562 use super::WatermarkDynamicFilter;
563 use std::sync::atomic::{AtomicI64, AtomicU64};
564
565 let schema = test_schema();
566 let source: StreamSourceRef = Arc::new(MockSource::new(schema));
567 let filter = Arc::new(WatermarkDynamicFilter::new(
568 Arc::new(AtomicI64::new(200)),
569 Arc::new(AtomicU64::new(0)),
570 "id".to_string(),
571 ));
572
573 let exec =
574 StreamingScanExec::new(source, None, vec![]).with_watermark_filter(Arc::clone(&filter));
575
576 let exec_arc: Arc<dyn ExecutionPlan> = Arc::new(exec);
578 let rebuilt = exec_arc.with_new_children(vec![]).unwrap();
579 let rebuilt_scan = rebuilt
580 .as_any()
581 .downcast_ref::<StreamingScanExec>()
582 .unwrap();
583 assert!(rebuilt_scan.watermark_filter().is_some());
584 assert_eq!(rebuilt_scan.watermark_filter().unwrap().watermark_ms(), 200);
585 }
586}