1use crate::error::TraceEngineError;
2use crate::parquet::control::{get_pod_id, ControlTableEngine};
3use crate::parquet::tracing::traits::{arrow_schema_to_delta, resource_attribute_field};
4use crate::parquet::utils::match_attr_expr;
5use crate::parquet::utils::register_cloud_logstore_factories;
6use crate::storage::ObjectStore;
7use arrow::array::*;
8use arrow::compute;
9use arrow::datatypes::*;
10use arrow_array::Array;
11use arrow_array::RecordBatch;
12use chrono::{DateTime, Datelike, TimeZone, Utc};
13use datafusion::logical_expr::{cast as df_cast, col, lit, SortExpr};
14use datafusion::prelude::*;
15use datafusion::scalar::ScalarValue;
16use deltalake::operations::optimize::OptimizeType;
17use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty};
18use scouter_settings::ObjectStorageSettings;
19use scouter_types::sql::{TraceFilters, TraceListItem};
20use scouter_types::{Attribute, TraceCursor, TraceId, TracePaginationResponse, TraceSummaryRecord};
21use std::sync::Arc;
22use tokio::sync::oneshot;
23use tokio::sync::{mpsc, RwLock as AsyncRwLock};
24use tokio::time::{interval, Duration};
25use tracing::{error, info};
26use url::Url;
27
28const UNIX_EPOCH_DAYS: i32 = 719_163;
31
32const SUMMARY_TABLE_NAME: &str = "trace_summaries";
33
34const TASK_SUMMARY_OPTIMIZE: &str = "summary_optimize";
36
37const TRACE_ID_COL: &str = "trace_id";
39const SERVICE_NAME_COL: &str = "service_name";
40const SCOPE_NAME_COL: &str = "scope_name";
41const SCOPE_VERSION_COL: &str = "scope_version";
42const ROOT_OPERATION_COL: &str = "root_operation";
43const START_TIME_COL: &str = "start_time";
44const END_TIME_COL: &str = "end_time";
45const DURATION_MS_COL: &str = "duration_ms";
46const STATUS_CODE_COL: &str = "status_code";
47const STATUS_MESSAGE_COL: &str = "status_message";
48const SPAN_COUNT_COL: &str = "span_count";
49const ERROR_COUNT_COL: &str = "error_count";
50const SEARCH_BLOB_COL: &str = "search_blob";
51
52const RESOURCE_ATTRIBUTES_COL: &str = "resource_attributes";
53const ENTITY_IDS_COL: &str = "entity_ids";
54const QUEUE_IDS_COL: &str = "queue_ids";
55
56const PARTITION_DATE_COL: &str = "partition_date";
57
58fn create_summary_schema() -> Schema {
61 Schema::new(vec![
62 Field::new(TRACE_ID_COL, DataType::FixedSizeBinary(16), false),
63 Field::new(
64 SERVICE_NAME_COL,
65 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
66 false,
67 ),
68 Field::new(SCOPE_NAME_COL, DataType::Utf8, false),
69 Field::new(SCOPE_VERSION_COL, DataType::Utf8, true),
70 Field::new(ROOT_OPERATION_COL, DataType::Utf8, false),
71 Field::new(
72 START_TIME_COL,
73 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
74 false,
75 ),
76 Field::new(
77 END_TIME_COL,
78 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
79 true,
80 ),
81 Field::new(DURATION_MS_COL, DataType::Int64, true),
82 Field::new(STATUS_CODE_COL, DataType::Int32, false),
83 Field::new(STATUS_MESSAGE_COL, DataType::Utf8, true),
84 Field::new(SPAN_COUNT_COL, DataType::Int64, false),
85 Field::new(ERROR_COUNT_COL, DataType::Int64, false),
86 resource_attribute_field(),
87 Field::new(
88 ENTITY_IDS_COL,
89 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
90 true,
91 ),
92 Field::new(
93 QUEUE_IDS_COL,
94 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
95 true,
96 ),
97 Field::new(PARTITION_DATE_COL, DataType::Date32, false),
98 ])
99}
100
101struct TraceSummaryBatchBuilder {
104 schema: Arc<Schema>,
105 trace_id: FixedSizeBinaryBuilder,
106 service_name: StringDictionaryBuilder<Int32Type>,
107 scope_name: StringBuilder,
108 scope_version: StringBuilder,
109 root_operation: StringBuilder,
110 start_time: TimestampMicrosecondBuilder,
111 end_time: TimestampMicrosecondBuilder,
112 duration_ms: Int64Builder,
113 status_code: Int32Builder,
114 status_message: StringBuilder,
115 span_count: Int64Builder,
116 error_count: Int64Builder,
117 resource_attributes: MapBuilder<StringBuilder, StringViewBuilder>,
118 entity_ids: ListBuilder<StringBuilder>,
119 queue_ids: ListBuilder<StringBuilder>,
120 partition_date: Date32Builder,
121}
122
123impl TraceSummaryBatchBuilder {
124 fn new(schema: Arc<Schema>, capacity: usize) -> Self {
125 let map_field_names = MapFieldNames {
126 entry: "key_value".to_string(),
127 key: "key".to_string(),
128 value: "value".to_string(),
129 };
130 let resource_attributes = MapBuilder::new(
131 Some(map_field_names),
132 StringBuilder::new(),
133 StringViewBuilder::new(),
134 );
135 Self {
136 schema,
137 trace_id: FixedSizeBinaryBuilder::with_capacity(capacity, 16),
138 service_name: StringDictionaryBuilder::new(),
139 scope_name: StringBuilder::with_capacity(capacity, capacity * 16),
140 scope_version: StringBuilder::with_capacity(capacity, capacity * 8),
141 root_operation: StringBuilder::with_capacity(capacity, capacity * 32),
142 start_time: TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone("UTC"),
143 end_time: TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone("UTC"),
144 duration_ms: Int64Builder::with_capacity(capacity),
145 status_code: Int32Builder::with_capacity(capacity),
146 status_message: StringBuilder::with_capacity(capacity, capacity * 16),
147 span_count: Int64Builder::with_capacity(capacity),
148 error_count: Int64Builder::with_capacity(capacity),
149 resource_attributes,
150 entity_ids: ListBuilder::new(StringBuilder::new()),
151 queue_ids: ListBuilder::new(StringBuilder::new()),
152 partition_date: Date32Builder::with_capacity(capacity),
153 }
154 }
155
156 fn append(&mut self, rec: &TraceSummaryRecord) -> Result<(), TraceEngineError> {
157 self.trace_id.append_value(rec.trace_id.as_bytes())?;
158 self.service_name.append_value(&rec.service_name);
159 self.scope_name.append_value(&rec.scope_name);
160 if rec.scope_version.is_empty() {
161 self.scope_version.append_null();
162 } else {
163 self.scope_version.append_value(&rec.scope_version);
164 }
165 self.root_operation.append_value(&rec.root_operation);
166 self.start_time
167 .append_value(rec.start_time.timestamp_micros());
168 match rec.end_time {
169 Some(end) => self.end_time.append_value(end.timestamp_micros()),
170 None => self.end_time.append_null(),
171 }
172 let duration = rec
173 .end_time
174 .map(|end| (end - rec.start_time).num_milliseconds());
175 match duration {
176 Some(d) => self.duration_ms.append_value(d),
177 None => self.duration_ms.append_null(),
178 }
179 self.status_code.append_value(rec.status_code);
180 if rec.status_message.is_empty() {
181 self.status_message.append_null();
182 } else {
183 self.status_message.append_value(&rec.status_message);
184 }
185 self.span_count.append_value(rec.span_count);
186 self.error_count.append_value(rec.error_count);
187 if rec.resource_attributes.is_empty() {
188 self.resource_attributes.append(false)?; } else {
190 for attr in &rec.resource_attributes {
191 self.resource_attributes.keys().append_value(&attr.key);
192 let value_str =
193 serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
194 self.resource_attributes.values().append_value(value_str);
195 }
196 self.resource_attributes.append(true)?;
197 }
198 if rec.entity_ids.is_empty() {
199 self.entity_ids.append_null();
200 } else {
201 for id in &rec.entity_ids {
202 self.entity_ids.values().append_value(id);
203 }
204 self.entity_ids.append(true);
205 }
206 if rec.queue_ids.is_empty() {
207 self.queue_ids.append_null();
208 } else {
209 for id in &rec.queue_ids {
210 self.queue_ids.values().append_value(id);
211 }
212 self.queue_ids.append(true);
213 }
214 let days = rec.start_time.date_naive().num_days_from_ce() - UNIX_EPOCH_DAYS;
216 self.partition_date.append_value(days);
217 Ok(())
218 }
219
220 fn finish(mut self) -> Result<RecordBatch, TraceEngineError> {
221 let columns: Vec<Arc<dyn Array>> = vec![
222 Arc::new(self.trace_id.finish()),
223 Arc::new(self.service_name.finish()),
224 Arc::new(self.scope_name.finish()),
225 Arc::new(self.scope_version.finish()),
226 Arc::new(self.root_operation.finish()),
227 Arc::new(self.start_time.finish()),
228 Arc::new(self.end_time.finish()),
229 Arc::new(self.duration_ms.finish()),
230 Arc::new(self.status_code.finish()),
231 Arc::new(self.status_message.finish()),
232 Arc::new(self.span_count.finish()),
233 Arc::new(self.error_count.finish()),
234 Arc::new(self.resource_attributes.finish()),
235 Arc::new(self.entity_ids.finish()),
236 Arc::new(self.queue_ids.finish()),
237 Arc::new(self.partition_date.finish()),
238 ];
239 RecordBatch::try_new(self.schema, columns).map_err(Into::into)
240 }
241}
242
243pub enum SummaryTableCommand {
246 Write {
247 records: Vec<TraceSummaryRecord>,
248 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
249 },
250 Optimize {
251 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
252 },
253 Vacuum {
254 retention_hours: u64,
255 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
256 },
257 Shutdown,
258}
259
260async fn build_summary_url(object_store: &ObjectStore) -> Result<Url, TraceEngineError> {
261 let mut base = object_store.get_base_url()?;
262 let mut path = base.path().to_string();
263 if !path.ends_with('/') {
264 path.push('/');
265 }
266 path.push_str(SUMMARY_TABLE_NAME);
267 base.set_path(&path);
268 Ok(base)
269}
270
271async fn create_summary_table(
272 object_store: &ObjectStore,
273 table_url: Url,
274 schema: SchemaRef,
275) -> Result<DeltaTable, TraceEngineError> {
276 info!(
277 "Creating trace summary table [{}://.../{} ]",
278 table_url.scheme(),
279 table_url
280 .path_segments()
281 .and_then(|mut s| s.next_back())
282 .unwrap_or(SUMMARY_TABLE_NAME)
283 );
284 let store = object_store.as_dyn_object_store();
285 let table = DeltaTableBuilder::from_url(table_url.clone())?
286 .with_storage_backend(store, table_url)
287 .build()?;
288 let delta_fields = arrow_schema_to_delta(&schema);
289 table
290 .create()
291 .with_table_name(SUMMARY_TABLE_NAME)
292 .with_columns(delta_fields)
293 .with_partition_columns(vec![PARTITION_DATE_COL.to_string()])
294 .with_configuration_property(
297 TableProperty::DataSkippingStatsColumns,
298 Some("start_time,end_time,service_name,duration_ms,status_code,span_count,error_count,partition_date"),
299 )
300 .await
301 .map_err(Into::into)
302}
303
304async fn build_or_create_summary_table(
305 object_store: &ObjectStore,
306 schema: SchemaRef,
307) -> Result<DeltaTable, TraceEngineError> {
308 register_cloud_logstore_factories();
309 let table_url = build_summary_url(object_store).await?;
310 info!(
311 "Loading trace summary table [{}://.../{} ]",
312 table_url.scheme(),
313 table_url
314 .path_segments()
315 .and_then(|mut s| s.next_back())
316 .unwrap_or(SUMMARY_TABLE_NAME)
317 );
318
319 let is_delta_table = if table_url.scheme() == "file" {
324 if let Ok(path) = table_url.to_file_path() {
325 if !path.exists() {
326 info!("Creating directory for summary table: {:?}", path);
327 std::fs::create_dir_all(&path)?;
328 }
329 path.join("_delta_log").exists()
330 } else {
331 false
332 }
333 } else {
334 let store = object_store.as_dyn_object_store();
335 match DeltaTableBuilder::from_url(table_url.clone()) {
336 Ok(builder) => builder
337 .with_storage_backend(store, table_url.clone())
338 .load()
339 .await
340 .is_ok(),
341 Err(_) => false,
342 }
343 };
344
345 if is_delta_table {
346 info!(
347 "Loaded existing trace summary table [{}://.../{} ]",
348 table_url.scheme(),
349 table_url
350 .path_segments()
351 .and_then(|mut s| s.next_back())
352 .unwrap_or(SUMMARY_TABLE_NAME)
353 );
354 let store = object_store.as_dyn_object_store();
355 DeltaTableBuilder::from_url(table_url.clone())?
356 .with_storage_backend(store, table_url)
357 .load()
358 .await
359 .map_err(Into::into)
360 } else {
361 info!("Summary table does not exist, creating new table");
362 create_summary_table(object_store, table_url, schema).await
363 }
364}
365
366pub struct TraceSummaryDBEngine {
367 schema: Arc<Schema>,
368 table: Arc<AsyncRwLock<DeltaTable>>,
369 pub ctx: Arc<SessionContext>,
370 control: ControlTableEngine,
371}
372
373impl TraceSummaryDBEngine {
374 pub async fn new(
381 storage_settings: &ObjectStorageSettings,
382 ctx: Arc<SessionContext>,
383 ) -> Result<Self, TraceEngineError> {
384 let object_store = ObjectStore::new(storage_settings)?;
385 let schema = Arc::new(create_summary_schema());
386 let delta_table = build_or_create_summary_table(&object_store, schema.clone()).await?;
387 if let Ok(provider) = delta_table.table_provider().await {
390 ctx.register_table(SUMMARY_TABLE_NAME, provider)?;
391 } else {
392 info!("Empty summary table at init — deferring SessionContext registration until first write");
393 }
394
395 let control = ControlTableEngine::new(storage_settings, get_pod_id()).await?;
396
397 Ok(TraceSummaryDBEngine {
398 schema,
399 table: Arc::new(AsyncRwLock::new(delta_table)),
400 ctx,
401 control,
402 })
403 }
404
405 fn build_batch(
406 &self,
407 records: Vec<TraceSummaryRecord>,
408 ) -> Result<RecordBatch, TraceEngineError> {
409 let mut builder = TraceSummaryBatchBuilder::new(self.schema.clone(), records.len());
410 for rec in &records {
411 builder.append(rec)?;
412 }
413 builder.finish()
414 }
415
416 async fn write_records(
417 &self,
418 records: Vec<TraceSummaryRecord>,
419 ) -> Result<(), TraceEngineError> {
420 let count = records.len();
421 info!("Writing {} trace summaries", count);
422 let batch = self.build_batch(records)?;
423
424 let mut table_guard = self.table.write().await;
425
426 if let Err(e) = table_guard.update_incremental(None).await {
427 info!("Summary table update skipped (new table): {}", e);
428 }
429
430 let updated_table = table_guard
431 .clone()
432 .write(vec![batch])
433 .with_save_mode(deltalake::protocol::SaveMode::Append)
434 .with_partition_columns(vec![PARTITION_DATE_COL.to_string()])
435 .await?;
436
437 self.ctx.deregister_table(SUMMARY_TABLE_NAME)?;
438 self.ctx
439 .register_table(SUMMARY_TABLE_NAME, updated_table.table_provider().await?)?;
440
441 *table_guard = updated_table;
442 info!("Summary table updated with {} records", count);
443 Ok(())
444 }
445
446 async fn optimize_table(&self) -> Result<(), TraceEngineError> {
447 let mut table_guard = self.table.write().await;
448 let (updated_table, _metrics) = table_guard
449 .clone()
450 .optimize()
451 .with_target_size(128 * 1024 * 1024)
452 .with_type(OptimizeType::ZOrder(vec![
453 START_TIME_COL.to_string(),
454 SERVICE_NAME_COL.to_string(),
455 ]))
456 .await?;
457
458 self.ctx.deregister_table(SUMMARY_TABLE_NAME)?;
459 self.ctx
460 .register_table(SUMMARY_TABLE_NAME, updated_table.table_provider().await?)?;
461 *table_guard = updated_table;
462 Ok(())
463 }
464
465 async fn vacuum_table(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
466 let mut table_guard = self.table.write().await;
467 let (updated_table, _metrics) = table_guard
468 .clone()
469 .vacuum()
470 .with_retention_period(chrono::Duration::hours(retention_hours as i64))
471 .with_enforce_retention_duration(false)
472 .await?;
473
474 self.ctx.deregister_table(SUMMARY_TABLE_NAME)?;
475 self.ctx
476 .register_table(SUMMARY_TABLE_NAME, updated_table.table_provider().await?)?;
477 *table_guard = updated_table;
478 Ok(())
479 }
480
481 async fn try_run_optimize(&self, interval_hours: u64) {
483 match self.control.try_claim_task(TASK_SUMMARY_OPTIMIZE).await {
484 Ok(true) => match self.optimize_table().await {
485 Ok(()) => {
486 let _ = self
487 .control
488 .release_task(
489 TASK_SUMMARY_OPTIMIZE,
490 chrono::Duration::hours(interval_hours as i64),
491 )
492 .await;
493 }
494 Err(e) => {
495 error!("Summary optimize failed: {}", e);
496 let _ = self
497 .control
498 .release_task_on_failure(TASK_SUMMARY_OPTIMIZE)
499 .await;
500 }
501 },
502 Ok(false) => { }
503 Err(e) => error!("Summary optimize claim check failed: {}", e),
504 }
505 }
506
507 pub fn start_actor(
508 self,
509 compaction_interval_hours: u64,
510 ) -> (
511 mpsc::Sender<SummaryTableCommand>,
512 tokio::task::JoinHandle<()>,
513 ) {
514 let (tx, mut rx) = mpsc::channel::<SummaryTableCommand>(100);
515
516 let handle = tokio::spawn(async move {
517 let mut scheduler_ticker = interval(Duration::from_secs(5 * 60));
519 scheduler_ticker.tick().await; loop {
522 tokio::select! {
523 Some(cmd) = rx.recv() => {
524 match cmd {
525 SummaryTableCommand::Write { records, respond_to } => {
526 let result = self.write_records(records).await;
527 if let Err(ref e) = result {
528 error!("Summary write failed: {}", e);
529 }
530 let _ = respond_to.send(result);
531 }
532 SummaryTableCommand::Optimize { respond_to } => {
533 let _ = respond_to.send(self.optimize_table().await);
535 }
536 SummaryTableCommand::Vacuum { retention_hours, respond_to } => {
537 let _ = respond_to.send(self.vacuum_table(retention_hours).await);
538 }
539 SummaryTableCommand::Shutdown => {
540 info!("TraceSummaryDBEngine actor shutting down");
541 break;
542 }
543 }
544 }
545 _ = scheduler_ticker.tick() => {
546 self.try_run_optimize(compaction_interval_hours).await;
547 }
548 }
549 }
550 });
551
552 (tx, handle)
553 }
554}
555
556pub struct TraceSummaryService {
559 engine_tx: mpsc::Sender<SummaryTableCommand>,
560 engine_handle: tokio::task::JoinHandle<()>,
561 pub query_service: TraceSummaryQueries,
562}
563
564impl TraceSummaryService {
565 pub async fn new(
566 storage_settings: &ObjectStorageSettings,
567 compaction_interval_hours: u64,
568 ctx: Arc<SessionContext>,
569 ) -> Result<Self, TraceEngineError> {
570 let engine = TraceSummaryDBEngine::new(storage_settings, ctx).await?;
571 let engine_ctx = engine.ctx.clone();
572 let (engine_tx, engine_handle) = engine.start_actor(compaction_interval_hours);
573
574 Ok(TraceSummaryService {
575 engine_tx,
576 engine_handle,
577 query_service: TraceSummaryQueries::new(engine_ctx),
578 })
579 }
580
581 pub async fn write_summaries(
583 &self,
584 records: Vec<TraceSummaryRecord>,
585 ) -> Result<(), TraceEngineError> {
586 let (tx, rx) = oneshot::channel();
587 self.engine_tx
588 .send(SummaryTableCommand::Write {
589 records,
590 respond_to: tx,
591 })
592 .await
593 .map_err(|_| TraceEngineError::ChannelClosed)?;
594 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
595 }
596
597 pub async fn optimize(&self) -> Result<(), TraceEngineError> {
598 let (tx, rx) = oneshot::channel();
599 self.engine_tx
600 .send(SummaryTableCommand::Optimize { respond_to: tx })
601 .await
602 .map_err(|_| TraceEngineError::ChannelClosed)?;
603 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
604 }
605
606 pub async fn vacuum(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
607 let (tx, rx) = oneshot::channel();
608 self.engine_tx
609 .send(SummaryTableCommand::Vacuum {
610 retention_hours,
611 respond_to: tx,
612 })
613 .await
614 .map_err(|_| TraceEngineError::ChannelClosed)?;
615 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
616 }
617
618 pub async fn signal_shutdown(&self) {
620 info!("TraceSummaryService signaling shutdown");
621 let _ = self.engine_tx.send(SummaryTableCommand::Shutdown).await;
622 }
623
624 pub async fn shutdown(self) -> Result<(), TraceEngineError> {
625 info!("TraceSummaryService shutting down");
626 self.engine_tx
627 .send(SummaryTableCommand::Shutdown)
628 .await
629 .map_err(|_| TraceEngineError::ChannelClosed)?;
630 if let Err(e) = self.engine_handle.await {
631 error!("Summary engine handle error: {}", e);
632 }
633 info!("TraceSummaryService shutdown complete");
634 Ok(())
635 }
636}
637
638pub struct TraceSummaryQueries {
641 ctx: Arc<SessionContext>,
642}
643
644impl TraceSummaryQueries {
645 pub fn new(ctx: Arc<SessionContext>) -> Self {
646 Self { ctx }
647 }
648
649 pub async fn get_paginated_traces(
661 &self,
662 filters: &TraceFilters,
663 ) -> Result<TracePaginationResponse, TraceEngineError> {
664 let limit = filters.limit.unwrap_or(50) as usize;
665 let direction = filters.direction.as_deref().unwrap_or("next");
666
667 use crate::parquet::tracing::queries::{date_lit, ts_lit};
669 use datafusion::functions_aggregate::expr_fn::{array_agg, first_value, max, min, sum};
670 use datafusion::functions_nested::set_ops::array_distinct;
671
672 let mut df = self.ctx.table(SUMMARY_TABLE_NAME).await?;
673
674 if let Some(start) = filters.start_time {
677 df = df.filter(col(PARTITION_DATE_COL).gt_eq(date_lit(&start)))?;
678 df = df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start)))?;
679 }
680 if let Some(end) = filters.end_time {
681 df = df.filter(col(PARTITION_DATE_COL).lt_eq(date_lit(&end)))?;
682 df = df.filter(col(START_TIME_COL).lt(ts_lit(&end)))?;
683 }
684
685 let by_span_end: Vec<SortExpr> = vec![
688 col(SPAN_COUNT_COL).sort(false, false),
689 col(END_TIME_COL).sort(false, false),
690 ];
691 let by_status_span: Vec<SortExpr> = vec![
693 col(STATUS_CODE_COL).sort(false, false),
694 col(SPAN_COUNT_COL).sort(false, false),
695 ];
696
697 let mut df = df
708 .aggregate(
709 vec![col(TRACE_ID_COL)],
710 vec![
711 min(col(START_TIME_COL)).alias(START_TIME_COL),
712 max(col(END_TIME_COL)).alias(END_TIME_COL),
713 max(df_cast(col(END_TIME_COL), DataType::Int64)).alias("_max_end_us"),
714 min(df_cast(col(START_TIME_COL), DataType::Int64)).alias("_min_start_us"),
715 max(col(STATUS_CODE_COL)).alias(STATUS_CODE_COL),
716 sum(col(SPAN_COUNT_COL)).alias(SPAN_COUNT_COL),
717 sum(col(ERROR_COUNT_COL)).alias(ERROR_COUNT_COL),
718 first_value(col(SERVICE_NAME_COL), by_span_end.clone()).alias(SERVICE_NAME_COL),
719 first_value(col(SCOPE_NAME_COL), by_span_end.clone()).alias(SCOPE_NAME_COL),
720 first_value(col(SCOPE_VERSION_COL), by_span_end.clone())
721 .alias(SCOPE_VERSION_COL),
722 first_value(col(ROOT_OPERATION_COL), by_span_end.clone())
723 .alias(ROOT_OPERATION_COL),
724 first_value(col(STATUS_MESSAGE_COL), by_status_span).alias(STATUS_MESSAGE_COL),
725 first_value(col(RESOURCE_ATTRIBUTES_COL), by_span_end)
726 .alias(RESOURCE_ATTRIBUTES_COL),
727 array_agg(col(ENTITY_IDS_COL)).alias("_entity_ids_raw"),
728 array_agg(col(QUEUE_IDS_COL)).alias("_queue_ids_raw"),
729 ],
730 )?
731 .with_column(
733 DURATION_MS_COL,
734 (col("_max_end_us") - col("_min_start_us")) / lit(1000i64),
735 )?
736 .with_column(
737 ENTITY_IDS_COL,
738 array_distinct(flatten(col("_entity_ids_raw"))),
739 )?
740 .with_column(
741 QUEUE_IDS_COL,
742 array_distinct(flatten(col("_queue_ids_raw"))),
743 )?
744 .drop_columns(&[
745 "_max_end_us",
746 "_min_start_us",
747 "_entity_ids_raw",
748 "_queue_ids_raw",
749 ])?;
750
751 if let Some(ref svc) = filters.service_name {
753 df = df.filter(col(SERVICE_NAME_COL).eq(lit(svc.as_str())))?;
754 }
755 match filters.has_errors {
756 Some(true) => {
757 df = df.filter(col(ERROR_COUNT_COL).gt(lit(0i64)))?;
758 }
759 Some(false) => {
760 df = df.filter(col(ERROR_COUNT_COL).eq(lit(0i64)))?;
761 }
762 None => {}
763 }
764 if let Some(sc) = filters.status_code {
765 df = df.filter(col(STATUS_CODE_COL).eq(lit(sc)))?;
766 }
767
768 if let Some(ref uid) = filters.entity_uid {
770 df = df.filter(datafusion::functions_nested::expr_fn::array_has(
771 col(ENTITY_IDS_COL),
772 lit(uid.as_str()),
773 ))?;
774 }
775
776 if let Some(ref uid) = filters.queue_uid {
778 df = df.filter(datafusion::functions_nested::expr_fn::array_has(
779 col(QUEUE_IDS_COL),
780 lit(uid.as_str()),
781 ))?;
782 }
783
784 if let Some(ref ids) = filters.trace_ids {
786 if !ids.is_empty() {
787 let binary_ids: Vec<Expr> = ids
788 .iter()
789 .filter_map(|hex| TraceId::hex_to_bytes(hex).ok())
790 .map(|b| lit(ScalarValue::Binary(Some(b))))
791 .collect();
792 if !binary_ids.is_empty() {
793 df = df.filter(col(TRACE_ID_COL).in_list(binary_ids, false))?;
794 }
795 }
796 }
797
798 if let (Some(cursor_time), Some(ref cursor_id)) =
802 (filters.cursor_start_time, &filters.cursor_trace_id)
803 {
804 if let Ok(cursor_bytes) = TraceId::hex_to_bytes(cursor_id) {
805 let cursor_ts = lit(ScalarValue::TimestampMicrosecond(
806 Some(cursor_time.timestamp_micros()),
807 Some("UTC".into()),
808 ));
809 let cursor_tid = lit(ScalarValue::Binary(Some(cursor_bytes)));
810 let cursor_expr = if direction == "previous" {
811 col(START_TIME_COL)
812 .gt(cursor_ts.clone())
813 .or(col(START_TIME_COL)
814 .eq(cursor_ts)
815 .and(col(TRACE_ID_COL).gt(cursor_tid)))
816 } else {
817 col(START_TIME_COL)
818 .lt(cursor_ts.clone())
819 .or(col(START_TIME_COL)
820 .eq(cursor_ts)
821 .and(col(TRACE_ID_COL).lt(cursor_tid)))
822 };
823 df = df.filter(cursor_expr)?;
824 }
825 }
826
827 if let Some(ref attr_filters) = filters.attribute_filters {
833 if !attr_filters.is_empty() {
834 let mut spans_df = self.ctx.table("trace_spans").await?.select_columns(&[
835 TRACE_ID_COL,
836 START_TIME_COL,
837 SEARCH_BLOB_COL,
838 ])?;
839
840 if let Some(start) = filters.start_time {
842 spans_df = spans_df.filter(col(START_TIME_COL).gt_eq(lit(
843 ScalarValue::TimestampMicrosecond(
844 Some(start.timestamp_micros()),
845 Some("UTC".into()),
846 ),
847 )))?;
848 }
849 if let Some(end) = filters.end_time {
850 spans_df = spans_df.filter(col(START_TIME_COL).lt(lit(
851 ScalarValue::TimestampMicrosecond(
852 Some(end.timestamp_micros()),
853 Some("UTC".into()),
854 ),
855 )))?;
856 }
857
858 let mut attr_expr: Option<Expr> = None;
862 for f in attr_filters {
863 let pattern = crate::parquet::tracing::queries::normalize_attr_filter(f);
864 let cond = match_attr_expr(col(SEARCH_BLOB_COL), lit(pattern));
865 attr_expr = Some(match attr_expr {
866 None => cond,
867 Some(e) => e.or(cond),
868 });
869 }
870 if let Some(expr) = attr_expr {
871 spans_df = spans_df.filter(expr)?;
872 }
873
874 let span_batches = spans_df.select_columns(&[TRACE_ID_COL])?.collect().await?;
877 let mut seen_ids: std::collections::HashSet<Vec<u8>> =
878 std::collections::HashSet::new();
879 let mut binary_ids: Vec<Expr> = Vec::new();
880 for batch in &span_batches {
881 if let Some(col_ref) = batch.column_by_name(TRACE_ID_COL) {
884 let casted = compute::cast(col_ref, &DataType::Binary)?;
885 let col_arr =
886 casted
887 .as_any()
888 .downcast_ref::<BinaryArray>()
889 .ok_or_else(|| {
890 TraceEngineError::DowncastError("trace_id to BinaryArray")
891 })?;
892 for i in 0..batch.num_rows() {
893 let id_bytes = col_arr.value(i).to_vec();
894 if seen_ids.insert(id_bytes.clone()) {
895 binary_ids.push(lit(ScalarValue::Binary(Some(id_bytes))));
896 }
897 }
898 }
899 }
900
901 if !binary_ids.is_empty() {
902 df = df.filter(col(TRACE_ID_COL).in_list(binary_ids, false))?;
903 } else {
904 df = df.filter(lit(false))?;
906 }
907 }
908 }
909
910 df = if direction == "previous" {
914 df.sort(vec![
915 col(START_TIME_COL).sort(true, true),
916 col(TRACE_ID_COL).sort(true, true),
917 ])?
918 } else {
919 df.sort(vec![
920 col(START_TIME_COL).sort(false, false),
921 col(TRACE_ID_COL).sort(false, false),
922 ])?
923 };
924
925 df = df.limit(0, Some(limit + 1))?;
927
928 let batches = df.collect().await?;
929 let mut items = batches_to_trace_list_items(batches)?;
930
931 let has_more = items.len() > limit;
932 if has_more {
933 items.pop(); }
935
936 let (has_next, next_cursor, has_previous, previous_cursor) = match direction {
947 "next" => {
948 let next_cursor = if has_more {
949 items.last().map(|item| TraceCursor {
950 start_time: item.start_time,
951 trace_id: item.trace_id.clone(),
952 })
953 } else {
954 None
955 };
956
957 let previous_cursor = items.first().map(|item| TraceCursor {
958 start_time: item.start_time,
959 trace_id: item.trace_id.clone(),
960 });
961
962 (
963 has_more,
964 next_cursor,
965 filters.cursor_start_time.is_some(),
966 previous_cursor,
967 )
968 }
969 "previous" => {
970 let previous_cursor = if has_more {
975 items.last().map(|item| TraceCursor {
976 start_time: item.start_time,
977 trace_id: item.trace_id.clone(),
978 })
979 } else {
980 None
981 };
982
983 let next_cursor = items.first().map(|item| TraceCursor {
987 start_time: item.start_time,
988 trace_id: item.trace_id.clone(),
989 });
990
991 (
992 filters.cursor_start_time.is_some(),
993 next_cursor,
994 has_more,
995 previous_cursor,
996 )
997 }
998 _ => (false, None, false, None),
999 };
1000
1001 Ok(TracePaginationResponse {
1002 items,
1003 has_next,
1004 next_cursor,
1005 has_previous,
1006 previous_cursor,
1007 })
1008 }
1009}
1010
1011fn extract_map_attributes(map_array: &MapArray, row_idx: usize) -> Vec<Attribute> {
1015 if map_array.is_null(row_idx) {
1016 return Vec::new();
1017 }
1018 let entry = map_array.value(row_idx);
1019 let struct_array = entry.as_any().downcast_ref::<StructArray>().unwrap();
1020 let keys_arr = compute::cast(struct_array.column(0).as_ref(), &DataType::Utf8).unwrap();
1021 let keys = keys_arr.as_any().downcast_ref::<StringArray>().unwrap();
1022 let values_arr = compute::cast(struct_array.column(1).as_ref(), &DataType::Utf8).unwrap();
1023 let values = values_arr.as_any().downcast_ref::<StringArray>().unwrap();
1024
1025 (0..struct_array.len())
1026 .map(|i| Attribute {
1027 key: keys.value(i).to_string(),
1028 value: serde_json::from_str(values.value(i)).unwrap_or(serde_json::Value::Null),
1029 })
1030 .collect()
1031}
1032
1033fn extract_list_strings(list: Option<&ListArray>, row_idx: usize) -> Vec<String> {
1035 let Some(list) = list else {
1036 return Vec::new();
1037 };
1038 if list.is_null(row_idx) {
1039 return Vec::new();
1040 }
1041 let inner = list.value(row_idx);
1042 let str_arr = compute::cast(&inner, &DataType::Utf8)
1043 .ok()
1044 .and_then(|a| a.as_any().downcast_ref::<StringArray>().cloned());
1045 match str_arr {
1046 Some(arr) => (0..arr.len())
1047 .filter(|i| !arr.is_null(*i))
1048 .map(|i| arr.value(i).to_string())
1049 .collect(),
1050 None => Vec::new(),
1051 }
1052}
1053
1054fn batches_to_trace_list_items(
1055 batches: Vec<RecordBatch>,
1056) -> Result<Vec<TraceListItem>, TraceEngineError> {
1057 let mut items = Vec::new();
1058
1059 for batch in &batches {
1060 let trace_id_col = batch.column_by_name(TRACE_ID_COL).ok_or_else(|| {
1063 TraceEngineError::UnsupportedOperation("missing trace_id column".into())
1064 })?;
1065 let trace_id_binary = compute::cast(trace_id_col, &DataType::Binary)?;
1066 let trace_ids = trace_id_binary
1067 .as_any()
1068 .downcast_ref::<BinaryArray>()
1069 .ok_or_else(|| {
1070 TraceEngineError::UnsupportedOperation("trace_id cast to BinaryArray failed".into())
1071 })?;
1072
1073 let svc_arr = compute::cast(
1076 batch.column_by_name(SERVICE_NAME_COL).ok_or_else(|| {
1077 TraceEngineError::UnsupportedOperation("missing service_name column".into())
1078 })?,
1079 &DataType::Utf8,
1080 )?;
1081 let service_names = svc_arr
1082 .as_any()
1083 .downcast_ref::<StringArray>()
1084 .ok_or_else(|| {
1085 TraceEngineError::UnsupportedOperation(
1086 "service_name cast to StringArray failed".into(),
1087 )
1088 })?;
1089
1090 let scope_arr = compute::cast(
1091 batch.column_by_name(SCOPE_NAME_COL).ok_or_else(|| {
1092 TraceEngineError::UnsupportedOperation("missing scope_name column".into())
1093 })?,
1094 &DataType::Utf8,
1095 )?;
1096 let scope_names = scope_arr
1097 .as_any()
1098 .downcast_ref::<StringArray>()
1099 .ok_or_else(|| {
1100 TraceEngineError::UnsupportedOperation(
1101 "scope_name cast to StringArray failed".into(),
1102 )
1103 })?;
1104
1105 let scopev_arr = compute::cast(
1106 batch.column_by_name(SCOPE_VERSION_COL).ok_or_else(|| {
1107 TraceEngineError::UnsupportedOperation("missing scope_version column".into())
1108 })?,
1109 &DataType::Utf8,
1110 )?;
1111 let scope_versions = scopev_arr
1112 .as_any()
1113 .downcast_ref::<StringArray>()
1114 .ok_or_else(|| {
1115 TraceEngineError::UnsupportedOperation(
1116 "scope_version cast to StringArray failed".into(),
1117 )
1118 })?;
1119
1120 let root_arr = compute::cast(
1121 batch.column_by_name(ROOT_OPERATION_COL).ok_or_else(|| {
1122 TraceEngineError::UnsupportedOperation("missing root_operation column".into())
1123 })?,
1124 &DataType::Utf8,
1125 )?;
1126 let root_operations = root_arr
1127 .as_any()
1128 .downcast_ref::<StringArray>()
1129 .ok_or_else(|| {
1130 TraceEngineError::UnsupportedOperation(
1131 "root_operation cast to StringArray failed".into(),
1132 )
1133 })?;
1134
1135 let sm_arr = compute::cast(
1136 batch.column_by_name(STATUS_MESSAGE_COL).ok_or_else(|| {
1137 TraceEngineError::UnsupportedOperation("missing status_message column".into())
1138 })?,
1139 &DataType::Utf8,
1140 )?;
1141 let status_messages = sm_arr
1142 .as_any()
1143 .downcast_ref::<StringArray>()
1144 .ok_or_else(|| {
1145 TraceEngineError::UnsupportedOperation(
1146 "status_message cast to StringArray failed".into(),
1147 )
1148 })?;
1149
1150 let resource_attrs_map = batch
1151 .column_by_name(RESOURCE_ATTRIBUTES_COL)
1152 .and_then(|c| c.as_any().downcast_ref::<MapArray>())
1153 .ok_or_else(|| {
1154 TraceEngineError::UnsupportedOperation("missing resource_attributes column".into())
1155 })?;
1156
1157 let entity_ids_list = batch
1158 .column_by_name(ENTITY_IDS_COL)
1159 .and_then(|c| c.as_any().downcast_ref::<ListArray>());
1160
1161 let queue_ids_list = batch
1162 .column_by_name(QUEUE_IDS_COL)
1163 .and_then(|c| c.as_any().downcast_ref::<ListArray>());
1164
1165 let start_times = batch
1166 .column_by_name(START_TIME_COL)
1167 .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>())
1168 .ok_or_else(|| {
1169 TraceEngineError::UnsupportedOperation("missing start_time column".into())
1170 })?;
1171
1172 let end_times = batch
1173 .column_by_name(END_TIME_COL)
1174 .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>())
1175 .ok_or_else(|| {
1176 TraceEngineError::UnsupportedOperation("missing end_time column".into())
1177 })?;
1178
1179 let durations = batch
1180 .column_by_name(DURATION_MS_COL)
1181 .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1182 .ok_or_else(|| {
1183 TraceEngineError::UnsupportedOperation("missing duration_ms column".into())
1184 })?;
1185
1186 let status_codes = batch
1187 .column_by_name(STATUS_CODE_COL)
1188 .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
1189 .ok_or_else(|| {
1190 TraceEngineError::UnsupportedOperation("missing status_code column".into())
1191 })?;
1192
1193 let span_counts = batch
1194 .column_by_name(SPAN_COUNT_COL)
1195 .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1196 .ok_or_else(|| {
1197 TraceEngineError::UnsupportedOperation("missing span_count column".into())
1198 })?;
1199
1200 let error_counts = batch
1201 .column_by_name(ERROR_COUNT_COL)
1202 .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1203 .ok_or_else(|| {
1204 TraceEngineError::UnsupportedOperation("missing error_count column".into())
1205 })?;
1206
1207 for i in 0..batch.num_rows() {
1208 let trace_id_hex = hex::encode(trace_ids.value(i));
1209
1210 let start_time = micros_to_datetime(start_times.value(i));
1211 let end_time = if end_times.is_null(i) {
1212 None
1213 } else {
1214 Some(micros_to_datetime(end_times.value(i)))
1215 };
1216 let duration_ms = if durations.is_null(i) {
1217 None
1218 } else {
1219 Some(durations.value(i))
1220 };
1221 let error_count = error_counts.value(i);
1222
1223 let resource_attributes = extract_map_attributes(resource_attrs_map, i);
1224
1225 let entity_ids = extract_list_strings(entity_ids_list, i);
1226 let queue_ids = extract_list_strings(queue_ids_list, i);
1227
1228 items.push(TraceListItem {
1229 trace_id: trace_id_hex,
1230 service_name: service_names.value(i).to_string(),
1231 scope_name: scope_names.value(i).to_string(),
1232 scope_version: scope_versions.value(i).to_string(),
1233 root_operation: root_operations.value(i).to_string(),
1234 start_time,
1235 end_time,
1236 duration_ms,
1237 status_code: status_codes.value(i),
1238 status_message: if status_messages.is_null(i) {
1239 None
1240 } else {
1241 Some(status_messages.value(i).to_string())
1242 },
1243 span_count: span_counts.value(i),
1244 has_errors: error_count > 0,
1245 error_count,
1246 resource_attributes,
1247 entity_ids,
1248 queue_ids,
1249 });
1250 }
1251 }
1252
1253 Ok(items)
1254}
1255
1256fn micros_to_datetime(micros: i64) -> DateTime<Utc> {
1257 let secs = micros / 1_000_000;
1258 let nanos = ((micros % 1_000_000) * 1_000) as u32;
1259 Utc.timestamp_opt(secs, nanos).unwrap()
1260}
1261
1262#[cfg(test)]
1263mod tests {
1264 use super::*;
1265 use crate::storage::ObjectStore;
1266 use scouter_settings::ObjectStorageSettings;
1267 use scouter_types::sql::TraceFilters;
1268 use scouter_types::{Attribute, SpanId, TraceId, TraceSpanRecord};
1269 use tracing_subscriber;
1270
1271 fn cleanup() {
1272 let _ = tracing_subscriber::fmt()
1273 .with_max_level(tracing::Level::INFO)
1274 .try_init();
1275
1276 let storage_settings = ObjectStorageSettings::default();
1277 let current_dir = std::env::current_dir().unwrap();
1278 let storage_path = current_dir.join(storage_settings.storage_root());
1279 if storage_path.exists() {
1280 std::fs::remove_dir_all(storage_path).unwrap();
1281 }
1282 }
1283
1284 fn make_test_ctx(storage_settings: &ObjectStorageSettings) -> Arc<SessionContext> {
1287 Arc::new(
1288 ObjectStore::new(storage_settings)
1289 .unwrap()
1290 .get_session()
1291 .unwrap(),
1292 )
1293 }
1294
1295 fn make_summary(
1296 trace_id_bytes: [u8; 16],
1297 service_name: &str,
1298 error_count: i64,
1299 resource_attributes: Vec<Attribute>,
1300 ) -> TraceSummaryRecord {
1301 let now = Utc::now();
1302 TraceSummaryRecord {
1303 trace_id: TraceId::from_bytes(trace_id_bytes),
1304 service_name: service_name.to_string(),
1305 scope_name: "test.scope".to_string(),
1306 scope_version: String::new(),
1307 root_operation: "root_op".to_string(),
1308 start_time: now,
1309 end_time: Some(now + chrono::Duration::milliseconds(200)),
1310 status_code: if error_count > 0 { 2 } else { 0 },
1311 status_message: if error_count > 0 {
1312 "Internal Server Error".to_string()
1313 } else {
1314 "OK".to_string()
1315 },
1316 span_count: 3,
1317 error_count,
1318 resource_attributes,
1319 entity_ids: vec![],
1320 queue_ids: vec![],
1321 }
1322 }
1323
1324 #[tokio::test]
1326 async fn test_summary_write_and_paginate_basic() -> Result<(), TraceEngineError> {
1327 cleanup();
1328
1329 let storage_settings = ObjectStorageSettings::default();
1330 let ctx = make_test_ctx(&storage_settings);
1331 let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1332
1333 let s1 = make_summary([1u8; 16], "svc_a", 0, vec![]);
1334 let s2 = make_summary([2u8; 16], "svc_b", 0, vec![]);
1335 service.write_summaries(vec![s1, s2]).await?;
1336 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1337
1338 let start = Utc::now() - chrono::Duration::hours(1);
1339 let end = Utc::now() + chrono::Duration::hours(1);
1340 let filters = TraceFilters {
1341 service_name: None,
1342 has_errors: None,
1343 status_code: None,
1344 start_time: Some(start),
1345 end_time: Some(end),
1346 limit: Some(25),
1347 cursor_start_time: None,
1348 cursor_trace_id: None,
1349 direction: None,
1350 attribute_filters: None,
1351 trace_ids: None,
1352 entity_uid: None,
1353 queue_uid: None,
1354 };
1355
1356 let response = service.query_service.get_paginated_traces(&filters).await?;
1357 assert!(
1358 response.items.len() >= 2,
1359 "Expected at least 2 items, got {}",
1360 response.items.len()
1361 );
1362
1363 service.shutdown().await?;
1364 cleanup();
1365 Ok(())
1366 }
1367
1368 #[tokio::test]
1370 async fn test_summary_has_errors_filter() -> Result<(), TraceEngineError> {
1371 cleanup();
1372
1373 let storage_settings = ObjectStorageSettings::default();
1374 let ctx = make_test_ctx(&storage_settings);
1375 let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1376
1377 let ok_summary = make_summary([3u8; 16], "svc", 0, vec![]);
1378 let err_summary = make_summary([4u8; 16], "svc", 2, vec![]);
1379 service
1380 .write_summaries(vec![ok_summary, err_summary])
1381 .await?;
1382 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1383
1384 let start = Utc::now() - chrono::Duration::hours(1);
1385 let end = Utc::now() + chrono::Duration::hours(1);
1386
1387 let base_filters = TraceFilters {
1388 service_name: None,
1389 has_errors: None,
1390 status_code: None,
1391 start_time: Some(start),
1392 end_time: Some(end),
1393 limit: Some(25),
1394 cursor_start_time: None,
1395 cursor_trace_id: None,
1396 direction: None,
1397 attribute_filters: None,
1398 trace_ids: None,
1399 entity_uid: None,
1400 queue_uid: None,
1401 };
1402
1403 let mut filters_err = base_filters.clone();
1405 filters_err.has_errors = Some(true);
1406 let errors_only = service
1407 .query_service
1408 .get_paginated_traces(&filters_err)
1409 .await?;
1410 for item in &errors_only.items {
1411 assert!(
1412 item.error_count > 0,
1413 "Expected error trace, got: {:?}",
1414 item
1415 );
1416 }
1417 assert!(
1418 !errors_only.items.is_empty(),
1419 "Expected at least one error trace"
1420 );
1421
1422 let mut filters_ok = base_filters.clone();
1424 filters_ok.has_errors = Some(false);
1425 let no_errors = service
1426 .query_service
1427 .get_paginated_traces(&filters_ok)
1428 .await?;
1429 for item in &no_errors.items {
1430 assert_eq!(
1431 item.error_count, 0,
1432 "Expected non-error trace, got error_count={}",
1433 item.error_count
1434 );
1435 }
1436 assert!(
1437 !no_errors.items.is_empty(),
1438 "Expected at least one non-error trace"
1439 );
1440
1441 service.shutdown().await?;
1442 cleanup();
1443 Ok(())
1444 }
1445
1446 #[tokio::test]
1448 async fn test_summary_service_name_filter() -> Result<(), TraceEngineError> {
1449 cleanup();
1450
1451 let storage_settings = ObjectStorageSettings::default();
1452 let ctx = make_test_ctx(&storage_settings);
1453 let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1454
1455 let s_alpha = make_summary([5u8; 16], "alpha_service", 0, vec![]);
1456 let s_beta = make_summary([6u8; 16], "beta_service", 0, vec![]);
1457 service.write_summaries(vec![s_alpha, s_beta]).await?;
1458 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1459
1460 let start = Utc::now() - chrono::Duration::hours(1);
1461 let end = Utc::now() + chrono::Duration::hours(1);
1462 let filters = TraceFilters {
1463 service_name: Some("alpha_service".to_string()),
1464 has_errors: None,
1465 status_code: None,
1466 start_time: Some(start),
1467 end_time: Some(end),
1468 limit: Some(25),
1469 cursor_start_time: None,
1470 cursor_trace_id: None,
1471 direction: None,
1472 attribute_filters: None,
1473 trace_ids: None,
1474 entity_uid: None,
1475 queue_uid: None,
1476 };
1477
1478 let response = service.query_service.get_paginated_traces(&filters).await?;
1479 assert!(
1480 !response.items.is_empty(),
1481 "Expected results for alpha_service"
1482 );
1483 for item in &response.items {
1484 assert_eq!(
1485 item.service_name, "alpha_service",
1486 "Expected only alpha_service items, got: {}",
1487 item.service_name
1488 );
1489 }
1490
1491 service.shutdown().await?;
1492 cleanup();
1493 Ok(())
1494 }
1495
1496 #[tokio::test]
1498 async fn test_summary_trace_ids_filter() -> Result<(), TraceEngineError> {
1499 cleanup();
1500
1501 let storage_settings = ObjectStorageSettings::default();
1502 let ctx = make_test_ctx(&storage_settings);
1503 let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1504
1505 let wanted_id = TraceId::from_bytes([7u8; 16]);
1506 let unwanted_id = TraceId::from_bytes([8u8; 16]);
1507
1508 let s1 = make_summary([7u8; 16], "svc", 0, vec![]);
1509 let s2 = make_summary([8u8; 16], "svc", 0, vec![]);
1510 service.write_summaries(vec![s1, s2]).await?;
1511 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1512
1513 let start = Utc::now() - chrono::Duration::hours(1);
1514 let end = Utc::now() + chrono::Duration::hours(1);
1515 let filters = TraceFilters {
1516 service_name: None,
1517 has_errors: None,
1518 status_code: None,
1519 start_time: Some(start),
1520 end_time: Some(end),
1521 limit: Some(25),
1522 cursor_start_time: None,
1523 cursor_trace_id: None,
1524 direction: None,
1525 attribute_filters: None,
1526 trace_ids: Some(vec![wanted_id.to_hex()]),
1527 entity_uid: None,
1528 queue_uid: None,
1529 };
1530
1531 let response = service.query_service.get_paginated_traces(&filters).await?;
1532 assert_eq!(
1533 response.items.len(),
1534 1,
1535 "Expected exactly 1 item from trace_ids filter"
1536 );
1537 assert_eq!(
1538 response.items[0].trace_id,
1539 wanted_id.to_hex(),
1540 "Returned wrong trace_id"
1541 );
1542 assert_ne!(
1543 response.items[0].trace_id,
1544 unwanted_id.to_hex(),
1545 "Should not have returned unwanted trace_id"
1546 );
1547
1548 service.shutdown().await?;
1549 cleanup();
1550 Ok(())
1551 }
1552
1553 #[tokio::test]
1555 async fn test_summary_cursor_pagination() -> Result<(), TraceEngineError> {
1556 cleanup();
1557 let storage_settings = ObjectStorageSettings::default();
1558 let ctx = make_test_ctx(&storage_settings);
1559 let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1560
1561 let now = Utc::now();
1562 let summaries: Vec<TraceSummaryRecord> = (0u8..100)
1563 .map(|i| {
1564 let mut s = make_summary([i; 16], "svc", 0, vec![]);
1565 s.start_time = now - chrono::Duration::minutes(i as i64);
1566 s
1567 })
1568 .collect();
1569 service.write_summaries(summaries).await?;
1570 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1571
1572 let mut filters = TraceFilters {
1573 start_time: Some(now - chrono::Duration::hours(2)),
1574 end_time: Some(now + chrono::Duration::hours(1)),
1575 limit: Some(50),
1576 ..Default::default()
1577 };
1578
1579 let first = service.query_service.get_paginated_traces(&filters).await?;
1581 assert_eq!(first.items.len(), 50, "first page: 50 items");
1582 assert!(
1583 first.next_cursor.is_some(),
1584 "first page: should have next_cursor"
1585 );
1586
1587 let next_cur = first.next_cursor.clone().unwrap();
1589 filters.cursor_start_time = Some(next_cur.start_time);
1590 filters.cursor_trace_id = Some(next_cur.trace_id.clone());
1591 filters.direction = Some("next".to_string());
1592 let second = service.query_service.get_paginated_traces(&filters).await?;
1593 assert_eq!(second.items.len(), 50, "second page: 50 items");
1594 assert!(
1595 second.items[0].start_time <= next_cur.start_time,
1596 "second page first item must be <= cursor"
1597 );
1598 assert!(second.previous_cursor.is_some());
1599
1600 let prev_cur = second.previous_cursor.unwrap();
1602 filters.cursor_start_time = Some(prev_cur.start_time);
1603 filters.cursor_trace_id = Some(prev_cur.trace_id.clone());
1604 filters.direction = Some("previous".to_string());
1605 let prev = service.query_service.get_paginated_traces(&filters).await?;
1606 assert_eq!(prev.items.len(), 50, "previous page: 50 items");
1607
1608 service.shutdown().await?;
1609 cleanup();
1610 Ok(())
1611 }
1612
1613 #[tokio::test]
1615 async fn test_summary_attribute_filter_via_join() -> Result<(), TraceEngineError> {
1616 use crate::parquet::tracing::service::TraceSpanService;
1617
1618 cleanup();
1619 let storage_settings = ObjectStorageSettings::default();
1620
1621 let span_service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
1623 let shared_ctx = span_service.ctx.clone();
1624
1625 let summary_service = TraceSummaryService::new(&storage_settings, 24, shared_ctx).await?;
1627
1628 let now = Utc::now();
1629 let kafka_trace = TraceId::from_bytes([70u8; 16]);
1630 let plain_trace = TraceId::from_bytes([80u8; 16]);
1631
1632 let kafka_span = make_span_record(
1633 &kafka_trace,
1634 SpanId::from_bytes([70u8; 8]),
1635 "svc",
1636 vec![Attribute {
1637 key: "component".to_string(),
1638 value: serde_json::Value::String("kafka".to_string()),
1639 }],
1640 );
1641 let plain_span =
1642 make_span_record(&plain_trace, SpanId::from_bytes([80u8; 8]), "svc", vec![]);
1643 span_service
1644 .write_spans(vec![kafka_span, plain_span])
1645 .await?;
1646
1647 let mut kafka_summary = make_summary([70u8; 16], "svc", 0, vec![]);
1648 kafka_summary.start_time = now;
1649 let mut plain_summary = make_summary([80u8; 16], "svc", 0, vec![]);
1650 plain_summary.start_time = now;
1651 summary_service
1652 .write_summaries(vec![kafka_summary, plain_summary])
1653 .await?;
1654
1655 tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
1656
1657 let filters = TraceFilters {
1658 start_time: Some(now - chrono::Duration::hours(1)),
1659 end_time: Some(now + chrono::Duration::hours(1)),
1660 attribute_filters: Some(vec!["component:kafka".to_string()]),
1661 limit: Some(25),
1662 ..Default::default()
1663 };
1664
1665 let response = summary_service
1666 .query_service
1667 .get_paginated_traces(&filters)
1668 .await?;
1669
1670 assert!(
1671 !response.items.is_empty(),
1672 "attribute filter must return results"
1673 );
1674 assert!(
1675 response
1676 .items
1677 .iter()
1678 .all(|i| i.trace_id == kafka_trace.to_hex()),
1679 "only kafka trace should appear; got {:?}",
1680 response
1681 .items
1682 .iter()
1683 .map(|i| &i.trace_id)
1684 .collect::<Vec<_>>()
1685 );
1686
1687 span_service.shutdown().await?;
1688 summary_service.shutdown().await?;
1689 cleanup();
1690 Ok(())
1691 }
1692
1693 #[tokio::test]
1696 async fn test_summary_queue_id_filter_and_span_lookup() -> Result<(), TraceEngineError> {
1697 use crate::parquet::tracing::service::TraceSpanService;
1698
1699 cleanup();
1700 let storage_settings = ObjectStorageSettings::default();
1701
1702 let span_service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
1704 let shared_ctx = span_service.ctx.clone();
1705
1706 let summary_service = TraceSummaryService::new(&storage_settings, 24, shared_ctx).await?;
1708
1709 let now = Utc::now();
1710 let queue_trace = TraceId::from_bytes([90u8; 16]);
1711 let plain_trace = TraceId::from_bytes([91u8; 16]);
1712 let target_queue_uid = "queue-record-abc123";
1713
1714 let queue_span = make_span_record(
1716 &queue_trace,
1717 SpanId::from_bytes([90u8; 8]),
1718 "svc_queue",
1719 vec![],
1720 );
1721 let plain_span = make_span_record(
1722 &plain_trace,
1723 SpanId::from_bytes([91u8; 8]),
1724 "svc_queue",
1725 vec![],
1726 );
1727 span_service
1728 .write_spans_direct(vec![queue_span, plain_span])
1729 .await?;
1730
1731 let mut queue_summary = make_summary([90u8; 16], "svc_queue", 0, vec![]);
1733 queue_summary.start_time = now;
1734 queue_summary.queue_ids = vec![target_queue_uid.to_string()];
1735
1736 let mut plain_summary = make_summary([91u8; 16], "svc_queue", 0, vec![]);
1737 plain_summary.start_time = now;
1738 summary_service
1741 .write_summaries(vec![queue_summary, plain_summary])
1742 .await?;
1743
1744 tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
1745
1746 let filters = TraceFilters {
1748 start_time: Some(now - chrono::Duration::hours(1)),
1749 end_time: Some(now + chrono::Duration::hours(1)),
1750 queue_uid: Some(target_queue_uid.to_string()),
1751 limit: Some(25),
1752 ..Default::default()
1753 };
1754
1755 let response = summary_service
1756 .query_service
1757 .get_paginated_traces(&filters)
1758 .await?;
1759
1760 assert!(
1761 !response.items.is_empty(),
1762 "queue_uid filter must return at least one result"
1763 );
1764 assert!(
1765 response
1766 .items
1767 .iter()
1768 .all(|i| i.trace_id == queue_trace.to_hex()),
1769 "only the queue trace should appear; got {:?}",
1770 response
1771 .items
1772 .iter()
1773 .map(|i| &i.trace_id)
1774 .collect::<Vec<_>>()
1775 );
1776
1777 let returned_trace_id =
1779 TraceId::from_hex(&response.items[0].trace_id).expect("trace_id must be valid hex");
1780 let spans = span_service
1781 .query_service
1782 .get_trace_spans(
1783 Some(returned_trace_id.as_bytes()),
1784 None,
1785 Some(&(now - chrono::Duration::hours(1))),
1786 Some(&(now + chrono::Duration::hours(1))),
1787 None,
1788 )
1789 .await?;
1790
1791 assert!(
1792 !spans.is_empty(),
1793 "should find spans for the returned trace_id"
1794 );
1795
1796 span_service.shutdown().await?;
1797 summary_service.shutdown().await?;
1798 cleanup();
1799 Ok(())
1800 }
1801
1802 fn make_span_record(
1804 trace_id: &TraceId,
1805 span_id: SpanId,
1806 service_name: &str,
1807 attributes: Vec<Attribute>,
1808 ) -> TraceSpanRecord {
1809 let now = Utc::now();
1810 TraceSpanRecord {
1811 created_at: now,
1812 trace_id: trace_id.clone(),
1813 span_id,
1814 parent_span_id: None,
1815 flags: 1,
1816 trace_state: String::new(),
1817 scope_name: "test.scope".to_string(),
1818 scope_version: None,
1819 span_name: "op".to_string(),
1820 span_kind: "INTERNAL".to_string(),
1821 start_time: now,
1822 end_time: now + chrono::Duration::milliseconds(100),
1823 duration_ms: 100,
1824 status_code: 0,
1825 status_message: "OK".to_string(),
1826 attributes,
1827 events: vec![],
1828 links: vec![],
1829 label: None,
1830 input: serde_json::Value::Null,
1831 output: serde_json::Value::Null,
1832 service_name: service_name.to_string(),
1833 resource_attributes: vec![],
1834 }
1835 }
1836
1837 #[tokio::test]
1839 async fn test_summary_resource_attributes_roundtrip() -> Result<(), TraceEngineError> {
1840 cleanup();
1841
1842 let storage_settings = ObjectStorageSettings::default();
1843 let ctx = make_test_ctx(&storage_settings);
1844 let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1845
1846 let attrs = vec![Attribute {
1847 key: "cloud.region".to_string(),
1848 value: serde_json::Value::String("us-east-1".to_string()),
1849 }];
1850 let summary = make_summary([9u8; 16], "svc", 0, attrs.clone());
1851 service.write_summaries(vec![summary]).await?;
1852 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1853
1854 let start = Utc::now() - chrono::Duration::hours(1);
1855 let end = Utc::now() + chrono::Duration::hours(1);
1856 let filters = TraceFilters {
1857 service_name: None,
1858 has_errors: None,
1859 status_code: None,
1860 start_time: Some(start),
1861 end_time: Some(end),
1862 limit: Some(25),
1863 cursor_start_time: None,
1864 cursor_trace_id: None,
1865 direction: None,
1866 attribute_filters: None,
1867 trace_ids: Some(vec![TraceId::from_bytes([9u8; 16]).to_hex()]),
1868 entity_uid: None,
1869 queue_uid: None,
1870 };
1871
1872 let response = service.query_service.get_paginated_traces(&filters).await?;
1873 assert_eq!(response.items.len(), 1, "Expected exactly 1 item");
1874 assert_eq!(
1875 response.items[0].resource_attributes.len(),
1876 1,
1877 "Expected 1 resource attribute"
1878 );
1879 assert_eq!(response.items[0].resource_attributes[0].key, "cloud.region");
1880
1881 service.shutdown().await?;
1882 cleanup();
1883 Ok(())
1884 }
1885}