datafusion_catalog/
streaming.rs1use std::any::Any;
21use std::sync::Arc;
22
23use arrow::datatypes::SchemaRef;
24use async_trait::async_trait;
25
26use crate::Session;
27use crate::TableProvider;
28use datafusion_common::{plan_err, Result};
29use datafusion_expr::{Expr, TableType};
30use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
31use datafusion_physical_plan::ExecutionPlan;
32use log::debug;
33
34#[derive(Debug)]
36pub struct StreamingTable {
37 schema: SchemaRef,
38 partitions: Vec<Arc<dyn PartitionStream>>,
39 infinite: bool,
40}
41
42impl StreamingTable {
43 pub fn try_new(
45 schema: SchemaRef,
46 partitions: Vec<Arc<dyn PartitionStream>>,
47 ) -> Result<Self> {
48 for x in partitions.iter() {
49 let partition_schema = x.schema();
50 if !schema.contains(partition_schema) {
51 debug!(
52 "target schema does not contain partition schema. \
53 Target_schema: {schema:?}. Partition Schema: {partition_schema:?}"
54 );
55 return plan_err!("Mismatch between schema and batches");
56 }
57 }
58
59 Ok(Self {
60 schema,
61 partitions,
62 infinite: false,
63 })
64 }
65 pub fn with_infinite_table(mut self, infinite: bool) -> Self {
67 self.infinite = infinite;
68 self
69 }
70}
71
72#[async_trait]
73impl TableProvider for StreamingTable {
74 fn as_any(&self) -> &dyn Any {
75 self
76 }
77
78 fn schema(&self) -> SchemaRef {
79 Arc::clone(&self.schema)
80 }
81
82 fn table_type(&self) -> TableType {
83 TableType::View
84 }
85
86 async fn scan(
87 &self,
88 _state: &dyn Session,
89 projection: Option<&Vec<usize>>,
90 _filters: &[Expr],
91 limit: Option<usize>,
92 ) -> Result<Arc<dyn ExecutionPlan>> {
93 Ok(Arc::new(StreamingTableExec::try_new(
94 Arc::clone(&self.schema),
95 self.partitions.clone(),
96 projection,
97 None,
98 self.infinite,
99 limit,
100 )?))
101 }
102}