micromegas-analytics 0.19.0

analytics module of micromegas
Documentation
use super::{
    blocks_view::BlocksView,
    dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
    jit_partitions::{
        JitPartitionConfig, generate_stream_jit_partitions, is_jit_partition_up_to_date,
    },
    lakehouse_context::LakehouseContext,
    partition_cache::PartitionCache,
    partition_source_data::{SourceDataBlocksInMemory, hash_to_object_count},
    view::{PartitionSpec, View, ViewMetadata},
    view_factory::ViewMaker,
};
use crate::{
    call_tree::make_call_tree,
    lakehouse::write_partition::{PartitionRowSet, write_partition_from_rows},
    metadata::{find_process, find_stream},
    response_writer::ResponseWriter,
    span_table::{SpanRecordBuilder, get_spans_schema},
    time::{ConvertTicks, TimeRange, datetime_to_scalar, make_time_converter_from_db},
};
use anyhow::{Context, Result};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion::logical_expr::{BinaryExpr, Expr, Operator};
use datafusion::{arrow::datatypes::Schema, logical_expr::expr_fn::col};
use micromegas_ingestion::data_lake_connection::DataLakeConnection;
use micromegas_telemetry::{blob_storage::BlobStorage, types::block::BlockMetadata};
use micromegas_tracing::prelude::*;
use std::sync::Arc;
use uuid::Uuid;

const VIEW_SET_NAME: &str = "thread_spans";
const SCHEMA_VERSION: u8 = 0;
lazy_static::lazy_static! {
    static ref MIN_TIME_COLUMN: Arc<String> = Arc::new( String::from("begin"));
    static ref MAX_TIME_COLUMN: Arc<String> = Arc::new( String::from("end"));
}

/// A `ViewMaker` for creating `ThreadSpansView` instances.
#[derive(Debug)]
pub struct ThreadSpansViewMaker {}

impl ViewMaker for ThreadSpansViewMaker {
    fn make_view(&self, stream_id: &str) -> Result<Arc<dyn View>> {
        Ok(Arc::new(ThreadSpansView::new(stream_id)?))
    }

    fn get_schema_hash(&self) -> Vec<u8> {
        vec![SCHEMA_VERSION]
    }

    fn get_schema(&self) -> Arc<Schema> {
        Arc::new(get_spans_schema())
    }
}

/// A view of thread spans.
#[derive(Debug)]
pub struct ThreadSpansView {
    view_set_name: Arc<String>,
    view_instance_id: Arc<String>,
    stream_id: sqlx::types::Uuid,
}

impl ThreadSpansView {
    pub fn new(view_instance_id: &str) -> Result<Self> {
        if view_instance_id == "global" {
            anyhow::bail!("the global view is not implemented for thread spans");
        }

        Ok(Self {
            view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
            view_instance_id: Arc::new(String::from(view_instance_id)),
            stream_id: Uuid::parse_str(view_instance_id).with_context(|| "Uuid::parse_str")?,
        })
    }
}

async fn append_call_tree(
    record_builder: &mut SpanRecordBuilder,
    convert_ticks: &ConvertTicks,
    blocks: &[BlockMetadata],
    blob_storage: Arc<BlobStorage>,
    stream: &crate::metadata::StreamMetadata,
) -> Result<()> {
    let call_tree = make_call_tree(
        blocks,
        convert_ticks.delta_ticks_to_ns(blocks[0].begin_ticks),
        convert_ticks.delta_ticks_to_ns(blocks[blocks.len() - 1].end_ticks),
        None,
        blob_storage,
        convert_ticks.clone(),
        stream,
    )
    .await
    .with_context(|| "make_call_tree")?;
    record_builder
        .append_call_tree(&call_tree)
        .with_context(|| "adding call tree to span record builder")?;
    Ok(())
}

/// Writes a partition from a set of blocks.
async fn write_partition(
    lake: Arc<DataLakeConnection>,
    view_meta: ViewMetadata,
    schema: Arc<Schema>,
    convert_ticks: &ConvertTicks,
    spec: &SourceDataBlocksInMemory,
) -> Result<()> {
    let nb_events = hash_to_object_count(&spec.block_ids_hash)? as usize;
    info!("nb_events: {nb_events}");
    let mut record_builder = SpanRecordBuilder::with_capacity(nb_events / 2);
    let mut blocks_to_process = vec![];
    let mut last_end = None;
    if spec.blocks.is_empty() {
        anyhow::bail!("empty partition spec");
    }
    // for jit partitions, we assume that the blocks were registered in order
    // since they are built based on begin_ticks, not insert_time
    let min_insert_time = spec.blocks[0].block.insert_time;
    let max_insert_time = spec.blocks[spec.blocks.len() - 1].block.insert_time;

    let (tx, rx) = tokio::sync::mpsc::channel(1);
    let null_response_writer = Arc::new(ResponseWriter::new(None));
    let join_handle = spawn_with_context(write_partition_from_rows(
        lake.clone(),
        view_meta,
        schema,
        TimeRange::new(min_insert_time, max_insert_time),
        spec.block_ids_hash.clone(),
        rx,
        null_response_writer,
    ));

    for block in &spec.blocks {
        if block.block.begin_ticks == last_end.unwrap_or(block.block.begin_ticks) {
            last_end = Some(block.block.end_ticks);
            blocks_to_process.push(block.block.clone());
        } else {
            append_call_tree(
                &mut record_builder,
                convert_ticks,
                &blocks_to_process,
                lake.blob_storage.clone(),
                &block.stream,
            )
            .await?;
            last_end = Some(block.block.end_ticks);
            blocks_to_process = vec![block.block.clone()];
        }
    }
    if !blocks_to_process.is_empty() {
        append_call_tree(
            &mut record_builder,
            convert_ticks,
            &blocks_to_process,
            lake.blob_storage.clone(),
            &spec.blocks[0].stream,
        )
        .await?;
    }
    let min_time_row = convert_ticks.delta_ticks_to_time(spec.blocks[0].block.begin_ticks);
    let max_time_row =
        convert_ticks.delta_ticks_to_time(spec.blocks[spec.blocks.len() - 1].block.end_ticks);
    let rows = record_builder
        .finish()
        .with_context(|| "record_builder.finish()")?;
    info!("writing {} rows", rows.num_rows());
    tx.send(PartitionRowSet {
        rows_time_range: TimeRange::new(min_time_row, max_time_row),
        rows,
    })
    .await?;
    drop(tx);
    join_handle.await??;
    Ok(())
}
/// Rebuilds the partition if it's missing or out of date.
async fn update_partition(
    lake: Arc<DataLakeConnection>,
    view_meta: ViewMetadata,
    schema: Arc<Schema>,
    convert_ticks: &ConvertTicks,
    spec: &SourceDataBlocksInMemory,
) -> Result<()> {
    if is_jit_partition_up_to_date(&lake.db_pool, view_meta.clone(), spec).await? {
        return Ok(());
    }
    write_partition(lake, view_meta, schema, convert_ticks, spec)
        .await
        .with_context(|| "write_partition")?;

    Ok(())
}

#[async_trait]
impl View for ThreadSpansView {
    fn get_view_set_name(&self) -> Arc<String> {
        self.view_set_name.clone()
    }

    fn get_view_instance_id(&self) -> Arc<String> {
        self.view_instance_id.clone()
    }

    async fn make_batch_partition_spec(
        &self,
        _lakehouse: Arc<LakehouseContext>,
        _existing_partitions: Arc<PartitionCache>,
        _insert_range: TimeRange,
    ) -> Result<Arc<dyn PartitionSpec>> {
        anyhow::bail!("not implemented")
    }

    fn get_file_schema_hash(&self) -> Vec<u8> {
        vec![SCHEMA_VERSION]
    }

    fn get_file_schema(&self) -> Arc<Schema> {
        Arc::new(get_spans_schema())
    }

    async fn jit_update(
        &self,
        lakehouse: Arc<LakehouseContext>,
        query_range: Option<TimeRange>,
    ) -> Result<()> {
        if query_range.is_none() {
            anyhow::bail!("query range mandatory for thread spans view");
        }
        let query_range = query_range.unwrap();
        let stream = Arc::new(
            find_stream(&lakehouse.lake().db_pool, self.stream_id)
                .await
                .with_context(|| "find_stream")?,
        );
        let process = Arc::new(
            find_process(&lakehouse.lake().db_pool, &stream.process_id)
                .await
                .with_context(|| "find_process")?,
        );
        let convert_ticks =
            make_time_converter_from_db(&lakehouse.lake().db_pool, &process).await?;
        let blocks_view = BlocksView::new()?;
        let partitions = generate_stream_jit_partitions(
            &JitPartitionConfig::default(),
            lakehouse.clone(),
            &blocks_view,
            &query_range,
            stream.clone(),
            process.clone(),
        )
        .await
        .with_context(|| "generate_stream_jit_partitions")?;
        for part in &partitions {
            update_partition(
                lakehouse.lake().clone(),
                ViewMetadata {
                    view_set_name: self.get_view_set_name(),
                    view_instance_id: self.get_view_instance_id(),
                    file_schema_hash: self.get_file_schema_hash(),
                },
                self.get_file_schema(),
                &convert_ticks,
                part,
            )
            .await
            .with_context(|| "update_partition")?;
        }
        Ok(())
    }

    fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
        Ok(vec![
            Expr::BinaryExpr(BinaryExpr::new(
                col("begin").into(),
                Operator::LtEq,
                Expr::Literal(datetime_to_scalar(end), None).into(),
            )),
            Expr::BinaryExpr(BinaryExpr::new(
                col("end").into(),
                Operator::GtEq,
                Expr::Literal(datetime_to_scalar(begin), None).into(),
            )),
        ])
    }

    fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
        Arc::new(NamedColumnsTimeBounds::new(
            MIN_TIME_COLUMN.clone(),
            MAX_TIME_COLUMN.clone(),
        ))
    }

    fn get_update_group(&self) -> Option<i32> {
        None
    }
}