1use crate::error::TraceEngineError;
2use crate::parquet::control::{get_pod_id, ControlTableEngine};
3use crate::parquet::tracing::traits::{arrow_schema_to_delta, resource_attribute_field};
4use crate::parquet::utils::match_attr_expr;
5use crate::parquet::utils::register_cloud_logstore_factories;
6use crate::storage::ObjectStore;
7use arrow::array::*;
8use arrow::compute;
9use arrow::datatypes::*;
10use arrow_array::Array;
11use arrow_array::RecordBatch;
12use chrono::{DateTime, Datelike, Utc};
13use datafusion::logical_expr::{cast as df_cast, col, lit, SortExpr};
14use datafusion::prelude::*;
15use datafusion::scalar::ScalarValue;
16use deltalake::operations::optimize::OptimizeType;
17use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty};
18use scouter_types::sql::{TraceFilters, TraceListItem};
19use scouter_types::{Attribute, TraceCursor, TraceId, TracePaginationResponse, TraceSummaryRecord};
20use std::sync::Arc;
21use tokio::sync::oneshot;
22use tokio::sync::{mpsc, RwLock as AsyncRwLock};
23use tokio::time::{interval, Duration};
24use tracing::{error, info};
25use url::Url;
26
27const UNIX_EPOCH_DAYS: i32 = 719_163;
30
31const SUMMARY_TABLE_NAME: &str = "trace_summaries";
32
33const TASK_SUMMARY_OPTIMIZE: &str = "summary_optimize";
35
36const TRACE_ID_COL: &str = "trace_id";
38const SERVICE_NAME_COL: &str = "service_name";
39const SCOPE_NAME_COL: &str = "scope_name";
40const SCOPE_VERSION_COL: &str = "scope_version";
41const ROOT_OPERATION_COL: &str = "root_operation";
42const START_TIME_COL: &str = "start_time";
43const END_TIME_COL: &str = "end_time";
44const DURATION_MS_COL: &str = "duration_ms";
45const STATUS_CODE_COL: &str = "status_code";
46const STATUS_MESSAGE_COL: &str = "status_message";
47const SPAN_COUNT_COL: &str = "span_count";
48const ERROR_COUNT_COL: &str = "error_count";
49const SEARCH_BLOB_COL: &str = "search_blob";
50
51const RESOURCE_ATTRIBUTES_COL: &str = "resource_attributes";
52const ENTITY_IDS_COL: &str = "entity_ids";
53const QUEUE_IDS_COL: &str = "queue_ids";
54
55const PARTITION_DATE_COL: &str = "partition_date";
56
57fn create_summary_schema() -> Schema {
60 Schema::new(vec![
61 Field::new(TRACE_ID_COL, DataType::FixedSizeBinary(16), false),
62 Field::new(
63 SERVICE_NAME_COL,
64 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
65 false,
66 ),
67 Field::new(SCOPE_NAME_COL, DataType::Utf8, false),
68 Field::new(SCOPE_VERSION_COL, DataType::Utf8, true),
69 Field::new(ROOT_OPERATION_COL, DataType::Utf8, false),
70 Field::new(
71 START_TIME_COL,
72 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
73 false,
74 ),
75 Field::new(
76 END_TIME_COL,
77 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
78 true,
79 ),
80 Field::new(DURATION_MS_COL, DataType::Int64, true),
81 Field::new(STATUS_CODE_COL, DataType::Int32, false),
82 Field::new(STATUS_MESSAGE_COL, DataType::Utf8, true),
83 Field::new(SPAN_COUNT_COL, DataType::Int64, false),
84 Field::new(ERROR_COUNT_COL, DataType::Int64, false),
85 resource_attribute_field(),
86 Field::new(
87 ENTITY_IDS_COL,
88 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
89 true,
90 ),
91 Field::new(
92 QUEUE_IDS_COL,
93 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
94 true,
95 ),
96 Field::new(PARTITION_DATE_COL, DataType::Date32, false),
97 ])
98}
99
100struct TraceSummaryBatchBuilder {
103 schema: Arc<Schema>,
104 trace_id: FixedSizeBinaryBuilder,
105 service_name: StringDictionaryBuilder<Int32Type>,
106 scope_name: StringBuilder,
107 scope_version: StringBuilder,
108 root_operation: StringBuilder,
109 start_time: TimestampMicrosecondBuilder,
110 end_time: TimestampMicrosecondBuilder,
111 duration_ms: Int64Builder,
112 status_code: Int32Builder,
113 status_message: StringBuilder,
114 span_count: Int64Builder,
115 error_count: Int64Builder,
116 resource_attributes: MapBuilder<StringBuilder, StringViewBuilder>,
117 entity_ids: ListBuilder<StringBuilder>,
118 queue_ids: ListBuilder<StringBuilder>,
119 partition_date: Date32Builder,
120}
121
122impl TraceSummaryBatchBuilder {
123 fn new(schema: Arc<Schema>, capacity: usize) -> Self {
124 let map_field_names = MapFieldNames {
125 entry: "key_value".to_string(),
126 key: "key".to_string(),
127 value: "value".to_string(),
128 };
129 let resource_attributes = MapBuilder::new(
130 Some(map_field_names),
131 StringBuilder::new(),
132 StringViewBuilder::new(),
133 );
134 Self {
135 schema,
136 trace_id: FixedSizeBinaryBuilder::with_capacity(capacity, 16),
137 service_name: StringDictionaryBuilder::new(),
138 scope_name: StringBuilder::with_capacity(capacity, capacity * 16),
139 scope_version: StringBuilder::with_capacity(capacity, capacity * 8),
140 root_operation: StringBuilder::with_capacity(capacity, capacity * 32),
141 start_time: TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone("UTC"),
142 end_time: TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone("UTC"),
143 duration_ms: Int64Builder::with_capacity(capacity),
144 status_code: Int32Builder::with_capacity(capacity),
145 status_message: StringBuilder::with_capacity(capacity, capacity * 16),
146 span_count: Int64Builder::with_capacity(capacity),
147 error_count: Int64Builder::with_capacity(capacity),
148 resource_attributes,
149 entity_ids: ListBuilder::new(StringBuilder::new()),
150 queue_ids: ListBuilder::new(StringBuilder::new()),
151 partition_date: Date32Builder::with_capacity(capacity),
152 }
153 }
154
155 fn append(&mut self, rec: &TraceSummaryRecord) -> Result<(), TraceEngineError> {
156 self.trace_id.append_value(rec.trace_id.as_bytes())?;
157 self.service_name.append_value(&rec.service_name);
158 self.scope_name.append_value(&rec.scope_name);
159 if rec.scope_version.is_empty() {
160 self.scope_version.append_null();
161 } else {
162 self.scope_version.append_value(&rec.scope_version);
163 }
164 self.root_operation.append_value(&rec.root_operation);
165 self.start_time
166 .append_value(rec.start_time.timestamp_micros());
167 match rec.end_time {
168 Some(end) => self.end_time.append_value(end.timestamp_micros()),
169 None => self.end_time.append_null(),
170 }
171 let duration = rec
172 .end_time
173 .map(|end| (end - rec.start_time).num_milliseconds().max(0));
174 match duration {
175 Some(d) => self.duration_ms.append_value(d),
176 None => self.duration_ms.append_null(),
177 }
178 self.status_code.append_value(rec.status_code);
179 if rec.status_message.is_empty() {
180 self.status_message.append_null();
181 } else {
182 self.status_message.append_value(&rec.status_message);
183 }
184 self.span_count.append_value(rec.span_count);
185 self.error_count.append_value(rec.error_count);
186 if rec.resource_attributes.is_empty() {
187 self.resource_attributes.append(false)?; } else {
189 for attr in &rec.resource_attributes {
190 self.resource_attributes.keys().append_value(&attr.key);
191 let value_str =
192 serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
193 self.resource_attributes.values().append_value(value_str);
194 }
195 self.resource_attributes.append(true)?;
196 }
197 if rec.entity_ids.is_empty() {
198 self.entity_ids.append_null();
199 } else {
200 for id in &rec.entity_ids {
201 self.entity_ids.values().append_value(id);
202 }
203 self.entity_ids.append(true);
204 }
205 if rec.queue_ids.is_empty() {
206 self.queue_ids.append_null();
207 } else {
208 for id in &rec.queue_ids {
209 self.queue_ids.values().append_value(id);
210 }
211 self.queue_ids.append(true);
212 }
213 let days = rec.start_time.date_naive().num_days_from_ce() - UNIX_EPOCH_DAYS;
215 self.partition_date.append_value(days);
216 Ok(())
217 }
218
219 fn finish(mut self) -> Result<RecordBatch, TraceEngineError> {
220 let columns: Vec<Arc<dyn Array>> = vec![
221 Arc::new(self.trace_id.finish()),
222 Arc::new(self.service_name.finish()),
223 Arc::new(self.scope_name.finish()),
224 Arc::new(self.scope_version.finish()),
225 Arc::new(self.root_operation.finish()),
226 Arc::new(self.start_time.finish()),
227 Arc::new(self.end_time.finish()),
228 Arc::new(self.duration_ms.finish()),
229 Arc::new(self.status_code.finish()),
230 Arc::new(self.status_message.finish()),
231 Arc::new(self.span_count.finish()),
232 Arc::new(self.error_count.finish()),
233 Arc::new(self.resource_attributes.finish()),
234 Arc::new(self.entity_ids.finish()),
235 Arc::new(self.queue_ids.finish()),
236 Arc::new(self.partition_date.finish()),
237 ];
238 RecordBatch::try_new(self.schema, columns).map_err(Into::into)
239 }
240}
241
242pub enum SummaryTableCommand {
245 Write {
246 records: Vec<TraceSummaryRecord>,
247 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
248 },
249 Optimize {
250 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
251 },
252 Vacuum {
253 retention_hours: u64,
254 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
255 },
256 Shutdown,
257}
258
259async fn build_summary_url(object_store: &ObjectStore) -> Result<Url, TraceEngineError> {
260 let mut base = object_store.get_base_url()?;
261 let mut path = base.path().to_string();
262 if !path.ends_with('/') {
263 path.push('/');
264 }
265 path.push_str(SUMMARY_TABLE_NAME);
266 base.set_path(&path);
267 Ok(base)
268}
269
270async fn create_summary_table(
271 object_store: &ObjectStore,
272 table_url: Url,
273 schema: SchemaRef,
274) -> Result<DeltaTable, TraceEngineError> {
275 info!(
276 "Creating trace summary table [{}://.../{} ]",
277 table_url.scheme(),
278 table_url
279 .path_segments()
280 .and_then(|mut s| s.next_back())
281 .unwrap_or(SUMMARY_TABLE_NAME)
282 );
283 let store = object_store.as_dyn_object_store();
284 let table = DeltaTableBuilder::from_url(table_url.clone())?
285 .with_storage_backend(store, table_url)
286 .build()?;
287 let delta_fields = arrow_schema_to_delta(&schema);
288 table
289 .create()
290 .with_table_name(SUMMARY_TABLE_NAME)
291 .with_columns(delta_fields)
292 .with_partition_columns(vec![PARTITION_DATE_COL.to_string()])
293 .with_configuration_property(
296 TableProperty::DataSkippingStatsColumns,
297 Some("start_time,end_time,service_name,duration_ms,status_code,span_count,error_count,partition_date"),
298 )
299 .await
300 .map_err(Into::into)
301}
302
303async fn build_or_create_summary_table(
304 object_store: &ObjectStore,
305 schema: SchemaRef,
306) -> Result<DeltaTable, TraceEngineError> {
307 register_cloud_logstore_factories();
308 let table_url = build_summary_url(object_store).await?;
309 info!(
310 "Loading trace summary table [{}://.../{} ]",
311 table_url.scheme(),
312 table_url
313 .path_segments()
314 .and_then(|mut s| s.next_back())
315 .unwrap_or(SUMMARY_TABLE_NAME)
316 );
317
318 let is_delta_table = if table_url.scheme() == "file" {
323 if let Ok(path) = table_url.to_file_path() {
324 if !path.exists() {
325 info!("Creating directory for summary table: {:?}", path);
326 std::fs::create_dir_all(&path)?;
327 }
328 path.join("_delta_log").exists()
329 } else {
330 false
331 }
332 } else {
333 let store = object_store.as_dyn_object_store();
334 match DeltaTableBuilder::from_url(table_url.clone()) {
335 Ok(builder) => builder
336 .with_storage_backend(store, table_url.clone())
337 .load()
338 .await
339 .is_ok(),
340 Err(_) => false,
341 }
342 };
343
344 if is_delta_table {
345 info!(
346 "Loaded existing trace summary table [{}://.../{} ]",
347 table_url.scheme(),
348 table_url
349 .path_segments()
350 .and_then(|mut s| s.next_back())
351 .unwrap_or(SUMMARY_TABLE_NAME)
352 );
353 let store = object_store.as_dyn_object_store();
354 DeltaTableBuilder::from_url(table_url.clone())?
355 .with_storage_backend(store, table_url)
356 .load()
357 .await
358 .map_err(Into::into)
359 } else {
360 info!("Summary table does not exist, creating new table");
361 create_summary_table(object_store, table_url, schema).await
362 }
363}
364
365pub struct TraceSummaryDBEngine {
366 schema: Arc<Schema>,
367 table: Arc<AsyncRwLock<DeltaTable>>,
368 pub ctx: Arc<SessionContext>,
369 control: ControlTableEngine,
370}
371
372impl TraceSummaryDBEngine {
373 pub async fn new(
380 object_store: &ObjectStore,
381 ctx: Arc<SessionContext>,
382 ) -> Result<Self, TraceEngineError> {
383 let schema = Arc::new(create_summary_schema());
384 let delta_table = build_or_create_summary_table(object_store, schema.clone()).await?;
385 if let Ok(provider) = delta_table.table_provider().await {
388 ctx.register_table(SUMMARY_TABLE_NAME, provider)?;
389 } else {
390 info!("Empty summary table at init — deferring SessionContext registration until first write");
391 }
392
393 let control = ControlTableEngine::new(object_store, get_pod_id()).await?;
394
395 Ok(TraceSummaryDBEngine {
396 schema,
397 table: Arc::new(AsyncRwLock::new(delta_table)),
398 ctx,
399 control,
400 })
401 }
402
403 fn build_batch(
404 &self,
405 records: Vec<TraceSummaryRecord>,
406 ) -> Result<RecordBatch, TraceEngineError> {
407 let mut builder = TraceSummaryBatchBuilder::new(self.schema.clone(), records.len());
408 for rec in &records {
409 builder.append(rec)?;
410 }
411 builder.finish()
412 }
413
414 async fn write_records(
415 &self,
416 records: Vec<TraceSummaryRecord>,
417 ) -> Result<(), TraceEngineError> {
418 let count = records.len();
419 info!("Writing {} trace summaries", count);
420 let batch = self.build_batch(records)?;
421
422 let mut table_guard = self.table.write().await;
423 let current_table = table_guard.clone();
432 let updated_table = current_table
433 .write(vec![batch])
434 .with_save_mode(deltalake::protocol::SaveMode::Append)
435 .with_partition_columns(vec![PARTITION_DATE_COL.to_string()])
436 .await?;
437
438 self.ctx.deregister_table(SUMMARY_TABLE_NAME)?;
439 self.ctx
440 .register_table(SUMMARY_TABLE_NAME, updated_table.table_provider().await?)?;
441 updated_table.update_datafusion_session(&self.ctx.state())?;
442
443 *table_guard = updated_table;
444 info!("Summary table updated with {} records", count);
445 Ok(())
446 }
447
448 async fn optimize_table(&self) -> Result<(), TraceEngineError> {
449 let mut table_guard = self.table.write().await;
450 let (updated_table, _metrics) = table_guard
451 .clone()
452 .optimize()
453 .with_target_size(128 * 1024 * 1024)
454 .with_type(OptimizeType::ZOrder(vec![
455 START_TIME_COL.to_string(),
456 SERVICE_NAME_COL.to_string(),
457 ]))
458 .await?;
459
460 self.ctx.deregister_table(SUMMARY_TABLE_NAME)?;
461 self.ctx
462 .register_table(SUMMARY_TABLE_NAME, updated_table.table_provider().await?)?;
463 updated_table.update_datafusion_session(&self.ctx.state())?;
464 *table_guard = updated_table;
465 Ok(())
466 }
467
468 async fn vacuum_table(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
469 let mut table_guard = self.table.write().await;
470 let (updated_table, _metrics) = table_guard
471 .clone()
472 .vacuum()
473 .with_retention_period(chrono::Duration::hours(retention_hours as i64))
474 .with_enforce_retention_duration(false)
475 .await?;
476
477 self.ctx.deregister_table(SUMMARY_TABLE_NAME)?;
478 self.ctx
479 .register_table(SUMMARY_TABLE_NAME, updated_table.table_provider().await?)?;
480 updated_table.update_datafusion_session(&self.ctx.state())?;
481 *table_guard = updated_table;
482 Ok(())
483 }
484
485 async fn try_run_optimize(&self, interval_hours: u64) {
487 match self.control.try_claim_task(TASK_SUMMARY_OPTIMIZE).await {
488 Ok(true) => match self.optimize_table().await {
489 Ok(()) => {
490 if let Err(e) = self.vacuum_table(0).await {
491 error!("Post-optimize vacuum failed: {}", e);
492 }
493
494 let _ = self
495 .control
496 .release_task(
497 TASK_SUMMARY_OPTIMIZE,
498 chrono::Duration::hours(interval_hours as i64),
499 )
500 .await;
501 }
502 Err(e) => {
503 error!("Summary optimize failed: {}", e);
504 let _ = self
505 .control
506 .release_task_on_failure(TASK_SUMMARY_OPTIMIZE)
507 .await;
508 }
509 },
510 Ok(false) => { }
511 Err(e) => error!("Summary optimize claim check failed: {}", e),
512 }
513 }
514
515 pub fn start_actor(
516 self,
517 compaction_interval_hours: u64,
518 ) -> (
519 mpsc::Sender<SummaryTableCommand>,
520 tokio::task::JoinHandle<()>,
521 ) {
522 let (tx, mut rx) = mpsc::channel::<SummaryTableCommand>(100);
523
524 let handle = tokio::spawn(async move {
525 let mut scheduler_ticker = interval(Duration::from_secs(5 * 60));
527 scheduler_ticker.tick().await; loop {
530 tokio::select! {
531 Some(cmd) = rx.recv() => {
532 match cmd {
533 SummaryTableCommand::Write { records, respond_to } => {
534 let result = self.write_records(records).await;
535 if let Err(ref e) = result {
536 error!("Summary write failed: {}", e);
537 }
538 let _ = respond_to.send(result);
539 }
540 SummaryTableCommand::Optimize { respond_to } => {
541 let _ = respond_to.send(self.optimize_table().await);
545 if let Err(e) = self.vacuum_table(0).await {
546 error!("Post-optimize vacuum failed: {}", e);
547 }
548 }
549 SummaryTableCommand::Vacuum { retention_hours, respond_to } => {
550 let _ = respond_to.send(self.vacuum_table(retention_hours).await);
551 }
552 SummaryTableCommand::Shutdown => {
553 info!("TraceSummaryDBEngine actor shutting down");
554 break;
555 }
556 }
557 }
558 _ = scheduler_ticker.tick() => {
559 self.try_run_optimize(compaction_interval_hours).await;
560 }
561 }
562 }
563 });
564
565 (tx, handle)
566 }
567}
568
569pub struct TraceSummaryService {
572 engine_tx: mpsc::Sender<SummaryTableCommand>,
573 engine_handle: tokio::task::JoinHandle<()>,
574 pub query_service: TraceSummaryQueries,
575}
576
577impl TraceSummaryService {
578 pub async fn new(
579 object_store: &ObjectStore,
580 compaction_interval_hours: u64,
581 ctx: Arc<SessionContext>,
582 ) -> Result<Self, TraceEngineError> {
583 let engine = TraceSummaryDBEngine::new(object_store, ctx).await?;
584 let engine_ctx = engine.ctx.clone();
585 let (engine_tx, engine_handle) = engine.start_actor(compaction_interval_hours);
586
587 Ok(TraceSummaryService {
588 engine_tx,
589 engine_handle,
590 query_service: TraceSummaryQueries::new(engine_ctx),
591 })
592 }
593
594 pub async fn write_summaries(
596 &self,
597 records: Vec<TraceSummaryRecord>,
598 ) -> Result<(), TraceEngineError> {
599 let (tx, rx) = oneshot::channel();
600 self.engine_tx
601 .send(SummaryTableCommand::Write {
602 records,
603 respond_to: tx,
604 })
605 .await
606 .map_err(|_| TraceEngineError::ChannelClosed)?;
607 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
608 }
609
610 pub async fn optimize(&self) -> Result<(), TraceEngineError> {
611 let (tx, rx) = oneshot::channel();
612 self.engine_tx
613 .send(SummaryTableCommand::Optimize { respond_to: tx })
614 .await
615 .map_err(|_| TraceEngineError::ChannelClosed)?;
616 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
617 }
618
619 pub async fn vacuum(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
620 let (tx, rx) = oneshot::channel();
621 self.engine_tx
622 .send(SummaryTableCommand::Vacuum {
623 retention_hours,
624 respond_to: tx,
625 })
626 .await
627 .map_err(|_| TraceEngineError::ChannelClosed)?;
628 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
629 }
630
631 pub async fn signal_shutdown(&self) {
633 info!("TraceSummaryService signaling shutdown");
634 let _ = self.engine_tx.send(SummaryTableCommand::Shutdown).await;
635 }
636
637 pub async fn shutdown(self) -> Result<(), TraceEngineError> {
638 info!("TraceSummaryService shutting down");
639 self.engine_tx
640 .send(SummaryTableCommand::Shutdown)
641 .await
642 .map_err(|_| TraceEngineError::ChannelClosed)?;
643 if let Err(e) = self.engine_handle.await {
644 error!("Summary engine handle error: {}", e);
645 }
646 info!("TraceSummaryService shutdown complete");
647 Ok(())
648 }
649}
650
651pub struct TraceSummaryQueries {
654 ctx: Arc<SessionContext>,
655}
656
657impl TraceSummaryQueries {
658 pub fn new(ctx: Arc<SessionContext>) -> Self {
659 Self { ctx }
660 }
661
662 pub async fn get_paginated_traces(
674 &self,
675 filters: &TraceFilters,
676 ) -> Result<TracePaginationResponse, TraceEngineError> {
677 let limit = filters.limit.unwrap_or(50) as usize;
678 let direction = filters.direction.as_deref().unwrap_or("next");
679
680 use crate::parquet::tracing::queries::{date_lit, ts_lit};
682 use datafusion::functions_aggregate::expr_fn::{array_agg, first_value, max, min, sum};
683 use datafusion::functions_nested::set_ops::array_distinct;
684
685 let mut df = self.ctx.table(SUMMARY_TABLE_NAME).await?;
686
687 if let Some(start) = filters.start_time {
690 df = df.filter(col(PARTITION_DATE_COL).gt_eq(date_lit(&start)))?;
691 df = df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start)))?;
692 }
693 if let Some(end) = filters.end_time {
694 df = df.filter(col(PARTITION_DATE_COL).lt_eq(date_lit(&end)))?;
695 df = df.filter(col(START_TIME_COL).lt(ts_lit(&end)))?;
696 }
697
698 let by_span_end: Vec<SortExpr> = vec![
701 col(SPAN_COUNT_COL).sort(false, false),
702 col(END_TIME_COL).sort(false, false),
703 ];
704 let by_status_span: Vec<SortExpr> = vec![
706 col(STATUS_CODE_COL).sort(false, false),
707 col(SPAN_COUNT_COL).sort(false, false),
708 ];
709
710 let mut df = df
721 .aggregate(
722 vec![col(TRACE_ID_COL)],
723 vec![
724 min(col(START_TIME_COL)).alias(START_TIME_COL),
725 max(col(END_TIME_COL)).alias(END_TIME_COL),
726 max(df_cast(col(END_TIME_COL), DataType::Int64)).alias("_max_end_us"),
727 min(df_cast(col(START_TIME_COL), DataType::Int64)).alias("_min_start_us"),
728 max(col(STATUS_CODE_COL)).alias(STATUS_CODE_COL),
729 sum(col(SPAN_COUNT_COL)).alias(SPAN_COUNT_COL),
730 sum(col(ERROR_COUNT_COL)).alias(ERROR_COUNT_COL),
731 first_value(col(SERVICE_NAME_COL), by_span_end.clone()).alias(SERVICE_NAME_COL),
732 first_value(col(SCOPE_NAME_COL), by_span_end.clone()).alias(SCOPE_NAME_COL),
733 first_value(col(SCOPE_VERSION_COL), by_span_end.clone())
734 .alias(SCOPE_VERSION_COL),
735 first_value(col(ROOT_OPERATION_COL), by_span_end.clone())
736 .alias(ROOT_OPERATION_COL),
737 first_value(col(STATUS_MESSAGE_COL), by_status_span).alias(STATUS_MESSAGE_COL),
738 first_value(col(RESOURCE_ATTRIBUTES_COL), by_span_end)
739 .alias(RESOURCE_ATTRIBUTES_COL),
740 array_agg(col(ENTITY_IDS_COL)).alias("_entity_ids_raw"),
741 array_agg(col(QUEUE_IDS_COL)).alias("_queue_ids_raw"),
742 ],
743 )?
744 .with_column(
746 DURATION_MS_COL,
747 (col("_max_end_us") - col("_min_start_us")) / lit(1000i64),
748 )?
749 .with_column(
750 ENTITY_IDS_COL,
751 array_distinct(flatten(col("_entity_ids_raw"))),
752 )?
753 .with_column(
754 QUEUE_IDS_COL,
755 array_distinct(flatten(col("_queue_ids_raw"))),
756 )?
757 .drop_columns(&[
758 "_max_end_us",
759 "_min_start_us",
760 "_entity_ids_raw",
761 "_queue_ids_raw",
762 ])?;
763
764 if let Some(ref svc) = filters.service_name {
766 df = df.filter(col(SERVICE_NAME_COL).eq(lit(svc.as_str())))?;
767 }
768 match filters.has_errors {
769 Some(true) => {
770 df = df.filter(col(ERROR_COUNT_COL).gt(lit(0i64)))?;
771 }
772 Some(false) => {
773 df = df.filter(col(ERROR_COUNT_COL).eq(lit(0i64)))?;
774 }
775 None => {}
776 }
777 if let Some(sc) = filters.status_code {
778 df = df.filter(col(STATUS_CODE_COL).eq(lit(sc)))?;
779 }
780
781 if let Some(ref uid) = filters.entity_uid {
783 df = df.filter(datafusion::functions_nested::expr_fn::array_has(
784 col(ENTITY_IDS_COL),
785 lit(uid.as_str()),
786 ))?;
787 }
788
789 if let Some(ref uid) = filters.queue_uid {
791 df = df.filter(datafusion::functions_nested::expr_fn::array_has(
792 col(QUEUE_IDS_COL),
793 lit(uid.as_str()),
794 ))?;
795 }
796
797 if let Some(ref ids) = filters.trace_ids {
799 if !ids.is_empty() {
800 let binary_ids: Vec<Expr> = ids
801 .iter()
802 .filter_map(|hex| TraceId::hex_to_bytes(hex).ok())
803 .map(|b| lit(ScalarValue::Binary(Some(b))))
804 .collect();
805 if !binary_ids.is_empty() {
806 df = df.filter(col(TRACE_ID_COL).in_list(binary_ids, false))?;
807 }
808 }
809 }
810
811 if let (Some(cursor_time), Some(ref cursor_id)) =
815 (filters.cursor_start_time, &filters.cursor_trace_id)
816 {
817 if let Ok(cursor_bytes) = TraceId::hex_to_bytes(cursor_id) {
818 let cursor_ts = lit(ScalarValue::TimestampMicrosecond(
819 Some(cursor_time.timestamp_micros()),
820 Some("UTC".into()),
821 ));
822 let cursor_tid = lit(ScalarValue::Binary(Some(cursor_bytes)));
823 let cursor_expr = if direction == "previous" {
824 col(START_TIME_COL)
825 .gt(cursor_ts.clone())
826 .or(col(START_TIME_COL)
827 .eq(cursor_ts)
828 .and(col(TRACE_ID_COL).gt(cursor_tid)))
829 } else {
830 col(START_TIME_COL)
831 .lt(cursor_ts.clone())
832 .or(col(START_TIME_COL)
833 .eq(cursor_ts)
834 .and(col(TRACE_ID_COL).lt(cursor_tid)))
835 };
836 df = df.filter(cursor_expr)?;
837 }
838 }
839
840 if let Some(ref attr_filters) = filters.attribute_filters {
846 if !attr_filters.is_empty() {
847 let mut spans_df = self.ctx.table("trace_spans").await?.select_columns(&[
848 TRACE_ID_COL,
849 START_TIME_COL,
850 SEARCH_BLOB_COL,
851 ])?;
852
853 if let Some(start) = filters.start_time {
855 spans_df = spans_df.filter(col(START_TIME_COL).gt_eq(lit(
856 ScalarValue::TimestampMicrosecond(
857 Some(start.timestamp_micros()),
858 Some("UTC".into()),
859 ),
860 )))?;
861 }
862 if let Some(end) = filters.end_time {
863 spans_df = spans_df.filter(col(START_TIME_COL).lt(lit(
864 ScalarValue::TimestampMicrosecond(
865 Some(end.timestamp_micros()),
866 Some("UTC".into()),
867 ),
868 )))?;
869 }
870
871 let mut attr_expr: Option<Expr> = None;
875 for f in attr_filters {
876 let pattern = crate::parquet::tracing::queries::normalize_attr_filter(f);
877 let cond = match_attr_expr(col(SEARCH_BLOB_COL), lit(pattern));
878 attr_expr = Some(match attr_expr {
879 None => cond,
880 Some(e) => e.or(cond),
881 });
882 }
883 if let Some(expr) = attr_expr {
884 spans_df = spans_df.filter(expr)?;
885 }
886
887 let span_batches = spans_df.select_columns(&[TRACE_ID_COL])?.collect().await?;
890 let mut seen_ids: std::collections::HashSet<Vec<u8>> =
891 std::collections::HashSet::new();
892 let mut binary_ids: Vec<Expr> = Vec::new();
893 for batch in &span_batches {
894 if let Some(col_ref) = batch.column_by_name(TRACE_ID_COL) {
897 let casted = compute::cast(col_ref, &DataType::Binary)?;
898 let col_arr =
899 casted
900 .as_any()
901 .downcast_ref::<BinaryArray>()
902 .ok_or_else(|| {
903 TraceEngineError::DowncastError("trace_id to BinaryArray")
904 })?;
905 for i in 0..batch.num_rows() {
906 let id_bytes = col_arr.value(i).to_vec();
907 if seen_ids.insert(id_bytes.clone()) {
908 binary_ids.push(lit(ScalarValue::Binary(Some(id_bytes))));
909 }
910 }
911 }
912 }
913
914 if !binary_ids.is_empty() {
915 df = df.filter(col(TRACE_ID_COL).in_list(binary_ids, false))?;
916 } else {
917 df = df.filter(lit(false))?;
919 }
920 }
921 }
922
923 df = if direction == "previous" {
927 df.sort(vec![
928 col(START_TIME_COL).sort(true, true),
929 col(TRACE_ID_COL).sort(true, true),
930 ])?
931 } else {
932 df.sort(vec![
933 col(START_TIME_COL).sort(false, false),
934 col(TRACE_ID_COL).sort(false, false),
935 ])?
936 };
937
938 df = df.limit(0, Some(limit + 1))?;
940
941 let batches = df.collect().await?;
942 let mut items = batches_to_trace_list_items(batches)?;
943
944 let has_more = items.len() > limit;
945 if has_more {
946 items.pop(); }
948
949 let (has_next, next_cursor, has_previous, previous_cursor) = match direction {
960 "next" => {
961 let next_cursor = if has_more {
962 items.last().map(|item| TraceCursor {
963 start_time: item.start_time,
964 trace_id: item.trace_id.clone(),
965 })
966 } else {
967 None
968 };
969
970 let previous_cursor = items.first().map(|item| TraceCursor {
971 start_time: item.start_time,
972 trace_id: item.trace_id.clone(),
973 });
974
975 (
976 has_more,
977 next_cursor,
978 filters.cursor_start_time.is_some(),
979 previous_cursor,
980 )
981 }
982 "previous" => {
983 let previous_cursor = if has_more {
988 items.last().map(|item| TraceCursor {
989 start_time: item.start_time,
990 trace_id: item.trace_id.clone(),
991 })
992 } else {
993 None
994 };
995
996 let next_cursor = items.first().map(|item| TraceCursor {
1000 start_time: item.start_time,
1001 trace_id: item.trace_id.clone(),
1002 });
1003
1004 (
1005 filters.cursor_start_time.is_some(),
1006 next_cursor,
1007 has_more,
1008 previous_cursor,
1009 )
1010 }
1011 _ => (false, None, false, None),
1012 };
1013
1014 Ok(TracePaginationResponse {
1015 items,
1016 has_next,
1017 next_cursor,
1018 has_previous,
1019 previous_cursor,
1020 })
1021 }
1022}
1023
1024fn extract_map_attributes(map_array: &MapArray, row_idx: usize) -> Vec<Attribute> {
1028 if map_array.is_null(row_idx) {
1029 return Vec::new();
1030 }
1031 let entry = map_array.value(row_idx);
1032 let Some(struct_array) = entry.as_any().downcast_ref::<StructArray>() else {
1033 tracing::warn!("extract_map_attributes: failed to downcast to StructArray");
1034 return Vec::new();
1035 };
1036 let Some(keys_arr) = compute::cast(struct_array.column(0).as_ref(), &DataType::Utf8).ok()
1037 else {
1038 tracing::warn!("extract_map_attributes: failed to cast keys to Utf8");
1039 return Vec::new();
1040 };
1041 let Some(keys) = keys_arr.as_any().downcast_ref::<StringArray>() else {
1042 tracing::warn!("extract_map_attributes: failed to downcast keys to StringArray");
1043 return Vec::new();
1044 };
1045 let Some(values_arr) = compute::cast(struct_array.column(1).as_ref(), &DataType::Utf8).ok()
1046 else {
1047 tracing::warn!("extract_map_attributes: failed to cast values to Utf8");
1048 return Vec::new();
1049 };
1050 let Some(values) = values_arr.as_any().downcast_ref::<StringArray>() else {
1051 tracing::warn!("extract_map_attributes: failed to downcast values to StringArray");
1052 return Vec::new();
1053 };
1054
1055 (0..struct_array.len())
1056 .map(|i| Attribute {
1057 key: keys.value(i).to_string(),
1058 value: serde_json::from_str(values.value(i)).unwrap_or(serde_json::Value::Null),
1059 })
1060 .collect()
1061}
1062
1063fn extract_list_strings(list: Option<&ListArray>, row_idx: usize) -> Vec<String> {
1065 let Some(list) = list else {
1066 return Vec::new();
1067 };
1068 if list.is_null(row_idx) {
1069 return Vec::new();
1070 }
1071 let inner = list.value(row_idx);
1072 let str_arr = compute::cast(&inner, &DataType::Utf8)
1073 .ok()
1074 .and_then(|a| a.as_any().downcast_ref::<StringArray>().cloned());
1075 match str_arr {
1076 Some(arr) => (0..arr.len())
1077 .filter(|i| !arr.is_null(*i))
1078 .map(|i| arr.value(i).to_string())
1079 .collect(),
1080 None => Vec::new(),
1081 }
1082}
1083
1084fn batches_to_trace_list_items(
1085 batches: Vec<RecordBatch>,
1086) -> Result<Vec<TraceListItem>, TraceEngineError> {
1087 let mut items = Vec::new();
1088
1089 for batch in &batches {
1090 let trace_id_col = batch.column_by_name(TRACE_ID_COL).ok_or_else(|| {
1093 TraceEngineError::UnsupportedOperation("missing trace_id column".into())
1094 })?;
1095 let trace_id_binary = compute::cast(trace_id_col, &DataType::Binary)?;
1096 let trace_ids = trace_id_binary
1097 .as_any()
1098 .downcast_ref::<BinaryArray>()
1099 .ok_or_else(|| {
1100 TraceEngineError::UnsupportedOperation("trace_id cast to BinaryArray failed".into())
1101 })?;
1102
1103 let svc_arr = compute::cast(
1106 batch.column_by_name(SERVICE_NAME_COL).ok_or_else(|| {
1107 TraceEngineError::UnsupportedOperation("missing service_name column".into())
1108 })?,
1109 &DataType::Utf8,
1110 )?;
1111 let service_names = svc_arr
1112 .as_any()
1113 .downcast_ref::<StringArray>()
1114 .ok_or_else(|| {
1115 TraceEngineError::UnsupportedOperation(
1116 "service_name cast to StringArray failed".into(),
1117 )
1118 })?;
1119
1120 let scope_arr = compute::cast(
1121 batch.column_by_name(SCOPE_NAME_COL).ok_or_else(|| {
1122 TraceEngineError::UnsupportedOperation("missing scope_name column".into())
1123 })?,
1124 &DataType::Utf8,
1125 )?;
1126 let scope_names = scope_arr
1127 .as_any()
1128 .downcast_ref::<StringArray>()
1129 .ok_or_else(|| {
1130 TraceEngineError::UnsupportedOperation(
1131 "scope_name cast to StringArray failed".into(),
1132 )
1133 })?;
1134
1135 let scopev_arr = compute::cast(
1136 batch.column_by_name(SCOPE_VERSION_COL).ok_or_else(|| {
1137 TraceEngineError::UnsupportedOperation("missing scope_version column".into())
1138 })?,
1139 &DataType::Utf8,
1140 )?;
1141 let scope_versions = scopev_arr
1142 .as_any()
1143 .downcast_ref::<StringArray>()
1144 .ok_or_else(|| {
1145 TraceEngineError::UnsupportedOperation(
1146 "scope_version cast to StringArray failed".into(),
1147 )
1148 })?;
1149
1150 let root_arr = compute::cast(
1151 batch.column_by_name(ROOT_OPERATION_COL).ok_or_else(|| {
1152 TraceEngineError::UnsupportedOperation("missing root_operation column".into())
1153 })?,
1154 &DataType::Utf8,
1155 )?;
1156 let root_operations = root_arr
1157 .as_any()
1158 .downcast_ref::<StringArray>()
1159 .ok_or_else(|| {
1160 TraceEngineError::UnsupportedOperation(
1161 "root_operation cast to StringArray failed".into(),
1162 )
1163 })?;
1164
1165 let sm_arr = compute::cast(
1166 batch.column_by_name(STATUS_MESSAGE_COL).ok_or_else(|| {
1167 TraceEngineError::UnsupportedOperation("missing status_message column".into())
1168 })?,
1169 &DataType::Utf8,
1170 )?;
1171 let status_messages = sm_arr
1172 .as_any()
1173 .downcast_ref::<StringArray>()
1174 .ok_or_else(|| {
1175 TraceEngineError::UnsupportedOperation(
1176 "status_message cast to StringArray failed".into(),
1177 )
1178 })?;
1179
1180 let resource_attrs_map = batch
1181 .column_by_name(RESOURCE_ATTRIBUTES_COL)
1182 .and_then(|c| c.as_any().downcast_ref::<MapArray>())
1183 .ok_or_else(|| {
1184 TraceEngineError::UnsupportedOperation("missing resource_attributes column".into())
1185 })?;
1186
1187 let entity_ids_list = batch
1188 .column_by_name(ENTITY_IDS_COL)
1189 .and_then(|c| c.as_any().downcast_ref::<ListArray>());
1190
1191 let queue_ids_list = batch
1192 .column_by_name(QUEUE_IDS_COL)
1193 .and_then(|c| c.as_any().downcast_ref::<ListArray>());
1194
1195 let start_times = batch
1196 .column_by_name(START_TIME_COL)
1197 .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>())
1198 .ok_or_else(|| {
1199 TraceEngineError::UnsupportedOperation("missing start_time column".into())
1200 })?;
1201
1202 let end_times = batch
1203 .column_by_name(END_TIME_COL)
1204 .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>())
1205 .ok_or_else(|| {
1206 TraceEngineError::UnsupportedOperation("missing end_time column".into())
1207 })?;
1208
1209 let durations = batch
1210 .column_by_name(DURATION_MS_COL)
1211 .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1212 .ok_or_else(|| {
1213 TraceEngineError::UnsupportedOperation("missing duration_ms column".into())
1214 })?;
1215
1216 let status_codes = batch
1217 .column_by_name(STATUS_CODE_COL)
1218 .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
1219 .ok_or_else(|| {
1220 TraceEngineError::UnsupportedOperation("missing status_code column".into())
1221 })?;
1222
1223 let span_counts = batch
1224 .column_by_name(SPAN_COUNT_COL)
1225 .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1226 .ok_or_else(|| {
1227 TraceEngineError::UnsupportedOperation("missing span_count column".into())
1228 })?;
1229
1230 let error_counts = batch
1231 .column_by_name(ERROR_COUNT_COL)
1232 .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1233 .ok_or_else(|| {
1234 TraceEngineError::UnsupportedOperation("missing error_count column".into())
1235 })?;
1236
1237 for i in 0..batch.num_rows() {
1238 let trace_id_hex = hex::encode(trace_ids.value(i));
1239
1240 let start_time = micros_to_datetime(start_times.value(i))?;
1241 let end_time = if end_times.is_null(i) {
1242 None
1243 } else {
1244 Some(micros_to_datetime(end_times.value(i))?)
1245 };
1246 let duration_ms = if durations.is_null(i) {
1247 None
1248 } else {
1249 Some(durations.value(i))
1250 };
1251 let error_count = error_counts.value(i);
1252
1253 let resource_attributes = extract_map_attributes(resource_attrs_map, i);
1254
1255 let entity_ids = extract_list_strings(entity_ids_list, i);
1256 let queue_ids = extract_list_strings(queue_ids_list, i);
1257
1258 items.push(TraceListItem {
1259 trace_id: trace_id_hex,
1260 service_name: service_names.value(i).to_string(),
1261 scope_name: scope_names.value(i).to_string(),
1262 scope_version: scope_versions.value(i).to_string(),
1263 root_operation: root_operations.value(i).to_string(),
1264 start_time,
1265 end_time,
1266 duration_ms,
1267 status_code: status_codes.value(i),
1268 status_message: if status_messages.is_null(i) {
1269 None
1270 } else {
1271 Some(status_messages.value(i).to_string())
1272 },
1273 span_count: span_counts.value(i),
1274 has_errors: error_count > 0,
1275 error_count,
1276 resource_attributes,
1277 entity_ids,
1278 queue_ids,
1279 });
1280 }
1281 }
1282
1283 Ok(items)
1284}
1285
1286fn micros_to_datetime(micros: i64) -> Result<DateTime<Utc>, TraceEngineError> {
1287 DateTime::from_timestamp_micros(micros).ok_or(TraceEngineError::InvalidTimestamp(
1288 "out-of-range microsecond timestamp",
1289 ))
1290}
1291
1292#[cfg(test)]
1293mod tests {
1294 use super::*;
1295 use crate::storage::ObjectStore;
1296 use scouter_settings::ObjectStorageSettings;
1297 use scouter_types::sql::TraceFilters;
1298 use scouter_types::{Attribute, SpanId, TraceId, TraceSpanRecord};
1299 use tracing_subscriber;
1300
1301 fn cleanup() {
1302 let _ = tracing_subscriber::fmt()
1303 .with_max_level(tracing::Level::INFO)
1304 .try_init();
1305
1306 let storage_settings = ObjectStorageSettings::default();
1307 let current_dir = std::env::current_dir().unwrap();
1308 let storage_path = current_dir.join(storage_settings.storage_root());
1309 if storage_path.exists() {
1310 let _ = std::fs::remove_dir_all(storage_path);
1311 }
1312 }
1313
1314 fn make_test_object_store(storage_settings: &ObjectStorageSettings) -> ObjectStore {
1315 ObjectStore::new(storage_settings).unwrap()
1316 }
1317
1318 fn make_test_ctx(object_store: &ObjectStore) -> Arc<SessionContext> {
1321 Arc::new(object_store.get_session().unwrap())
1322 }
1323
1324 fn make_summary(
1325 trace_id_bytes: [u8; 16],
1326 service_name: &str,
1327 error_count: i64,
1328 resource_attributes: Vec<Attribute>,
1329 ) -> TraceSummaryRecord {
1330 let now = Utc::now();
1331 TraceSummaryRecord {
1332 trace_id: TraceId::from_bytes(trace_id_bytes),
1333 service_name: service_name.to_string(),
1334 scope_name: "test.scope".to_string(),
1335 scope_version: String::new(),
1336 root_operation: "root_op".to_string(),
1337 start_time: now,
1338 end_time: Some(now + chrono::Duration::milliseconds(200)),
1339 status_code: if error_count > 0 { 2 } else { 0 },
1340 status_message: if error_count > 0 {
1341 "Internal Server Error".to_string()
1342 } else {
1343 "OK".to_string()
1344 },
1345 span_count: 3,
1346 error_count,
1347 resource_attributes,
1348 entity_ids: vec![],
1349 queue_ids: vec![],
1350 }
1351 }
1352
1353 #[tokio::test]
1355 async fn test_summary_write_and_paginate_basic() -> Result<(), TraceEngineError> {
1356 cleanup();
1357
1358 let storage_settings = ObjectStorageSettings::default();
1359 let object_store = make_test_object_store(&storage_settings);
1360 let ctx = make_test_ctx(&object_store);
1361 let service = TraceSummaryService::new(&object_store, 24, ctx).await?;
1362
1363 let s1 = make_summary([1u8; 16], "svc_a", 0, vec![]);
1364 let s2 = make_summary([2u8; 16], "svc_b", 0, vec![]);
1365 service.write_summaries(vec![s1, s2]).await?;
1366 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1367
1368 let start = Utc::now() - chrono::Duration::hours(1);
1369 let end = Utc::now() + chrono::Duration::hours(1);
1370 let filters = TraceFilters {
1371 service_name: None,
1372 has_errors: None,
1373 status_code: None,
1374 start_time: Some(start),
1375 end_time: Some(end),
1376 limit: Some(25),
1377 cursor_start_time: None,
1378 cursor_trace_id: None,
1379 direction: None,
1380 attribute_filters: None,
1381 trace_ids: None,
1382 entity_uid: None,
1383 queue_uid: None,
1384 };
1385
1386 let response = service.query_service.get_paginated_traces(&filters).await?;
1387 assert!(
1388 response.items.len() >= 2,
1389 "Expected at least 2 items, got {}",
1390 response.items.len()
1391 );
1392
1393 service.shutdown().await?;
1394 cleanup();
1395 Ok(())
1396 }
1397
1398 #[tokio::test]
1400 async fn test_summary_has_errors_filter() -> Result<(), TraceEngineError> {
1401 cleanup();
1402
1403 let storage_settings = ObjectStorageSettings::default();
1404 let object_store = make_test_object_store(&storage_settings);
1405 let ctx = make_test_ctx(&object_store);
1406 let service = TraceSummaryService::new(&object_store, 24, ctx).await?;
1407
1408 let ok_summary = make_summary([3u8; 16], "svc", 0, vec![]);
1409 let err_summary = make_summary([4u8; 16], "svc", 2, vec![]);
1410 service
1411 .write_summaries(vec![ok_summary, err_summary])
1412 .await?;
1413 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1414
1415 let start = Utc::now() - chrono::Duration::hours(1);
1416 let end = Utc::now() + chrono::Duration::hours(1);
1417
1418 let base_filters = TraceFilters {
1419 service_name: None,
1420 has_errors: None,
1421 status_code: None,
1422 start_time: Some(start),
1423 end_time: Some(end),
1424 limit: Some(25),
1425 cursor_start_time: None,
1426 cursor_trace_id: None,
1427 direction: None,
1428 attribute_filters: None,
1429 trace_ids: None,
1430 entity_uid: None,
1431 queue_uid: None,
1432 };
1433
1434 let mut filters_err = base_filters.clone();
1436 filters_err.has_errors = Some(true);
1437 let errors_only = service
1438 .query_service
1439 .get_paginated_traces(&filters_err)
1440 .await?;
1441 for item in &errors_only.items {
1442 assert!(
1443 item.error_count > 0,
1444 "Expected error trace, got: {:?}",
1445 item
1446 );
1447 }
1448 assert!(
1449 !errors_only.items.is_empty(),
1450 "Expected at least one error trace"
1451 );
1452
1453 let mut filters_ok = base_filters.clone();
1455 filters_ok.has_errors = Some(false);
1456 let no_errors = service
1457 .query_service
1458 .get_paginated_traces(&filters_ok)
1459 .await?;
1460 for item in &no_errors.items {
1461 assert_eq!(
1462 item.error_count, 0,
1463 "Expected non-error trace, got error_count={}",
1464 item.error_count
1465 );
1466 }
1467 assert!(
1468 !no_errors.items.is_empty(),
1469 "Expected at least one non-error trace"
1470 );
1471
1472 service.shutdown().await?;
1473 cleanup();
1474 Ok(())
1475 }
1476
1477 #[tokio::test]
1479 async fn test_summary_service_name_filter() -> Result<(), TraceEngineError> {
1480 cleanup();
1481
1482 let storage_settings = ObjectStorageSettings::default();
1483 let object_store = make_test_object_store(&storage_settings);
1484 let ctx = make_test_ctx(&object_store);
1485 let service = TraceSummaryService::new(&object_store, 24, ctx).await?;
1486
1487 let s_alpha = make_summary([5u8; 16], "alpha_service", 0, vec![]);
1488 let s_beta = make_summary([6u8; 16], "beta_service", 0, vec![]);
1489 service.write_summaries(vec![s_alpha, s_beta]).await?;
1490 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1491
1492 let start = Utc::now() - chrono::Duration::hours(1);
1493 let end = Utc::now() + chrono::Duration::hours(1);
1494 let filters = TraceFilters {
1495 service_name: Some("alpha_service".to_string()),
1496 has_errors: None,
1497 status_code: None,
1498 start_time: Some(start),
1499 end_time: Some(end),
1500 limit: Some(25),
1501 cursor_start_time: None,
1502 cursor_trace_id: None,
1503 direction: None,
1504 attribute_filters: None,
1505 trace_ids: None,
1506 entity_uid: None,
1507 queue_uid: None,
1508 };
1509
1510 let response = service.query_service.get_paginated_traces(&filters).await?;
1511 assert!(
1512 !response.items.is_empty(),
1513 "Expected results for alpha_service"
1514 );
1515 for item in &response.items {
1516 assert_eq!(
1517 item.service_name, "alpha_service",
1518 "Expected only alpha_service items, got: {}",
1519 item.service_name
1520 );
1521 }
1522
1523 service.shutdown().await?;
1524 cleanup();
1525 Ok(())
1526 }
1527
1528 #[tokio::test]
1530 async fn test_summary_trace_ids_filter() -> Result<(), TraceEngineError> {
1531 cleanup();
1532
1533 let storage_settings = ObjectStorageSettings::default();
1534 let object_store = make_test_object_store(&storage_settings);
1535 let ctx = make_test_ctx(&object_store);
1536 let service = TraceSummaryService::new(&object_store, 24, ctx).await?;
1537
1538 let wanted_id = TraceId::from_bytes([7u8; 16]);
1539 let unwanted_id = TraceId::from_bytes([8u8; 16]);
1540
1541 let s1 = make_summary([7u8; 16], "svc", 0, vec![]);
1542 let s2 = make_summary([8u8; 16], "svc", 0, vec![]);
1543 service.write_summaries(vec![s1, s2]).await?;
1544 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1545
1546 let start = Utc::now() - chrono::Duration::hours(1);
1547 let end = Utc::now() + chrono::Duration::hours(1);
1548 let filters = TraceFilters {
1549 service_name: None,
1550 has_errors: None,
1551 status_code: None,
1552 start_time: Some(start),
1553 end_time: Some(end),
1554 limit: Some(25),
1555 cursor_start_time: None,
1556 cursor_trace_id: None,
1557 direction: None,
1558 attribute_filters: None,
1559 trace_ids: Some(vec![wanted_id.to_hex()]),
1560 entity_uid: None,
1561 queue_uid: None,
1562 };
1563
1564 let response = service.query_service.get_paginated_traces(&filters).await?;
1565 assert_eq!(
1566 response.items.len(),
1567 1,
1568 "Expected exactly 1 item from trace_ids filter"
1569 );
1570 assert_eq!(
1571 response.items[0].trace_id,
1572 wanted_id.to_hex(),
1573 "Returned wrong trace_id"
1574 );
1575 assert_ne!(
1576 response.items[0].trace_id,
1577 unwanted_id.to_hex(),
1578 "Should not have returned unwanted trace_id"
1579 );
1580
1581 service.shutdown().await?;
1582 cleanup();
1583 Ok(())
1584 }
1585
1586 #[tokio::test]
1588 async fn test_summary_cursor_pagination() -> Result<(), TraceEngineError> {
1589 cleanup();
1590 let storage_settings = ObjectStorageSettings::default();
1591 let object_store = make_test_object_store(&storage_settings);
1592 let ctx = make_test_ctx(&object_store);
1593 let service = TraceSummaryService::new(&object_store, 24, ctx).await?;
1594
1595 let now = Utc::now();
1596 let summaries: Vec<TraceSummaryRecord> = (0u8..100)
1597 .map(|i| {
1598 let mut s = make_summary([i; 16], "svc", 0, vec![]);
1599 s.start_time = now - chrono::Duration::minutes(i as i64);
1600 s
1601 })
1602 .collect();
1603 service.write_summaries(summaries).await?;
1604 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1605
1606 let mut filters = TraceFilters {
1607 start_time: Some(now - chrono::Duration::hours(2)),
1608 end_time: Some(now + chrono::Duration::hours(1)),
1609 limit: Some(50),
1610 ..Default::default()
1611 };
1612
1613 let first = service.query_service.get_paginated_traces(&filters).await?;
1615 assert_eq!(first.items.len(), 50, "first page: 50 items");
1616 assert!(
1617 first.next_cursor.is_some(),
1618 "first page: should have next_cursor"
1619 );
1620
1621 let next_cur = first.next_cursor.clone().unwrap();
1623 filters.cursor_start_time = Some(next_cur.start_time);
1624 filters.cursor_trace_id = Some(next_cur.trace_id.clone());
1625 filters.direction = Some("next".to_string());
1626 let second = service.query_service.get_paginated_traces(&filters).await?;
1627 assert_eq!(second.items.len(), 50, "second page: 50 items");
1628 assert!(
1629 second.items[0].start_time <= next_cur.start_time,
1630 "second page first item must be <= cursor"
1631 );
1632 assert!(second.previous_cursor.is_some());
1633
1634 let prev_cur = second.previous_cursor.unwrap();
1636 filters.cursor_start_time = Some(prev_cur.start_time);
1637 filters.cursor_trace_id = Some(prev_cur.trace_id.clone());
1638 filters.direction = Some("previous".to_string());
1639 let prev = service.query_service.get_paginated_traces(&filters).await?;
1640 assert_eq!(prev.items.len(), 50, "previous page: 50 items");
1641
1642 service.shutdown().await?;
1643 cleanup();
1644 Ok(())
1645 }
1646
1647 #[tokio::test]
1649 async fn test_summary_attribute_filter_via_join() -> Result<(), TraceEngineError> {
1650 use crate::parquet::tracing::service::TraceSpanService;
1651
1652 cleanup();
1653 let storage_settings = ObjectStorageSettings::default();
1654
1655 let span_service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
1657 let shared_ctx = span_service.ctx.clone();
1658
1659 let summary_service =
1661 TraceSummaryService::new(&span_service.object_store, 24, shared_ctx).await?;
1662
1663 let now = Utc::now();
1664 let kafka_trace = TraceId::from_bytes([70u8; 16]);
1665 let plain_trace = TraceId::from_bytes([80u8; 16]);
1666
1667 let kafka_span = make_span_record(
1668 &kafka_trace,
1669 SpanId::from_bytes([70u8; 8]),
1670 "svc",
1671 vec![Attribute {
1672 key: "component".to_string(),
1673 value: serde_json::Value::String("kafka".to_string()),
1674 }],
1675 );
1676 let plain_span =
1677 make_span_record(&plain_trace, SpanId::from_bytes([80u8; 8]), "svc", vec![]);
1678 span_service
1679 .write_spans(vec![kafka_span, plain_span])
1680 .await?;
1681
1682 let mut kafka_summary = make_summary([70u8; 16], "svc", 0, vec![]);
1683 kafka_summary.start_time = now;
1684 let mut plain_summary = make_summary([80u8; 16], "svc", 0, vec![]);
1685 plain_summary.start_time = now;
1686 summary_service
1687 .write_summaries(vec![kafka_summary, plain_summary])
1688 .await?;
1689
1690 tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
1691
1692 let filters = TraceFilters {
1693 start_time: Some(now - chrono::Duration::hours(1)),
1694 end_time: Some(now + chrono::Duration::hours(1)),
1695 attribute_filters: Some(vec!["component:kafka".to_string()]),
1696 limit: Some(25),
1697 ..Default::default()
1698 };
1699
1700 let response = summary_service
1701 .query_service
1702 .get_paginated_traces(&filters)
1703 .await?;
1704
1705 assert!(
1706 !response.items.is_empty(),
1707 "attribute filter must return results"
1708 );
1709 assert!(
1710 response
1711 .items
1712 .iter()
1713 .all(|i| i.trace_id == kafka_trace.to_hex()),
1714 "only kafka trace should appear; got {:?}",
1715 response
1716 .items
1717 .iter()
1718 .map(|i| &i.trace_id)
1719 .collect::<Vec<_>>()
1720 );
1721
1722 span_service.shutdown().await?;
1723 summary_service.shutdown().await?;
1724 cleanup();
1725 Ok(())
1726 }
1727
1728 #[tokio::test]
1731 async fn test_summary_queue_id_filter_and_span_lookup() -> Result<(), TraceEngineError> {
1732 use crate::parquet::tracing::service::TraceSpanService;
1733
1734 cleanup();
1735 let storage_settings = ObjectStorageSettings::default();
1736
1737 let span_service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
1739 let shared_ctx = span_service.ctx.clone();
1740
1741 let summary_service =
1743 TraceSummaryService::new(&span_service.object_store, 24, shared_ctx).await?;
1744
1745 let now = Utc::now();
1746 let queue_trace = TraceId::from_bytes([90u8; 16]);
1747 let plain_trace = TraceId::from_bytes([91u8; 16]);
1748 let target_queue_uid = "queue-record-abc123";
1749
1750 let queue_span = make_span_record(
1752 &queue_trace,
1753 SpanId::from_bytes([90u8; 8]),
1754 "svc_queue",
1755 vec![],
1756 );
1757 let plain_span = make_span_record(
1758 &plain_trace,
1759 SpanId::from_bytes([91u8; 8]),
1760 "svc_queue",
1761 vec![],
1762 );
1763 span_service
1764 .write_spans_direct(vec![queue_span, plain_span])
1765 .await?;
1766
1767 let mut queue_summary = make_summary([90u8; 16], "svc_queue", 0, vec![]);
1769 queue_summary.start_time = now;
1770 queue_summary.queue_ids = vec![target_queue_uid.to_string()];
1771
1772 let mut plain_summary = make_summary([91u8; 16], "svc_queue", 0, vec![]);
1773 plain_summary.start_time = now;
1774 summary_service
1777 .write_summaries(vec![queue_summary, plain_summary])
1778 .await?;
1779
1780 tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
1781
1782 let filters = TraceFilters {
1784 start_time: Some(now - chrono::Duration::hours(1)),
1785 end_time: Some(now + chrono::Duration::hours(1)),
1786 queue_uid: Some(target_queue_uid.to_string()),
1787 limit: Some(25),
1788 ..Default::default()
1789 };
1790
1791 let response = summary_service
1792 .query_service
1793 .get_paginated_traces(&filters)
1794 .await?;
1795
1796 assert!(
1797 !response.items.is_empty(),
1798 "queue_uid filter must return at least one result"
1799 );
1800 assert!(
1801 response
1802 .items
1803 .iter()
1804 .all(|i| i.trace_id == queue_trace.to_hex()),
1805 "only the queue trace should appear; got {:?}",
1806 response
1807 .items
1808 .iter()
1809 .map(|i| &i.trace_id)
1810 .collect::<Vec<_>>()
1811 );
1812
1813 let returned_trace_id =
1815 TraceId::from_hex(&response.items[0].trace_id).expect("trace_id must be valid hex");
1816 let spans = span_service
1817 .query_service
1818 .get_trace_spans(
1819 Some(returned_trace_id.as_bytes()),
1820 None,
1821 Some(&(now - chrono::Duration::hours(1))),
1822 Some(&(now + chrono::Duration::hours(1))),
1823 None,
1824 )
1825 .await?;
1826
1827 assert!(
1828 !spans.is_empty(),
1829 "should find spans for the returned trace_id"
1830 );
1831
1832 span_service.shutdown().await?;
1833 summary_service.shutdown().await?;
1834 cleanup();
1835 Ok(())
1836 }
1837
1838 fn make_span_record(
1840 trace_id: &TraceId,
1841 span_id: SpanId,
1842 service_name: &str,
1843 attributes: Vec<Attribute>,
1844 ) -> TraceSpanRecord {
1845 let now = Utc::now();
1846 TraceSpanRecord {
1847 created_at: now,
1848 trace_id: *trace_id,
1849 span_id,
1850 parent_span_id: None,
1851 flags: 1,
1852 trace_state: String::new(),
1853 scope_name: "test.scope".to_string(),
1854 scope_version: None,
1855 span_name: "op".to_string(),
1856 span_kind: "INTERNAL".to_string(),
1857 start_time: now,
1858 end_time: now + chrono::Duration::milliseconds(100),
1859 duration_ms: 100,
1860 status_code: 0,
1861 status_message: "OK".to_string(),
1862 attributes,
1863 events: vec![],
1864 links: vec![],
1865 label: None,
1866 input: serde_json::Value::Null,
1867 output: serde_json::Value::Null,
1868 service_name: service_name.to_string(),
1869 resource_attributes: vec![],
1870 }
1871 }
1872
1873 #[tokio::test]
1875 async fn test_summary_resource_attributes_roundtrip() -> Result<(), TraceEngineError> {
1876 cleanup();
1877
1878 let storage_settings = ObjectStorageSettings::default();
1879 let object_store = make_test_object_store(&storage_settings);
1880 let ctx = make_test_ctx(&object_store);
1881 let service = TraceSummaryService::new(&object_store, 24, ctx).await?;
1882
1883 let attrs = vec![Attribute {
1884 key: "cloud.region".to_string(),
1885 value: serde_json::Value::String("us-east-1".to_string()),
1886 }];
1887 let summary = make_summary([9u8; 16], "svc", 0, attrs.clone());
1888 service.write_summaries(vec![summary]).await?;
1889 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1890
1891 let start = Utc::now() - chrono::Duration::hours(1);
1892 let end = Utc::now() + chrono::Duration::hours(1);
1893 let filters = TraceFilters {
1894 service_name: None,
1895 has_errors: None,
1896 status_code: None,
1897 start_time: Some(start),
1898 end_time: Some(end),
1899 limit: Some(25),
1900 cursor_start_time: None,
1901 cursor_trace_id: None,
1902 direction: None,
1903 attribute_filters: None,
1904 trace_ids: Some(vec![TraceId::from_bytes([9u8; 16]).to_hex()]),
1905 entity_uid: None,
1906 queue_uid: None,
1907 };
1908
1909 let response = service.query_service.get_paginated_traces(&filters).await?;
1910 assert_eq!(response.items.len(), 1, "Expected exactly 1 item");
1911 assert_eq!(
1912 response.items[0].resource_attributes.len(),
1913 1,
1914 "Expected 1 resource attribute"
1915 );
1916 assert_eq!(response.items[0].resource_attributes[0].key, "cloud.region");
1917
1918 service.shutdown().await?;
1919 cleanup();
1920 Ok(())
1921 }
1922
1923 #[tokio::test]
1928 async fn test_summary_write_visibility_across_multiple_writes() -> Result<(), TraceEngineError>
1929 {
1930 cleanup();
1931
1932 let storage_settings = ObjectStorageSettings::default();
1933 let object_store = make_test_object_store(&storage_settings);
1934 let ctx = make_test_ctx(&object_store);
1935 let service = TraceSummaryService::new(&object_store, 24, ctx).await?;
1936
1937 let start = Utc::now() - chrono::Duration::hours(1);
1938 let end = Utc::now() + chrono::Duration::hours(1);
1939 let filters = TraceFilters {
1940 start_time: Some(start),
1941 end_time: Some(end),
1942 limit: Some(100),
1943 ..Default::default()
1944 };
1945
1946 let s1 = make_summary([0xA0; 16], "svc_vis", 0, vec![]);
1948 let s2 = make_summary([0xA1; 16], "svc_vis", 0, vec![]);
1949 service.write_summaries(vec![s1, s2]).await?;
1950
1951 let response = service.query_service.get_paginated_traces(&filters).await?;
1952 assert_eq!(
1953 response.items.len(),
1954 2,
1955 "After write #1: expected 2 items, got {}",
1956 response.items.len()
1957 );
1958
1959 let s3 = make_summary([0xA2; 16], "svc_vis", 0, vec![]);
1961 let s4 = make_summary([0xA3; 16], "svc_vis", 0, vec![]);
1962 service.write_summaries(vec![s3, s4]).await?;
1963
1964 let response = service.query_service.get_paginated_traces(&filters).await?;
1965 assert_eq!(
1966 response.items.len(),
1967 4,
1968 "After write #2: expected 4 items, got {} (stale snapshot?)",
1969 response.items.len()
1970 );
1971
1972 let s5 = make_summary([0xA4; 16], "svc_vis", 0, vec![]);
1974 let s6 = make_summary([0xA5; 16], "svc_vis", 0, vec![]);
1975 service.write_summaries(vec![s5, s6]).await?;
1976
1977 let response = service.query_service.get_paginated_traces(&filters).await?;
1978 assert_eq!(
1979 response.items.len(),
1980 6,
1981 "After write #3: expected 6 items, got {} (stale snapshot?)",
1982 response.items.len()
1983 );
1984
1985 service.shutdown().await?;
1986 cleanup();
1987 Ok(())
1988 }
1989}