1use std::any::Any;
8use std::fmt::{Debug, Formatter};
9use std::sync::Arc;
10
11use arrow_schema::{SchemaRef, SortOptions};
12use datafusion::execution::{SendableRecordBatchStream, TaskContext};
13use datafusion::physical_expr::{
14 expressions::Column, EquivalenceProperties, LexOrdering, Partitioning, PhysicalSortExpr,
15};
16use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
17use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
18use datafusion_common::DataFusionError;
19use datafusion_expr::Expr;
20
21use super::source::{SortColumn, StreamSourceRef};
22
23pub struct StreamingScanExec {
35 source: StreamSourceRef,
37 schema: SchemaRef,
39 projection: Option<Vec<usize>>,
41 filters: Vec<Expr>,
43 properties: PlanProperties,
45}
46
47impl StreamingScanExec {
48 pub fn new(
65 source: StreamSourceRef,
66 projection: Option<Vec<usize>>,
67 filters: Vec<Expr>,
68 ) -> Self {
69 let source_schema = source.schema();
70 let source_ordering = source.output_ordering();
71
72 let schema = match &projection {
73 Some(indices) => {
74 let fields: Vec<_> = indices
75 .iter()
76 .map(|&i| source_schema.field(i).clone())
77 .collect();
78 Arc::new(arrow_schema::Schema::new(fields))
79 }
80 None => source_schema,
81 };
82
83 let eq_properties = Self::build_equivalence_properties(&schema, source_ordering.as_deref());
85
86 let properties = PlanProperties::new(
88 eq_properties,
89 Partitioning::UnknownPartitioning(1), EmissionType::Incremental, Boundedness::Unbounded {
92 requires_infinite_memory: false,
93 }, );
95
96 Self {
97 source,
98 schema,
99 projection,
100 filters,
101 properties,
102 }
103 }
104
105 fn build_equivalence_properties(
110 schema: &SchemaRef,
111 ordering: Option<&[SortColumn]>,
112 ) -> EquivalenceProperties {
113 let mut eq = EquivalenceProperties::new(Arc::clone(schema));
114
115 if let Some(sort_columns) = ordering {
116 let sort_exprs: Vec<PhysicalSortExpr> = sort_columns
117 .iter()
118 .filter_map(|sc| {
119 schema.index_of(&sc.name).ok().map(|idx| {
121 PhysicalSortExpr::new(
122 Arc::new(Column::new(&sc.name, idx)),
123 SortOptions {
124 descending: sc.descending,
125 nulls_first: sc.nulls_first,
126 },
127 )
128 })
129 })
130 .collect();
131
132 if !sort_exprs.is_empty() {
133 eq.add_ordering(sort_exprs);
134 }
135 }
136
137 eq
138 }
139
140 #[must_use]
142 pub fn source(&self) -> &StreamSourceRef {
143 &self.source
144 }
145
146 #[must_use]
148 pub fn projection(&self) -> Option<&[usize]> {
149 self.projection.as_deref()
150 }
151
152 #[must_use]
154 pub fn filters(&self) -> &[Expr] {
155 &self.filters
156 }
157}
158
159impl Debug for StreamingScanExec {
160 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
161 f.debug_struct("StreamingScanExec")
162 .field("source", &self.source)
163 .field("schema", &self.schema)
164 .field("projection", &self.projection)
165 .field("filters", &self.filters)
166 .finish_non_exhaustive()
167 }
168}
169
170impl DisplayAs for StreamingScanExec {
171 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> std::fmt::Result {
172 match t {
173 DisplayFormatType::Default | DisplayFormatType::Verbose => {
174 write!(f, "StreamingScanExec: ")?;
175 if let Some(proj) = &self.projection {
176 write!(f, "projection=[{proj:?}]")?;
177 } else {
178 write!(f, "projection=[*]")?;
179 }
180 if !self.filters.is_empty() {
181 write!(f, ", filters={:?}", self.filters)?;
182 }
183 Ok(())
184 }
185 DisplayFormatType::TreeRender => {
186 write!(f, "StreamingScanExec")
187 }
188 }
189 }
190}
191
192impl ExecutionPlan for StreamingScanExec {
193 fn name(&self) -> &'static str {
194 "StreamingScanExec"
195 }
196
197 fn as_any(&self) -> &dyn Any {
198 self
199 }
200
201 fn schema(&self) -> SchemaRef {
202 Arc::clone(&self.schema)
203 }
204
205 fn properties(&self) -> &PlanProperties {
206 &self.properties
207 }
208
209 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
210 vec![]
212 }
213
214 fn with_new_children(
215 self: Arc<Self>,
216 children: Vec<Arc<dyn ExecutionPlan>>,
217 ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
218 if children.is_empty() {
219 Ok(self)
221 } else {
222 Err(DataFusionError::Plan(
223 "StreamingScanExec cannot have children".to_string(),
224 ))
225 }
226 }
227
228 fn execute(
229 &self,
230 partition: usize,
231 _context: Arc<TaskContext>,
232 ) -> Result<SendableRecordBatchStream, DataFusionError> {
233 if partition != 0 {
234 return Err(DataFusionError::Plan(format!(
235 "StreamingScanExec only supports partition 0, got {partition}"
236 )));
237 }
238
239 self.source
240 .stream(self.projection.clone(), self.filters.clone())
241 }
242}
243
244impl datafusion::physical_plan::ExecutionPlanProperties for StreamingScanExec {
246 fn output_partitioning(&self) -> &Partitioning {
247 self.properties.output_partitioning()
248 }
249
250 fn output_ordering(&self) -> Option<&LexOrdering> {
251 self.properties.output_ordering()
252 }
253
254 fn boundedness(&self) -> Boundedness {
255 Boundedness::Unbounded {
256 requires_infinite_memory: false,
257 }
258 }
259
260 fn pipeline_behavior(&self) -> EmissionType {
261 EmissionType::Incremental
262 }
263
264 fn equivalence_properties(&self) -> &EquivalenceProperties {
265 self.properties.equivalence_properties()
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272 use crate::datafusion::source::StreamSource;
273 use arrow_schema::{DataType, Field, Schema};
274 use async_trait::async_trait;
275
276 #[derive(Debug)]
277 struct MockSource {
278 schema: SchemaRef,
279 ordering: Option<Vec<SortColumn>>,
280 }
281
282 impl MockSource {
283 fn new(schema: SchemaRef) -> Self {
284 Self {
285 schema,
286 ordering: None,
287 }
288 }
289
290 fn with_ordering(mut self, ordering: Vec<SortColumn>) -> Self {
291 self.ordering = Some(ordering);
292 self
293 }
294 }
295
296 #[async_trait]
297 impl StreamSource for MockSource {
298 fn schema(&self) -> SchemaRef {
299 Arc::clone(&self.schema)
300 }
301
302 fn stream(
303 &self,
304 _projection: Option<Vec<usize>>,
305 _filters: Vec<Expr>,
306 ) -> Result<SendableRecordBatchStream, DataFusionError> {
307 Err(DataFusionError::NotImplemented("mock".to_string()))
308 }
309
310 fn output_ordering(&self) -> Option<Vec<SortColumn>> {
311 self.ordering.clone()
312 }
313 }
314
315 fn test_schema() -> SchemaRef {
316 Arc::new(Schema::new(vec![
317 Field::new("id", DataType::Int64, false),
318 Field::new("name", DataType::Utf8, true),
319 Field::new("value", DataType::Float64, true),
320 ]))
321 }
322
323 #[test]
324 fn test_scan_exec_schema() {
325 let schema = test_schema();
326 let source: StreamSourceRef = Arc::new(MockSource::new(Arc::clone(&schema)));
327 let exec = StreamingScanExec::new(source, None, vec![]);
328
329 assert_eq!(exec.schema(), schema);
330 }
331
332 #[test]
333 fn test_scan_exec_projection() {
334 let schema = test_schema();
335 let source: StreamSourceRef = Arc::new(MockSource::new(Arc::clone(&schema)));
336 let exec = StreamingScanExec::new(source, Some(vec![0, 2]), vec![]);
337
338 let output_schema = exec.schema();
339 assert_eq!(output_schema.fields().len(), 2);
340 assert_eq!(output_schema.field(0).name(), "id");
341 assert_eq!(output_schema.field(1).name(), "value");
342 }
343
344 #[test]
345 fn test_scan_exec_properties() {
346 use datafusion::physical_plan::ExecutionPlanProperties;
347
348 let schema = test_schema();
349 let source: StreamSourceRef = Arc::new(MockSource::new(schema));
350 let exec = StreamingScanExec::new(source, None, vec![]);
351
352 assert!(matches!(exec.boundedness(), Boundedness::Unbounded { .. }));
354
355 let partitioning = exec.properties().output_partitioning();
357 assert!(matches!(partitioning, Partitioning::UnknownPartitioning(1)));
358
359 assert!(exec.children().is_empty());
361 }
362
363 #[test]
364 fn test_scan_exec_display() {
365 let schema = test_schema();
366 let source: StreamSourceRef = Arc::new(MockSource::new(schema));
367 let exec = StreamingScanExec::new(source, Some(vec![0, 1]), vec![]);
368
369 assert_eq!(exec.name(), "StreamingScanExec");
371 let debug = format!("{exec:?}");
373 assert!(debug.contains("StreamingScanExec"));
374 }
375
376 #[test]
377 fn test_scan_exec_name() {
378 let schema = test_schema();
379 let source: StreamSourceRef = Arc::new(MockSource::new(schema));
380 let exec = StreamingScanExec::new(source, None, vec![]);
381
382 assert_eq!(exec.name(), "StreamingScanExec");
383 }
384
385 #[test]
388 fn test_scan_exec_no_ordering() {
389 use datafusion::physical_plan::ExecutionPlanProperties;
390
391 let schema = test_schema();
392 let source: StreamSourceRef = Arc::new(MockSource::new(schema));
393 let exec = StreamingScanExec::new(source, None, vec![]);
394
395 assert!(exec.output_ordering().is_none());
397 }
398
399 #[test]
400 fn test_scan_exec_with_ordering() {
401 use datafusion::physical_plan::ExecutionPlanProperties;
402
403 let schema = test_schema();
404 let source: StreamSourceRef = Arc::new(
405 MockSource::new(Arc::clone(&schema)).with_ordering(vec![SortColumn::ascending("id")]),
406 );
407 let exec = StreamingScanExec::new(source, None, vec![]);
408
409 let ordering = exec.output_ordering();
411 assert!(ordering.is_some());
412 let lex = ordering.unwrap();
413 assert_eq!(lex.len(), 1);
414 }
415
416 #[test]
417 fn test_scan_exec_output_ordering_returns_some() {
418 use datafusion::physical_plan::ExecutionPlanProperties;
419
420 let schema = test_schema();
421 let source: StreamSourceRef =
422 Arc::new(MockSource::new(Arc::clone(&schema)).with_ordering(vec![
423 SortColumn::ascending("id"),
424 SortColumn::descending("value"),
425 ]));
426 let exec = StreamingScanExec::new(source, None, vec![]);
427
428 let ordering = exec.output_ordering().unwrap();
429 assert_eq!(ordering.len(), 2);
430 }
431
432 #[test]
433 fn test_scan_exec_ordering_with_projection() {
434 use datafusion::physical_plan::ExecutionPlanProperties;
435
436 let schema = test_schema();
437 let source: StreamSourceRef = Arc::new(
439 MockSource::new(Arc::clone(&schema)).with_ordering(vec![SortColumn::ascending("id")]),
440 );
441 let exec = StreamingScanExec::new(source, Some(vec![0, 2]), vec![]);
443
444 let ordering = exec.output_ordering();
446 assert!(ordering.is_some());
447 }
448
449 #[test]
450 fn test_scan_exec_ordering_column_not_in_projection() {
451 use datafusion::physical_plan::ExecutionPlanProperties;
452
453 let schema = test_schema();
454 let source: StreamSourceRef = Arc::new(
456 MockSource::new(Arc::clone(&schema)).with_ordering(vec![SortColumn::ascending("name")]),
457 );
458 let exec = StreamingScanExec::new(source, Some(vec![0, 2]), vec![]);
460
461 assert!(exec.output_ordering().is_none());
463 }
464}