micromegas-analytics 0.19.0

analytics module of micromegas
Documentation
use super::{
    lakehouse_context::LakehouseContext, partition_cache::QueryPartitionProvider,
    partitioned_execution_plan::make_partitioned_execution_plan, reader_factory::ReaderFactory,
    view::View,
};
use crate::time::TimeRange;
use async_trait::async_trait;
use datafusion::{
    arrow::datatypes::SchemaRef,
    catalog::{Session, TableProvider},
    datasource::TableType,
    error::DataFusionError,
    logical_expr::{Expr, TableProviderFilterPushDown},
    physical_plan::ExecutionPlan,
};
use micromegas_tracing::prelude::*;
use std::{any::Any, sync::Arc};

/// A DataFusion `TableProvider` for materialized views.
#[derive(Debug)]
pub struct MaterializedView {
    lakehouse: Arc<LakehouseContext>,
    reader_factory: Arc<ReaderFactory>,
    view: Arc<dyn View>,
    part_provider: Arc<dyn QueryPartitionProvider>,
    query_range: Option<TimeRange>,
}

impl MaterializedView {
    pub fn new(
        lakehouse: Arc<LakehouseContext>,
        reader_factory: Arc<ReaderFactory>,
        view: Arc<dyn View>,
        part_provider: Arc<dyn QueryPartitionProvider>,
        query_range: Option<TimeRange>,
    ) -> Self {
        Self {
            lakehouse,
            reader_factory,
            view,
            part_provider,
            query_range,
        }
    }

    pub fn get_view(&self) -> Arc<dyn View> {
        self.view.clone()
    }
}

#[async_trait]
impl TableProvider for MaterializedView {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn schema(&self) -> SchemaRef {
        self.view.get_file_schema()
    }

    fn table_type(&self) -> TableType {
        TableType::Base
    }

    #[span_fn]
    async fn scan(
        &self,
        state: &dyn Session,
        projection: Option<&Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
        self.view
            .jit_update(self.lakehouse.clone(), self.query_range)
            .await
            .map_err(|e| DataFusionError::External(e.into()))?;

        let partitions = self
            .part_provider
            .fetch(
                &self.view.get_view_set_name(),
                &self.view.get_view_instance_id(),
                self.query_range,
                self.view.get_file_schema_hash(),
            )
            .await
            .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;
        trace!("MaterializedView::scan nb_partitions={}", partitions.len());

        make_partitioned_execution_plan(
            self.schema(),
            self.reader_factory.clone(),
            state,
            projection,
            filters,
            limit,
            Arc::new(partitions),
        )
    }

    /// Tell DataFusion to push filters down to the scan method
    fn supports_filters_pushdown(
        &self,
        filters: &[&Expr],
    ) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
        // Inexact because the pruning can't handle all expressions and pruning
        // is not done at the row level -- there may be rows in returned files
        // that do not pass the filter
        Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
    }
}