use super::batch_update::materialize_partition_range;
use super::lakehouse_context::LakehouseContext;
use super::partition_cache::PartitionCache;
use super::view_factory::ViewFactory;
use crate::dfext::expressions::exp_to_i64;
use crate::dfext::expressions::exp_to_string;
use crate::dfext::expressions::exp_to_timestamp;
use crate::dfext::log_stream_table_provider::LogStreamTableProvider;
use crate::dfext::task_log_exec_plan::TaskLogExecPlan;
use crate::response_writer::LogSender;
use crate::response_writer::Logger;
use crate::time::TimeRange;
use anyhow::Context;
use chrono::TimeDelta;
use datafusion::catalog::TableFunctionImpl;
use datafusion::catalog::TableProvider;
use datafusion::common::plan_err;
use datafusion::prelude::Expr;
use micromegas_tracing::prelude::*;
use std::sync::Arc;
#[derive(Debug)]
pub struct MaterializePartitionsTableFunction {
lakehouse: Arc<LakehouseContext>,
view_factory: Arc<ViewFactory>,
}
impl MaterializePartitionsTableFunction {
pub fn new(lakehouse: Arc<LakehouseContext>, view_factory: Arc<ViewFactory>) -> Self {
Self {
lakehouse,
view_factory,
}
}
}
#[span_fn]
async fn materialize_partitions_impl(
lakehouse: Arc<LakehouseContext>,
view_factory: Arc<ViewFactory>,
view_name: &str,
insert_range: TimeRange,
partition_time_delta: TimeDelta,
logger: Arc<dyn Logger>,
) -> anyhow::Result<()> {
let view = view_factory
.get_global_view(view_name)
.with_context(|| format!("can't find view {view_name}"))?;
let existing_partitions_all_views = Arc::new(
PartitionCache::fetch_overlapping_insert_range(&lakehouse.lake().db_pool, insert_range)
.await?,
);
materialize_partition_range(
existing_partitions_all_views,
lakehouse,
view,
insert_range,
partition_time_delta,
logger,
)
.await?;
Ok(())
}
impl TableFunctionImpl for MaterializePartitionsTableFunction {
fn call(&self, args: &[Expr]) -> datafusion::error::Result<Arc<dyn TableProvider>> {
let Some(view_set_name) = args.first().map(exp_to_string).transpose()? else {
return plan_err!("Missing first argument, expected view_set_name: String");
};
let Some(begin) = args.get(1).map(exp_to_timestamp).transpose()? else {
return plan_err!("Missing 3rd argument, expected a UTC nanoseconds timestamp");
};
let Some(end) = args.get(2).map(exp_to_timestamp).transpose()? else {
return plan_err!("Missing 4th argument, expected a UTC nanoseconds timestamp");
};
let Some(delta) = args.get(3).map(exp_to_i64).transpose()? else {
return plan_err!("Missing 5th argument, expected a number of seconds(i64)");
};
let lakehouse = self.lakehouse.clone();
let view_factory = self.view_factory.clone();
let spawner = move || {
let (tx, rx) = tokio::sync::mpsc::channel(100);
let logger = Arc::new(LogSender::new(tx));
spawn_with_context(async move {
if let Err(e) = materialize_partitions_impl(
lakehouse,
view_factory,
&view_set_name,
TimeRange::new(begin, end),
TimeDelta::seconds(delta),
logger.clone(),
)
.await
.with_context(|| "materialize_partitions_impl")
{
let msg = format!("{e:?}");
let _ = logger.write_log_entry(msg.clone()).await;
error!("{msg}");
}
});
rx
};
Ok(Arc::new(LogStreamTableProvider {
log_stream: Arc::new(TaskLogExecPlan::new(Box::new(spawner))),
}))
}
}