use super::{
block_partition_spec::{BlockPartitionSpec, BlockProcessor},
blocks_view::BlocksView,
lakehouse_context::LakehouseContext,
partition_cache::{LivePartitionProvider, QueryPartitionProvider},
partition_source_data::{PartitionSourceBlock, SourceDataBlocksInMemory},
view::{View, ViewMetadata},
};
use crate::{
dfext::typed_column::get_single_row_primitive_value,
lakehouse::{partition_cache::PartitionCache, view::PartitionSpec},
metadata::{ProcessMetadata, StreamMetadata, block_from_batch_row},
time::TimeRange,
};
use crate::{
lakehouse::{partition_source_data::hash_to_object_count, query::query_partitions},
response_writer::ResponseWriter,
};
use anyhow::{Context, Result};
use chrono::DurationRound;
use chrono::{DateTime, TimeDelta, Utc};
use datafusion::arrow::datatypes::{Schema, TimestampNanosecondType};
use micromegas_ingestion::data_lake_connection::DataLakeConnection;
use micromegas_tracing::prelude::*;
use sqlx::Row;
use std::sync::Arc;
pub struct JitPartitionConfig {
pub max_nb_objects: i64,
pub max_insert_time_slice: TimeDelta,
}
impl Default for JitPartitionConfig {
fn default() -> Self {
JitPartitionConfig {
max_nb_objects: 20 * 1024 * 1024,
max_insert_time_slice: TimeDelta::hours(1),
}
}
}
async fn get_insert_time_range(
lakehouse: Arc<LakehouseContext>,
blocks_view: &BlocksView,
query_time_range: &TimeRange,
stream: Arc<StreamMetadata>,
) -> Result<Option<TimeRange>> {
let part_provider = LivePartitionProvider::new(lakehouse.lake().db_pool.clone());
let partitions = part_provider
.fetch(
&blocks_view.get_view_set_name(),
&blocks_view.get_view_instance_id(),
Some(*query_time_range),
blocks_view.get_file_schema_hash(),
)
.await?;
let stream_id = &stream.stream_id;
let begin_range_iso = query_time_range.begin.to_rfc3339();
let end_range_iso = query_time_range.end.to_rfc3339();
let sql = format!(
r#"SELECT MIN(insert_time) as min_insert_time, MAX(insert_time) as max_insert_time
FROM source
WHERE stream_id = '{stream_id}'
AND begin_time <= '{end_range_iso}'
AND end_time >= '{begin_range_iso}';"#
);
let reader_factory = lakehouse.reader_factory().clone();
let rbs = query_partitions(
lakehouse.runtime().clone(),
reader_factory,
lakehouse.lake().blob_storage.inner(),
blocks_view.get_file_schema(),
Arc::new(partitions),
&sql,
)
.await?
.collect()
.await?;
if rbs.is_empty() {
return Ok(None);
}
if rbs[0].num_rows() == 0 {
return Ok(None);
}
let min_insert_time = get_single_row_primitive_value::<TimestampNanosecondType>(&rbs, 0)?;
let max_insert_time = get_single_row_primitive_value::<TimestampNanosecondType>(&rbs, 1)?;
Ok(Some(TimeRange::new(
DateTime::from_timestamp_nanos(min_insert_time),
DateTime::from_timestamp_nanos(max_insert_time),
)))
}
pub async fn generate_stream_jit_partitions_segment(
config: &JitPartitionConfig,
lakehouse: Arc<LakehouseContext>,
blocks_view: &BlocksView,
insert_time_range: &TimeRange,
stream: Arc<StreamMetadata>,
process: Arc<ProcessMetadata>,
) -> Result<Vec<SourceDataBlocksInMemory>> {
let cache = PartitionCache::fetch_overlapping_insert_range_for_view(
&lakehouse.lake().db_pool,
blocks_view.get_view_set_name(),
blocks_view.get_view_instance_id(),
*insert_time_range,
)
.await?;
let partitions = cache.partitions;
let stream_id = &stream.stream_id;
let begin_range_iso = insert_time_range.begin.to_rfc3339();
let end_range_iso = insert_time_range.end.to_rfc3339();
let sql = format!(
r#"SELECT block_id, stream_id, process_id, begin_time, end_time, begin_ticks, end_ticks, nb_objects, object_offset, payload_size, insert_time
FROM source
WHERE stream_id = '{stream_id}'
AND insert_time >= '{begin_range_iso}'
AND insert_time < '{end_range_iso}'
ORDER BY insert_time, block_id;"#
);
let reader_factory = lakehouse.reader_factory().clone();
let rbs = query_partitions(
lakehouse.runtime().clone(),
reader_factory,
lakehouse.lake().blob_storage.inner(),
blocks_view.get_file_schema(),
Arc::new(partitions),
&sql,
)
.await?
.collect()
.await?;
let mut partitions = vec![];
let mut partition_blocks = vec![];
let mut partition_nb_objects: i64 = 0;
for rb in rbs {
for ir in 0..rb.num_rows() {
let block = block_from_batch_row(&rb, ir).with_context(|| "block_from_batch_row")?;
let block_nb_objects = block.nb_objects as i64;
if partition_nb_objects + block_nb_objects > config.max_nb_objects
&& !partition_blocks.is_empty()
{
partitions.push(SourceDataBlocksInMemory {
blocks: partition_blocks,
block_ids_hash: partition_nb_objects.to_le_bytes().to_vec(),
});
partition_blocks = vec![Arc::new(PartitionSourceBlock {
block,
stream: stream.clone(),
process: process.clone(),
})];
partition_nb_objects = block_nb_objects;
} else {
partition_nb_objects += block_nb_objects;
partition_blocks.push(Arc::new(PartitionSourceBlock {
block,
stream: stream.clone(),
process: process.clone(),
}));
}
}
}
if partition_nb_objects != 0 {
partitions.push(SourceDataBlocksInMemory {
blocks: partition_blocks,
block_ids_hash: partition_nb_objects.to_le_bytes().to_vec(),
});
}
Ok(partitions)
}
pub async fn generate_stream_jit_partitions(
config: &JitPartitionConfig,
lakehouse: Arc<LakehouseContext>,
blocks_view: &BlocksView,
query_time_range: &TimeRange,
stream: Arc<StreamMetadata>,
process: Arc<ProcessMetadata>,
) -> Result<Vec<SourceDataBlocksInMemory>> {
let insert_time_range = get_insert_time_range(
lakehouse.clone(),
blocks_view,
query_time_range,
stream.clone(),
)
.await?;
if insert_time_range.is_none() {
return Ok(vec![]);
}
let insert_time_range = insert_time_range.with_context(|| "missing insert_time_range")?;
let insert_time_range = TimeRange::new(
insert_time_range
.begin
.duration_trunc(config.max_insert_time_slice)?,
insert_time_range
.end
.duration_trunc(config.max_insert_time_slice)?
+ config.max_insert_time_slice,
);
let mut begin_segment = insert_time_range.begin;
let mut end_segment = begin_segment + config.max_insert_time_slice;
let mut partitions = vec![];
while end_segment <= insert_time_range.end {
let insert_time_range = TimeRange::new(begin_segment, end_segment);
let mut segment_partitions = generate_stream_jit_partitions_segment(
config,
lakehouse.clone(),
blocks_view,
&insert_time_range,
stream.clone(),
process.clone(),
)
.await?;
partitions.append(&mut segment_partitions);
begin_segment = end_segment;
end_segment = begin_segment + config.max_insert_time_slice;
}
Ok(partitions)
}
pub async fn generate_process_jit_partitions_segment(
config: &JitPartitionConfig,
lakehouse: Arc<LakehouseContext>,
blocks_view: &BlocksView,
insert_time_range: &TimeRange,
process: Arc<ProcessMetadata>,
stream_tag: &str,
) -> Result<Vec<SourceDataBlocksInMemory>> {
let cache = PartitionCache::fetch_overlapping_insert_range_for_view(
&lakehouse.lake().db_pool,
blocks_view.get_view_set_name(),
blocks_view.get_view_instance_id(),
*insert_time_range,
)
.await?;
let partitions = cache.partitions;
let process_id = &process.process_id;
let begin_range_iso = insert_time_range.begin.to_rfc3339();
let end_range_iso = insert_time_range.end.to_rfc3339();
let sql = format!(
r#"SELECT block_id, stream_id, process_id, begin_time, end_time, begin_ticks, end_ticks, nb_objects, object_offset, payload_size, insert_time,
"streams.dependencies_metadata", "streams.objects_metadata", "streams.tags", "streams.properties"
FROM source
WHERE process_id = '{process_id}'
AND array_has( "streams.tags", '{stream_tag}' )
AND insert_time >= '{begin_range_iso}'
AND insert_time < '{end_range_iso}'
ORDER BY insert_time, block_id;"#
);
let reader_factory = lakehouse.reader_factory().clone();
let rbs = query_partitions(
lakehouse.runtime().clone(),
reader_factory,
lakehouse.lake().blob_storage.inner(),
blocks_view.get_file_schema(),
Arc::new(partitions),
&sql,
)
.await?
.collect()
.await?;
let mut partitions = vec![];
let mut partition_blocks = vec![];
let mut partition_nb_objects: i64 = 0;
for rb in rbs {
for ir in 0..rb.num_rows() {
let block = block_from_batch_row(&rb, ir).with_context(|| "block_from_batch_row")?;
let block_nb_objects = block.nb_objects as i64;
use crate::dfext::{
string_column_accessor::string_column_by_name, typed_column::typed_column_by_name,
};
use crate::properties::properties_column_accessor::properties_column_by_name;
use datafusion::arrow::array::{BinaryArray, GenericListArray, StringArray};
use uuid::Uuid;
let stream_id_column = string_column_by_name(&rb, "stream_id")?;
let stream_process_id_column = string_column_by_name(&rb, "process_id")?;
let dependencies_metadata_column: &BinaryArray =
typed_column_by_name(&rb, "streams.dependencies_metadata")?;
let objects_metadata_column: &BinaryArray =
typed_column_by_name(&rb, "streams.objects_metadata")?;
let stream_tags_column: &GenericListArray<i32> =
typed_column_by_name(&rb, "streams.tags")?;
let stream_properties_accessor = properties_column_by_name(&rb, "streams.properties")?;
let stream_id =
Uuid::parse_str(stream_id_column.value(ir)).with_context(|| "parsing stream_id")?;
let stream_process_id = Uuid::parse_str(stream_process_id_column.value(ir))
.with_context(|| "parsing stream process_id")?;
let dependencies_metadata = dependencies_metadata_column.value(ir);
let objects_metadata = objects_metadata_column.value(ir);
let stream_tags = stream_tags_column
.value(ir)
.as_any()
.downcast_ref::<StringArray>()
.with_context(|| "casting stream_tags")?
.iter()
.map(|item| String::from(item.unwrap_or_default()))
.collect();
let stream_properties_jsonb = stream_properties_accessor.jsonb_value(ir)?;
let stream = Arc::new(StreamMetadata {
stream_id,
process_id: stream_process_id,
dependencies_metadata: ciborium::from_reader(dependencies_metadata)
.with_context(|| "decoding dependencies_metadata")?,
objects_metadata: ciborium::from_reader(objects_metadata)
.with_context(|| "decoding objects_metadata")?,
tags: stream_tags,
properties: Arc::new(stream_properties_jsonb),
});
if partition_nb_objects + block_nb_objects > config.max_nb_objects
&& !partition_blocks.is_empty()
{
partitions.push(SourceDataBlocksInMemory {
blocks: partition_blocks,
block_ids_hash: partition_nb_objects.to_le_bytes().to_vec(),
});
partition_blocks = vec![Arc::new(PartitionSourceBlock {
block,
stream: stream.clone(),
process: process.clone(),
})];
partition_nb_objects = block_nb_objects;
} else {
partition_nb_objects += block_nb_objects;
partition_blocks.push(Arc::new(PartitionSourceBlock {
block,
stream: stream.clone(),
process: process.clone(),
}));
}
}
}
if partition_nb_objects != 0 {
partitions.push(SourceDataBlocksInMemory {
blocks: partition_blocks,
block_ids_hash: partition_nb_objects.to_le_bytes().to_vec(),
});
}
Ok(partitions)
}
pub async fn generate_process_jit_partitions(
config: &JitPartitionConfig,
lakehouse: Arc<LakehouseContext>,
blocks_view: &BlocksView,
query_time_range: &TimeRange,
process: Arc<ProcessMetadata>,
stream_tag: &str,
) -> Result<Vec<SourceDataBlocksInMemory>> {
let part_provider = LivePartitionProvider::new(lakehouse.lake().db_pool.clone());
let partitions = part_provider
.fetch(
&blocks_view.get_view_set_name(),
&blocks_view.get_view_instance_id(),
Some(*query_time_range),
blocks_view.get_file_schema_hash(),
)
.await?;
let process_id = &process.process_id;
let begin_range_iso = query_time_range.begin.to_rfc3339();
let end_range_iso = query_time_range.end.to_rfc3339();
let sql = format!(
r#"SELECT MIN(insert_time) as min_insert_time, MAX(insert_time) as max_insert_time
FROM source
WHERE process_id = '{process_id}'
AND array_has( "streams.tags", '{stream_tag}' )
AND begin_time <= '{end_range_iso}'
AND end_time >= '{begin_range_iso}';"#
);
let reader_factory = lakehouse.reader_factory().clone();
let rbs = query_partitions(
lakehouse.runtime().clone(),
reader_factory,
lakehouse.lake().blob_storage.inner(),
blocks_view.get_file_schema(),
Arc::new(partitions),
&sql,
)
.await?
.collect()
.await?;
if rbs.is_empty() || rbs[0].num_rows() == 0 {
return Ok(vec![]);
}
let min_insert_time = get_single_row_primitive_value::<TimestampNanosecondType>(&rbs, 0)?;
let max_insert_time = get_single_row_primitive_value::<TimestampNanosecondType>(&rbs, 1)?;
if min_insert_time == 0 || max_insert_time == 0 {
return Ok(vec![]);
}
let insert_time_range = TimeRange::new(
DateTime::from_timestamp_nanos(min_insert_time)
.duration_trunc(config.max_insert_time_slice)?,
DateTime::from_timestamp_nanos(max_insert_time)
.duration_trunc(config.max_insert_time_slice)?
+ config.max_insert_time_slice,
);
let mut begin_segment = insert_time_range.begin;
let mut end_segment = begin_segment + config.max_insert_time_slice;
let mut partitions = vec![];
while end_segment <= insert_time_range.end {
let insert_time_range = TimeRange::new(begin_segment, end_segment);
let mut segment_partitions = generate_process_jit_partitions_segment(
config,
lakehouse.clone(),
blocks_view,
&insert_time_range,
process.clone(),
stream_tag,
)
.await?;
partitions.append(&mut segment_partitions);
begin_segment = end_segment;
end_segment = begin_segment + config.max_insert_time_slice;
}
Ok(partitions)
}
pub async fn is_jit_partition_up_to_date(
pool: &sqlx::PgPool,
view_meta: ViewMetadata,
spec: &SourceDataBlocksInMemory,
) -> Result<bool> {
let (min_insert_time, max_insert_time) =
get_part_insert_time_range(spec).with_context(|| "get_event_time_range")?;
let desc = format!(
"[{}, {}] {} {}",
min_insert_time.to_rfc3339(),
max_insert_time.to_rfc3339(),
&*view_meta.view_set_name,
&*view_meta.view_instance_id,
);
let rows = if min_insert_time == max_insert_time {
sqlx::query(
"SELECT file_schema_hash, source_data_hash
FROM lakehouse_partitions
WHERE view_set_name = $1
AND view_instance_id = $2
AND begin_insert_time = $3
AND end_insert_time = $3
;",
)
.bind(&*view_meta.view_set_name)
.bind(&*view_meta.view_instance_id)
.bind(min_insert_time)
} else {
sqlx::query(
"SELECT file_schema_hash, source_data_hash
FROM lakehouse_partitions
WHERE view_set_name = $1
AND view_instance_id = $2
AND begin_insert_time <= $3
AND end_insert_time >= $4
;",
)
.bind(&*view_meta.view_set_name)
.bind(&*view_meta.view_instance_id)
.bind(max_insert_time)
.bind(min_insert_time)
}
.fetch_all(pool)
.await
.with_context(|| "fetching matching partitions")?;
if rows.len() != 1 {
debug!("{desc}: found {} partitions (expected 1)", rows.len());
for (i, row) in rows.iter().enumerate() {
let part_file_schema: Vec<u8> = row.try_get("file_schema_hash")?;
let part_source_data: Vec<u8> = row.try_get("source_data_hash")?;
let source_row_count = hash_to_object_count(&part_source_data)?;
debug!(
"{desc}: partition {}: file_schema_hash={:?}, source_rows={}",
i, part_file_schema, source_row_count
);
}
info!("{desc}: found {} partitions", rows.len());
return Ok(false);
}
let r = &rows[0];
let part_file_schema: Vec<u8> = r.try_get("file_schema_hash")?;
if part_file_schema != view_meta.file_schema_hash {
warn!("{desc}: found matching partition with different file schema");
return Ok(false);
}
let part_source_data: Vec<u8> = r.try_get("source_data_hash")?;
let existing_count = hash_to_object_count(&part_source_data)?;
let required_count = hash_to_object_count(&spec.block_ids_hash)?;
if existing_count < required_count {
info!("{desc}: existing partition lacks source data: creating a new partition");
return Ok(false);
}
info!("{desc}: partition up to date");
Ok(true)
}
fn get_part_insert_time_range(
spec: &SourceDataBlocksInMemory,
) -> Result<(DateTime<Utc>, DateTime<Utc>)> {
if spec.blocks.is_empty() {
anyhow::bail!("empty partition should not exist");
}
let min_insert_time = spec.blocks[0].block.insert_time;
let max_insert_time = spec.blocks[spec.blocks.len() - 1].block.insert_time;
Ok((min_insert_time, max_insert_time))
}
pub async fn write_partition_from_blocks(
lake: Arc<DataLakeConnection>,
view_metadata: ViewMetadata,
schema: Arc<Schema>,
source_data: SourceDataBlocksInMemory,
block_processor: Arc<dyn BlockProcessor>,
) -> Result<()> {
if source_data.blocks.is_empty() {
anyhow::bail!("empty partition spec");
}
let min_insert_time = source_data.blocks[0].block.insert_time;
let max_insert_time = source_data.blocks[source_data.blocks.len() - 1]
.block
.insert_time;
let block_spec = BlockPartitionSpec {
view_metadata,
schema,
insert_range: TimeRange::new(min_insert_time, max_insert_time),
source_data: Arc::new(source_data),
block_processor,
};
let null_response_writer = Arc::new(ResponseWriter::new(None));
block_spec
.write(lake, null_response_writer)
.await
.with_context(|| "block_spec.write")?;
Ok(())
}