use super::{
batch_update::PartitionCreationStrategy, materialized_view::MaterializedView,
partition_cache::PartitionCache,
};
use crate::{response_writer::Logger, time::TimeRange};
use anyhow::Result;
use async_trait::async_trait;
use chrono::{DateTime, TimeDelta, Utc};
use datafusion::{arrow::datatypes::Schema, logical_expr::Expr, prelude::*, sql::TableReference};
use micromegas_ingestion::data_lake_connection::DataLakeConnection;
use std::sync::Arc;
#[async_trait]
pub trait PartitionSpec: Send + Sync {
fn is_empty(&self) -> bool;
fn get_source_data_hash(&self) -> Vec<u8>;
async fn write(&self, lake: Arc<DataLakeConnection>, logger: Arc<dyn Logger>) -> Result<()>;
}
#[derive(Debug, Clone)]
pub struct ViewMetadata {
pub view_set_name: Arc<String>,
pub view_instance_id: Arc<String>,
pub file_schema_hash: Vec<u8>,
}
#[async_trait]
pub trait View: std::fmt::Debug + Send + Sync {
fn get_view_set_name(&self) -> Arc<String>;
fn get_view_instance_id(&self) -> Arc<String>;
async fn make_batch_partition_spec(
&self,
lake: Arc<DataLakeConnection>,
existing_partitions: Arc<PartitionCache>,
begin_insert: DateTime<Utc>,
end_insert: DateTime<Utc>,
) -> Result<Arc<dyn PartitionSpec>>;
fn get_file_schema_hash(&self) -> Vec<u8>;
fn get_file_schema(&self) -> Arc<Schema>;
async fn jit_update(
&self,
lake: Arc<DataLakeConnection>,
query_range: Option<TimeRange>,
) -> Result<()>;
fn make_time_filter(&self, _begin: DateTime<Utc>, _end: DateTime<Utc>) -> Result<Vec<Expr>>;
fn get_min_event_time_column_name(&self) -> Arc<String>;
fn get_max_event_time_column_name(&self) -> Arc<String>;
async fn register_table(&self, ctx: &SessionContext, table: MaterializedView) -> Result<()> {
let view_set_name = self.get_view_set_name().to_string();
ctx.register_table(
TableReference::Bare {
table: view_set_name.into(),
},
Arc::new(table),
)?;
Ok(())
}
fn get_merge_partitions_query(&self) -> Arc<String> {
Arc::new(String::from("SELECT * FROM {source};"))
}
fn get_update_group(&self) -> Option<i32>;
fn get_max_partition_time_delta(&self, _strategy: &PartitionCreationStrategy) -> TimeDelta {
TimeDelta::days(1)
}
}
impl dyn View {
pub fn get_meta(&self) -> ViewMetadata {
ViewMetadata {
view_set_name: self.get_view_set_name(),
view_instance_id: self.get_view_instance_id(),
file_schema_hash: self.get_file_schema_hash(),
}
}
}