use crate::error::TraceEngineError;
use crate::parquet::control::{get_pod_id, ControlTableEngine};
use crate::parquet::tracing::catalog::TraceCatalogProvider;
use crate::parquet::tracing::traits::{arrow_schema_to_delta, resource_attribute_field};
use crate::parquet::utils::match_attr_expr;
use crate::parquet::utils::register_cloud_logstore_factories;
use crate::storage::ObjectStore;
use arrow::array::*;
use arrow::compute;
use arrow::datatypes::*;
use arrow_array::Array;
use arrow_array::RecordBatch;
use chrono::{DateTime, Datelike, Utc};
use datafusion::logical_expr::{cast as df_cast, col, lit, SortExpr};
use datafusion::prelude::*;
use datafusion::scalar::ScalarValue;
use deltalake::operations::optimize::OptimizeType;
use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty};
use scouter_types::sql::{TraceFilters, TraceListItem};
use scouter_types::{Attribute, TraceCursor, TraceId, TracePaginationResponse, TraceSummaryRecord};
use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::sync::{mpsc, RwLock as AsyncRwLock};
use tokio::time::{interval, Duration};
use tracing::{debug, error, info, instrument};
use url::Url;
const UNIX_EPOCH_DAYS: i32 = 719_163;
const SUMMARY_TABLE_NAME: &str = "trace_summaries";
const TASK_SUMMARY_OPTIMIZE: &str = "summary_optimize";
const TRACE_ID_COL: &str = "trace_id";
const SERVICE_NAME_COL: &str = "service_name";
const SCOPE_NAME_COL: &str = "scope_name";
const SCOPE_VERSION_COL: &str = "scope_version";
const ROOT_OPERATION_COL: &str = "root_operation";
const START_TIME_COL: &str = "start_time";
const END_TIME_COL: &str = "end_time";
const DURATION_MS_COL: &str = "duration_ms";
const STATUS_CODE_COL: &str = "status_code";
const STATUS_MESSAGE_COL: &str = "status_message";
const SPAN_COUNT_COL: &str = "span_count";
const ERROR_COUNT_COL: &str = "error_count";
const SEARCH_BLOB_COL: &str = "search_blob";
const RESOURCE_ATTRIBUTES_COL: &str = "resource_attributes";
const ENTITY_IDS_COL: &str = "entity_ids";
const QUEUE_IDS_COL: &str = "queue_ids";
const PARTITION_DATE_COL: &str = "partition_date";
fn create_summary_schema() -> Schema {
Schema::new(vec![
Field::new(TRACE_ID_COL, DataType::FixedSizeBinary(16), false),
Field::new(
SERVICE_NAME_COL,
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
false,
),
Field::new(SCOPE_NAME_COL, DataType::Utf8, false),
Field::new(SCOPE_VERSION_COL, DataType::Utf8, true),
Field::new(ROOT_OPERATION_COL, DataType::Utf8, false),
Field::new(
START_TIME_COL,
DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
false,
),
Field::new(
END_TIME_COL,
DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
true,
),
Field::new(DURATION_MS_COL, DataType::Int64, true),
Field::new(STATUS_CODE_COL, DataType::Int32, false),
Field::new(STATUS_MESSAGE_COL, DataType::Utf8, true),
Field::new(SPAN_COUNT_COL, DataType::Int64, false),
Field::new(ERROR_COUNT_COL, DataType::Int64, false),
resource_attribute_field(),
Field::new(
ENTITY_IDS_COL,
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
true,
),
Field::new(
QUEUE_IDS_COL,
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
true,
),
Field::new(PARTITION_DATE_COL, DataType::Date32, false),
])
}
struct TraceSummaryBatchBuilder {
schema: Arc<Schema>,
trace_id: FixedSizeBinaryBuilder,
service_name: StringDictionaryBuilder<Int32Type>,
scope_name: StringBuilder,
scope_version: StringBuilder,
root_operation: StringBuilder,
start_time: TimestampMicrosecondBuilder,
end_time: TimestampMicrosecondBuilder,
duration_ms: Int64Builder,
status_code: Int32Builder,
status_message: StringBuilder,
span_count: Int64Builder,
error_count: Int64Builder,
resource_attributes: MapBuilder<StringBuilder, StringViewBuilder>,
entity_ids: ListBuilder<StringBuilder>,
queue_ids: ListBuilder<StringBuilder>,
partition_date: Date32Builder,
}
impl TraceSummaryBatchBuilder {
fn new(schema: Arc<Schema>, capacity: usize) -> Self {
let map_field_names = MapFieldNames {
entry: "key_value".to_string(),
key: "key".to_string(),
value: "value".to_string(),
};
let resource_attributes = MapBuilder::new(
Some(map_field_names),
StringBuilder::new(),
StringViewBuilder::new(),
);
Self {
schema,
trace_id: FixedSizeBinaryBuilder::with_capacity(capacity, 16),
service_name: StringDictionaryBuilder::new(),
scope_name: StringBuilder::with_capacity(capacity, capacity * 16),
scope_version: StringBuilder::with_capacity(capacity, capacity * 8),
root_operation: StringBuilder::with_capacity(capacity, capacity * 32),
start_time: TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone("UTC"),
end_time: TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone("UTC"),
duration_ms: Int64Builder::with_capacity(capacity),
status_code: Int32Builder::with_capacity(capacity),
status_message: StringBuilder::with_capacity(capacity, capacity * 16),
span_count: Int64Builder::with_capacity(capacity),
error_count: Int64Builder::with_capacity(capacity),
resource_attributes,
entity_ids: ListBuilder::new(StringBuilder::new()),
queue_ids: ListBuilder::new(StringBuilder::new()),
partition_date: Date32Builder::with_capacity(capacity),
}
}
fn append(&mut self, rec: &TraceSummaryRecord) -> Result<(), TraceEngineError> {
self.trace_id.append_value(rec.trace_id.as_bytes())?;
self.service_name.append_value(&rec.service_name);
self.scope_name.append_value(&rec.scope_name);
if rec.scope_version.is_empty() {
self.scope_version.append_null();
} else {
self.scope_version.append_value(&rec.scope_version);
}
self.root_operation.append_value(&rec.root_operation);
self.start_time
.append_value(rec.start_time.timestamp_micros());
match rec.end_time {
Some(end) => self.end_time.append_value(end.timestamp_micros()),
None => self.end_time.append_null(),
}
let duration = rec
.end_time
.map(|end| (end - rec.start_time).num_milliseconds().max(0));
match duration {
Some(d) => self.duration_ms.append_value(d),
None => self.duration_ms.append_null(),
}
self.status_code.append_value(rec.status_code);
if rec.status_message.is_empty() {
self.status_message.append_null();
} else {
self.status_message.append_value(&rec.status_message);
}
self.span_count.append_value(rec.span_count);
self.error_count.append_value(rec.error_count);
if rec.resource_attributes.is_empty() {
self.resource_attributes.append(false)?; } else {
for attr in &rec.resource_attributes {
self.resource_attributes.keys().append_value(&attr.key);
let value_str =
serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
self.resource_attributes.values().append_value(value_str);
}
self.resource_attributes.append(true)?;
}
if rec.entity_ids.is_empty() {
self.entity_ids.append_null();
} else {
for id in &rec.entity_ids {
self.entity_ids.values().append_value(id);
}
self.entity_ids.append(true);
}
if rec.queue_ids.is_empty() {
self.queue_ids.append_null();
} else {
for id in &rec.queue_ids {
self.queue_ids.values().append_value(id);
}
self.queue_ids.append(true);
}
let days = rec.start_time.date_naive().num_days_from_ce() - UNIX_EPOCH_DAYS;
self.partition_date.append_value(days);
Ok(())
}
fn finish(mut self) -> Result<RecordBatch, TraceEngineError> {
let columns: Vec<Arc<dyn Array>> = vec![
Arc::new(self.trace_id.finish()),
Arc::new(self.service_name.finish()),
Arc::new(self.scope_name.finish()),
Arc::new(self.scope_version.finish()),
Arc::new(self.root_operation.finish()),
Arc::new(self.start_time.finish()),
Arc::new(self.end_time.finish()),
Arc::new(self.duration_ms.finish()),
Arc::new(self.status_code.finish()),
Arc::new(self.status_message.finish()),
Arc::new(self.span_count.finish()),
Arc::new(self.error_count.finish()),
Arc::new(self.resource_attributes.finish()),
Arc::new(self.entity_ids.finish()),
Arc::new(self.queue_ids.finish()),
Arc::new(self.partition_date.finish()),
];
RecordBatch::try_new(self.schema, columns).map_err(Into::into)
}
}
pub enum SummaryTableCommand {
Write {
records: Vec<TraceSummaryRecord>,
respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
},
Optimize {
respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
},
Vacuum {
retention_hours: u64,
respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
},
Shutdown,
}
async fn build_summary_url(object_store: &ObjectStore) -> Result<Url, TraceEngineError> {
let mut base = object_store.get_base_url()?;
let mut path = base.path().to_string();
if !path.ends_with('/') {
path.push('/');
}
path.push_str(SUMMARY_TABLE_NAME);
base.set_path(&path);
Ok(base)
}
async fn create_summary_table(
object_store: &ObjectStore,
table_url: Url,
schema: SchemaRef,
) -> Result<DeltaTable, TraceEngineError> {
info!(
"Creating trace summary table [{}://.../{} ]",
table_url.scheme(),
table_url
.path_segments()
.and_then(|mut s| s.next_back())
.unwrap_or(SUMMARY_TABLE_NAME)
);
let store = object_store.as_dyn_object_store();
let table = DeltaTableBuilder::from_url(table_url.clone())?
.with_storage_backend(store, table_url)
.build()?;
let delta_fields = arrow_schema_to_delta(&schema);
table
.create()
.with_table_name(SUMMARY_TABLE_NAME)
.with_columns(delta_fields)
.with_partition_columns(vec![PARTITION_DATE_COL.to_string()])
.with_configuration_property(
TableProperty::DataSkippingStatsColumns,
Some("start_time,end_time,service_name,duration_ms,status_code,span_count,error_count,partition_date"),
)
.await
.map_err(Into::into)
}
async fn build_or_create_summary_table(
object_store: &ObjectStore,
schema: SchemaRef,
) -> Result<DeltaTable, TraceEngineError> {
register_cloud_logstore_factories();
let table_url = build_summary_url(object_store).await?;
info!(
"Loading trace summary table [{}://.../{} ]",
table_url.scheme(),
table_url
.path_segments()
.and_then(|mut s| s.next_back())
.unwrap_or(SUMMARY_TABLE_NAME)
);
let is_delta_table = if table_url.scheme() == "file" {
if let Ok(path) = table_url.to_file_path() {
if !path.exists() {
info!("Creating directory for summary table: {:?}", path);
std::fs::create_dir_all(&path)?;
}
path.join("_delta_log").exists()
} else {
false
}
} else {
let store = object_store.as_dyn_object_store();
match DeltaTableBuilder::from_url(table_url.clone()) {
Ok(builder) => builder
.with_storage_backend(store, table_url.clone())
.load()
.await
.is_ok(),
Err(_) => false,
}
};
if is_delta_table {
info!(
"Loaded existing trace summary table [{}://.../{} ]",
table_url.scheme(),
table_url
.path_segments()
.and_then(|mut s| s.next_back())
.unwrap_or(SUMMARY_TABLE_NAME)
);
let store = object_store.as_dyn_object_store();
DeltaTableBuilder::from_url(table_url.clone())?
.with_storage_backend(store, table_url)
.load()
.await
.map_err(Into::into)
} else {
info!("Summary table does not exist, creating new table");
create_summary_table(object_store, table_url, schema).await
}
}
pub struct TraceSummaryDBEngine {
schema: Arc<Schema>,
table: Arc<AsyncRwLock<DeltaTable>>,
pub ctx: Arc<SessionContext>,
catalog: Arc<TraceCatalogProvider>,
control: ControlTableEngine,
}
impl TraceSummaryDBEngine {
pub async fn new(
object_store: &ObjectStore,
ctx: Arc<SessionContext>,
catalog: Arc<TraceCatalogProvider>,
) -> Result<Self, TraceEngineError> {
let schema = Arc::new(create_summary_schema());
let delta_table = build_or_create_summary_table(object_store, schema.clone()).await?;
if let Ok(provider) = delta_table.table_provider().await {
catalog.swap(SUMMARY_TABLE_NAME, provider);
} else {
info!("Empty summary table at init — deferring catalog registration until first write");
}
let control = ControlTableEngine::new(object_store, get_pod_id()).await?;
Ok(TraceSummaryDBEngine {
schema,
table: Arc::new(AsyncRwLock::new(delta_table)),
ctx,
catalog,
control,
})
}
fn build_batch(
&self,
records: Vec<TraceSummaryRecord>,
) -> Result<RecordBatch, TraceEngineError> {
let mut builder = TraceSummaryBatchBuilder::new(self.schema.clone(), records.len());
for rec in &records {
builder.append(rec)?;
}
builder.finish()
}
async fn write_records(
&self,
records: Vec<TraceSummaryRecord>,
) -> Result<(), TraceEngineError> {
let count = records.len();
info!("Writing {} trace summaries", count);
let batch = self.build_batch(records)?;
let mut table_guard = self.table.write().await;
let current_table = table_guard.clone();
let updated_table = current_table
.write(vec![batch])
.with_save_mode(deltalake::protocol::SaveMode::Append)
.with_partition_columns(vec![PARTITION_DATE_COL.to_string()])
.await?;
let new_provider = updated_table.table_provider().await?;
self.catalog.swap(SUMMARY_TABLE_NAME, new_provider);
updated_table.update_datafusion_session(&self.ctx.state())?;
*table_guard = updated_table;
info!("Summary table updated with {} records", count);
Ok(())
}
async fn optimize_table(&self) -> Result<(), TraceEngineError> {
let mut table_guard = self.table.write().await;
let (updated_table, _metrics) = table_guard
.clone()
.optimize()
.with_target_size(std::num::NonZero::new(128 * 1024 * 1024).unwrap())
.with_type(OptimizeType::ZOrder(vec![
START_TIME_COL.to_string(),
SERVICE_NAME_COL.to_string(),
]))
.await?;
self.catalog
.swap(SUMMARY_TABLE_NAME, updated_table.table_provider().await?);
updated_table.update_datafusion_session(&self.ctx.state())?;
*table_guard = updated_table;
Ok(())
}
async fn vacuum_table(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
let mut table_guard = self.table.write().await;
let (updated_table, _metrics) = table_guard
.clone()
.vacuum()
.with_retention_period(chrono::Duration::hours(retention_hours as i64))
.with_enforce_retention_duration(false)
.await?;
self.catalog
.swap(SUMMARY_TABLE_NAME, updated_table.table_provider().await?);
updated_table.update_datafusion_session(&self.ctx.state())?;
*table_guard = updated_table;
Ok(())
}
async fn refresh_table(&self) -> Result<(), TraceEngineError> {
let mut table_guard = self.table.write().await;
let current_version = table_guard.version();
let mut refreshed = table_guard.clone();
match refreshed.update_incremental(None).await {
Ok(_) => {
if refreshed.version() > current_version {
info!(
"Summary table refreshed: v{:?} → v{:?}",
current_version,
refreshed.version()
);
let new_provider = refreshed.table_provider().await?;
self.catalog.swap(SUMMARY_TABLE_NAME, new_provider);
refreshed.update_datafusion_session(&self.ctx.state())?;
*table_guard = refreshed;
}
}
Err(e) => {
debug!("Summary table refresh skipped: {}", e);
}
}
Ok(())
}
async fn try_run_optimize(&self, interval_hours: u64) {
match self.control.try_claim_task(TASK_SUMMARY_OPTIMIZE).await {
Ok(true) => match self.optimize_table().await {
Ok(()) => {
if let Err(e) = self.vacuum_table(0).await {
error!("Post-optimize vacuum failed: {}", e);
}
let _ = self
.control
.release_task(
TASK_SUMMARY_OPTIMIZE,
chrono::Duration::hours(interval_hours as i64),
)
.await;
}
Err(e) => {
error!("Summary optimize failed: {}", e);
let _ = self
.control
.release_task_on_failure(TASK_SUMMARY_OPTIMIZE)
.await;
}
},
Ok(false) => { }
Err(e) => error!("Summary optimize claim check failed: {}", e),
}
}
#[instrument(skip_all, name = "summary_engine_actor")]
pub fn start_actor(
self,
compaction_interval_hours: u64,
refresh_interval_secs: u64,
) -> (
mpsc::Sender<SummaryTableCommand>,
tokio::task::JoinHandle<()>,
) {
let (tx, mut rx) = mpsc::channel::<SummaryTableCommand>(100);
let handle = tokio::spawn(async move {
info!(refresh_interval_secs, "TraceSummaryDBEngine actor started");
let mut scheduler_ticker = interval(Duration::from_secs(5 * 60));
scheduler_ticker.tick().await;
let mut refresh_ticker = interval(Duration::from_secs(refresh_interval_secs.max(1)));
refresh_ticker.tick().await;
loop {
tokio::select! {
Some(cmd) = rx.recv() => {
match cmd {
SummaryTableCommand::Write { records, respond_to } => {
let result = self.write_records(records).await;
if let Err(ref e) = result {
error!("Summary write failed: {}", e);
}
let _ = respond_to.send(result);
}
SummaryTableCommand::Optimize { respond_to } => {
let _ = respond_to.send(self.optimize_table().await);
if let Err(e) = self.vacuum_table(0).await {
error!("Post-optimize vacuum failed: {}", e);
}
}
SummaryTableCommand::Vacuum { retention_hours, respond_to } => {
let _ = respond_to.send(self.vacuum_table(retention_hours).await);
}
SummaryTableCommand::Shutdown => {
info!("TraceSummaryDBEngine actor shutting down");
break;
}
}
}
_ = scheduler_ticker.tick() => {
self.try_run_optimize(compaction_interval_hours).await;
}
_ = refresh_ticker.tick() => {
if let Err(e) = self.refresh_table().await {
error!("Summary table refresh failed: {}", e);
}
}
}
}
});
(tx, handle)
}
}
pub struct TraceSummaryService {
engine_tx: mpsc::Sender<SummaryTableCommand>,
engine_handle: tokio::task::JoinHandle<()>,
pub query_service: TraceSummaryQueries,
}
impl TraceSummaryService {
pub async fn new(
object_store: &ObjectStore,
compaction_interval_hours: u64,
ctx: Arc<SessionContext>,
catalog: Arc<TraceCatalogProvider>,
refresh_interval_secs: u64,
) -> Result<Self, TraceEngineError> {
let engine = TraceSummaryDBEngine::new(object_store, ctx, catalog).await?;
let engine_ctx = engine.ctx.clone();
let (engine_tx, engine_handle) =
engine.start_actor(compaction_interval_hours, refresh_interval_secs);
Ok(TraceSummaryService {
engine_tx,
engine_handle,
query_service: TraceSummaryQueries::new(engine_ctx),
})
}
pub async fn write_summaries(
&self,
records: Vec<TraceSummaryRecord>,
) -> Result<(), TraceEngineError> {
let (tx, rx) = oneshot::channel();
self.engine_tx
.send(SummaryTableCommand::Write {
records,
respond_to: tx,
})
.await
.map_err(|_| TraceEngineError::ChannelClosed)?;
rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
}
pub async fn optimize(&self) -> Result<(), TraceEngineError> {
let (tx, rx) = oneshot::channel();
self.engine_tx
.send(SummaryTableCommand::Optimize { respond_to: tx })
.await
.map_err(|_| TraceEngineError::ChannelClosed)?;
rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
}
pub async fn vacuum(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
let (tx, rx) = oneshot::channel();
self.engine_tx
.send(SummaryTableCommand::Vacuum {
retention_hours,
respond_to: tx,
})
.await
.map_err(|_| TraceEngineError::ChannelClosed)?;
rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
}
pub async fn signal_shutdown(&self) {
info!("TraceSummaryService signaling shutdown");
let _ = self.engine_tx.send(SummaryTableCommand::Shutdown).await;
}
pub async fn shutdown(self) -> Result<(), TraceEngineError> {
info!("TraceSummaryService shutting down");
self.engine_tx
.send(SummaryTableCommand::Shutdown)
.await
.map_err(|_| TraceEngineError::ChannelClosed)?;
if let Err(e) = self.engine_handle.await {
error!("Summary engine handle error: {}", e);
}
info!("TraceSummaryService shutdown complete");
Ok(())
}
}
pub struct TraceSummaryQueries {
ctx: Arc<SessionContext>,
}
impl TraceSummaryQueries {
pub fn new(ctx: Arc<SessionContext>) -> Self {
Self { ctx }
}
pub async fn get_paginated_traces(
&self,
filters: &TraceFilters,
) -> Result<TracePaginationResponse, TraceEngineError> {
let limit = filters.limit.unwrap_or(50) as usize;
let direction = filters.direction.as_deref().unwrap_or("next");
use crate::parquet::tracing::queries::{date_lit, ts_lit};
use datafusion::functions_aggregate::expr_fn::{array_agg, first_value, max, min, sum};
use datafusion::functions_nested::set_ops::array_distinct;
let mut df = self.ctx.table(SUMMARY_TABLE_NAME).await?;
if let Some(start) = filters.start_time {
df = df.filter(col(PARTITION_DATE_COL).gt_eq(date_lit(&start)))?;
df = df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start)))?;
}
if let Some(end) = filters.end_time {
df = df.filter(col(PARTITION_DATE_COL).lt_eq(date_lit(&end)))?;
df = df.filter(col(START_TIME_COL).lt(ts_lit(&end)))?;
}
let by_span_end: Vec<SortExpr> = vec![
col(SPAN_COUNT_COL).sort(false, false),
col(END_TIME_COL).sort(false, false),
];
let by_status_span: Vec<SortExpr> = vec![
col(STATUS_CODE_COL).sort(false, false),
col(SPAN_COUNT_COL).sort(false, false),
];
let mut df = df
.aggregate(
vec![col(TRACE_ID_COL)],
vec![
min(col(START_TIME_COL)).alias(START_TIME_COL),
max(col(END_TIME_COL)).alias(END_TIME_COL),
max(df_cast(col(END_TIME_COL), DataType::Int64)).alias("_max_end_us"),
min(df_cast(col(START_TIME_COL), DataType::Int64)).alias("_min_start_us"),
max(col(STATUS_CODE_COL)).alias(STATUS_CODE_COL),
sum(col(SPAN_COUNT_COL)).alias(SPAN_COUNT_COL),
sum(col(ERROR_COUNT_COL)).alias(ERROR_COUNT_COL),
first_value(col(SERVICE_NAME_COL), by_span_end.clone()).alias(SERVICE_NAME_COL),
first_value(col(SCOPE_NAME_COL), by_span_end.clone()).alias(SCOPE_NAME_COL),
first_value(col(SCOPE_VERSION_COL), by_span_end.clone())
.alias(SCOPE_VERSION_COL),
first_value(col(ROOT_OPERATION_COL), by_span_end.clone())
.alias(ROOT_OPERATION_COL),
first_value(col(STATUS_MESSAGE_COL), by_status_span).alias(STATUS_MESSAGE_COL),
first_value(col(RESOURCE_ATTRIBUTES_COL), by_span_end)
.alias(RESOURCE_ATTRIBUTES_COL),
array_agg(col(ENTITY_IDS_COL)).alias("_entity_ids_raw"),
array_agg(col(QUEUE_IDS_COL)).alias("_queue_ids_raw"),
],
)?
.with_column(
DURATION_MS_COL,
(col("_max_end_us") - col("_min_start_us")) / lit(1000i64),
)?
.with_column(
ENTITY_IDS_COL,
array_distinct(flatten(col("_entity_ids_raw"))),
)?
.with_column(
QUEUE_IDS_COL,
array_distinct(flatten(col("_queue_ids_raw"))),
)?
.drop_columns(&[
"_max_end_us",
"_min_start_us",
"_entity_ids_raw",
"_queue_ids_raw",
])?;
if let Some(ref svc) = filters.service_name {
df = df.filter(col(SERVICE_NAME_COL).eq(lit(svc.as_str())))?;
}
match filters.has_errors {
Some(true) => {
df = df.filter(col(ERROR_COUNT_COL).gt(lit(0i64)))?;
}
Some(false) => {
df = df.filter(col(ERROR_COUNT_COL).eq(lit(0i64)))?;
}
None => {}
}
if let Some(sc) = filters.status_code {
df = df.filter(col(STATUS_CODE_COL).eq(lit(sc)))?;
}
if let Some(ref uid) = filters.entity_uid {
df = df.filter(datafusion::functions_nested::expr_fn::array_has(
col(ENTITY_IDS_COL),
lit(uid.as_str()),
))?;
}
if let Some(ref uid) = filters.queue_uid {
df = df.filter(datafusion::functions_nested::expr_fn::array_has(
col(QUEUE_IDS_COL),
lit(uid.as_str()),
))?;
}
if let Some(ref ids) = filters.trace_ids {
if !ids.is_empty() {
let binary_ids: Vec<Expr> = ids
.iter()
.filter_map(|hex| TraceId::hex_to_bytes(hex).ok())
.map(|b| lit(ScalarValue::Binary(Some(b))))
.collect();
if !binary_ids.is_empty() {
df = df.filter(col(TRACE_ID_COL).in_list(binary_ids, false))?;
}
}
}
if let (Some(cursor_time), Some(ref cursor_id)) =
(filters.cursor_start_time, &filters.cursor_trace_id)
{
if let Ok(cursor_bytes) = TraceId::hex_to_bytes(cursor_id) {
let cursor_ts = lit(ScalarValue::TimestampMicrosecond(
Some(cursor_time.timestamp_micros()),
Some("UTC".into()),
));
let cursor_tid = lit(ScalarValue::Binary(Some(cursor_bytes)));
let cursor_expr = if direction == "previous" {
col(START_TIME_COL)
.gt(cursor_ts.clone())
.or(col(START_TIME_COL)
.eq(cursor_ts)
.and(col(TRACE_ID_COL).gt(cursor_tid)))
} else {
col(START_TIME_COL)
.lt(cursor_ts.clone())
.or(col(START_TIME_COL)
.eq(cursor_ts)
.and(col(TRACE_ID_COL).lt(cursor_tid)))
};
df = df.filter(cursor_expr)?;
}
}
if let Some(ref attr_filters) = filters.attribute_filters {
if !attr_filters.is_empty() {
let mut spans_df = self.ctx.table("trace_spans").await?.select_columns(&[
TRACE_ID_COL,
START_TIME_COL,
SEARCH_BLOB_COL,
])?;
if let Some(start) = filters.start_time {
spans_df = spans_df.filter(col(START_TIME_COL).gt_eq(lit(
ScalarValue::TimestampMicrosecond(
Some(start.timestamp_micros()),
Some("UTC".into()),
),
)))?;
}
if let Some(end) = filters.end_time {
spans_df = spans_df.filter(col(START_TIME_COL).lt(lit(
ScalarValue::TimestampMicrosecond(
Some(end.timestamp_micros()),
Some("UTC".into()),
),
)))?;
}
let mut attr_expr: Option<Expr> = None;
for f in attr_filters {
let pattern = crate::parquet::tracing::queries::normalize_attr_filter(f);
let cond = match_attr_expr(col(SEARCH_BLOB_COL), lit(pattern));
attr_expr = Some(match attr_expr {
None => cond,
Some(e) => e.or(cond),
});
}
if let Some(expr) = attr_expr {
spans_df = spans_df.filter(expr)?;
}
let span_batches = spans_df.select_columns(&[TRACE_ID_COL])?.collect().await?;
let mut seen_ids: std::collections::HashSet<Vec<u8>> =
std::collections::HashSet::new();
let mut binary_ids: Vec<Expr> = Vec::new();
for batch in &span_batches {
if let Some(col_ref) = batch.column_by_name(TRACE_ID_COL) {
let casted = compute::cast(col_ref, &DataType::Binary)?;
let col_arr =
casted
.as_any()
.downcast_ref::<BinaryArray>()
.ok_or_else(|| {
TraceEngineError::DowncastError("trace_id to BinaryArray")
})?;
for i in 0..batch.num_rows() {
let id_bytes = col_arr.value(i).to_vec();
if seen_ids.insert(id_bytes.clone()) {
binary_ids.push(lit(ScalarValue::Binary(Some(id_bytes))));
}
}
}
}
if !binary_ids.is_empty() {
df = df.filter(col(TRACE_ID_COL).in_list(binary_ids, false))?;
} else {
df = df.filter(lit(false))?;
}
}
}
df = if direction == "previous" {
df.sort(vec![
col(START_TIME_COL).sort(true, true),
col(TRACE_ID_COL).sort(true, true),
])?
} else {
df.sort(vec![
col(START_TIME_COL).sort(false, false),
col(TRACE_ID_COL).sort(false, false),
])?
};
df = df.limit(0, Some(limit + 1))?;
let batches = df.collect().await?;
let mut items = batches_to_trace_list_items(batches)?;
let has_more = items.len() > limit;
if has_more {
items.pop(); }
let (has_next, next_cursor, has_previous, previous_cursor) = match direction {
"next" => {
let next_cursor = if has_more {
items.last().map(|item| TraceCursor {
start_time: item.start_time,
trace_id: item.trace_id.clone(),
})
} else {
None
};
let previous_cursor = items.first().map(|item| TraceCursor {
start_time: item.start_time,
trace_id: item.trace_id.clone(),
});
(
has_more,
next_cursor,
filters.cursor_start_time.is_some(),
previous_cursor,
)
}
"previous" => {
let previous_cursor = if has_more {
items.last().map(|item| TraceCursor {
start_time: item.start_time,
trace_id: item.trace_id.clone(),
})
} else {
None
};
let next_cursor = items.first().map(|item| TraceCursor {
start_time: item.start_time,
trace_id: item.trace_id.clone(),
});
(
filters.cursor_start_time.is_some(),
next_cursor,
has_more,
previous_cursor,
)
}
_ => (false, None, false, None),
};
Ok(TracePaginationResponse {
items,
has_next,
next_cursor,
has_previous,
previous_cursor,
})
}
}
fn extract_map_attributes(map_array: &MapArray, row_idx: usize) -> Vec<Attribute> {
if map_array.is_null(row_idx) {
return Vec::new();
}
let entry = map_array.value(row_idx);
let Some(struct_array) = entry.as_any().downcast_ref::<StructArray>() else {
tracing::warn!("extract_map_attributes: failed to downcast to StructArray");
return Vec::new();
};
let Some(keys_arr) = compute::cast(struct_array.column(0).as_ref(), &DataType::Utf8).ok()
else {
tracing::warn!("extract_map_attributes: failed to cast keys to Utf8");
return Vec::new();
};
let Some(keys) = keys_arr.as_any().downcast_ref::<StringArray>() else {
tracing::warn!("extract_map_attributes: failed to downcast keys to StringArray");
return Vec::new();
};
let Some(values_arr) = compute::cast(struct_array.column(1).as_ref(), &DataType::Utf8).ok()
else {
tracing::warn!("extract_map_attributes: failed to cast values to Utf8");
return Vec::new();
};
let Some(values) = values_arr.as_any().downcast_ref::<StringArray>() else {
tracing::warn!("extract_map_attributes: failed to downcast values to StringArray");
return Vec::new();
};
(0..struct_array.len())
.map(|i| Attribute {
key: keys.value(i).to_string(),
value: serde_json::from_str(values.value(i)).unwrap_or(serde_json::Value::Null),
})
.collect()
}
fn extract_list_strings(list: Option<&ListArray>, row_idx: usize) -> Vec<String> {
let Some(list) = list else {
return Vec::new();
};
if list.is_null(row_idx) {
return Vec::new();
}
let inner = list.value(row_idx);
let str_arr = compute::cast(&inner, &DataType::Utf8)
.ok()
.and_then(|a| a.as_any().downcast_ref::<StringArray>().cloned());
match str_arr {
Some(arr) => (0..arr.len())
.filter(|i| !arr.is_null(*i))
.map(|i| arr.value(i).to_string())
.collect(),
None => Vec::new(),
}
}
fn batches_to_trace_list_items(
batches: Vec<RecordBatch>,
) -> Result<Vec<TraceListItem>, TraceEngineError> {
let mut items = Vec::new();
for batch in &batches {
let trace_id_col = batch.column_by_name(TRACE_ID_COL).ok_or_else(|| {
TraceEngineError::UnsupportedOperation("missing trace_id column".into())
})?;
let trace_id_binary = compute::cast(trace_id_col, &DataType::Binary)?;
let trace_ids = trace_id_binary
.as_any()
.downcast_ref::<BinaryArray>()
.ok_or_else(|| {
TraceEngineError::UnsupportedOperation("trace_id cast to BinaryArray failed".into())
})?;
let svc_arr = compute::cast(
batch.column_by_name(SERVICE_NAME_COL).ok_or_else(|| {
TraceEngineError::UnsupportedOperation("missing service_name column".into())
})?,
&DataType::Utf8,
)?;
let service_names = svc_arr
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
TraceEngineError::UnsupportedOperation(
"service_name cast to StringArray failed".into(),
)
})?;
let scope_arr = compute::cast(
batch.column_by_name(SCOPE_NAME_COL).ok_or_else(|| {
TraceEngineError::UnsupportedOperation("missing scope_name column".into())
})?,
&DataType::Utf8,
)?;
let scope_names = scope_arr
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
TraceEngineError::UnsupportedOperation(
"scope_name cast to StringArray failed".into(),
)
})?;
let scopev_arr = compute::cast(
batch.column_by_name(SCOPE_VERSION_COL).ok_or_else(|| {
TraceEngineError::UnsupportedOperation("missing scope_version column".into())
})?,
&DataType::Utf8,
)?;
let scope_versions = scopev_arr
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
TraceEngineError::UnsupportedOperation(
"scope_version cast to StringArray failed".into(),
)
})?;
let root_arr = compute::cast(
batch.column_by_name(ROOT_OPERATION_COL).ok_or_else(|| {
TraceEngineError::UnsupportedOperation("missing root_operation column".into())
})?,
&DataType::Utf8,
)?;
let root_operations = root_arr
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
TraceEngineError::UnsupportedOperation(
"root_operation cast to StringArray failed".into(),
)
})?;
let sm_arr = compute::cast(
batch.column_by_name(STATUS_MESSAGE_COL).ok_or_else(|| {
TraceEngineError::UnsupportedOperation("missing status_message column".into())
})?,
&DataType::Utf8,
)?;
let status_messages = sm_arr
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
TraceEngineError::UnsupportedOperation(
"status_message cast to StringArray failed".into(),
)
})?;
let resource_attrs_map = batch
.column_by_name(RESOURCE_ATTRIBUTES_COL)
.and_then(|c| c.as_any().downcast_ref::<MapArray>())
.ok_or_else(|| {
TraceEngineError::UnsupportedOperation("missing resource_attributes column".into())
})?;
let entity_ids_list = batch
.column_by_name(ENTITY_IDS_COL)
.and_then(|c| c.as_any().downcast_ref::<ListArray>());
let queue_ids_list = batch
.column_by_name(QUEUE_IDS_COL)
.and_then(|c| c.as_any().downcast_ref::<ListArray>());
let start_times = batch
.column_by_name(START_TIME_COL)
.and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>())
.ok_or_else(|| {
TraceEngineError::UnsupportedOperation("missing start_time column".into())
})?;
let end_times = batch
.column_by_name(END_TIME_COL)
.and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>())
.ok_or_else(|| {
TraceEngineError::UnsupportedOperation("missing end_time column".into())
})?;
let durations = batch
.column_by_name(DURATION_MS_COL)
.and_then(|c| c.as_any().downcast_ref::<Int64Array>())
.ok_or_else(|| {
TraceEngineError::UnsupportedOperation("missing duration_ms column".into())
})?;
let status_codes = batch
.column_by_name(STATUS_CODE_COL)
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.ok_or_else(|| {
TraceEngineError::UnsupportedOperation("missing status_code column".into())
})?;
let span_counts = batch
.column_by_name(SPAN_COUNT_COL)
.and_then(|c| c.as_any().downcast_ref::<Int64Array>())
.ok_or_else(|| {
TraceEngineError::UnsupportedOperation("missing span_count column".into())
})?;
let error_counts = batch
.column_by_name(ERROR_COUNT_COL)
.and_then(|c| c.as_any().downcast_ref::<Int64Array>())
.ok_or_else(|| {
TraceEngineError::UnsupportedOperation("missing error_count column".into())
})?;
for i in 0..batch.num_rows() {
let trace_id_hex = hex::encode(trace_ids.value(i));
let start_time = micros_to_datetime(start_times.value(i))?;
let end_time = if end_times.is_null(i) {
None
} else {
Some(micros_to_datetime(end_times.value(i))?)
};
let duration_ms = if durations.is_null(i) {
None
} else {
Some(durations.value(i))
};
let error_count = error_counts.value(i);
let resource_attributes = extract_map_attributes(resource_attrs_map, i);
let entity_ids = extract_list_strings(entity_ids_list, i);
let queue_ids = extract_list_strings(queue_ids_list, i);
items.push(TraceListItem {
trace_id: trace_id_hex,
service_name: service_names.value(i).to_string(),
scope_name: scope_names.value(i).to_string(),
scope_version: scope_versions.value(i).to_string(),
root_operation: root_operations.value(i).to_string(),
start_time,
end_time,
duration_ms,
status_code: status_codes.value(i),
status_message: if status_messages.is_null(i) {
None
} else {
Some(status_messages.value(i).to_string())
},
span_count: span_counts.value(i),
has_errors: error_count > 0,
error_count,
resource_attributes,
entity_ids,
queue_ids,
});
}
}
Ok(items)
}
fn micros_to_datetime(micros: i64) -> Result<DateTime<Utc>, TraceEngineError> {
DateTime::from_timestamp_micros(micros).ok_or(TraceEngineError::InvalidTimestamp(
"out-of-range microsecond timestamp",
))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::ObjectStore;
use scouter_settings::ObjectStorageSettings;
use scouter_types::sql::TraceFilters;
use scouter_types::{Attribute, SpanId, TraceId, TraceSpanRecord};
use tracing_subscriber;
fn cleanup() {
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.try_init();
let storage_settings = ObjectStorageSettings::default();
let current_dir = std::env::current_dir().unwrap();
let storage_path = current_dir.join(storage_settings.storage_root());
if storage_path.exists() {
let _ = std::fs::remove_dir_all(storage_path);
}
}
fn make_test_object_store(storage_settings: &ObjectStorageSettings) -> ObjectStore {
ObjectStore::new(storage_settings).unwrap()
}
fn make_test_ctx(object_store: &ObjectStore) -> Arc<SessionContext> {
Arc::new(
object_store
.get_session_with_catalog(
crate::parquet::tracing::engine::TRACE_CATALOG_NAME,
"default",
)
.unwrap(),
)
}
fn make_test_catalog(ctx: &Arc<SessionContext>) -> Arc<TraceCatalogProvider> {
use datafusion::catalog::CatalogProvider;
let catalog = Arc::new(TraceCatalogProvider::new());
ctx.register_catalog(
crate::parquet::tracing::engine::TRACE_CATALOG_NAME,
Arc::clone(&catalog) as Arc<dyn CatalogProvider>,
);
catalog
}
fn make_summary(
trace_id_bytes: [u8; 16],
service_name: &str,
error_count: i64,
resource_attributes: Vec<Attribute>,
) -> TraceSummaryRecord {
let now = Utc::now();
TraceSummaryRecord {
trace_id: TraceId::from_bytes(trace_id_bytes),
service_name: service_name.to_string(),
scope_name: "test.scope".to_string(),
scope_version: String::new(),
root_operation: "root_op".to_string(),
start_time: now,
end_time: Some(now + chrono::Duration::milliseconds(200)),
status_code: if error_count > 0 { 2 } else { 0 },
status_message: if error_count > 0 {
"Internal Server Error".to_string()
} else {
"OK".to_string()
},
span_count: 3,
error_count,
resource_attributes,
entity_ids: vec![],
queue_ids: vec![],
}
}
#[tokio::test]
async fn test_summary_write_and_paginate_basic() -> Result<(), TraceEngineError> {
cleanup();
let storage_settings = ObjectStorageSettings::default();
let object_store = make_test_object_store(&storage_settings);
let ctx = make_test_ctx(&object_store);
let catalog = make_test_catalog(&ctx);
let service = TraceSummaryService::new(&object_store, 24, ctx, catalog, 10).await?;
let s1 = make_summary([1u8; 16], "svc_a", 0, vec![]);
let s2 = make_summary([2u8; 16], "svc_b", 0, vec![]);
service.write_summaries(vec![s1, s2]).await?;
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let start = Utc::now() - chrono::Duration::hours(1);
let end = Utc::now() + chrono::Duration::hours(1);
let filters = TraceFilters {
service_name: None,
has_errors: None,
status_code: None,
start_time: Some(start),
end_time: Some(end),
limit: Some(25),
cursor_start_time: None,
cursor_trace_id: None,
direction: None,
attribute_filters: None,
trace_ids: None,
entity_uid: None,
queue_uid: None,
};
let response = service.query_service.get_paginated_traces(&filters).await?;
assert!(
response.items.len() >= 2,
"Expected at least 2 items, got {}",
response.items.len()
);
service.shutdown().await?;
cleanup();
Ok(())
}
#[tokio::test]
async fn test_summary_has_errors_filter() -> Result<(), TraceEngineError> {
cleanup();
let storage_settings = ObjectStorageSettings::default();
let object_store = make_test_object_store(&storage_settings);
let ctx = make_test_ctx(&object_store);
let catalog = make_test_catalog(&ctx);
let service = TraceSummaryService::new(&object_store, 24, ctx, catalog, 10).await?;
let ok_summary = make_summary([3u8; 16], "svc", 0, vec![]);
let err_summary = make_summary([4u8; 16], "svc", 2, vec![]);
service
.write_summaries(vec![ok_summary, err_summary])
.await?;
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let start = Utc::now() - chrono::Duration::hours(1);
let end = Utc::now() + chrono::Duration::hours(1);
let base_filters = TraceFilters {
service_name: None,
has_errors: None,
status_code: None,
start_time: Some(start),
end_time: Some(end),
limit: Some(25),
cursor_start_time: None,
cursor_trace_id: None,
direction: None,
attribute_filters: None,
trace_ids: None,
entity_uid: None,
queue_uid: None,
};
let mut filters_err = base_filters.clone();
filters_err.has_errors = Some(true);
let errors_only = service
.query_service
.get_paginated_traces(&filters_err)
.await?;
for item in &errors_only.items {
assert!(
item.error_count > 0,
"Expected error trace, got: {:?}",
item
);
}
assert!(
!errors_only.items.is_empty(),
"Expected at least one error trace"
);
let mut filters_ok = base_filters.clone();
filters_ok.has_errors = Some(false);
let no_errors = service
.query_service
.get_paginated_traces(&filters_ok)
.await?;
for item in &no_errors.items {
assert_eq!(
item.error_count, 0,
"Expected non-error trace, got error_count={}",
item.error_count
);
}
assert!(
!no_errors.items.is_empty(),
"Expected at least one non-error trace"
);
service.shutdown().await?;
cleanup();
Ok(())
}
#[tokio::test]
async fn test_summary_service_name_filter() -> Result<(), TraceEngineError> {
cleanup();
let storage_settings = ObjectStorageSettings::default();
let object_store = make_test_object_store(&storage_settings);
let ctx = make_test_ctx(&object_store);
let catalog = make_test_catalog(&ctx);
let service = TraceSummaryService::new(&object_store, 24, ctx, catalog, 10).await?;
let s_alpha = make_summary([5u8; 16], "alpha_service", 0, vec![]);
let s_beta = make_summary([6u8; 16], "beta_service", 0, vec![]);
service.write_summaries(vec![s_alpha, s_beta]).await?;
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let start = Utc::now() - chrono::Duration::hours(1);
let end = Utc::now() + chrono::Duration::hours(1);
let filters = TraceFilters {
service_name: Some("alpha_service".to_string()),
has_errors: None,
status_code: None,
start_time: Some(start),
end_time: Some(end),
limit: Some(25),
cursor_start_time: None,
cursor_trace_id: None,
direction: None,
attribute_filters: None,
trace_ids: None,
entity_uid: None,
queue_uid: None,
};
let response = service.query_service.get_paginated_traces(&filters).await?;
assert!(
!response.items.is_empty(),
"Expected results for alpha_service"
);
for item in &response.items {
assert_eq!(
item.service_name, "alpha_service",
"Expected only alpha_service items, got: {}",
item.service_name
);
}
service.shutdown().await?;
cleanup();
Ok(())
}
#[tokio::test]
async fn test_summary_trace_ids_filter() -> Result<(), TraceEngineError> {
cleanup();
let storage_settings = ObjectStorageSettings::default();
let object_store = make_test_object_store(&storage_settings);
let ctx = make_test_ctx(&object_store);
let catalog = make_test_catalog(&ctx);
let service = TraceSummaryService::new(&object_store, 24, ctx, catalog, 10).await?;
let wanted_id = TraceId::from_bytes([7u8; 16]);
let unwanted_id = TraceId::from_bytes([8u8; 16]);
let s1 = make_summary([7u8; 16], "svc", 0, vec![]);
let s2 = make_summary([8u8; 16], "svc", 0, vec![]);
service.write_summaries(vec![s1, s2]).await?;
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let start = Utc::now() - chrono::Duration::hours(1);
let end = Utc::now() + chrono::Duration::hours(1);
let filters = TraceFilters {
service_name: None,
has_errors: None,
status_code: None,
start_time: Some(start),
end_time: Some(end),
limit: Some(25),
cursor_start_time: None,
cursor_trace_id: None,
direction: None,
attribute_filters: None,
trace_ids: Some(vec![wanted_id.to_hex()]),
entity_uid: None,
queue_uid: None,
};
let response = service.query_service.get_paginated_traces(&filters).await?;
assert_eq!(
response.items.len(),
1,
"Expected exactly 1 item from trace_ids filter"
);
assert_eq!(
response.items[0].trace_id,
wanted_id.to_hex(),
"Returned wrong trace_id"
);
assert_ne!(
response.items[0].trace_id,
unwanted_id.to_hex(),
"Should not have returned unwanted trace_id"
);
service.shutdown().await?;
cleanup();
Ok(())
}
#[tokio::test]
async fn test_summary_cursor_pagination() -> Result<(), TraceEngineError> {
cleanup();
let storage_settings = ObjectStorageSettings::default();
let object_store = make_test_object_store(&storage_settings);
let ctx = make_test_ctx(&object_store);
let catalog = make_test_catalog(&ctx);
let service = TraceSummaryService::new(&object_store, 24, ctx, catalog, 10).await?;
let now = Utc::now();
let summaries: Vec<TraceSummaryRecord> = (0u8..100)
.map(|i| {
let mut s = make_summary([i; 16], "svc", 0, vec![]);
s.start_time = now - chrono::Duration::minutes(i as i64);
s
})
.collect();
service.write_summaries(summaries).await?;
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let mut filters = TraceFilters {
start_time: Some(now - chrono::Duration::hours(2)),
end_time: Some(now + chrono::Duration::hours(1)),
limit: Some(50),
..Default::default()
};
let first = service.query_service.get_paginated_traces(&filters).await?;
assert_eq!(first.items.len(), 50, "first page: 50 items");
assert!(
first.next_cursor.is_some(),
"first page: should have next_cursor"
);
let next_cur = first.next_cursor.clone().unwrap();
filters.cursor_start_time = Some(next_cur.start_time);
filters.cursor_trace_id = Some(next_cur.trace_id.clone());
filters.direction = Some("next".to_string());
let second = service.query_service.get_paginated_traces(&filters).await?;
assert_eq!(second.items.len(), 50, "second page: 50 items");
assert!(
second.items[0].start_time <= next_cur.start_time,
"second page first item must be <= cursor"
);
assert!(second.previous_cursor.is_some());
let prev_cur = second.previous_cursor.unwrap();
filters.cursor_start_time = Some(prev_cur.start_time);
filters.cursor_trace_id = Some(prev_cur.trace_id.clone());
filters.direction = Some("previous".to_string());
let prev = service.query_service.get_paginated_traces(&filters).await?;
assert_eq!(prev.items.len(), 50, "previous page: 50 items");
service.shutdown().await?;
cleanup();
Ok(())
}
#[tokio::test]
async fn test_summary_attribute_filter_via_join() -> Result<(), TraceEngineError> {
use crate::parquet::tracing::service::TraceSpanService;
cleanup();
let storage_settings = ObjectStorageSettings::default();
let span_service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
let shared_ctx = span_service.ctx.clone();
let summary_service = TraceSummaryService::new(
&span_service.object_store,
24,
shared_ctx,
span_service.catalog.clone(),
10,
)
.await?;
let now = Utc::now();
let kafka_trace = TraceId::from_bytes([70u8; 16]);
let plain_trace = TraceId::from_bytes([80u8; 16]);
let kafka_span = make_span_record(
&kafka_trace,
SpanId::from_bytes([70u8; 8]),
"svc",
vec![Attribute {
key: "component".to_string(),
value: serde_json::Value::String("kafka".to_string()),
}],
);
let plain_span =
make_span_record(&plain_trace, SpanId::from_bytes([80u8; 8]), "svc", vec![]);
span_service
.write_spans(vec![kafka_span, plain_span])
.await?;
let mut kafka_summary = make_summary([70u8; 16], "svc", 0, vec![]);
kafka_summary.start_time = now;
let mut plain_summary = make_summary([80u8; 16], "svc", 0, vec![]);
plain_summary.start_time = now;
summary_service
.write_summaries(vec![kafka_summary, plain_summary])
.await?;
tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
let filters = TraceFilters {
start_time: Some(now - chrono::Duration::hours(1)),
end_time: Some(now + chrono::Duration::hours(1)),
attribute_filters: Some(vec!["component:kafka".to_string()]),
limit: Some(25),
..Default::default()
};
let response = summary_service
.query_service
.get_paginated_traces(&filters)
.await?;
assert!(
!response.items.is_empty(),
"attribute filter must return results"
);
assert!(
response
.items
.iter()
.all(|i| i.trace_id == kafka_trace.to_hex()),
"only kafka trace should appear; got {:?}",
response
.items
.iter()
.map(|i| &i.trace_id)
.collect::<Vec<_>>()
);
span_service.shutdown().await?;
summary_service.shutdown().await?;
cleanup();
Ok(())
}
#[tokio::test]
async fn test_summary_queue_id_filter_and_span_lookup() -> Result<(), TraceEngineError> {
use crate::parquet::tracing::service::TraceSpanService;
cleanup();
let storage_settings = ObjectStorageSettings::default();
let span_service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
let shared_ctx = span_service.ctx.clone();
let summary_service = TraceSummaryService::new(
&span_service.object_store,
24,
shared_ctx,
span_service.catalog.clone(),
10,
)
.await?;
let now = Utc::now();
let queue_trace = TraceId::from_bytes([90u8; 16]);
let plain_trace = TraceId::from_bytes([91u8; 16]);
let target_queue_uid = "queue-record-abc123";
let queue_span = make_span_record(
&queue_trace,
SpanId::from_bytes([90u8; 8]),
"svc_queue",
vec![],
);
let plain_span = make_span_record(
&plain_trace,
SpanId::from_bytes([91u8; 8]),
"svc_queue",
vec![],
);
span_service
.write_spans_direct(vec![queue_span, plain_span])
.await?;
let mut queue_summary = make_summary([90u8; 16], "svc_queue", 0, vec![]);
queue_summary.start_time = now;
queue_summary.queue_ids = vec![target_queue_uid.to_string()];
let mut plain_summary = make_summary([91u8; 16], "svc_queue", 0, vec![]);
plain_summary.start_time = now;
summary_service
.write_summaries(vec![queue_summary, plain_summary])
.await?;
tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
let filters = TraceFilters {
start_time: Some(now - chrono::Duration::hours(1)),
end_time: Some(now + chrono::Duration::hours(1)),
queue_uid: Some(target_queue_uid.to_string()),
limit: Some(25),
..Default::default()
};
let response = summary_service
.query_service
.get_paginated_traces(&filters)
.await?;
assert!(
!response.items.is_empty(),
"queue_uid filter must return at least one result"
);
assert!(
response
.items
.iter()
.all(|i| i.trace_id == queue_trace.to_hex()),
"only the queue trace should appear; got {:?}",
response
.items
.iter()
.map(|i| &i.trace_id)
.collect::<Vec<_>>()
);
let returned_trace_id =
TraceId::from_hex(&response.items[0].trace_id).expect("trace_id must be valid hex");
let spans = span_service
.query_service
.get_trace_spans(
Some(returned_trace_id.as_bytes()),
None,
Some(&(now - chrono::Duration::hours(1))),
Some(&(now + chrono::Duration::hours(1))),
None,
)
.await?;
assert!(
!spans.is_empty(),
"should find spans for the returned trace_id"
);
span_service.shutdown().await?;
summary_service.shutdown().await?;
cleanup();
Ok(())
}
fn make_span_record(
trace_id: &TraceId,
span_id: SpanId,
service_name: &str,
attributes: Vec<Attribute>,
) -> TraceSpanRecord {
let now = Utc::now();
TraceSpanRecord {
created_at: now,
trace_id: *trace_id,
span_id,
parent_span_id: None,
flags: 1,
trace_state: String::new(),
scope_name: "test.scope".to_string(),
scope_version: None,
span_name: "op".to_string(),
span_kind: "INTERNAL".to_string(),
start_time: now,
end_time: now + chrono::Duration::milliseconds(100),
duration_ms: 100,
status_code: 0,
status_message: "OK".to_string(),
attributes,
events: vec![],
links: vec![],
label: None,
input: serde_json::Value::Null,
output: serde_json::Value::Null,
service_name: service_name.to_string(),
resource_attributes: vec![],
}
}
#[tokio::test]
async fn test_summary_resource_attributes_roundtrip() -> Result<(), TraceEngineError> {
cleanup();
let storage_settings = ObjectStorageSettings::default();
let object_store = make_test_object_store(&storage_settings);
let ctx = make_test_ctx(&object_store);
let catalog = make_test_catalog(&ctx);
let service = TraceSummaryService::new(&object_store, 24, ctx, catalog, 10).await?;
let attrs = vec![Attribute {
key: "cloud.region".to_string(),
value: serde_json::Value::String("us-east-1".to_string()),
}];
let summary = make_summary([9u8; 16], "svc", 0, attrs.clone());
service.write_summaries(vec![summary]).await?;
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let start = Utc::now() - chrono::Duration::hours(1);
let end = Utc::now() + chrono::Duration::hours(1);
let filters = TraceFilters {
service_name: None,
has_errors: None,
status_code: None,
start_time: Some(start),
end_time: Some(end),
limit: Some(25),
cursor_start_time: None,
cursor_trace_id: None,
direction: None,
attribute_filters: None,
trace_ids: Some(vec![TraceId::from_bytes([9u8; 16]).to_hex()]),
entity_uid: None,
queue_uid: None,
};
let response = service.query_service.get_paginated_traces(&filters).await?;
assert_eq!(response.items.len(), 1, "Expected exactly 1 item");
assert_eq!(
response.items[0].resource_attributes.len(),
1,
"Expected 1 resource attribute"
);
assert_eq!(response.items[0].resource_attributes[0].key, "cloud.region");
service.shutdown().await?;
cleanup();
Ok(())
}
#[tokio::test]
async fn test_summary_write_visibility_across_multiple_writes() -> Result<(), TraceEngineError>
{
cleanup();
let storage_settings = ObjectStorageSettings::default();
let object_store = make_test_object_store(&storage_settings);
let ctx = make_test_ctx(&object_store);
let catalog = make_test_catalog(&ctx);
let service = TraceSummaryService::new(&object_store, 24, ctx, catalog, 10).await?;
let start = Utc::now() - chrono::Duration::hours(1);
let end = Utc::now() + chrono::Duration::hours(1);
let filters = TraceFilters {
start_time: Some(start),
end_time: Some(end),
limit: Some(100),
..Default::default()
};
let s1 = make_summary([0xA0; 16], "svc_vis", 0, vec![]);
let s2 = make_summary([0xA1; 16], "svc_vis", 0, vec![]);
service.write_summaries(vec![s1, s2]).await?;
let response = service.query_service.get_paginated_traces(&filters).await?;
assert_eq!(
response.items.len(),
2,
"After write #1: expected 2 items, got {}",
response.items.len()
);
let s3 = make_summary([0xA2; 16], "svc_vis", 0, vec![]);
let s4 = make_summary([0xA3; 16], "svc_vis", 0, vec![]);
service.write_summaries(vec![s3, s4]).await?;
let response = service.query_service.get_paginated_traces(&filters).await?;
assert_eq!(
response.items.len(),
4,
"After write #2: expected 4 items, got {} (stale snapshot?)",
response.items.len()
);
let s5 = make_summary([0xA4; 16], "svc_vis", 0, vec![]);
let s6 = make_summary([0xA5; 16], "svc_vis", 0, vec![]);
service.write_summaries(vec![s5, s6]).await?;
let response = service.query_service.get_paginated_traces(&filters).await?;
assert_eq!(
response.items.len(),
6,
"After write #3: expected 6 items, got {} (stale snapshot?)",
response.items.len()
);
service.shutdown().await?;
cleanup();
Ok(())
}
#[tokio::test]
async fn test_distributed_refresh() -> Result<(), TraceEngineError> {
use crate::parquet::tracing::service::TraceSpanService;
let storage_settings = ObjectStorageSettings {
storage_uri: "./scouter_storage_summary_dist".to_string(),
..ObjectStorageSettings::default()
};
let current_dir = std::env::current_dir().unwrap();
let storage_path = current_dir.join(storage_settings.storage_root());
if storage_path.exists() {
let _ = std::fs::remove_dir_all(&storage_path);
}
let writer_spans = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
let writer = TraceSummaryService::new(
&writer_spans.object_store,
24,
writer_spans.ctx.clone(),
writer_spans.catalog.clone(),
10,
)
.await?;
let reader_spans = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
let reader = TraceSummaryService::new(
&reader_spans.object_store,
24,
reader_spans.ctx.clone(),
reader_spans.catalog.clone(),
1,
)
.await?;
let summary = make_summary([0xDD_u8; 16], "distributed-svc", 0, vec![]);
writer.write_summaries(vec![summary]).await?;
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let start = Utc::now() - chrono::Duration::hours(1);
let end = Utc::now() + chrono::Duration::hours(1);
let filters = TraceFilters {
service_name: Some("distributed-svc".to_string()),
has_errors: None,
status_code: None,
start_time: Some(start),
end_time: Some(end),
limit: Some(25),
cursor_start_time: None,
cursor_trace_id: None,
direction: None,
attribute_filters: None,
trace_ids: None,
entity_uid: None,
queue_uid: None,
};
let response = reader.query_service.get_paginated_traces(&filters).await?;
assert!(
!response.items.is_empty(),
"Reader pod should see summaries written by writer pod after refresh"
);
writer.shutdown().await?;
reader.shutdown().await?;
writer_spans.shutdown().await?;
reader_spans.shutdown().await?;
if storage_path.exists() {
let _ = std::fs::remove_dir_all(&storage_path);
}
Ok(())
}
}