micromegas_analytics/lakehouse/
merge.rs1use super::{
2    partition::Partition,
3    partition_cache::PartitionCache,
4    partition_source_data::hash_to_object_count,
5    partitioned_table_provider::PartitionedTableProvider,
6    query::make_session_context,
7    session_configurator::SessionConfigurator,
8    view::View,
9    view_factory::ViewFactory,
10    write_partition::{PartitionRowSet, write_partition_from_rows},
11};
12use crate::{response_writer::Logger, time::TimeRange};
13use anyhow::{Context, Result};
14use async_trait::async_trait;
15use datafusion::{
16    arrow::datatypes::Schema,
17    execution::{SendableRecordBatchStream, runtime_env::RuntimeEnv},
18    prelude::*,
19    sql::TableReference,
20};
21use futures::stream::StreamExt;
22use micromegas_ingestion::data_lake_connection::DataLakeConnection;
23use micromegas_tracing::{error, warn};
24use std::fmt::Debug;
25use std::sync::Arc;
26use xxhash_rust::xxh32::xxh32;
27
28#[async_trait]
30pub trait PartitionMerger: Send + Sync + Debug {
31    async fn execute_merge_query(
33        &self,
34        lake: Arc<DataLakeConnection>,
35        partitions_to_merge: Arc<Vec<Partition>>,
36        partitions_all_views: Arc<PartitionCache>,
37    ) -> Result<SendableRecordBatchStream>;
38}
39
40#[derive(Debug)]
42pub struct QueryMerger {
43    runtime: Arc<RuntimeEnv>,
44    view_factory: Arc<ViewFactory>,
45    session_configurator: Arc<dyn SessionConfigurator>,
46    file_schema: Arc<Schema>,
47    query: Arc<String>,
48}
49
50impl QueryMerger {
51    pub fn new(
52        runtime: Arc<RuntimeEnv>,
53        view_factory: Arc<ViewFactory>,
54        session_configurator: Arc<dyn SessionConfigurator>,
55        file_schema: Arc<Schema>,
56        query: Arc<String>,
57    ) -> Self {
58        Self {
59            runtime,
60            view_factory,
61            session_configurator,
62            file_schema,
63            query,
64        }
65    }
66}
67
68#[async_trait]
69impl PartitionMerger for QueryMerger {
70    async fn execute_merge_query(
71        &self,
72        lake: Arc<DataLakeConnection>,
73        partitions_to_merge: Arc<Vec<Partition>>,
74        partitions_all_views: Arc<PartitionCache>,
75    ) -> Result<SendableRecordBatchStream> {
76        let ctx = make_session_context(
77            self.runtime.clone(),
78            lake.clone(),
79            partitions_all_views,
80            None,
81            self.view_factory.clone(),
82            self.session_configurator.clone(),
83        )
84        .await?;
85        let src_table = PartitionedTableProvider::new(
86            self.file_schema.clone(),
87            lake.blob_storage.inner(),
88            partitions_to_merge,
89            lake.db_pool.clone(),
90        );
91        ctx.register_table(
92            TableReference::Bare {
93                table: "source".into(),
94            },
95            Arc::new(src_table),
96        )?;
97
98        ctx.sql(&self.query)
99            .await?
100            .execute_stream()
101            .await
102            .with_context(|| "merged_df.execute_stream")
103    }
104}
105
106fn partition_set_stats(
107    view: Arc<dyn View>,
108    filtered_partitions: &[Partition],
109) -> Result<(i64, i64)> {
110    let mut sum_size: i64 = 0;
111    let mut source_hash: i64 = 0;
112    let latest_file_schema_hash = view.get_file_schema_hash();
113    for p in filtered_partitions {
114        source_hash = if p.source_data_hash.len() == std::mem::size_of::<i64>() {
117            source_hash + hash_to_object_count(&p.source_data_hash)?
118        } else {
119            xxh32(&p.source_data_hash, source_hash as u32).into()
121        };
122
123        sum_size += p.file_size;
124
125        if p.view_metadata.file_schema_hash != latest_file_schema_hash {
126            anyhow::bail!(
127                "incompatible file schema with [{},{}]",
128                p.begin_insert_time().to_rfc3339(),
129                p.end_insert_time().to_rfc3339()
130            );
131        }
132    }
133    Ok((sum_size, source_hash))
134}
135
136pub async fn create_merged_partition(
138    partitions_to_merge: Arc<PartitionCache>,
139    partitions_all_views: Arc<PartitionCache>,
140    runtime: Arc<RuntimeEnv>,
141    lake: Arc<DataLakeConnection>,
142    view: Arc<dyn View>,
143    insert_range: TimeRange,
144    logger: Arc<dyn Logger>,
145) -> Result<()> {
146    let view_set_name = &view.get_view_set_name();
147    let view_instance_id = &view.get_view_instance_id();
148    let desc = format!(
149        "[{}, {}] {view_set_name} {view_instance_id}",
150        insert_range.begin.to_rfc3339(),
151        insert_range.end.to_rfc3339()
152    );
153    let mut filtered_partitions = partitions_to_merge
156        .filter_inside_range(view_set_name, view_instance_id, insert_range)
157        .partitions;
158    if filtered_partitions.len() != partitions_to_merge.len() {
159        warn!("partitions_to_merge was not filtered properly");
160    }
161    if filtered_partitions.len() < 2 {
162        logger
163            .write_log_entry(format!("{desc}: not enough partitions to merge"))
164            .await
165            .with_context(|| "writing log")?;
166        return Ok(());
167    }
168    let (sum_size, source_hash) = partition_set_stats(view.clone(), &filtered_partitions)
169        .with_context(|| "partition_set_stats")?;
170    logger
171        .write_log_entry(format!(
172            "{desc}: merging {} partitions sum_size={sum_size}",
173            filtered_partitions.len()
174        ))
175        .await
176        .with_context(|| "write_log_entry")?;
177    filtered_partitions.sort_by_key(|p| p.begin_insert_time());
178    let mut merged_stream = view
179        .merge_partitions(
180            runtime.clone(),
181            lake.clone(),
182            Arc::new(filtered_partitions),
183            partitions_all_views,
184        )
185        .await
186        .with_context(|| "view.merge_partitions")?;
187    let (tx, rx) = tokio::sync::mpsc::channel(1);
188    let view_copy = view.clone();
189    let join_handle = tokio::spawn(async move {
190        let res = write_partition_from_rows(
191            lake.clone(),
192            view_copy.get_meta(),
193            view_copy.get_file_schema(),
194            insert_range,
195            source_hash.to_le_bytes().to_vec(),
196            rx,
197            logger.clone(),
198        )
199        .await;
200        if let Err(e) = &res {
201            error!("{e:?}");
202        }
203        res
204    });
205    let compute_time_bounds = view.get_time_bounds();
206    let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), runtime);
207    while let Some(rb_res) = merged_stream.next().await {
208        let rb = rb_res.with_context(|| "receiving record_batch from stream")?;
209        let event_time_range = compute_time_bounds
210            .get_time_bounds(ctx.read_batch(rb.clone()).with_context(|| "read_batch")?)
211            .await?;
212        tx.send(PartitionRowSet::new(event_time_range, rb))
213            .await
214            .with_context(|| "sending partition row set")?;
215    }
216    drop(tx);
217    join_handle.await??;
218    Ok(())
219}