use super::{partition_cache::QueryPartitionProvider, view::View};
use crate::{lakehouse::reader_factory::ReaderFactory, time::TimeRange};
use async_trait::async_trait;
use datafusion::{
arrow::datatypes::SchemaRef,
catalog::{Session, TableProvider},
common::DFSchema,
datasource::{
listing::PartitionedFile,
physical_plan::{parquet::ParquetExecBuilder, FileScanConfig},
TableType,
},
error::DataFusionError,
execution::object_store::ObjectStoreUrl,
logical_expr::{utils::conjunction, Expr, TableProviderFilterPushDown},
physical_plan::{expressions, ExecutionPlan, PhysicalExpr},
};
use micromegas_ingestion::data_lake_connection::DataLakeConnection;
use object_store::ObjectStore;
use std::{any::Any, sync::Arc};
pub struct MaterializedView {
lake: Arc<DataLakeConnection>,
object_store: Arc<dyn ObjectStore>,
view: Arc<dyn View>,
part_provider: Arc<dyn QueryPartitionProvider>,
query_range: Option<TimeRange>,
}
impl MaterializedView {
pub fn new(
lake: Arc<DataLakeConnection>,
object_store: Arc<dyn ObjectStore>,
view: Arc<dyn View>,
part_provider: Arc<dyn QueryPartitionProvider>,
query_range: Option<TimeRange>,
) -> Self {
Self {
lake,
object_store,
view,
part_provider,
query_range,
}
}
pub fn get_view(&self) -> Arc<dyn View> {
self.view.clone()
}
fn filters_to_predicate(
&self,
schema: SchemaRef,
state: &dyn Session,
filters: &[Expr],
) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
let df_schema = DFSchema::try_from(schema)?;
let predicate = conjunction(filters.to_vec());
let predicate = predicate
.map(|predicate| state.create_physical_expr(predicate, &df_schema))
.transpose()?
.unwrap_or_else(|| expressions::lit(true));
Ok(predicate)
}
}
#[async_trait]
impl TableProvider for MaterializedView {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.view.get_file_schema()
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
let predicate = self.filters_to_predicate(self.view.get_file_schema(), state, filters)?;
self.view
.jit_update(self.lake.clone(), self.query_range.clone())
.await
.map_err(|e| DataFusionError::External(e.into()))?;
let mut file_group = vec![];
let partitions = self
.part_provider
.fetch(
&self.view.get_view_set_name(),
&self.view.get_view_instance_id(),
self.query_range.clone(),
self.view.get_file_schema_hash(),
)
.await
.map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;
for part in &partitions {
file_group.push(PartitionedFile::new(&part.file_path, part.file_size as u64));
}
let schema = self.schema();
let object_store_url = ObjectStoreUrl::parse("obj://lakehouse/").unwrap();
let file_scan_config = FileScanConfig::new(object_store_url, schema)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file_groups(vec![file_group]);
let reader_factory =
ReaderFactory::new(Arc::clone(&self.object_store), Arc::new(partitions));
Ok(ParquetExecBuilder::new(file_scan_config)
.with_predicate(predicate.clone())
.with_parquet_file_reader_factory(Arc::new(reader_factory))
.build_arc())
}
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
}
}