use super::{
lakehouse_context::LakehouseContext,
partition::Partition,
partition_cache::PartitionCache,
partition_source_data::hash_to_object_count,
partitioned_table_provider::PartitionedTableProvider,
query::make_session_context,
session_configurator::SessionConfigurator,
view::View,
view_factory::ViewFactory,
write_partition::{PartitionRowSet, write_partition_from_rows},
};
use crate::{response_writer::Logger, time::TimeRange};
use anyhow::{Context, Result};
use async_trait::async_trait;
use datafusion::{
arrow::datatypes::Schema, execution::SendableRecordBatchStream, prelude::*, sql::TableReference,
};
use futures::stream::StreamExt;
use micromegas_tracing::prelude::*;
use std::fmt::Debug;
use std::sync::Arc;
use xxhash_rust::xxh32::xxh32;
#[async_trait]
pub trait PartitionMerger: Send + Sync + Debug {
async fn execute_merge_query(
&self,
lakehouse: Arc<LakehouseContext>,
partitions_to_merge: Arc<Vec<Partition>>,
partitions_all_views: Arc<PartitionCache>,
insert_range: TimeRange,
) -> Result<SendableRecordBatchStream>;
}
#[derive(Debug)]
pub struct QueryMerger {
view_factory: Arc<ViewFactory>,
session_configurator: Arc<dyn SessionConfigurator>,
file_schema: Arc<Schema>,
query: Arc<String>,
}
impl QueryMerger {
pub fn new(
view_factory: Arc<ViewFactory>,
session_configurator: Arc<dyn SessionConfigurator>,
file_schema: Arc<Schema>,
query: Arc<String>,
) -> Self {
Self {
view_factory,
session_configurator,
file_schema,
query,
}
}
}
#[async_trait]
impl PartitionMerger for QueryMerger {
async fn execute_merge_query(
&self,
lakehouse: Arc<LakehouseContext>,
partitions_to_merge: Arc<Vec<Partition>>,
partitions_all_views: Arc<PartitionCache>,
insert_range: TimeRange,
) -> Result<SendableRecordBatchStream> {
let reader_factory = lakehouse.reader_factory().clone();
let ctx = make_session_context(
lakehouse.clone(),
partitions_all_views,
Some(insert_range),
self.view_factory.clone(),
self.session_configurator.clone(),
)
.await?;
let src_table = PartitionedTableProvider::new(
self.file_schema.clone(),
reader_factory,
partitions_to_merge,
);
ctx.register_table(
TableReference::Bare {
table: "source".into(),
},
Arc::new(src_table),
)?;
ctx.sql(&self.query)
.await?
.execute_stream()
.await
.with_context(|| "merged_df.execute_stream")
}
}
fn partition_set_stats(
view: Arc<dyn View>,
filtered_partitions: &[Partition],
) -> Result<(i64, i64)> {
let mut sum_size: i64 = 0;
let mut source_hash: i64 = 0;
let latest_file_schema_hash = view.get_file_schema_hash();
for p in filtered_partitions {
source_hash = if p.source_data_hash.len() == std::mem::size_of::<i64>() {
source_hash + hash_to_object_count(&p.source_data_hash)?
} else {
xxh32(&p.source_data_hash, source_hash as u32).into()
};
sum_size += p.file_size;
if p.view_metadata.file_schema_hash != latest_file_schema_hash {
anyhow::bail!(
"incompatible file schema with [{},{}]",
p.begin_insert_time().to_rfc3339(),
p.end_insert_time().to_rfc3339()
);
}
}
Ok((sum_size, source_hash))
}
pub async fn create_merged_partition(
partitions_to_merge: Arc<PartitionCache>,
partitions_all_views: Arc<PartitionCache>,
lakehouse: Arc<LakehouseContext>,
view: Arc<dyn View>,
insert_range: TimeRange,
logger: Arc<dyn Logger>,
) -> Result<()> {
let view_set_name = &view.get_view_set_name();
let view_instance_id = &view.get_view_instance_id();
let desc = format!(
"[{}, {}] {view_set_name} {view_instance_id}",
insert_range.begin.to_rfc3339(),
insert_range.end.to_rfc3339()
);
let mut filtered_partitions = partitions_to_merge
.filter_inside_range(view_set_name, view_instance_id, insert_range)
.partitions;
if filtered_partitions.len() != partitions_to_merge.len() {
warn!("partitions_to_merge was not filtered properly");
}
if filtered_partitions.len() < 2 {
logger
.write_log_entry(format!("{desc}: not enough partitions to merge"))
.await
.with_context(|| "writing log")?;
return Ok(());
}
let (sum_size, source_hash) = partition_set_stats(view.clone(), &filtered_partitions)
.with_context(|| "partition_set_stats")?;
logger
.write_log_entry(format!(
"{desc}: merging {} partitions sum_size={sum_size}",
filtered_partitions.len()
))
.await
.with_context(|| "write_log_entry")?;
filtered_partitions.sort_by_key(|p| p.begin_insert_time());
let mut merged_stream = view
.merge_partitions(
lakehouse.clone(),
Arc::new(filtered_partitions),
partitions_all_views,
insert_range,
)
.await
.with_context(|| "view.merge_partitions")?;
let (tx, rx) = tokio::sync::mpsc::channel(1);
let view_copy = view.clone();
let lake = lakehouse.lake().clone();
let join_handle = spawn_with_context(async move {
let res = write_partition_from_rows(
lake,
view_copy.get_meta(),
view_copy.get_file_schema(),
insert_range,
source_hash.to_le_bytes().to_vec(),
rx,
logger.clone(),
)
.await;
if let Err(e) = &res {
error!("{e:?}");
}
res
});
let compute_time_bounds = view.get_time_bounds();
let ctx =
SessionContext::new_with_config_rt(SessionConfig::default(), lakehouse.runtime().clone());
while let Some(rb_res) = merged_stream.next().await {
let rb = rb_res.with_context(|| "receiving record_batch from stream")?;
let event_time_range = compute_time_bounds
.get_time_bounds(ctx.read_batch(rb.clone()).with_context(|| "read_batch")?)
.await?;
tx.send(PartitionRowSet::new(event_time_range, rb))
.await
.with_context(|| "sending partition row set")?;
}
drop(tx);
join_handle.await??;
Ok(())
}