use super::{
blocks_view::BlocksView,
dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
jit_partitions::{
JitPartitionConfig, generate_stream_jit_partitions, is_jit_partition_up_to_date,
},
lakehouse_context::LakehouseContext,
partition_cache::PartitionCache,
partition_source_data::{SourceDataBlocksInMemory, hash_to_object_count},
view::{PartitionSpec, View, ViewMetadata},
view_factory::ViewMaker,
};
use crate::{
call_tree::make_call_tree,
lakehouse::write_partition::{PartitionRowSet, write_partition_from_rows},
metadata::{find_process, find_stream},
response_writer::ResponseWriter,
span_table::{SpanRecordBuilder, get_spans_schema},
time::{ConvertTicks, TimeRange, datetime_to_scalar, make_time_converter_from_db},
};
use anyhow::{Context, Result};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion::logical_expr::{BinaryExpr, Expr, Operator};
use datafusion::{arrow::datatypes::Schema, logical_expr::expr_fn::col};
use micromegas_ingestion::data_lake_connection::DataLakeConnection;
use micromegas_telemetry::{blob_storage::BlobStorage, types::block::BlockMetadata};
use micromegas_tracing::prelude::*;
use std::sync::Arc;
use uuid::Uuid;
const VIEW_SET_NAME: &str = "thread_spans";
const SCHEMA_VERSION: u8 = 0;
lazy_static::lazy_static! {
static ref MIN_TIME_COLUMN: Arc<String> = Arc::new( String::from("begin"));
static ref MAX_TIME_COLUMN: Arc<String> = Arc::new( String::from("end"));
}
#[derive(Debug)]
pub struct ThreadSpansViewMaker {}
impl ViewMaker for ThreadSpansViewMaker {
fn make_view(&self, stream_id: &str) -> Result<Arc<dyn View>> {
Ok(Arc::new(ThreadSpansView::new(stream_id)?))
}
fn get_schema_hash(&self) -> Vec<u8> {
vec![SCHEMA_VERSION]
}
fn get_schema(&self) -> Arc<Schema> {
Arc::new(get_spans_schema())
}
}
#[derive(Debug)]
pub struct ThreadSpansView {
view_set_name: Arc<String>,
view_instance_id: Arc<String>,
stream_id: sqlx::types::Uuid,
}
impl ThreadSpansView {
pub fn new(view_instance_id: &str) -> Result<Self> {
if view_instance_id == "global" {
anyhow::bail!("the global view is not implemented for thread spans");
}
Ok(Self {
view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
view_instance_id: Arc::new(String::from(view_instance_id)),
stream_id: Uuid::parse_str(view_instance_id).with_context(|| "Uuid::parse_str")?,
})
}
}
async fn append_call_tree(
record_builder: &mut SpanRecordBuilder,
convert_ticks: &ConvertTicks,
blocks: &[BlockMetadata],
blob_storage: Arc<BlobStorage>,
stream: &crate::metadata::StreamMetadata,
) -> Result<()> {
let call_tree = make_call_tree(
blocks,
convert_ticks.delta_ticks_to_ns(blocks[0].begin_ticks),
convert_ticks.delta_ticks_to_ns(blocks[blocks.len() - 1].end_ticks),
None,
blob_storage,
convert_ticks.clone(),
stream,
)
.await
.with_context(|| "make_call_tree")?;
record_builder
.append_call_tree(&call_tree)
.with_context(|| "adding call tree to span record builder")?;
Ok(())
}
async fn write_partition(
lake: Arc<DataLakeConnection>,
view_meta: ViewMetadata,
schema: Arc<Schema>,
convert_ticks: &ConvertTicks,
spec: &SourceDataBlocksInMemory,
) -> Result<()> {
let nb_events = hash_to_object_count(&spec.block_ids_hash)? as usize;
info!("nb_events: {nb_events}");
let mut record_builder = SpanRecordBuilder::with_capacity(nb_events / 2);
let mut blocks_to_process = vec![];
let mut last_end = None;
if spec.blocks.is_empty() {
anyhow::bail!("empty partition spec");
}
let min_insert_time = spec.blocks[0].block.insert_time;
let max_insert_time = spec.blocks[spec.blocks.len() - 1].block.insert_time;
let (tx, rx) = tokio::sync::mpsc::channel(1);
let null_response_writer = Arc::new(ResponseWriter::new(None));
let join_handle = spawn_with_context(write_partition_from_rows(
lake.clone(),
view_meta,
schema,
TimeRange::new(min_insert_time, max_insert_time),
spec.block_ids_hash.clone(),
rx,
null_response_writer,
));
for block in &spec.blocks {
if block.block.begin_ticks == last_end.unwrap_or(block.block.begin_ticks) {
last_end = Some(block.block.end_ticks);
blocks_to_process.push(block.block.clone());
} else {
append_call_tree(
&mut record_builder,
convert_ticks,
&blocks_to_process,
lake.blob_storage.clone(),
&block.stream,
)
.await?;
last_end = Some(block.block.end_ticks);
blocks_to_process = vec![block.block.clone()];
}
}
if !blocks_to_process.is_empty() {
append_call_tree(
&mut record_builder,
convert_ticks,
&blocks_to_process,
lake.blob_storage.clone(),
&spec.blocks[0].stream,
)
.await?;
}
let min_time_row = convert_ticks.delta_ticks_to_time(spec.blocks[0].block.begin_ticks);
let max_time_row =
convert_ticks.delta_ticks_to_time(spec.blocks[spec.blocks.len() - 1].block.end_ticks);
let rows = record_builder
.finish()
.with_context(|| "record_builder.finish()")?;
info!("writing {} rows", rows.num_rows());
tx.send(PartitionRowSet {
rows_time_range: TimeRange::new(min_time_row, max_time_row),
rows,
})
.await?;
drop(tx);
join_handle.await??;
Ok(())
}
async fn update_partition(
lake: Arc<DataLakeConnection>,
view_meta: ViewMetadata,
schema: Arc<Schema>,
convert_ticks: &ConvertTicks,
spec: &SourceDataBlocksInMemory,
) -> Result<()> {
if is_jit_partition_up_to_date(&lake.db_pool, view_meta.clone(), spec).await? {
return Ok(());
}
write_partition(lake, view_meta, schema, convert_ticks, spec)
.await
.with_context(|| "write_partition")?;
Ok(())
}
#[async_trait]
impl View for ThreadSpansView {
fn get_view_set_name(&self) -> Arc<String> {
self.view_set_name.clone()
}
fn get_view_instance_id(&self) -> Arc<String> {
self.view_instance_id.clone()
}
async fn make_batch_partition_spec(
&self,
_lakehouse: Arc<LakehouseContext>,
_existing_partitions: Arc<PartitionCache>,
_insert_range: TimeRange,
) -> Result<Arc<dyn PartitionSpec>> {
anyhow::bail!("not implemented")
}
fn get_file_schema_hash(&self) -> Vec<u8> {
vec![SCHEMA_VERSION]
}
fn get_file_schema(&self) -> Arc<Schema> {
Arc::new(get_spans_schema())
}
async fn jit_update(
&self,
lakehouse: Arc<LakehouseContext>,
query_range: Option<TimeRange>,
) -> Result<()> {
if query_range.is_none() {
anyhow::bail!("query range mandatory for thread spans view");
}
let query_range = query_range.unwrap();
let stream = Arc::new(
find_stream(&lakehouse.lake().db_pool, self.stream_id)
.await
.with_context(|| "find_stream")?,
);
let process = Arc::new(
find_process(&lakehouse.lake().db_pool, &stream.process_id)
.await
.with_context(|| "find_process")?,
);
let convert_ticks =
make_time_converter_from_db(&lakehouse.lake().db_pool, &process).await?;
let blocks_view = BlocksView::new()?;
let partitions = generate_stream_jit_partitions(
&JitPartitionConfig::default(),
lakehouse.clone(),
&blocks_view,
&query_range,
stream.clone(),
process.clone(),
)
.await
.with_context(|| "generate_stream_jit_partitions")?;
for part in &partitions {
update_partition(
lakehouse.lake().clone(),
ViewMetadata {
view_set_name: self.get_view_set_name(),
view_instance_id: self.get_view_instance_id(),
file_schema_hash: self.get_file_schema_hash(),
},
self.get_file_schema(),
&convert_ticks,
part,
)
.await
.with_context(|| "update_partition")?;
}
Ok(())
}
fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
Ok(vec![
Expr::BinaryExpr(BinaryExpr::new(
col("begin").into(),
Operator::LtEq,
Expr::Literal(datetime_to_scalar(end), None).into(),
)),
Expr::BinaryExpr(BinaryExpr::new(
col("end").into(),
Operator::GtEq,
Expr::Literal(datetime_to_scalar(begin), None).into(),
)),
])
}
fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
Arc::new(NamedColumnsTimeBounds::new(
MIN_TIME_COLUMN.clone(),
MAX_TIME_COLUMN.clone(),
))
}
fn get_update_group(&self) -> Option<i32> {
None
}
}