use std::any::Any;
use std::sync::Arc;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{Expr, TableType};
use crate::datasource::TableProvider;
use crate::execution::context::{SessionState, TaskContext};
use crate::physical_plan::streaming::StreamingTableExec;
use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
pub trait PartitionStream: Send + Sync {
fn schema(&self) -> &SchemaRef;
fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream;
}
pub struct StreamingTable {
schema: SchemaRef,
partitions: Vec<Arc<dyn PartitionStream>>,
}
impl StreamingTable {
pub fn try_new(
schema: SchemaRef,
partitions: Vec<Arc<dyn PartitionStream>>,
) -> Result<Self> {
if !partitions.iter().all(|x| schema.contains(x.schema())) {
return Err(DataFusionError::Plan(
"Mismatch between schema and batches".to_string(),
));
}
Ok(Self { schema, partitions })
}
}
#[async_trait]
impl TableProvider for StreamingTable {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn table_type(&self) -> TableType {
TableType::View
}
async fn scan(
&self,
_state: &SessionState,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(StreamingTableExec::try_new(
self.schema.clone(),
self.partitions.clone(),
projection,
)?))
}
}