1use crate::error::TraceEngineError;
2use crate::parquet::control::{get_pod_id, ControlTableEngine};
3use crate::parquet::tracing::traits::{arrow_schema_to_delta, resource_attribute_field};
4use crate::parquet::utils::match_attr_expr;
5use crate::parquet::utils::register_cloud_logstore_factories;
6use crate::storage::ObjectStore;
7use arrow::array::*;
8use arrow::compute;
9use arrow::datatypes::*;
10use arrow_array::Array;
11use arrow_array::RecordBatch;
12use chrono::{DateTime, Datelike, TimeZone, Utc};
13use datafusion::logical_expr::{cast as df_cast, col, lit, SortExpr};
14use datafusion::prelude::*;
15use datafusion::scalar::ScalarValue;
16use deltalake::operations::optimize::OptimizeType;
17use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty};
18use scouter_settings::ObjectStorageSettings;
19use scouter_types::sql::{TraceFilters, TraceListItem};
20use scouter_types::{Attribute, TraceCursor, TraceId, TracePaginationResponse, TraceSummaryRecord};
21use std::sync::Arc;
22use tokio::sync::oneshot;
23use tokio::sync::{mpsc, RwLock as AsyncRwLock};
24use tokio::time::{interval, Duration};
25use tracing::{error, info};
26use url::Url;
27
28const UNIX_EPOCH_DAYS: i32 = 719_163;
31
32const SUMMARY_TABLE_NAME: &str = "trace_summaries";
33
34const TASK_SUMMARY_OPTIMIZE: &str = "summary_optimize";
36
37const TRACE_ID_COL: &str = "trace_id";
39const SERVICE_NAME_COL: &str = "service_name";
40const SCOPE_NAME_COL: &str = "scope_name";
41const SCOPE_VERSION_COL: &str = "scope_version";
42const ROOT_OPERATION_COL: &str = "root_operation";
43const START_TIME_COL: &str = "start_time";
44const END_TIME_COL: &str = "end_time";
45const DURATION_MS_COL: &str = "duration_ms";
46const STATUS_CODE_COL: &str = "status_code";
47const STATUS_MESSAGE_COL: &str = "status_message";
48const SPAN_COUNT_COL: &str = "span_count";
49const ERROR_COUNT_COL: &str = "error_count";
50const SEARCH_BLOB_COL: &str = "search_blob";
51
52const RESOURCE_ATTRIBUTES_COL: &str = "resource_attributes";
53const ENTITY_IDS_COL: &str = "entity_ids";
54const QUEUE_IDS_COL: &str = "queue_ids";
55
56const PARTITION_DATE_COL: &str = "partition_date";
57
58fn create_summary_schema() -> Schema {
61 Schema::new(vec![
62 Field::new(TRACE_ID_COL, DataType::FixedSizeBinary(16), false),
63 Field::new(
64 SERVICE_NAME_COL,
65 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
66 false,
67 ),
68 Field::new(SCOPE_NAME_COL, DataType::Utf8, false),
69 Field::new(SCOPE_VERSION_COL, DataType::Utf8, true),
70 Field::new(ROOT_OPERATION_COL, DataType::Utf8, false),
71 Field::new(
72 START_TIME_COL,
73 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
74 false,
75 ),
76 Field::new(
77 END_TIME_COL,
78 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
79 true,
80 ),
81 Field::new(DURATION_MS_COL, DataType::Int64, true),
82 Field::new(STATUS_CODE_COL, DataType::Int32, false),
83 Field::new(STATUS_MESSAGE_COL, DataType::Utf8, true),
84 Field::new(SPAN_COUNT_COL, DataType::Int64, false),
85 Field::new(ERROR_COUNT_COL, DataType::Int64, false),
86 resource_attribute_field(),
87 Field::new(
88 ENTITY_IDS_COL,
89 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
90 true,
91 ),
92 Field::new(
93 QUEUE_IDS_COL,
94 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
95 true,
96 ),
97 Field::new(PARTITION_DATE_COL, DataType::Date32, false),
98 ])
99}
100
101struct TraceSummaryBatchBuilder {
104 schema: Arc<Schema>,
105 trace_id: FixedSizeBinaryBuilder,
106 service_name: StringDictionaryBuilder<Int32Type>,
107 scope_name: StringBuilder,
108 scope_version: StringBuilder,
109 root_operation: StringBuilder,
110 start_time: TimestampMicrosecondBuilder,
111 end_time: TimestampMicrosecondBuilder,
112 duration_ms: Int64Builder,
113 status_code: Int32Builder,
114 status_message: StringBuilder,
115 span_count: Int64Builder,
116 error_count: Int64Builder,
117 resource_attributes: MapBuilder<StringBuilder, StringViewBuilder>,
118 entity_ids: ListBuilder<StringBuilder>,
119 queue_ids: ListBuilder<StringBuilder>,
120 partition_date: Date32Builder,
121}
122
123impl TraceSummaryBatchBuilder {
124 fn new(schema: Arc<Schema>, capacity: usize) -> Self {
125 let map_field_names = MapFieldNames {
126 entry: "key_value".to_string(),
127 key: "key".to_string(),
128 value: "value".to_string(),
129 };
130 let resource_attributes = MapBuilder::new(
131 Some(map_field_names),
132 StringBuilder::new(),
133 StringViewBuilder::new(),
134 );
135 Self {
136 schema,
137 trace_id: FixedSizeBinaryBuilder::with_capacity(capacity, 16),
138 service_name: StringDictionaryBuilder::new(),
139 scope_name: StringBuilder::with_capacity(capacity, capacity * 16),
140 scope_version: StringBuilder::with_capacity(capacity, capacity * 8),
141 root_operation: StringBuilder::with_capacity(capacity, capacity * 32),
142 start_time: TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone("UTC"),
143 end_time: TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone("UTC"),
144 duration_ms: Int64Builder::with_capacity(capacity),
145 status_code: Int32Builder::with_capacity(capacity),
146 status_message: StringBuilder::with_capacity(capacity, capacity * 16),
147 span_count: Int64Builder::with_capacity(capacity),
148 error_count: Int64Builder::with_capacity(capacity),
149 resource_attributes,
150 entity_ids: ListBuilder::new(StringBuilder::new()),
151 queue_ids: ListBuilder::new(StringBuilder::new()),
152 partition_date: Date32Builder::with_capacity(capacity),
153 }
154 }
155
156 fn append(&mut self, rec: &TraceSummaryRecord) -> Result<(), TraceEngineError> {
157 self.trace_id.append_value(rec.trace_id.as_bytes())?;
158 self.service_name.append_value(&rec.service_name);
159 self.scope_name.append_value(&rec.scope_name);
160 if rec.scope_version.is_empty() {
161 self.scope_version.append_null();
162 } else {
163 self.scope_version.append_value(&rec.scope_version);
164 }
165 self.root_operation.append_value(&rec.root_operation);
166 self.start_time
167 .append_value(rec.start_time.timestamp_micros());
168 match rec.end_time {
169 Some(end) => self.end_time.append_value(end.timestamp_micros()),
170 None => self.end_time.append_null(),
171 }
172 let duration = rec
173 .end_time
174 .map(|end| (end - rec.start_time).num_milliseconds());
175 match duration {
176 Some(d) => self.duration_ms.append_value(d),
177 None => self.duration_ms.append_null(),
178 }
179 self.status_code.append_value(rec.status_code);
180 if rec.status_message.is_empty() {
181 self.status_message.append_null();
182 } else {
183 self.status_message.append_value(&rec.status_message);
184 }
185 self.span_count.append_value(rec.span_count);
186 self.error_count.append_value(rec.error_count);
187 if rec.resource_attributes.is_empty() {
188 self.resource_attributes.append(false)?; } else {
190 for attr in &rec.resource_attributes {
191 self.resource_attributes.keys().append_value(&attr.key);
192 let value_str =
193 serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
194 self.resource_attributes.values().append_value(value_str);
195 }
196 self.resource_attributes.append(true)?;
197 }
198 if rec.entity_ids.is_empty() {
199 self.entity_ids.append_null();
200 } else {
201 for id in &rec.entity_ids {
202 self.entity_ids.values().append_value(id);
203 }
204 self.entity_ids.append(true);
205 }
206 if rec.queue_ids.is_empty() {
207 self.queue_ids.append_null();
208 } else {
209 for id in &rec.queue_ids {
210 self.queue_ids.values().append_value(id);
211 }
212 self.queue_ids.append(true);
213 }
214 let days = rec.start_time.date_naive().num_days_from_ce() - UNIX_EPOCH_DAYS;
216 self.partition_date.append_value(days);
217 Ok(())
218 }
219
220 fn finish(mut self) -> Result<RecordBatch, TraceEngineError> {
221 let columns: Vec<Arc<dyn Array>> = vec![
222 Arc::new(self.trace_id.finish()),
223 Arc::new(self.service_name.finish()),
224 Arc::new(self.scope_name.finish()),
225 Arc::new(self.scope_version.finish()),
226 Arc::new(self.root_operation.finish()),
227 Arc::new(self.start_time.finish()),
228 Arc::new(self.end_time.finish()),
229 Arc::new(self.duration_ms.finish()),
230 Arc::new(self.status_code.finish()),
231 Arc::new(self.status_message.finish()),
232 Arc::new(self.span_count.finish()),
233 Arc::new(self.error_count.finish()),
234 Arc::new(self.resource_attributes.finish()),
235 Arc::new(self.entity_ids.finish()),
236 Arc::new(self.queue_ids.finish()),
237 Arc::new(self.partition_date.finish()),
238 ];
239 RecordBatch::try_new(self.schema, columns).map_err(Into::into)
240 }
241}
242
243pub enum SummaryTableCommand {
246 Write {
247 records: Vec<TraceSummaryRecord>,
248 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
249 },
250 Optimize {
251 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
252 },
253 Vacuum {
254 retention_hours: u64,
255 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
256 },
257 Shutdown,
258}
259
260async fn build_summary_url(object_store: &ObjectStore) -> Result<Url, TraceEngineError> {
261 let mut base = object_store.get_base_url()?;
262 let mut path = base.path().to_string();
263 if !path.ends_with('/') {
264 path.push('/');
265 }
266 path.push_str(SUMMARY_TABLE_NAME);
267 base.set_path(&path);
268 Ok(base)
269}
270
271async fn create_summary_table(
272 object_store: &ObjectStore,
273 table_url: Url,
274 schema: SchemaRef,
275) -> Result<DeltaTable, TraceEngineError> {
276 info!(
277 "Creating trace summary table [{}://.../{} ]",
278 table_url.scheme(),
279 table_url
280 .path_segments()
281 .and_then(|mut s| s.next_back())
282 .unwrap_or(SUMMARY_TABLE_NAME)
283 );
284 let store = object_store.as_dyn_object_store();
285 let table = DeltaTableBuilder::from_url(table_url.clone())?
286 .with_storage_backend(store, table_url)
287 .build()?;
288 let delta_fields = arrow_schema_to_delta(&schema);
289 table
290 .create()
291 .with_table_name(SUMMARY_TABLE_NAME)
292 .with_columns(delta_fields)
293 .with_partition_columns(vec![PARTITION_DATE_COL.to_string()])
294 .with_configuration_property(
297 TableProperty::DataSkippingStatsColumns,
298 Some("start_time,end_time,service_name,duration_ms,status_code,span_count,error_count,partition_date"),
299 )
300 .await
301 .map_err(Into::into)
302}
303
304async fn build_or_create_summary_table(
305 object_store: &ObjectStore,
306 schema: SchemaRef,
307) -> Result<DeltaTable, TraceEngineError> {
308 register_cloud_logstore_factories();
309 let table_url = build_summary_url(object_store).await?;
310 info!(
311 "Loading trace summary table [{}://.../{} ]",
312 table_url.scheme(),
313 table_url
314 .path_segments()
315 .and_then(|mut s| s.next_back())
316 .unwrap_or(SUMMARY_TABLE_NAME)
317 );
318
319 let is_delta_table = if table_url.scheme() == "file" {
324 if let Ok(path) = table_url.to_file_path() {
325 if !path.exists() {
326 info!("Creating directory for summary table: {:?}", path);
327 std::fs::create_dir_all(&path)?;
328 }
329 path.join("_delta_log").exists()
330 } else {
331 false
332 }
333 } else {
334 let store = object_store.as_dyn_object_store();
335 match DeltaTableBuilder::from_url(table_url.clone()) {
336 Ok(builder) => builder
337 .with_storage_backend(store, table_url.clone())
338 .load()
339 .await
340 .is_ok(),
341 Err(_) => false,
342 }
343 };
344
345 if is_delta_table {
346 info!(
347 "Loaded existing trace summary table [{}://.../{} ]",
348 table_url.scheme(),
349 table_url
350 .path_segments()
351 .and_then(|mut s| s.next_back())
352 .unwrap_or(SUMMARY_TABLE_NAME)
353 );
354 let store = object_store.as_dyn_object_store();
355 DeltaTableBuilder::from_url(table_url.clone())?
356 .with_storage_backend(store, table_url)
357 .load()
358 .await
359 .map_err(Into::into)
360 } else {
361 info!("Summary table does not exist, creating new table");
362 create_summary_table(object_store, table_url, schema).await
363 }
364}
365
366pub struct TraceSummaryDBEngine {
367 schema: Arc<Schema>,
368 table: Arc<AsyncRwLock<DeltaTable>>,
369 pub ctx: Arc<SessionContext>,
370 control: ControlTableEngine,
371}
372
373impl TraceSummaryDBEngine {
374 pub async fn new(
381 storage_settings: &ObjectStorageSettings,
382 ctx: Arc<SessionContext>,
383 ) -> Result<Self, TraceEngineError> {
384 let object_store = ObjectStore::new(storage_settings)?;
385 let schema = Arc::new(create_summary_schema());
386 let delta_table = build_or_create_summary_table(&object_store, schema.clone()).await?;
387 if let Ok(provider) = delta_table.table_provider().await {
390 ctx.register_table(SUMMARY_TABLE_NAME, provider)?;
391 } else {
392 info!("Empty summary table at init — deferring SessionContext registration until first write");
393 }
394
395 let control = ControlTableEngine::new(storage_settings, get_pod_id()).await?;
396
397 Ok(TraceSummaryDBEngine {
398 schema,
399 table: Arc::new(AsyncRwLock::new(delta_table)),
400 ctx,
401 control,
402 })
403 }
404
405 fn build_batch(
406 &self,
407 records: Vec<TraceSummaryRecord>,
408 ) -> Result<RecordBatch, TraceEngineError> {
409 let mut builder = TraceSummaryBatchBuilder::new(self.schema.clone(), records.len());
410 for rec in &records {
411 builder.append(rec)?;
412 }
413 builder.finish()
414 }
415
416 async fn write_records(
417 &self,
418 records: Vec<TraceSummaryRecord>,
419 ) -> Result<(), TraceEngineError> {
420 let count = records.len();
421 info!("Writing {} trace summaries", count);
422 let batch = self.build_batch(records)?;
423
424 let mut table_guard = self.table.write().await;
425
426 if let Err(e) = table_guard.update_incremental(None).await {
427 info!("Summary table update skipped (new table): {}", e);
428 }
429
430 let updated_table = table_guard
431 .clone()
432 .write(vec![batch])
433 .with_save_mode(deltalake::protocol::SaveMode::Append)
434 .with_partition_columns(vec![PARTITION_DATE_COL.to_string()])
435 .await?;
436
437 self.ctx.deregister_table(SUMMARY_TABLE_NAME)?;
438 self.ctx
439 .register_table(SUMMARY_TABLE_NAME, updated_table.table_provider().await?)?;
440
441 *table_guard = updated_table;
442 info!("Summary table updated with {} records", count);
443 Ok(())
444 }
445
446 async fn optimize_table(&self) -> Result<(), TraceEngineError> {
447 let mut table_guard = self.table.write().await;
448 let (updated_table, _metrics) = table_guard
449 .clone()
450 .optimize()
451 .with_target_size(128 * 1024 * 1024)
452 .with_type(OptimizeType::ZOrder(vec![
453 START_TIME_COL.to_string(),
454 SERVICE_NAME_COL.to_string(),
455 ]))
456 .await?;
457
458 self.ctx.deregister_table(SUMMARY_TABLE_NAME)?;
459 self.ctx
460 .register_table(SUMMARY_TABLE_NAME, updated_table.table_provider().await?)?;
461 *table_guard = updated_table;
462 Ok(())
463 }
464
465 async fn vacuum_table(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
466 let mut table_guard = self.table.write().await;
467 let (updated_table, _metrics) = table_guard
468 .clone()
469 .vacuum()
470 .with_retention_period(chrono::Duration::hours(retention_hours as i64))
471 .with_enforce_retention_duration(false)
472 .await?;
473
474 self.ctx.deregister_table(SUMMARY_TABLE_NAME)?;
475 self.ctx
476 .register_table(SUMMARY_TABLE_NAME, updated_table.table_provider().await?)?;
477 *table_guard = updated_table;
478 Ok(())
479 }
480
481 async fn try_run_optimize(&self, interval_hours: u64) {
483 match self.control.try_claim_task(TASK_SUMMARY_OPTIMIZE).await {
484 Ok(true) => match self.optimize_table().await {
485 Ok(()) => {
486 if let Err(e) = self.vacuum_table(0).await {
487 error!("Post-optimize vacuum failed: {}", e);
488 }
489
490 let _ = self
491 .control
492 .release_task(
493 TASK_SUMMARY_OPTIMIZE,
494 chrono::Duration::hours(interval_hours as i64),
495 )
496 .await;
497 }
498 Err(e) => {
499 error!("Summary optimize failed: {}", e);
500 let _ = self
501 .control
502 .release_task_on_failure(TASK_SUMMARY_OPTIMIZE)
503 .await;
504 }
505 },
506 Ok(false) => { }
507 Err(e) => error!("Summary optimize claim check failed: {}", e),
508 }
509 }
510
511 pub fn start_actor(
512 self,
513 compaction_interval_hours: u64,
514 ) -> (
515 mpsc::Sender<SummaryTableCommand>,
516 tokio::task::JoinHandle<()>,
517 ) {
518 let (tx, mut rx) = mpsc::channel::<SummaryTableCommand>(100);
519
520 let handle = tokio::spawn(async move {
521 let mut scheduler_ticker = interval(Duration::from_secs(5 * 60));
523 scheduler_ticker.tick().await; loop {
526 tokio::select! {
527 Some(cmd) = rx.recv() => {
528 match cmd {
529 SummaryTableCommand::Write { records, respond_to } => {
530 let result = self.write_records(records).await;
531 if let Err(ref e) = result {
532 error!("Summary write failed: {}", e);
533 }
534 let _ = respond_to.send(result);
535 }
536 SummaryTableCommand::Optimize { respond_to } => {
537 let _ = respond_to.send(self.optimize_table().await);
539 if let Err(e) = self.vacuum_table(0).await {
541 error!("Post-optimize vacuum failed: {}", e);
542 }
543 }
544 SummaryTableCommand::Vacuum { retention_hours, respond_to } => {
545 let _ = respond_to.send(self.vacuum_table(retention_hours).await);
546 }
547 SummaryTableCommand::Shutdown => {
548 info!("TraceSummaryDBEngine actor shutting down");
549 break;
550 }
551 }
552 }
553 _ = scheduler_ticker.tick() => {
554 self.try_run_optimize(compaction_interval_hours).await;
555 }
556 }
557 }
558 });
559
560 (tx, handle)
561 }
562}
563
564pub struct TraceSummaryService {
567 engine_tx: mpsc::Sender<SummaryTableCommand>,
568 engine_handle: tokio::task::JoinHandle<()>,
569 pub query_service: TraceSummaryQueries,
570}
571
572impl TraceSummaryService {
573 pub async fn new(
574 storage_settings: &ObjectStorageSettings,
575 compaction_interval_hours: u64,
576 ctx: Arc<SessionContext>,
577 ) -> Result<Self, TraceEngineError> {
578 let engine = TraceSummaryDBEngine::new(storage_settings, ctx).await?;
579 let engine_ctx = engine.ctx.clone();
580 let (engine_tx, engine_handle) = engine.start_actor(compaction_interval_hours);
581
582 Ok(TraceSummaryService {
583 engine_tx,
584 engine_handle,
585 query_service: TraceSummaryQueries::new(engine_ctx),
586 })
587 }
588
589 pub async fn write_summaries(
591 &self,
592 records: Vec<TraceSummaryRecord>,
593 ) -> Result<(), TraceEngineError> {
594 let (tx, rx) = oneshot::channel();
595 self.engine_tx
596 .send(SummaryTableCommand::Write {
597 records,
598 respond_to: tx,
599 })
600 .await
601 .map_err(|_| TraceEngineError::ChannelClosed)?;
602 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
603 }
604
605 pub async fn optimize(&self) -> Result<(), TraceEngineError> {
606 let (tx, rx) = oneshot::channel();
607 self.engine_tx
608 .send(SummaryTableCommand::Optimize { respond_to: tx })
609 .await
610 .map_err(|_| TraceEngineError::ChannelClosed)?;
611 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
612 }
613
614 pub async fn vacuum(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
615 let (tx, rx) = oneshot::channel();
616 self.engine_tx
617 .send(SummaryTableCommand::Vacuum {
618 retention_hours,
619 respond_to: tx,
620 })
621 .await
622 .map_err(|_| TraceEngineError::ChannelClosed)?;
623 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
624 }
625
626 pub async fn signal_shutdown(&self) {
628 info!("TraceSummaryService signaling shutdown");
629 let _ = self.engine_tx.send(SummaryTableCommand::Shutdown).await;
630 }
631
632 pub async fn shutdown(self) -> Result<(), TraceEngineError> {
633 info!("TraceSummaryService shutting down");
634 self.engine_tx
635 .send(SummaryTableCommand::Shutdown)
636 .await
637 .map_err(|_| TraceEngineError::ChannelClosed)?;
638 if let Err(e) = self.engine_handle.await {
639 error!("Summary engine handle error: {}", e);
640 }
641 info!("TraceSummaryService shutdown complete");
642 Ok(())
643 }
644}
645
646pub struct TraceSummaryQueries {
649 ctx: Arc<SessionContext>,
650}
651
652impl TraceSummaryQueries {
653 pub fn new(ctx: Arc<SessionContext>) -> Self {
654 Self { ctx }
655 }
656
657 pub async fn get_paginated_traces(
669 &self,
670 filters: &TraceFilters,
671 ) -> Result<TracePaginationResponse, TraceEngineError> {
672 let limit = filters.limit.unwrap_or(50) as usize;
673 let direction = filters.direction.as_deref().unwrap_or("next");
674
675 use crate::parquet::tracing::queries::{date_lit, ts_lit};
677 use datafusion::functions_aggregate::expr_fn::{array_agg, first_value, max, min, sum};
678 use datafusion::functions_nested::set_ops::array_distinct;
679
680 let mut df = self.ctx.table(SUMMARY_TABLE_NAME).await?;
681
682 if let Some(start) = filters.start_time {
685 df = df.filter(col(PARTITION_DATE_COL).gt_eq(date_lit(&start)))?;
686 df = df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start)))?;
687 }
688 if let Some(end) = filters.end_time {
689 df = df.filter(col(PARTITION_DATE_COL).lt_eq(date_lit(&end)))?;
690 df = df.filter(col(START_TIME_COL).lt(ts_lit(&end)))?;
691 }
692
693 let by_span_end: Vec<SortExpr> = vec![
696 col(SPAN_COUNT_COL).sort(false, false),
697 col(END_TIME_COL).sort(false, false),
698 ];
699 let by_status_span: Vec<SortExpr> = vec![
701 col(STATUS_CODE_COL).sort(false, false),
702 col(SPAN_COUNT_COL).sort(false, false),
703 ];
704
705 let mut df = df
716 .aggregate(
717 vec![col(TRACE_ID_COL)],
718 vec![
719 min(col(START_TIME_COL)).alias(START_TIME_COL),
720 max(col(END_TIME_COL)).alias(END_TIME_COL),
721 max(df_cast(col(END_TIME_COL), DataType::Int64)).alias("_max_end_us"),
722 min(df_cast(col(START_TIME_COL), DataType::Int64)).alias("_min_start_us"),
723 max(col(STATUS_CODE_COL)).alias(STATUS_CODE_COL),
724 sum(col(SPAN_COUNT_COL)).alias(SPAN_COUNT_COL),
725 sum(col(ERROR_COUNT_COL)).alias(ERROR_COUNT_COL),
726 first_value(col(SERVICE_NAME_COL), by_span_end.clone()).alias(SERVICE_NAME_COL),
727 first_value(col(SCOPE_NAME_COL), by_span_end.clone()).alias(SCOPE_NAME_COL),
728 first_value(col(SCOPE_VERSION_COL), by_span_end.clone())
729 .alias(SCOPE_VERSION_COL),
730 first_value(col(ROOT_OPERATION_COL), by_span_end.clone())
731 .alias(ROOT_OPERATION_COL),
732 first_value(col(STATUS_MESSAGE_COL), by_status_span).alias(STATUS_MESSAGE_COL),
733 first_value(col(RESOURCE_ATTRIBUTES_COL), by_span_end)
734 .alias(RESOURCE_ATTRIBUTES_COL),
735 array_agg(col(ENTITY_IDS_COL)).alias("_entity_ids_raw"),
736 array_agg(col(QUEUE_IDS_COL)).alias("_queue_ids_raw"),
737 ],
738 )?
739 .with_column(
741 DURATION_MS_COL,
742 (col("_max_end_us") - col("_min_start_us")) / lit(1000i64),
743 )?
744 .with_column(
745 ENTITY_IDS_COL,
746 array_distinct(flatten(col("_entity_ids_raw"))),
747 )?
748 .with_column(
749 QUEUE_IDS_COL,
750 array_distinct(flatten(col("_queue_ids_raw"))),
751 )?
752 .drop_columns(&[
753 "_max_end_us",
754 "_min_start_us",
755 "_entity_ids_raw",
756 "_queue_ids_raw",
757 ])?;
758
759 if let Some(ref svc) = filters.service_name {
761 df = df.filter(col(SERVICE_NAME_COL).eq(lit(svc.as_str())))?;
762 }
763 match filters.has_errors {
764 Some(true) => {
765 df = df.filter(col(ERROR_COUNT_COL).gt(lit(0i64)))?;
766 }
767 Some(false) => {
768 df = df.filter(col(ERROR_COUNT_COL).eq(lit(0i64)))?;
769 }
770 None => {}
771 }
772 if let Some(sc) = filters.status_code {
773 df = df.filter(col(STATUS_CODE_COL).eq(lit(sc)))?;
774 }
775
776 if let Some(ref uid) = filters.entity_uid {
778 df = df.filter(datafusion::functions_nested::expr_fn::array_has(
779 col(ENTITY_IDS_COL),
780 lit(uid.as_str()),
781 ))?;
782 }
783
784 if let Some(ref uid) = filters.queue_uid {
786 df = df.filter(datafusion::functions_nested::expr_fn::array_has(
787 col(QUEUE_IDS_COL),
788 lit(uid.as_str()),
789 ))?;
790 }
791
792 if let Some(ref ids) = filters.trace_ids {
794 if !ids.is_empty() {
795 let binary_ids: Vec<Expr> = ids
796 .iter()
797 .filter_map(|hex| TraceId::hex_to_bytes(hex).ok())
798 .map(|b| lit(ScalarValue::Binary(Some(b))))
799 .collect();
800 if !binary_ids.is_empty() {
801 df = df.filter(col(TRACE_ID_COL).in_list(binary_ids, false))?;
802 }
803 }
804 }
805
806 if let (Some(cursor_time), Some(ref cursor_id)) =
810 (filters.cursor_start_time, &filters.cursor_trace_id)
811 {
812 if let Ok(cursor_bytes) = TraceId::hex_to_bytes(cursor_id) {
813 let cursor_ts = lit(ScalarValue::TimestampMicrosecond(
814 Some(cursor_time.timestamp_micros()),
815 Some("UTC".into()),
816 ));
817 let cursor_tid = lit(ScalarValue::Binary(Some(cursor_bytes)));
818 let cursor_expr = if direction == "previous" {
819 col(START_TIME_COL)
820 .gt(cursor_ts.clone())
821 .or(col(START_TIME_COL)
822 .eq(cursor_ts)
823 .and(col(TRACE_ID_COL).gt(cursor_tid)))
824 } else {
825 col(START_TIME_COL)
826 .lt(cursor_ts.clone())
827 .or(col(START_TIME_COL)
828 .eq(cursor_ts)
829 .and(col(TRACE_ID_COL).lt(cursor_tid)))
830 };
831 df = df.filter(cursor_expr)?;
832 }
833 }
834
835 if let Some(ref attr_filters) = filters.attribute_filters {
841 if !attr_filters.is_empty() {
842 let mut spans_df = self.ctx.table("trace_spans").await?.select_columns(&[
843 TRACE_ID_COL,
844 START_TIME_COL,
845 SEARCH_BLOB_COL,
846 ])?;
847
848 if let Some(start) = filters.start_time {
850 spans_df = spans_df.filter(col(START_TIME_COL).gt_eq(lit(
851 ScalarValue::TimestampMicrosecond(
852 Some(start.timestamp_micros()),
853 Some("UTC".into()),
854 ),
855 )))?;
856 }
857 if let Some(end) = filters.end_time {
858 spans_df = spans_df.filter(col(START_TIME_COL).lt(lit(
859 ScalarValue::TimestampMicrosecond(
860 Some(end.timestamp_micros()),
861 Some("UTC".into()),
862 ),
863 )))?;
864 }
865
866 let mut attr_expr: Option<Expr> = None;
870 for f in attr_filters {
871 let pattern = crate::parquet::tracing::queries::normalize_attr_filter(f);
872 let cond = match_attr_expr(col(SEARCH_BLOB_COL), lit(pattern));
873 attr_expr = Some(match attr_expr {
874 None => cond,
875 Some(e) => e.or(cond),
876 });
877 }
878 if let Some(expr) = attr_expr {
879 spans_df = spans_df.filter(expr)?;
880 }
881
882 let span_batches = spans_df.select_columns(&[TRACE_ID_COL])?.collect().await?;
885 let mut seen_ids: std::collections::HashSet<Vec<u8>> =
886 std::collections::HashSet::new();
887 let mut binary_ids: Vec<Expr> = Vec::new();
888 for batch in &span_batches {
889 if let Some(col_ref) = batch.column_by_name(TRACE_ID_COL) {
892 let casted = compute::cast(col_ref, &DataType::Binary)?;
893 let col_arr =
894 casted
895 .as_any()
896 .downcast_ref::<BinaryArray>()
897 .ok_or_else(|| {
898 TraceEngineError::DowncastError("trace_id to BinaryArray")
899 })?;
900 for i in 0..batch.num_rows() {
901 let id_bytes = col_arr.value(i).to_vec();
902 if seen_ids.insert(id_bytes.clone()) {
903 binary_ids.push(lit(ScalarValue::Binary(Some(id_bytes))));
904 }
905 }
906 }
907 }
908
909 if !binary_ids.is_empty() {
910 df = df.filter(col(TRACE_ID_COL).in_list(binary_ids, false))?;
911 } else {
912 df = df.filter(lit(false))?;
914 }
915 }
916 }
917
918 df = if direction == "previous" {
922 df.sort(vec![
923 col(START_TIME_COL).sort(true, true),
924 col(TRACE_ID_COL).sort(true, true),
925 ])?
926 } else {
927 df.sort(vec![
928 col(START_TIME_COL).sort(false, false),
929 col(TRACE_ID_COL).sort(false, false),
930 ])?
931 };
932
933 df = df.limit(0, Some(limit + 1))?;
935
936 let batches = df.collect().await?;
937 let mut items = batches_to_trace_list_items(batches)?;
938
939 let has_more = items.len() > limit;
940 if has_more {
941 items.pop(); }
943
944 let (has_next, next_cursor, has_previous, previous_cursor) = match direction {
955 "next" => {
956 let next_cursor = if has_more {
957 items.last().map(|item| TraceCursor {
958 start_time: item.start_time,
959 trace_id: item.trace_id.clone(),
960 })
961 } else {
962 None
963 };
964
965 let previous_cursor = items.first().map(|item| TraceCursor {
966 start_time: item.start_time,
967 trace_id: item.trace_id.clone(),
968 });
969
970 (
971 has_more,
972 next_cursor,
973 filters.cursor_start_time.is_some(),
974 previous_cursor,
975 )
976 }
977 "previous" => {
978 let previous_cursor = if has_more {
983 items.last().map(|item| TraceCursor {
984 start_time: item.start_time,
985 trace_id: item.trace_id.clone(),
986 })
987 } else {
988 None
989 };
990
991 let next_cursor = items.first().map(|item| TraceCursor {
995 start_time: item.start_time,
996 trace_id: item.trace_id.clone(),
997 });
998
999 (
1000 filters.cursor_start_time.is_some(),
1001 next_cursor,
1002 has_more,
1003 previous_cursor,
1004 )
1005 }
1006 _ => (false, None, false, None),
1007 };
1008
1009 Ok(TracePaginationResponse {
1010 items,
1011 has_next,
1012 next_cursor,
1013 has_previous,
1014 previous_cursor,
1015 })
1016 }
1017}
1018
1019fn extract_map_attributes(map_array: &MapArray, row_idx: usize) -> Vec<Attribute> {
1023 if map_array.is_null(row_idx) {
1024 return Vec::new();
1025 }
1026 let entry = map_array.value(row_idx);
1027 let struct_array = entry.as_any().downcast_ref::<StructArray>().unwrap();
1028 let keys_arr = compute::cast(struct_array.column(0).as_ref(), &DataType::Utf8).unwrap();
1029 let keys = keys_arr.as_any().downcast_ref::<StringArray>().unwrap();
1030 let values_arr = compute::cast(struct_array.column(1).as_ref(), &DataType::Utf8).unwrap();
1031 let values = values_arr.as_any().downcast_ref::<StringArray>().unwrap();
1032
1033 (0..struct_array.len())
1034 .map(|i| Attribute {
1035 key: keys.value(i).to_string(),
1036 value: serde_json::from_str(values.value(i)).unwrap_or(serde_json::Value::Null),
1037 })
1038 .collect()
1039}
1040
1041fn extract_list_strings(list: Option<&ListArray>, row_idx: usize) -> Vec<String> {
1043 let Some(list) = list else {
1044 return Vec::new();
1045 };
1046 if list.is_null(row_idx) {
1047 return Vec::new();
1048 }
1049 let inner = list.value(row_idx);
1050 let str_arr = compute::cast(&inner, &DataType::Utf8)
1051 .ok()
1052 .and_then(|a| a.as_any().downcast_ref::<StringArray>().cloned());
1053 match str_arr {
1054 Some(arr) => (0..arr.len())
1055 .filter(|i| !arr.is_null(*i))
1056 .map(|i| arr.value(i).to_string())
1057 .collect(),
1058 None => Vec::new(),
1059 }
1060}
1061
1062fn batches_to_trace_list_items(
1063 batches: Vec<RecordBatch>,
1064) -> Result<Vec<TraceListItem>, TraceEngineError> {
1065 let mut items = Vec::new();
1066
1067 for batch in &batches {
1068 let trace_id_col = batch.column_by_name(TRACE_ID_COL).ok_or_else(|| {
1071 TraceEngineError::UnsupportedOperation("missing trace_id column".into())
1072 })?;
1073 let trace_id_binary = compute::cast(trace_id_col, &DataType::Binary)?;
1074 let trace_ids = trace_id_binary
1075 .as_any()
1076 .downcast_ref::<BinaryArray>()
1077 .ok_or_else(|| {
1078 TraceEngineError::UnsupportedOperation("trace_id cast to BinaryArray failed".into())
1079 })?;
1080
1081 let svc_arr = compute::cast(
1084 batch.column_by_name(SERVICE_NAME_COL).ok_or_else(|| {
1085 TraceEngineError::UnsupportedOperation("missing service_name column".into())
1086 })?,
1087 &DataType::Utf8,
1088 )?;
1089 let service_names = svc_arr
1090 .as_any()
1091 .downcast_ref::<StringArray>()
1092 .ok_or_else(|| {
1093 TraceEngineError::UnsupportedOperation(
1094 "service_name cast to StringArray failed".into(),
1095 )
1096 })?;
1097
1098 let scope_arr = compute::cast(
1099 batch.column_by_name(SCOPE_NAME_COL).ok_or_else(|| {
1100 TraceEngineError::UnsupportedOperation("missing scope_name column".into())
1101 })?,
1102 &DataType::Utf8,
1103 )?;
1104 let scope_names = scope_arr
1105 .as_any()
1106 .downcast_ref::<StringArray>()
1107 .ok_or_else(|| {
1108 TraceEngineError::UnsupportedOperation(
1109 "scope_name cast to StringArray failed".into(),
1110 )
1111 })?;
1112
1113 let scopev_arr = compute::cast(
1114 batch.column_by_name(SCOPE_VERSION_COL).ok_or_else(|| {
1115 TraceEngineError::UnsupportedOperation("missing scope_version column".into())
1116 })?,
1117 &DataType::Utf8,
1118 )?;
1119 let scope_versions = scopev_arr
1120 .as_any()
1121 .downcast_ref::<StringArray>()
1122 .ok_or_else(|| {
1123 TraceEngineError::UnsupportedOperation(
1124 "scope_version cast to StringArray failed".into(),
1125 )
1126 })?;
1127
1128 let root_arr = compute::cast(
1129 batch.column_by_name(ROOT_OPERATION_COL).ok_or_else(|| {
1130 TraceEngineError::UnsupportedOperation("missing root_operation column".into())
1131 })?,
1132 &DataType::Utf8,
1133 )?;
1134 let root_operations = root_arr
1135 .as_any()
1136 .downcast_ref::<StringArray>()
1137 .ok_or_else(|| {
1138 TraceEngineError::UnsupportedOperation(
1139 "root_operation cast to StringArray failed".into(),
1140 )
1141 })?;
1142
1143 let sm_arr = compute::cast(
1144 batch.column_by_name(STATUS_MESSAGE_COL).ok_or_else(|| {
1145 TraceEngineError::UnsupportedOperation("missing status_message column".into())
1146 })?,
1147 &DataType::Utf8,
1148 )?;
1149 let status_messages = sm_arr
1150 .as_any()
1151 .downcast_ref::<StringArray>()
1152 .ok_or_else(|| {
1153 TraceEngineError::UnsupportedOperation(
1154 "status_message cast to StringArray failed".into(),
1155 )
1156 })?;
1157
1158 let resource_attrs_map = batch
1159 .column_by_name(RESOURCE_ATTRIBUTES_COL)
1160 .and_then(|c| c.as_any().downcast_ref::<MapArray>())
1161 .ok_or_else(|| {
1162 TraceEngineError::UnsupportedOperation("missing resource_attributes column".into())
1163 })?;
1164
1165 let entity_ids_list = batch
1166 .column_by_name(ENTITY_IDS_COL)
1167 .and_then(|c| c.as_any().downcast_ref::<ListArray>());
1168
1169 let queue_ids_list = batch
1170 .column_by_name(QUEUE_IDS_COL)
1171 .and_then(|c| c.as_any().downcast_ref::<ListArray>());
1172
1173 let start_times = batch
1174 .column_by_name(START_TIME_COL)
1175 .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>())
1176 .ok_or_else(|| {
1177 TraceEngineError::UnsupportedOperation("missing start_time column".into())
1178 })?;
1179
1180 let end_times = batch
1181 .column_by_name(END_TIME_COL)
1182 .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>())
1183 .ok_or_else(|| {
1184 TraceEngineError::UnsupportedOperation("missing end_time column".into())
1185 })?;
1186
1187 let durations = batch
1188 .column_by_name(DURATION_MS_COL)
1189 .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1190 .ok_or_else(|| {
1191 TraceEngineError::UnsupportedOperation("missing duration_ms column".into())
1192 })?;
1193
1194 let status_codes = batch
1195 .column_by_name(STATUS_CODE_COL)
1196 .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
1197 .ok_or_else(|| {
1198 TraceEngineError::UnsupportedOperation("missing status_code column".into())
1199 })?;
1200
1201 let span_counts = batch
1202 .column_by_name(SPAN_COUNT_COL)
1203 .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1204 .ok_or_else(|| {
1205 TraceEngineError::UnsupportedOperation("missing span_count column".into())
1206 })?;
1207
1208 let error_counts = batch
1209 .column_by_name(ERROR_COUNT_COL)
1210 .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1211 .ok_or_else(|| {
1212 TraceEngineError::UnsupportedOperation("missing error_count column".into())
1213 })?;
1214
1215 for i in 0..batch.num_rows() {
1216 let trace_id_hex = hex::encode(trace_ids.value(i));
1217
1218 let start_time = micros_to_datetime(start_times.value(i));
1219 let end_time = if end_times.is_null(i) {
1220 None
1221 } else {
1222 Some(micros_to_datetime(end_times.value(i)))
1223 };
1224 let duration_ms = if durations.is_null(i) {
1225 None
1226 } else {
1227 Some(durations.value(i))
1228 };
1229 let error_count = error_counts.value(i);
1230
1231 let resource_attributes = extract_map_attributes(resource_attrs_map, i);
1232
1233 let entity_ids = extract_list_strings(entity_ids_list, i);
1234 let queue_ids = extract_list_strings(queue_ids_list, i);
1235
1236 items.push(TraceListItem {
1237 trace_id: trace_id_hex,
1238 service_name: service_names.value(i).to_string(),
1239 scope_name: scope_names.value(i).to_string(),
1240 scope_version: scope_versions.value(i).to_string(),
1241 root_operation: root_operations.value(i).to_string(),
1242 start_time,
1243 end_time,
1244 duration_ms,
1245 status_code: status_codes.value(i),
1246 status_message: if status_messages.is_null(i) {
1247 None
1248 } else {
1249 Some(status_messages.value(i).to_string())
1250 },
1251 span_count: span_counts.value(i),
1252 has_errors: error_count > 0,
1253 error_count,
1254 resource_attributes,
1255 entity_ids,
1256 queue_ids,
1257 });
1258 }
1259 }
1260
1261 Ok(items)
1262}
1263
1264fn micros_to_datetime(micros: i64) -> DateTime<Utc> {
1265 let secs = micros / 1_000_000;
1266 let nanos = ((micros % 1_000_000) * 1_000) as u32;
1267 Utc.timestamp_opt(secs, nanos).unwrap()
1268}
1269
1270#[cfg(test)]
1271mod tests {
1272 use super::*;
1273 use crate::storage::ObjectStore;
1274 use scouter_settings::ObjectStorageSettings;
1275 use scouter_types::sql::TraceFilters;
1276 use scouter_types::{Attribute, SpanId, TraceId, TraceSpanRecord};
1277 use tracing_subscriber;
1278
1279 fn cleanup() {
1280 let _ = tracing_subscriber::fmt()
1281 .with_max_level(tracing::Level::INFO)
1282 .try_init();
1283
1284 let storage_settings = ObjectStorageSettings::default();
1285 let current_dir = std::env::current_dir().unwrap();
1286 let storage_path = current_dir.join(storage_settings.storage_root());
1287 if storage_path.exists() {
1288 std::fs::remove_dir_all(storage_path).unwrap();
1289 }
1290 }
1291
1292 fn make_test_ctx(storage_settings: &ObjectStorageSettings) -> Arc<SessionContext> {
1295 Arc::new(
1296 ObjectStore::new(storage_settings)
1297 .unwrap()
1298 .get_session()
1299 .unwrap(),
1300 )
1301 }
1302
1303 fn make_summary(
1304 trace_id_bytes: [u8; 16],
1305 service_name: &str,
1306 error_count: i64,
1307 resource_attributes: Vec<Attribute>,
1308 ) -> TraceSummaryRecord {
1309 let now = Utc::now();
1310 TraceSummaryRecord {
1311 trace_id: TraceId::from_bytes(trace_id_bytes),
1312 service_name: service_name.to_string(),
1313 scope_name: "test.scope".to_string(),
1314 scope_version: String::new(),
1315 root_operation: "root_op".to_string(),
1316 start_time: now,
1317 end_time: Some(now + chrono::Duration::milliseconds(200)),
1318 status_code: if error_count > 0 { 2 } else { 0 },
1319 status_message: if error_count > 0 {
1320 "Internal Server Error".to_string()
1321 } else {
1322 "OK".to_string()
1323 },
1324 span_count: 3,
1325 error_count,
1326 resource_attributes,
1327 entity_ids: vec![],
1328 queue_ids: vec![],
1329 }
1330 }
1331
1332 #[tokio::test]
1334 async fn test_summary_write_and_paginate_basic() -> Result<(), TraceEngineError> {
1335 cleanup();
1336
1337 let storage_settings = ObjectStorageSettings::default();
1338 let ctx = make_test_ctx(&storage_settings);
1339 let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1340
1341 let s1 = make_summary([1u8; 16], "svc_a", 0, vec![]);
1342 let s2 = make_summary([2u8; 16], "svc_b", 0, vec![]);
1343 service.write_summaries(vec![s1, s2]).await?;
1344 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1345
1346 let start = Utc::now() - chrono::Duration::hours(1);
1347 let end = Utc::now() + chrono::Duration::hours(1);
1348 let filters = TraceFilters {
1349 service_name: None,
1350 has_errors: None,
1351 status_code: None,
1352 start_time: Some(start),
1353 end_time: Some(end),
1354 limit: Some(25),
1355 cursor_start_time: None,
1356 cursor_trace_id: None,
1357 direction: None,
1358 attribute_filters: None,
1359 trace_ids: None,
1360 entity_uid: None,
1361 queue_uid: None,
1362 };
1363
1364 let response = service.query_service.get_paginated_traces(&filters).await?;
1365 assert!(
1366 response.items.len() >= 2,
1367 "Expected at least 2 items, got {}",
1368 response.items.len()
1369 );
1370
1371 service.shutdown().await?;
1372 cleanup();
1373 Ok(())
1374 }
1375
1376 #[tokio::test]
1378 async fn test_summary_has_errors_filter() -> Result<(), TraceEngineError> {
1379 cleanup();
1380
1381 let storage_settings = ObjectStorageSettings::default();
1382 let ctx = make_test_ctx(&storage_settings);
1383 let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1384
1385 let ok_summary = make_summary([3u8; 16], "svc", 0, vec![]);
1386 let err_summary = make_summary([4u8; 16], "svc", 2, vec![]);
1387 service
1388 .write_summaries(vec![ok_summary, err_summary])
1389 .await?;
1390 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1391
1392 let start = Utc::now() - chrono::Duration::hours(1);
1393 let end = Utc::now() + chrono::Duration::hours(1);
1394
1395 let base_filters = TraceFilters {
1396 service_name: None,
1397 has_errors: None,
1398 status_code: None,
1399 start_time: Some(start),
1400 end_time: Some(end),
1401 limit: Some(25),
1402 cursor_start_time: None,
1403 cursor_trace_id: None,
1404 direction: None,
1405 attribute_filters: None,
1406 trace_ids: None,
1407 entity_uid: None,
1408 queue_uid: None,
1409 };
1410
1411 let mut filters_err = base_filters.clone();
1413 filters_err.has_errors = Some(true);
1414 let errors_only = service
1415 .query_service
1416 .get_paginated_traces(&filters_err)
1417 .await?;
1418 for item in &errors_only.items {
1419 assert!(
1420 item.error_count > 0,
1421 "Expected error trace, got: {:?}",
1422 item
1423 );
1424 }
1425 assert!(
1426 !errors_only.items.is_empty(),
1427 "Expected at least one error trace"
1428 );
1429
1430 let mut filters_ok = base_filters.clone();
1432 filters_ok.has_errors = Some(false);
1433 let no_errors = service
1434 .query_service
1435 .get_paginated_traces(&filters_ok)
1436 .await?;
1437 for item in &no_errors.items {
1438 assert_eq!(
1439 item.error_count, 0,
1440 "Expected non-error trace, got error_count={}",
1441 item.error_count
1442 );
1443 }
1444 assert!(
1445 !no_errors.items.is_empty(),
1446 "Expected at least one non-error trace"
1447 );
1448
1449 service.shutdown().await?;
1450 cleanup();
1451 Ok(())
1452 }
1453
1454 #[tokio::test]
1456 async fn test_summary_service_name_filter() -> Result<(), TraceEngineError> {
1457 cleanup();
1458
1459 let storage_settings = ObjectStorageSettings::default();
1460 let ctx = make_test_ctx(&storage_settings);
1461 let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1462
1463 let s_alpha = make_summary([5u8; 16], "alpha_service", 0, vec![]);
1464 let s_beta = make_summary([6u8; 16], "beta_service", 0, vec![]);
1465 service.write_summaries(vec![s_alpha, s_beta]).await?;
1466 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1467
1468 let start = Utc::now() - chrono::Duration::hours(1);
1469 let end = Utc::now() + chrono::Duration::hours(1);
1470 let filters = TraceFilters {
1471 service_name: Some("alpha_service".to_string()),
1472 has_errors: None,
1473 status_code: None,
1474 start_time: Some(start),
1475 end_time: Some(end),
1476 limit: Some(25),
1477 cursor_start_time: None,
1478 cursor_trace_id: None,
1479 direction: None,
1480 attribute_filters: None,
1481 trace_ids: None,
1482 entity_uid: None,
1483 queue_uid: None,
1484 };
1485
1486 let response = service.query_service.get_paginated_traces(&filters).await?;
1487 assert!(
1488 !response.items.is_empty(),
1489 "Expected results for alpha_service"
1490 );
1491 for item in &response.items {
1492 assert_eq!(
1493 item.service_name, "alpha_service",
1494 "Expected only alpha_service items, got: {}",
1495 item.service_name
1496 );
1497 }
1498
1499 service.shutdown().await?;
1500 cleanup();
1501 Ok(())
1502 }
1503
1504 #[tokio::test]
1506 async fn test_summary_trace_ids_filter() -> Result<(), TraceEngineError> {
1507 cleanup();
1508
1509 let storage_settings = ObjectStorageSettings::default();
1510 let ctx = make_test_ctx(&storage_settings);
1511 let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1512
1513 let wanted_id = TraceId::from_bytes([7u8; 16]);
1514 let unwanted_id = TraceId::from_bytes([8u8; 16]);
1515
1516 let s1 = make_summary([7u8; 16], "svc", 0, vec![]);
1517 let s2 = make_summary([8u8; 16], "svc", 0, vec![]);
1518 service.write_summaries(vec![s1, s2]).await?;
1519 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1520
1521 let start = Utc::now() - chrono::Duration::hours(1);
1522 let end = Utc::now() + chrono::Duration::hours(1);
1523 let filters = TraceFilters {
1524 service_name: None,
1525 has_errors: None,
1526 status_code: None,
1527 start_time: Some(start),
1528 end_time: Some(end),
1529 limit: Some(25),
1530 cursor_start_time: None,
1531 cursor_trace_id: None,
1532 direction: None,
1533 attribute_filters: None,
1534 trace_ids: Some(vec![wanted_id.to_hex()]),
1535 entity_uid: None,
1536 queue_uid: None,
1537 };
1538
1539 let response = service.query_service.get_paginated_traces(&filters).await?;
1540 assert_eq!(
1541 response.items.len(),
1542 1,
1543 "Expected exactly 1 item from trace_ids filter"
1544 );
1545 assert_eq!(
1546 response.items[0].trace_id,
1547 wanted_id.to_hex(),
1548 "Returned wrong trace_id"
1549 );
1550 assert_ne!(
1551 response.items[0].trace_id,
1552 unwanted_id.to_hex(),
1553 "Should not have returned unwanted trace_id"
1554 );
1555
1556 service.shutdown().await?;
1557 cleanup();
1558 Ok(())
1559 }
1560
1561 #[tokio::test]
1563 async fn test_summary_cursor_pagination() -> Result<(), TraceEngineError> {
1564 cleanup();
1565 let storage_settings = ObjectStorageSettings::default();
1566 let ctx = make_test_ctx(&storage_settings);
1567 let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1568
1569 let now = Utc::now();
1570 let summaries: Vec<TraceSummaryRecord> = (0u8..100)
1571 .map(|i| {
1572 let mut s = make_summary([i; 16], "svc", 0, vec![]);
1573 s.start_time = now - chrono::Duration::minutes(i as i64);
1574 s
1575 })
1576 .collect();
1577 service.write_summaries(summaries).await?;
1578 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1579
1580 let mut filters = TraceFilters {
1581 start_time: Some(now - chrono::Duration::hours(2)),
1582 end_time: Some(now + chrono::Duration::hours(1)),
1583 limit: Some(50),
1584 ..Default::default()
1585 };
1586
1587 let first = service.query_service.get_paginated_traces(&filters).await?;
1589 assert_eq!(first.items.len(), 50, "first page: 50 items");
1590 assert!(
1591 first.next_cursor.is_some(),
1592 "first page: should have next_cursor"
1593 );
1594
1595 let next_cur = first.next_cursor.clone().unwrap();
1597 filters.cursor_start_time = Some(next_cur.start_time);
1598 filters.cursor_trace_id = Some(next_cur.trace_id.clone());
1599 filters.direction = Some("next".to_string());
1600 let second = service.query_service.get_paginated_traces(&filters).await?;
1601 assert_eq!(second.items.len(), 50, "second page: 50 items");
1602 assert!(
1603 second.items[0].start_time <= next_cur.start_time,
1604 "second page first item must be <= cursor"
1605 );
1606 assert!(second.previous_cursor.is_some());
1607
1608 let prev_cur = second.previous_cursor.unwrap();
1610 filters.cursor_start_time = Some(prev_cur.start_time);
1611 filters.cursor_trace_id = Some(prev_cur.trace_id.clone());
1612 filters.direction = Some("previous".to_string());
1613 let prev = service.query_service.get_paginated_traces(&filters).await?;
1614 assert_eq!(prev.items.len(), 50, "previous page: 50 items");
1615
1616 service.shutdown().await?;
1617 cleanup();
1618 Ok(())
1619 }
1620
1621 #[tokio::test]
1623 async fn test_summary_attribute_filter_via_join() -> Result<(), TraceEngineError> {
1624 use crate::parquet::tracing::service::TraceSpanService;
1625
1626 cleanup();
1627 let storage_settings = ObjectStorageSettings::default();
1628
1629 let span_service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
1631 let shared_ctx = span_service.ctx.clone();
1632
1633 let summary_service = TraceSummaryService::new(&storage_settings, 24, shared_ctx).await?;
1635
1636 let now = Utc::now();
1637 let kafka_trace = TraceId::from_bytes([70u8; 16]);
1638 let plain_trace = TraceId::from_bytes([80u8; 16]);
1639
1640 let kafka_span = make_span_record(
1641 &kafka_trace,
1642 SpanId::from_bytes([70u8; 8]),
1643 "svc",
1644 vec![Attribute {
1645 key: "component".to_string(),
1646 value: serde_json::Value::String("kafka".to_string()),
1647 }],
1648 );
1649 let plain_span =
1650 make_span_record(&plain_trace, SpanId::from_bytes([80u8; 8]), "svc", vec![]);
1651 span_service
1652 .write_spans(vec![kafka_span, plain_span])
1653 .await?;
1654
1655 let mut kafka_summary = make_summary([70u8; 16], "svc", 0, vec![]);
1656 kafka_summary.start_time = now;
1657 let mut plain_summary = make_summary([80u8; 16], "svc", 0, vec![]);
1658 plain_summary.start_time = now;
1659 summary_service
1660 .write_summaries(vec![kafka_summary, plain_summary])
1661 .await?;
1662
1663 tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
1664
1665 let filters = TraceFilters {
1666 start_time: Some(now - chrono::Duration::hours(1)),
1667 end_time: Some(now + chrono::Duration::hours(1)),
1668 attribute_filters: Some(vec!["component:kafka".to_string()]),
1669 limit: Some(25),
1670 ..Default::default()
1671 };
1672
1673 let response = summary_service
1674 .query_service
1675 .get_paginated_traces(&filters)
1676 .await?;
1677
1678 assert!(
1679 !response.items.is_empty(),
1680 "attribute filter must return results"
1681 );
1682 assert!(
1683 response
1684 .items
1685 .iter()
1686 .all(|i| i.trace_id == kafka_trace.to_hex()),
1687 "only kafka trace should appear; got {:?}",
1688 response
1689 .items
1690 .iter()
1691 .map(|i| &i.trace_id)
1692 .collect::<Vec<_>>()
1693 );
1694
1695 span_service.shutdown().await?;
1696 summary_service.shutdown().await?;
1697 cleanup();
1698 Ok(())
1699 }
1700
1701 #[tokio::test]
1704 async fn test_summary_queue_id_filter_and_span_lookup() -> Result<(), TraceEngineError> {
1705 use crate::parquet::tracing::service::TraceSpanService;
1706
1707 cleanup();
1708 let storage_settings = ObjectStorageSettings::default();
1709
1710 let span_service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
1712 let shared_ctx = span_service.ctx.clone();
1713
1714 let summary_service = TraceSummaryService::new(&storage_settings, 24, shared_ctx).await?;
1716
1717 let now = Utc::now();
1718 let queue_trace = TraceId::from_bytes([90u8; 16]);
1719 let plain_trace = TraceId::from_bytes([91u8; 16]);
1720 let target_queue_uid = "queue-record-abc123";
1721
1722 let queue_span = make_span_record(
1724 &queue_trace,
1725 SpanId::from_bytes([90u8; 8]),
1726 "svc_queue",
1727 vec![],
1728 );
1729 let plain_span = make_span_record(
1730 &plain_trace,
1731 SpanId::from_bytes([91u8; 8]),
1732 "svc_queue",
1733 vec![],
1734 );
1735 span_service
1736 .write_spans_direct(vec![queue_span, plain_span])
1737 .await?;
1738
1739 let mut queue_summary = make_summary([90u8; 16], "svc_queue", 0, vec![]);
1741 queue_summary.start_time = now;
1742 queue_summary.queue_ids = vec![target_queue_uid.to_string()];
1743
1744 let mut plain_summary = make_summary([91u8; 16], "svc_queue", 0, vec![]);
1745 plain_summary.start_time = now;
1746 summary_service
1749 .write_summaries(vec![queue_summary, plain_summary])
1750 .await?;
1751
1752 tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
1753
1754 let filters = TraceFilters {
1756 start_time: Some(now - chrono::Duration::hours(1)),
1757 end_time: Some(now + chrono::Duration::hours(1)),
1758 queue_uid: Some(target_queue_uid.to_string()),
1759 limit: Some(25),
1760 ..Default::default()
1761 };
1762
1763 let response = summary_service
1764 .query_service
1765 .get_paginated_traces(&filters)
1766 .await?;
1767
1768 assert!(
1769 !response.items.is_empty(),
1770 "queue_uid filter must return at least one result"
1771 );
1772 assert!(
1773 response
1774 .items
1775 .iter()
1776 .all(|i| i.trace_id == queue_trace.to_hex()),
1777 "only the queue trace should appear; got {:?}",
1778 response
1779 .items
1780 .iter()
1781 .map(|i| &i.trace_id)
1782 .collect::<Vec<_>>()
1783 );
1784
1785 let returned_trace_id =
1787 TraceId::from_hex(&response.items[0].trace_id).expect("trace_id must be valid hex");
1788 let spans = span_service
1789 .query_service
1790 .get_trace_spans(
1791 Some(returned_trace_id.as_bytes()),
1792 None,
1793 Some(&(now - chrono::Duration::hours(1))),
1794 Some(&(now + chrono::Duration::hours(1))),
1795 None,
1796 )
1797 .await?;
1798
1799 assert!(
1800 !spans.is_empty(),
1801 "should find spans for the returned trace_id"
1802 );
1803
1804 span_service.shutdown().await?;
1805 summary_service.shutdown().await?;
1806 cleanup();
1807 Ok(())
1808 }
1809
1810 fn make_span_record(
1812 trace_id: &TraceId,
1813 span_id: SpanId,
1814 service_name: &str,
1815 attributes: Vec<Attribute>,
1816 ) -> TraceSpanRecord {
1817 let now = Utc::now();
1818 TraceSpanRecord {
1819 created_at: now,
1820 trace_id: trace_id.clone(),
1821 span_id,
1822 parent_span_id: None,
1823 flags: 1,
1824 trace_state: String::new(),
1825 scope_name: "test.scope".to_string(),
1826 scope_version: None,
1827 span_name: "op".to_string(),
1828 span_kind: "INTERNAL".to_string(),
1829 start_time: now,
1830 end_time: now + chrono::Duration::milliseconds(100),
1831 duration_ms: 100,
1832 status_code: 0,
1833 status_message: "OK".to_string(),
1834 attributes,
1835 events: vec![],
1836 links: vec![],
1837 label: None,
1838 input: serde_json::Value::Null,
1839 output: serde_json::Value::Null,
1840 service_name: service_name.to_string(),
1841 resource_attributes: vec![],
1842 }
1843 }
1844
1845 #[tokio::test]
1847 async fn test_summary_resource_attributes_roundtrip() -> Result<(), TraceEngineError> {
1848 cleanup();
1849
1850 let storage_settings = ObjectStorageSettings::default();
1851 let ctx = make_test_ctx(&storage_settings);
1852 let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1853
1854 let attrs = vec![Attribute {
1855 key: "cloud.region".to_string(),
1856 value: serde_json::Value::String("us-east-1".to_string()),
1857 }];
1858 let summary = make_summary([9u8; 16], "svc", 0, attrs.clone());
1859 service.write_summaries(vec![summary]).await?;
1860 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1861
1862 let start = Utc::now() - chrono::Duration::hours(1);
1863 let end = Utc::now() + chrono::Duration::hours(1);
1864 let filters = TraceFilters {
1865 service_name: None,
1866 has_errors: None,
1867 status_code: None,
1868 start_time: Some(start),
1869 end_time: Some(end),
1870 limit: Some(25),
1871 cursor_start_time: None,
1872 cursor_trace_id: None,
1873 direction: None,
1874 attribute_filters: None,
1875 trace_ids: Some(vec![TraceId::from_bytes([9u8; 16]).to_hex()]),
1876 entity_uid: None,
1877 queue_uid: None,
1878 };
1879
1880 let response = service.query_service.get_paginated_traces(&filters).await?;
1881 assert_eq!(response.items.len(), 1, "Expected exactly 1 item");
1882 assert_eq!(
1883 response.items[0].resource_attributes.len(),
1884 1,
1885 "Expected 1 resource attribute"
1886 );
1887 assert_eq!(response.items[0].resource_attributes[0].key, "cloud.region");
1888
1889 service.shutdown().await?;
1890 cleanup();
1891 Ok(())
1892 }
1893}