use super::{
batch_update::PartitionCreationStrategy,
materialized_view::MaterializedView,
merge::{PartitionMerger, QueryMerger},
partition::Partition,
partition_cache::PartitionCache,
view_factory::ViewFactory,
};
use crate::{response_writer::Logger, time::TimeRange};
use anyhow::Result;
use async_trait::async_trait;
use chrono::{DateTime, TimeDelta, Utc};
use datafusion::{
arrow::datatypes::Schema,
execution::{SendableRecordBatchStream, runtime_env::RuntimeEnv},
logical_expr::Expr,
prelude::*,
sql::TableReference,
};
use micromegas_ingestion::data_lake_connection::DataLakeConnection;
use std::fmt::Debug;
use std::sync::Arc;
#[async_trait]
pub trait PartitionSpec: Send + Sync + Debug {
fn is_empty(&self) -> bool;
fn get_source_data_hash(&self) -> Vec<u8>;
async fn write(&self, lake: Arc<DataLakeConnection>, logger: Arc<dyn Logger>) -> Result<()>;
}
#[derive(Debug, Clone)]
pub struct ViewMetadata {
pub view_set_name: Arc<String>,
pub view_instance_id: Arc<String>,
pub file_schema_hash: Vec<u8>,
}
#[async_trait]
pub trait View: std::fmt::Debug + Send + Sync {
fn get_view_set_name(&self) -> Arc<String>;
fn get_view_instance_id(&self) -> Arc<String>;
async fn make_batch_partition_spec(
&self,
runtime: Arc<RuntimeEnv>,
lake: Arc<DataLakeConnection>,
existing_partitions: Arc<PartitionCache>,
insert_range: TimeRange,
) -> Result<Arc<dyn PartitionSpec>>;
fn get_file_schema_hash(&self) -> Vec<u8>;
fn get_file_schema(&self) -> Arc<Schema>;
async fn jit_update(
&self,
lake: Arc<DataLakeConnection>,
query_range: Option<TimeRange>,
) -> Result<()>;
fn make_time_filter(&self, _begin: DateTime<Utc>, _end: DateTime<Utc>) -> Result<Vec<Expr>>;
fn get_min_event_time_column_name(&self) -> Arc<String>;
fn get_max_event_time_column_name(&self) -> Arc<String>;
async fn register_table(&self, ctx: &SessionContext, table: MaterializedView) -> Result<()> {
let view_set_name = self.get_view_set_name().to_string();
ctx.register_table(
TableReference::Bare {
table: view_set_name.into(),
},
Arc::new(table),
)?;
Ok(())
}
async fn merge_partitions(
&self,
runtime: Arc<RuntimeEnv>,
lake: Arc<DataLakeConnection>,
partitions_to_merge: Arc<Vec<Partition>>,
partitions_all_views: Arc<PartitionCache>,
) -> Result<SendableRecordBatchStream> {
let merge_query = Arc::new(String::from("SELECT * FROM source;"));
let empty_view_factory = Arc::new(ViewFactory::new(vec![]));
let merger = QueryMerger::new(
runtime,
empty_view_factory,
self.get_file_schema(),
merge_query,
);
merger
.execute_merge_query(lake, partitions_to_merge, partitions_all_views)
.await
}
fn get_update_group(&self) -> Option<i32>;
fn get_max_partition_time_delta(&self, _strategy: &PartitionCreationStrategy) -> TimeDelta {
TimeDelta::days(1)
}
}
impl dyn View {
pub fn get_meta(&self) -> ViewMetadata {
ViewMetadata {
view_set_name: self.get_view_set_name(),
view_instance_id: self.get_view_instance_id(),
file_schema_hash: self.get_file_schema_hash(),
}
}
}