use super::{
lakehouse_context::LakehouseContext, partition_cache::QueryPartitionProvider,
partitioned_execution_plan::make_partitioned_execution_plan, reader_factory::ReaderFactory,
view::View,
};
use crate::time::TimeRange;
use async_trait::async_trait;
use datafusion::{
arrow::datatypes::SchemaRef,
catalog::{Session, TableProvider},
datasource::TableType,
error::DataFusionError,
logical_expr::{Expr, TableProviderFilterPushDown},
physical_plan::ExecutionPlan,
};
use micromegas_tracing::prelude::*;
use std::{any::Any, sync::Arc};
#[derive(Debug)]
pub struct MaterializedView {
lakehouse: Arc<LakehouseContext>,
reader_factory: Arc<ReaderFactory>,
view: Arc<dyn View>,
part_provider: Arc<dyn QueryPartitionProvider>,
query_range: Option<TimeRange>,
}
impl MaterializedView {
pub fn new(
lakehouse: Arc<LakehouseContext>,
reader_factory: Arc<ReaderFactory>,
view: Arc<dyn View>,
part_provider: Arc<dyn QueryPartitionProvider>,
query_range: Option<TimeRange>,
) -> Self {
Self {
lakehouse,
reader_factory,
view,
part_provider,
query_range,
}
}
pub fn get_view(&self) -> Arc<dyn View> {
self.view.clone()
}
}
#[async_trait]
impl TableProvider for MaterializedView {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.view.get_file_schema()
}
fn table_type(&self) -> TableType {
TableType::Base
}
#[span_fn]
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
self.view
.jit_update(self.lakehouse.clone(), self.query_range)
.await
.map_err(|e| DataFusionError::External(e.into()))?;
let partitions = self
.part_provider
.fetch(
&self.view.get_view_set_name(),
&self.view.get_view_instance_id(),
self.query_range,
self.view.get_file_schema_hash(),
)
.await
.map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;
trace!("MaterializedView::scan nb_partitions={}", partitions.len());
make_partitioned_execution_plan(
self.schema(),
self.reader_factory.clone(),
state,
projection,
filters,
limit,
Arc::new(partitions),
)
}
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
}
}