use crate::{
file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener,
};
use std::sync::Arc;
use arrow::datatypes::Schema;
use datafusion_common::Result;
use datafusion_physical_expr::{PhysicalExpr, expressions::Column};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use object_store::ObjectStore;
#[derive(Clone)]
pub(crate) struct MockSource {
metrics: ExecutionPlanMetricsSet,
filter: Option<Arc<dyn PhysicalExpr>>,
table_schema: crate::table_schema::TableSchema,
projection: crate::projection::SplitProjection,
}
impl Default for MockSource {
fn default() -> Self {
let table_schema =
crate::table_schema::TableSchema::new(Arc::new(Schema::empty()), vec![]);
Self {
metrics: ExecutionPlanMetricsSet::new(),
filter: None,
projection: crate::projection::SplitProjection::unprojected(&table_schema),
table_schema,
}
}
}
impl MockSource {
pub fn new(table_schema: impl Into<crate::table_schema::TableSchema>) -> Self {
let table_schema = table_schema.into();
Self {
metrics: ExecutionPlanMetricsSet::new(),
filter: None,
projection: crate::projection::SplitProjection::unprojected(&table_schema),
table_schema,
}
}
pub fn with_filter(mut self, filter: Arc<dyn PhysicalExpr>) -> Self {
self.filter = Some(filter);
self
}
}
impl FileSource for MockSource {
fn create_file_opener(
&self,
_object_store: Arc<dyn ObjectStore>,
_base_config: &FileScanConfig,
_partition: usize,
) -> Result<Arc<dyn FileOpener>> {
unimplemented!()
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
self.filter.clone()
}
fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
Arc::new(Self { ..self.clone() })
}
fn metrics(&self) -> &ExecutionPlanMetricsSet {
&self.metrics
}
fn file_type(&self) -> &str {
"mock"
}
fn table_schema(&self) -> &crate::table_schema::TableSchema {
&self.table_schema
}
fn try_pushdown_projection(
&self,
projection: &datafusion_physical_plan::projection::ProjectionExprs,
) -> Result<Option<Arc<dyn FileSource>>> {
let mut source = self.clone();
let new_projection = self.projection.source.try_merge(projection)?;
let split_projection = crate::projection::SplitProjection::new(
self.table_schema.file_schema(),
&new_projection,
);
source.projection = split_projection;
Ok(Some(Arc::new(source)))
}
fn projection(
&self,
) -> Option<&datafusion_physical_plan::projection::ProjectionExprs> {
Some(&self.projection.source)
}
}
pub(crate) fn col(name: &str, schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(Column::new_with_schema(name, schema)?))
}