use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use datafusion::arrow::array::{Int32Array, Int64Array, RecordBatch, TimestampNanosecondArray};
use micromegas_telemetry::{
property::Property, stream_info::StreamInfo, types::block::BlockMetadata,
};
use micromegas_tracing::prelude::*;
use micromegas_transit::{UserDefinedType, uuid_utils::parse_optional_uuid};
use sqlx::Row;
use std::sync::Arc;
use uuid::Uuid;
use crate::{
arrow_properties::serialize_properties_to_jsonb,
dfext::{string_column_accessor::string_column_by_name, typed_column::typed_column_by_name},
lakehouse::{
lakehouse_context::LakehouseContext, partition_cache::LivePartitionProvider,
query::make_session_context, session_configurator::NoOpSessionConfigurator,
view_factory::ViewFactory,
},
properties::properties_column_accessor::properties_column_by_name,
time::TimeRange,
};
pub type SharedJsonbSerialized = Arc<Vec<u8>>;
#[derive(Debug, Clone)]
pub struct ProcessMetadata {
pub process_id: uuid::Uuid,
pub exe: String,
pub username: String,
pub realname: String,
pub computer: String,
pub distro: String,
pub cpu_brand: String,
pub tsc_frequency: i64,
pub start_time: chrono::DateTime<chrono::Utc>,
pub start_ticks: i64,
pub parent_process_id: Option<uuid::Uuid>,
pub properties: SharedJsonbSerialized,
}
#[derive(Debug, Clone)]
pub struct StreamMetadata {
pub process_id: Uuid,
pub stream_id: Uuid,
pub dependencies_metadata: Vec<UserDefinedType>,
pub objects_metadata: Vec<UserDefinedType>,
pub tags: Vec<String>,
pub properties: SharedJsonbSerialized,
}
impl StreamMetadata {
pub fn from_stream_info(stream_info: &StreamInfo) -> Result<Self> {
let properties = serialize_properties_to_jsonb(&stream_info.properties)
.with_context(|| "serializing stream properties to JSONB")?;
Ok(Self {
process_id: stream_info.process_id,
stream_id: stream_info.stream_id,
dependencies_metadata: stream_info.dependencies_metadata.clone(),
objects_metadata: stream_info.objects_metadata.clone(),
tags: stream_info.tags.clone(),
properties: Arc::new(properties),
})
}
}
pub fn get_thread_name_from_stream_metadata(stream: &StreamMetadata) -> Result<String> {
use jsonb::RawJsonb;
const THREAD_NAME_KEY: &str = "thread-name";
const THREAD_ID_KEY: &str = "thread-id";
if stream.properties.is_empty() {
return Ok(format!("{}", &stream.stream_id));
}
let jsonb = RawJsonb::new(&stream.properties);
let thread_name = if let Ok(Some(thread_name_value)) = jsonb.get_by_name(THREAD_NAME_KEY, false)
&& let Ok(Some(name)) = thread_name_value.as_raw().as_str()
{
Some(name.to_string())
} else {
None
};
let thread_id = if let Ok(Some(thread_id_value)) = jsonb.get_by_name(THREAD_ID_KEY, false)
&& let Ok(Some(id)) = thread_id_value.as_raw().as_str()
{
id.to_string()
} else {
stream.stream_id.to_string()
};
match thread_name {
Some(name) => Ok(format!("{}-{}", name, thread_id)),
None => Ok(thread_id),
}
}
#[span_fn]
pub fn stream_metadata_from_row(row: &sqlx::postgres::PgRow) -> Result<StreamMetadata> {
let dependencies_metadata_buffer: Vec<u8> = row.try_get("dependencies_metadata")?;
let dependencies_metadata: Vec<UserDefinedType> =
ciborium::from_reader(&dependencies_metadata_buffer[..])
.with_context(|| "decoding dependencies metadata")?;
let objects_metadata_buffer: Vec<u8> = row.try_get("objects_metadata")?;
let objects_metadata: Vec<UserDefinedType> =
ciborium::from_reader(&objects_metadata_buffer[..])
.with_context(|| "decoding objects metadata")?;
let tags: Vec<String> = row.try_get("tags")?;
let properties: Vec<Property> = row.try_get("properties")?;
let properties_map = micromegas_telemetry::property::into_hashmap(properties);
let serialized_properties = serialize_properties_to_jsonb(&properties_map)
.with_context(|| "serializing stream properties to JSONB")?;
Ok(StreamMetadata {
stream_id: row.try_get("stream_id")?,
process_id: row.try_get("process_id")?,
dependencies_metadata,
objects_metadata,
tags,
properties: Arc::new(serialized_properties),
})
}
#[span_fn]
pub async fn find_stream(
pool: &sqlx::Pool<sqlx::Postgres>,
stream_id: sqlx::types::Uuid,
) -> Result<StreamMetadata> {
let row = sqlx::query(
"SELECT stream_id, process_id, dependencies_metadata, objects_metadata, tags, properties
FROM streams
WHERE stream_id = $1
;",
)
.bind(stream_id)
.fetch_one(pool)
.await
.with_context(|| "select from streams")?;
stream_metadata_from_row(&row)
}
#[span_fn]
pub fn process_metadata_from_row(row: &sqlx::postgres::PgRow) -> Result<ProcessMetadata> {
let properties: Vec<Property> = row.try_get("process_properties")?;
let properties_map = micromegas_telemetry::property::into_hashmap(properties);
let serialized_properties = serialize_properties_to_jsonb(&properties_map)
.with_context(|| "serializing process properties to JSONB")?;
Ok(ProcessMetadata {
process_id: row.try_get("process_id")?,
exe: row.try_get("exe")?,
username: row.try_get("username")?,
realname: row.try_get("realname")?,
computer: row.try_get("computer")?,
distro: row.try_get("distro")?,
cpu_brand: row.try_get("cpu_brand")?,
tsc_frequency: row.try_get("tsc_frequency")?,
start_time: row.try_get("start_time")?,
start_ticks: row.try_get("start_ticks")?,
parent_process_id: row.try_get("parent_process_id")?,
properties: Arc::new(serialized_properties),
})
}
#[span_fn]
pub async fn find_process(
pool: &sqlx::Pool<sqlx::Postgres>,
process_id: &sqlx::types::Uuid,
) -> Result<ProcessMetadata> {
let row = sqlx::query(
"SELECT process_id,
exe,
username,
realname,
computer,
distro,
cpu_brand,
tsc_frequency,
start_time,
start_ticks,
parent_process_id,
properties as process_properties
FROM processes
WHERE process_id = $1;",
)
.bind(process_id)
.fetch_one(pool)
.await
.with_context(|| "select from processes")?;
process_metadata_from_row(&row)
}
#[span_fn]
pub async fn find_process_with_latest_timing(
lakehouse: Arc<LakehouseContext>,
view_factory: Arc<ViewFactory>,
process_id: &Uuid,
query_range: Option<TimeRange>,
) -> Result<(ProcessMetadata, i64, DateTime<Utc>)> {
let partition_provider = Arc::new(LivePartitionProvider::new(lakehouse.lake().db_pool.clone()));
let ctx = make_session_context(
lakehouse,
partition_provider,
query_range,
view_factory,
Arc::new(NoOpSessionConfigurator),
)
.await
.with_context(|| "creating DataFusion session context")?;
let sql = format!(
"SELECT process_id, exe, username, realname, computer, distro, cpu_brand,
tsc_frequency, start_time, start_ticks, parent_process_id, properties,
last_block_end_ticks, last_block_end_time
FROM processes
WHERE process_id = '{}'",
process_id
);
let df = ctx
.sql(&sql)
.await
.with_context(|| "executing SQL query for process with timing")?;
let results = df
.collect()
.await
.with_context(|| "collecting results from DataFusion")?;
if results.is_empty() || results[0].num_rows() == 0 {
anyhow::bail!("Process not found");
}
let batch = &results[0];
let process_id_column = string_column_by_name(batch, "process_id")?;
let exe_column = string_column_by_name(batch, "exe")?;
let username_column = string_column_by_name(batch, "username")?;
let realname_column = string_column_by_name(batch, "realname")?;
let computer_column = string_column_by_name(batch, "computer")?;
let distro_column = string_column_by_name(batch, "distro")?;
let cpu_brand_column = string_column_by_name(batch, "cpu_brand")?;
let tsc_frequency_column: &Int64Array = typed_column_by_name(batch, "tsc_frequency")?;
let start_time_column: &TimestampNanosecondArray = typed_column_by_name(batch, "start_time")?;
let start_ticks_column: &Int64Array = typed_column_by_name(batch, "start_ticks")?;
let last_block_end_ticks_column: &Int64Array =
typed_column_by_name(batch, "last_block_end_ticks")?;
let last_block_end_time_column: &TimestampNanosecondArray =
typed_column_by_name(batch, "last_block_end_time")?;
let parent_process_id_column = string_column_by_name(batch, "parent_process_id")?;
let parent_process_id = if parent_process_id_column.is_null(0) {
None
} else {
parse_optional_uuid(parent_process_id_column.value(0)?)?
};
let properties_accessor = properties_column_by_name(batch, "properties")
.with_context(|| "accessing properties column")?;
let properties_jsonb = Arc::new(
properties_accessor
.jsonb_value(0)
.with_context(|| "extracting JSONB from properties column")?,
);
let process_metadata = ProcessMetadata {
process_id: parse_optional_uuid(process_id_column.value(0)?)?
.ok_or_else(|| anyhow::anyhow!("process_id cannot be empty"))?,
exe: exe_column.value(0)?.to_string(),
username: username_column.value(0)?.to_string(),
realname: realname_column.value(0)?.to_string(),
computer: computer_column.value(0)?.to_string(),
distro: distro_column.value(0)?.to_string(),
cpu_brand: cpu_brand_column.value(0)?.to_string(),
tsc_frequency: tsc_frequency_column.value(0),
start_time: DateTime::from_timestamp_nanos(start_time_column.value(0)),
start_ticks: start_ticks_column.value(0),
parent_process_id,
properties: properties_jsonb,
};
let last_block_end_ticks = last_block_end_ticks_column.value(0);
let last_block_end_time = DateTime::from_timestamp_nanos(last_block_end_time_column.value(0));
Ok((process_metadata, last_block_end_ticks, last_block_end_time))
}
#[span_fn]
pub fn block_from_batch_row(rb: &RecordBatch, row: usize) -> Result<BlockMetadata> {
let block_id_column = string_column_by_name(rb, "block_id")?;
let stream_id_column = string_column_by_name(rb, "stream_id")?;
let process_id_column = string_column_by_name(rb, "process_id")?;
let begin_time_column: &TimestampNanosecondArray = typed_column_by_name(rb, "begin_time")?;
let begin_ticks_column: &Int64Array = typed_column_by_name(rb, "begin_ticks")?;
let end_time_column: &TimestampNanosecondArray = typed_column_by_name(rb, "end_time")?;
let end_ticks_column: &Int64Array = typed_column_by_name(rb, "end_ticks")?;
let nb_objects_column: &Int32Array = typed_column_by_name(rb, "nb_objects")?;
let object_offset_column: &Int64Array = typed_column_by_name(rb, "object_offset")?;
let payload_size_column: &Int64Array = typed_column_by_name(rb, "payload_size")?;
let insert_time_column: &TimestampNanosecondArray = typed_column_by_name(rb, "insert_time")?;
Ok(BlockMetadata {
block_id: Uuid::parse_str(block_id_column.value(row)?)?,
stream_id: Uuid::parse_str(stream_id_column.value(row)?)?,
process_id: Uuid::parse_str(process_id_column.value(row)?)?,
begin_time: DateTime::from_timestamp_nanos(begin_time_column.value(row)),
end_time: DateTime::from_timestamp_nanos(end_time_column.value(row)),
begin_ticks: begin_ticks_column.value(row),
end_ticks: end_ticks_column.value(row),
nb_objects: nb_objects_column.value(row),
object_offset: object_offset_column.value(row),
payload_size: payload_size_column.value(row),
insert_time: DateTime::from_timestamp_nanos(insert_time_column.value(row)),
})
}