1use crate::error::TraceEngineError;
2use crate::parquet::control::{get_pod_id, ControlTableEngine};
3use crate::parquet::tracing::catalog::TraceCatalogProvider;
4use crate::parquet::tracing::traits::{arrow_schema_to_delta, resource_attribute_field};
5use crate::parquet::utils::match_attr_expr;
6use crate::parquet::utils::register_cloud_logstore_factories;
7use crate::storage::ObjectStore;
8use arrow::array::*;
9use arrow::compute;
10use arrow::datatypes::*;
11use arrow_array::Array;
12use arrow_array::RecordBatch;
13use chrono::{DateTime, Datelike, Utc};
14use datafusion::logical_expr::{cast as df_cast, col, lit, SortExpr};
15use datafusion::prelude::*;
16use datafusion::scalar::ScalarValue;
17use deltalake::operations::optimize::OptimizeType;
18use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty};
19use scouter_types::sql::{TraceFilters, TraceListItem};
20use scouter_types::{Attribute, TraceCursor, TraceId, TracePaginationResponse, TraceSummaryRecord};
21use std::sync::Arc;
22use tokio::sync::oneshot;
23use tokio::sync::{mpsc, RwLock as AsyncRwLock};
24use tokio::time::{interval, Duration};
25use tracing::{debug, error, info, instrument};
26use url::Url;
27
28const UNIX_EPOCH_DAYS: i32 = 719_163;
31
32const SUMMARY_TABLE_NAME: &str = "trace_summaries";
33
34const TASK_SUMMARY_OPTIMIZE: &str = "summary_optimize";
36
37const TRACE_ID_COL: &str = "trace_id";
39const SERVICE_NAME_COL: &str = "service_name";
40const SCOPE_NAME_COL: &str = "scope_name";
41const SCOPE_VERSION_COL: &str = "scope_version";
42const ROOT_OPERATION_COL: &str = "root_operation";
43const START_TIME_COL: &str = "start_time";
44const END_TIME_COL: &str = "end_time";
45const DURATION_MS_COL: &str = "duration_ms";
46const STATUS_CODE_COL: &str = "status_code";
47const STATUS_MESSAGE_COL: &str = "status_message";
48const SPAN_COUNT_COL: &str = "span_count";
49const ERROR_COUNT_COL: &str = "error_count";
50const SEARCH_BLOB_COL: &str = "search_blob";
51
52const RESOURCE_ATTRIBUTES_COL: &str = "resource_attributes";
53const ENTITY_IDS_COL: &str = "entity_ids";
54const QUEUE_IDS_COL: &str = "queue_ids";
55
56const PARTITION_DATE_COL: &str = "partition_date";
57
58fn create_summary_schema() -> Schema {
61 Schema::new(vec![
62 Field::new(TRACE_ID_COL, DataType::FixedSizeBinary(16), false),
63 Field::new(
64 SERVICE_NAME_COL,
65 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
66 false,
67 ),
68 Field::new(SCOPE_NAME_COL, DataType::Utf8, false),
69 Field::new(SCOPE_VERSION_COL, DataType::Utf8, true),
70 Field::new(ROOT_OPERATION_COL, DataType::Utf8, false),
71 Field::new(
72 START_TIME_COL,
73 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
74 false,
75 ),
76 Field::new(
77 END_TIME_COL,
78 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
79 true,
80 ),
81 Field::new(DURATION_MS_COL, DataType::Int64, true),
82 Field::new(STATUS_CODE_COL, DataType::Int32, false),
83 Field::new(STATUS_MESSAGE_COL, DataType::Utf8, true),
84 Field::new(SPAN_COUNT_COL, DataType::Int64, false),
85 Field::new(ERROR_COUNT_COL, DataType::Int64, false),
86 resource_attribute_field(),
87 Field::new(
88 ENTITY_IDS_COL,
89 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
90 true,
91 ),
92 Field::new(
93 QUEUE_IDS_COL,
94 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
95 true,
96 ),
97 Field::new(PARTITION_DATE_COL, DataType::Date32, false),
98 ])
99}
100
101struct TraceSummaryBatchBuilder {
104 schema: Arc<Schema>,
105 trace_id: FixedSizeBinaryBuilder,
106 service_name: StringDictionaryBuilder<Int32Type>,
107 scope_name: StringBuilder,
108 scope_version: StringBuilder,
109 root_operation: StringBuilder,
110 start_time: TimestampMicrosecondBuilder,
111 end_time: TimestampMicrosecondBuilder,
112 duration_ms: Int64Builder,
113 status_code: Int32Builder,
114 status_message: StringBuilder,
115 span_count: Int64Builder,
116 error_count: Int64Builder,
117 resource_attributes: MapBuilder<StringBuilder, StringViewBuilder>,
118 entity_ids: ListBuilder<StringBuilder>,
119 queue_ids: ListBuilder<StringBuilder>,
120 partition_date: Date32Builder,
121}
122
123impl TraceSummaryBatchBuilder {
124 fn new(schema: Arc<Schema>, capacity: usize) -> Self {
125 let map_field_names = MapFieldNames {
126 entry: "key_value".to_string(),
127 key: "key".to_string(),
128 value: "value".to_string(),
129 };
130 let resource_attributes = MapBuilder::new(
131 Some(map_field_names),
132 StringBuilder::new(),
133 StringViewBuilder::new(),
134 );
135 Self {
136 schema,
137 trace_id: FixedSizeBinaryBuilder::with_capacity(capacity, 16),
138 service_name: StringDictionaryBuilder::new(),
139 scope_name: StringBuilder::with_capacity(capacity, capacity * 16),
140 scope_version: StringBuilder::with_capacity(capacity, capacity * 8),
141 root_operation: StringBuilder::with_capacity(capacity, capacity * 32),
142 start_time: TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone("UTC"),
143 end_time: TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone("UTC"),
144 duration_ms: Int64Builder::with_capacity(capacity),
145 status_code: Int32Builder::with_capacity(capacity),
146 status_message: StringBuilder::with_capacity(capacity, capacity * 16),
147 span_count: Int64Builder::with_capacity(capacity),
148 error_count: Int64Builder::with_capacity(capacity),
149 resource_attributes,
150 entity_ids: ListBuilder::new(StringBuilder::new()),
151 queue_ids: ListBuilder::new(StringBuilder::new()),
152 partition_date: Date32Builder::with_capacity(capacity),
153 }
154 }
155
156 fn append(&mut self, rec: &TraceSummaryRecord) -> Result<(), TraceEngineError> {
157 self.trace_id.append_value(rec.trace_id.as_bytes())?;
158 self.service_name.append_value(&rec.service_name);
159 self.scope_name.append_value(&rec.scope_name);
160 if rec.scope_version.is_empty() {
161 self.scope_version.append_null();
162 } else {
163 self.scope_version.append_value(&rec.scope_version);
164 }
165 self.root_operation.append_value(&rec.root_operation);
166 self.start_time
167 .append_value(rec.start_time.timestamp_micros());
168 match rec.end_time {
169 Some(end) => self.end_time.append_value(end.timestamp_micros()),
170 None => self.end_time.append_null(),
171 }
172 let duration = rec
173 .end_time
174 .map(|end| (end - rec.start_time).num_milliseconds().max(0));
175 match duration {
176 Some(d) => self.duration_ms.append_value(d),
177 None => self.duration_ms.append_null(),
178 }
179 self.status_code.append_value(rec.status_code);
180 if rec.status_message.is_empty() {
181 self.status_message.append_null();
182 } else {
183 self.status_message.append_value(&rec.status_message);
184 }
185 self.span_count.append_value(rec.span_count);
186 self.error_count.append_value(rec.error_count);
187 if rec.resource_attributes.is_empty() {
188 self.resource_attributes.append(false)?; } else {
190 for attr in &rec.resource_attributes {
191 self.resource_attributes.keys().append_value(&attr.key);
192 let value_str =
193 serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
194 self.resource_attributes.values().append_value(value_str);
195 }
196 self.resource_attributes.append(true)?;
197 }
198 if rec.entity_ids.is_empty() {
199 self.entity_ids.append_null();
200 } else {
201 for id in &rec.entity_ids {
202 self.entity_ids.values().append_value(id);
203 }
204 self.entity_ids.append(true);
205 }
206 if rec.queue_ids.is_empty() {
207 self.queue_ids.append_null();
208 } else {
209 for id in &rec.queue_ids {
210 self.queue_ids.values().append_value(id);
211 }
212 self.queue_ids.append(true);
213 }
214 let days = rec.start_time.date_naive().num_days_from_ce() - UNIX_EPOCH_DAYS;
216 self.partition_date.append_value(days);
217 Ok(())
218 }
219
220 fn finish(mut self) -> Result<RecordBatch, TraceEngineError> {
221 let columns: Vec<Arc<dyn Array>> = vec![
222 Arc::new(self.trace_id.finish()),
223 Arc::new(self.service_name.finish()),
224 Arc::new(self.scope_name.finish()),
225 Arc::new(self.scope_version.finish()),
226 Arc::new(self.root_operation.finish()),
227 Arc::new(self.start_time.finish()),
228 Arc::new(self.end_time.finish()),
229 Arc::new(self.duration_ms.finish()),
230 Arc::new(self.status_code.finish()),
231 Arc::new(self.status_message.finish()),
232 Arc::new(self.span_count.finish()),
233 Arc::new(self.error_count.finish()),
234 Arc::new(self.resource_attributes.finish()),
235 Arc::new(self.entity_ids.finish()),
236 Arc::new(self.queue_ids.finish()),
237 Arc::new(self.partition_date.finish()),
238 ];
239 RecordBatch::try_new(self.schema, columns).map_err(Into::into)
240 }
241}
242
243pub enum SummaryTableCommand {
246 Write {
247 records: Vec<TraceSummaryRecord>,
248 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
249 },
250 Optimize {
251 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
252 },
253 Vacuum {
254 retention_hours: u64,
255 respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
256 },
257 Shutdown,
258}
259
260async fn build_summary_url(object_store: &ObjectStore) -> Result<Url, TraceEngineError> {
261 let mut base = object_store.get_base_url()?;
262 let mut path = base.path().to_string();
263 if !path.ends_with('/') {
264 path.push('/');
265 }
266 path.push_str(SUMMARY_TABLE_NAME);
267 base.set_path(&path);
268 Ok(base)
269}
270
271async fn create_summary_table(
272 object_store: &ObjectStore,
273 table_url: Url,
274 schema: SchemaRef,
275) -> Result<DeltaTable, TraceEngineError> {
276 info!(
277 "Creating trace summary table [{}://.../{} ]",
278 table_url.scheme(),
279 table_url
280 .path_segments()
281 .and_then(|mut s| s.next_back())
282 .unwrap_or(SUMMARY_TABLE_NAME)
283 );
284 let store = object_store.as_dyn_object_store();
285 let table = DeltaTableBuilder::from_url(table_url.clone())?
286 .with_storage_backend(store, table_url)
287 .build()?;
288 let delta_fields = arrow_schema_to_delta(&schema);
289 table
290 .create()
291 .with_table_name(SUMMARY_TABLE_NAME)
292 .with_columns(delta_fields)
293 .with_partition_columns(vec![PARTITION_DATE_COL.to_string()])
294 .with_configuration_property(
297 TableProperty::DataSkippingStatsColumns,
298 Some("start_time,end_time,service_name,duration_ms,status_code,span_count,error_count,partition_date"),
299 )
300 .await
301 .map_err(Into::into)
302}
303
304async fn build_or_create_summary_table(
305 object_store: &ObjectStore,
306 schema: SchemaRef,
307) -> Result<DeltaTable, TraceEngineError> {
308 register_cloud_logstore_factories();
309 let table_url = build_summary_url(object_store).await?;
310 info!(
311 "Loading trace summary table [{}://.../{} ]",
312 table_url.scheme(),
313 table_url
314 .path_segments()
315 .and_then(|mut s| s.next_back())
316 .unwrap_or(SUMMARY_TABLE_NAME)
317 );
318
319 let is_delta_table = if table_url.scheme() == "file" {
324 if let Ok(path) = table_url.to_file_path() {
325 if !path.exists() {
326 info!("Creating directory for summary table: {:?}", path);
327 std::fs::create_dir_all(&path)?;
328 }
329 path.join("_delta_log").exists()
330 } else {
331 false
332 }
333 } else {
334 let store = object_store.as_dyn_object_store();
335 match DeltaTableBuilder::from_url(table_url.clone()) {
336 Ok(builder) => builder
337 .with_storage_backend(store, table_url.clone())
338 .load()
339 .await
340 .is_ok(),
341 Err(_) => false,
342 }
343 };
344
345 if is_delta_table {
346 info!(
347 "Loaded existing trace summary table [{}://.../{} ]",
348 table_url.scheme(),
349 table_url
350 .path_segments()
351 .and_then(|mut s| s.next_back())
352 .unwrap_or(SUMMARY_TABLE_NAME)
353 );
354 let store = object_store.as_dyn_object_store();
355 DeltaTableBuilder::from_url(table_url.clone())?
356 .with_storage_backend(store, table_url)
357 .load()
358 .await
359 .map_err(Into::into)
360 } else {
361 info!("Summary table does not exist, creating new table");
362 create_summary_table(object_store, table_url, schema).await
363 }
364}
365
366pub struct TraceSummaryDBEngine {
367 schema: Arc<Schema>,
368 table: Arc<AsyncRwLock<DeltaTable>>,
369 pub ctx: Arc<SessionContext>,
370 catalog: Arc<TraceCatalogProvider>,
373 control: ControlTableEngine,
374}
375
376impl TraceSummaryDBEngine {
377 pub async fn new(
383 object_store: &ObjectStore,
384 ctx: Arc<SessionContext>,
385 catalog: Arc<TraceCatalogProvider>,
386 ) -> Result<Self, TraceEngineError> {
387 let schema = Arc::new(create_summary_schema());
388 let delta_table = build_or_create_summary_table(object_store, schema.clone()).await?;
389 if let Ok(provider) = delta_table.table_provider().await {
392 catalog.swap(SUMMARY_TABLE_NAME, provider);
393 } else {
394 info!("Empty summary table at init — deferring catalog registration until first write");
395 }
396
397 let control = ControlTableEngine::new(object_store, get_pod_id()).await?;
398
399 Ok(TraceSummaryDBEngine {
400 schema,
401 table: Arc::new(AsyncRwLock::new(delta_table)),
402 ctx,
403 catalog,
404 control,
405 })
406 }
407
408 fn build_batch(
409 &self,
410 records: Vec<TraceSummaryRecord>,
411 ) -> Result<RecordBatch, TraceEngineError> {
412 let mut builder = TraceSummaryBatchBuilder::new(self.schema.clone(), records.len());
413 for rec in &records {
414 builder.append(rec)?;
415 }
416 builder.finish()
417 }
418
419 async fn write_records(
420 &self,
421 records: Vec<TraceSummaryRecord>,
422 ) -> Result<(), TraceEngineError> {
423 let count = records.len();
424 info!("Writing {} trace summaries", count);
425 let batch = self.build_batch(records)?;
426
427 let mut table_guard = self.table.write().await;
428 let current_table = table_guard.clone();
437 let updated_table = current_table
438 .write(vec![batch])
439 .with_save_mode(deltalake::protocol::SaveMode::Append)
440 .with_partition_columns(vec![PARTITION_DATE_COL.to_string()])
441 .await?;
442
443 let new_provider = updated_table.table_provider().await?;
444 self.catalog.swap(SUMMARY_TABLE_NAME, new_provider);
446 updated_table.update_datafusion_session(&self.ctx.state())?;
447
448 *table_guard = updated_table;
449 info!("Summary table updated with {} records", count);
450 Ok(())
451 }
452
453 async fn optimize_table(&self) -> Result<(), TraceEngineError> {
454 let mut table_guard = self.table.write().await;
455 let (updated_table, _metrics) = table_guard
456 .clone()
457 .optimize()
458 .with_target_size(std::num::NonZero::new(128 * 1024 * 1024).unwrap())
459 .with_type(OptimizeType::ZOrder(vec![
460 START_TIME_COL.to_string(),
461 SERVICE_NAME_COL.to_string(),
462 ]))
463 .await?;
464
465 self.catalog
466 .swap(SUMMARY_TABLE_NAME, updated_table.table_provider().await?);
467 updated_table.update_datafusion_session(&self.ctx.state())?;
468 *table_guard = updated_table;
469 Ok(())
470 }
471
472 async fn vacuum_table(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
473 let mut table_guard = self.table.write().await;
474 let (updated_table, _metrics) = table_guard
475 .clone()
476 .vacuum()
477 .with_retention_period(chrono::Duration::hours(retention_hours as i64))
478 .with_enforce_retention_duration(false)
479 .await?;
480
481 self.catalog
482 .swap(SUMMARY_TABLE_NAME, updated_table.table_provider().await?);
483 updated_table.update_datafusion_session(&self.ctx.state())?;
484 *table_guard = updated_table;
485 Ok(())
486 }
487
488 async fn refresh_table(&self) -> Result<(), TraceEngineError> {
494 let mut table_guard = self.table.write().await;
495 let current_version = table_guard.version();
496
497 let mut refreshed = table_guard.clone();
498 match refreshed.update_incremental(None).await {
499 Ok(_) => {
500 if refreshed.version() > current_version {
501 info!(
502 "Summary table refreshed: v{:?} → v{:?}",
503 current_version,
504 refreshed.version()
505 );
506 let new_provider = refreshed.table_provider().await?;
507 self.catalog.swap(SUMMARY_TABLE_NAME, new_provider);
509 refreshed.update_datafusion_session(&self.ctx.state())?;
510 *table_guard = refreshed;
511 }
512 }
513 Err(e) => {
514 debug!("Summary table refresh skipped: {}", e);
517 }
518 }
519 Ok(())
520 }
521
522 async fn try_run_optimize(&self, interval_hours: u64) {
524 match self.control.try_claim_task(TASK_SUMMARY_OPTIMIZE).await {
525 Ok(true) => match self.optimize_table().await {
526 Ok(()) => {
527 if let Err(e) = self.vacuum_table(0).await {
528 error!("Post-optimize vacuum failed: {}", e);
529 }
530
531 let _ = self
532 .control
533 .release_task(
534 TASK_SUMMARY_OPTIMIZE,
535 chrono::Duration::hours(interval_hours as i64),
536 )
537 .await;
538 }
539 Err(e) => {
540 error!("Summary optimize failed: {}", e);
541 let _ = self
542 .control
543 .release_task_on_failure(TASK_SUMMARY_OPTIMIZE)
544 .await;
545 }
546 },
547 Ok(false) => { }
548 Err(e) => error!("Summary optimize claim check failed: {}", e),
549 }
550 }
551
552 #[instrument(skip_all, name = "summary_engine_actor")]
553 pub fn start_actor(
554 self,
555 compaction_interval_hours: u64,
556 refresh_interval_secs: u64,
557 ) -> (
558 mpsc::Sender<SummaryTableCommand>,
559 tokio::task::JoinHandle<()>,
560 ) {
561 let (tx, mut rx) = mpsc::channel::<SummaryTableCommand>(100);
562
563 let handle = tokio::spawn(async move {
564 info!(refresh_interval_secs, "TraceSummaryDBEngine actor started");
565
566 let mut scheduler_ticker = interval(Duration::from_secs(5 * 60));
568 scheduler_ticker.tick().await; let mut refresh_ticker = interval(Duration::from_secs(refresh_interval_secs.max(1)));
574 refresh_ticker.tick().await; loop {
577 tokio::select! {
578 Some(cmd) = rx.recv() => {
579 match cmd {
580 SummaryTableCommand::Write { records, respond_to } => {
581 let result = self.write_records(records).await;
582 if let Err(ref e) = result {
583 error!("Summary write failed: {}", e);
584 }
585 let _ = respond_to.send(result);
586 }
587 SummaryTableCommand::Optimize { respond_to } => {
588 let _ = respond_to.send(self.optimize_table().await);
592 if let Err(e) = self.vacuum_table(0).await {
593 error!("Post-optimize vacuum failed: {}", e);
594 }
595 }
596 SummaryTableCommand::Vacuum { retention_hours, respond_to } => {
597 let _ = respond_to.send(self.vacuum_table(retention_hours).await);
598 }
599 SummaryTableCommand::Shutdown => {
600 info!("TraceSummaryDBEngine actor shutting down");
601 break;
602 }
603 }
604 }
605 _ = scheduler_ticker.tick() => {
606 self.try_run_optimize(compaction_interval_hours).await;
607 }
608 _ = refresh_ticker.tick() => {
609 if let Err(e) = self.refresh_table().await {
610 error!("Summary table refresh failed: {}", e);
611 }
612 }
613 }
614 }
615 });
616
617 (tx, handle)
618 }
619}
620
621pub struct TraceSummaryService {
624 engine_tx: mpsc::Sender<SummaryTableCommand>,
625 engine_handle: tokio::task::JoinHandle<()>,
626 pub query_service: TraceSummaryQueries,
627}
628
629impl TraceSummaryService {
630 pub async fn new(
631 object_store: &ObjectStore,
632 compaction_interval_hours: u64,
633 ctx: Arc<SessionContext>,
634 catalog: Arc<TraceCatalogProvider>,
635 refresh_interval_secs: u64,
636 ) -> Result<Self, TraceEngineError> {
637 let engine = TraceSummaryDBEngine::new(object_store, ctx, catalog).await?;
638 let engine_ctx = engine.ctx.clone();
639 let (engine_tx, engine_handle) =
640 engine.start_actor(compaction_interval_hours, refresh_interval_secs);
641
642 Ok(TraceSummaryService {
643 engine_tx,
644 engine_handle,
645 query_service: TraceSummaryQueries::new(engine_ctx),
646 })
647 }
648
649 pub async fn write_summaries(
651 &self,
652 records: Vec<TraceSummaryRecord>,
653 ) -> Result<(), TraceEngineError> {
654 let (tx, rx) = oneshot::channel();
655 self.engine_tx
656 .send(SummaryTableCommand::Write {
657 records,
658 respond_to: tx,
659 })
660 .await
661 .map_err(|_| TraceEngineError::ChannelClosed)?;
662 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
663 }
664
665 pub async fn optimize(&self) -> Result<(), TraceEngineError> {
666 let (tx, rx) = oneshot::channel();
667 self.engine_tx
668 .send(SummaryTableCommand::Optimize { respond_to: tx })
669 .await
670 .map_err(|_| TraceEngineError::ChannelClosed)?;
671 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
672 }
673
674 pub async fn vacuum(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
675 let (tx, rx) = oneshot::channel();
676 self.engine_tx
677 .send(SummaryTableCommand::Vacuum {
678 retention_hours,
679 respond_to: tx,
680 })
681 .await
682 .map_err(|_| TraceEngineError::ChannelClosed)?;
683 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
684 }
685
686 pub async fn signal_shutdown(&self) {
688 info!("TraceSummaryService signaling shutdown");
689 let _ = self.engine_tx.send(SummaryTableCommand::Shutdown).await;
690 }
691
692 pub async fn shutdown(self) -> Result<(), TraceEngineError> {
693 info!("TraceSummaryService shutting down");
694 self.engine_tx
695 .send(SummaryTableCommand::Shutdown)
696 .await
697 .map_err(|_| TraceEngineError::ChannelClosed)?;
698 if let Err(e) = self.engine_handle.await {
699 error!("Summary engine handle error: {}", e);
700 }
701 info!("TraceSummaryService shutdown complete");
702 Ok(())
703 }
704}
705
706pub struct TraceSummaryQueries {
709 ctx: Arc<SessionContext>,
710}
711
712impl TraceSummaryQueries {
713 pub fn new(ctx: Arc<SessionContext>) -> Self {
714 Self { ctx }
715 }
716
717 pub async fn get_paginated_traces(
729 &self,
730 filters: &TraceFilters,
731 ) -> Result<TracePaginationResponse, TraceEngineError> {
732 let limit = filters.limit.unwrap_or(50) as usize;
733 let direction = filters.direction.as_deref().unwrap_or("next");
734
735 use crate::parquet::tracing::queries::{date_lit, ts_lit};
737 use datafusion::functions_aggregate::expr_fn::{array_agg, first_value, max, min, sum};
738 use datafusion::functions_nested::set_ops::array_distinct;
739
740 let mut df = self.ctx.table(SUMMARY_TABLE_NAME).await?;
741
742 if let Some(start) = filters.start_time {
745 df = df.filter(col(PARTITION_DATE_COL).gt_eq(date_lit(&start)))?;
746 df = df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start)))?;
747 }
748 if let Some(end) = filters.end_time {
749 df = df.filter(col(PARTITION_DATE_COL).lt_eq(date_lit(&end)))?;
750 df = df.filter(col(START_TIME_COL).lt(ts_lit(&end)))?;
751 }
752
753 let by_span_end: Vec<SortExpr> = vec![
756 col(SPAN_COUNT_COL).sort(false, false),
757 col(END_TIME_COL).sort(false, false),
758 ];
759 let by_status_span: Vec<SortExpr> = vec![
761 col(STATUS_CODE_COL).sort(false, false),
762 col(SPAN_COUNT_COL).sort(false, false),
763 ];
764
765 let mut df = df
776 .aggregate(
777 vec![col(TRACE_ID_COL)],
778 vec![
779 min(col(START_TIME_COL)).alias(START_TIME_COL),
780 max(col(END_TIME_COL)).alias(END_TIME_COL),
781 max(df_cast(col(END_TIME_COL), DataType::Int64)).alias("_max_end_us"),
782 min(df_cast(col(START_TIME_COL), DataType::Int64)).alias("_min_start_us"),
783 max(col(STATUS_CODE_COL)).alias(STATUS_CODE_COL),
784 sum(col(SPAN_COUNT_COL)).alias(SPAN_COUNT_COL),
785 sum(col(ERROR_COUNT_COL)).alias(ERROR_COUNT_COL),
786 first_value(col(SERVICE_NAME_COL), by_span_end.clone()).alias(SERVICE_NAME_COL),
787 first_value(col(SCOPE_NAME_COL), by_span_end.clone()).alias(SCOPE_NAME_COL),
788 first_value(col(SCOPE_VERSION_COL), by_span_end.clone())
789 .alias(SCOPE_VERSION_COL),
790 first_value(col(ROOT_OPERATION_COL), by_span_end.clone())
791 .alias(ROOT_OPERATION_COL),
792 first_value(col(STATUS_MESSAGE_COL), by_status_span).alias(STATUS_MESSAGE_COL),
793 first_value(col(RESOURCE_ATTRIBUTES_COL), by_span_end)
794 .alias(RESOURCE_ATTRIBUTES_COL),
795 array_agg(col(ENTITY_IDS_COL)).alias("_entity_ids_raw"),
796 array_agg(col(QUEUE_IDS_COL)).alias("_queue_ids_raw"),
797 ],
798 )?
799 .with_column(
801 DURATION_MS_COL,
802 (col("_max_end_us") - col("_min_start_us")) / lit(1000i64),
803 )?
804 .with_column(
805 ENTITY_IDS_COL,
806 array_distinct(flatten(col("_entity_ids_raw"))),
807 )?
808 .with_column(
809 QUEUE_IDS_COL,
810 array_distinct(flatten(col("_queue_ids_raw"))),
811 )?
812 .drop_columns(&[
813 "_max_end_us",
814 "_min_start_us",
815 "_entity_ids_raw",
816 "_queue_ids_raw",
817 ])?;
818
819 if let Some(ref svc) = filters.service_name {
821 df = df.filter(col(SERVICE_NAME_COL).eq(lit(svc.as_str())))?;
822 }
823 match filters.has_errors {
824 Some(true) => {
825 df = df.filter(col(ERROR_COUNT_COL).gt(lit(0i64)))?;
826 }
827 Some(false) => {
828 df = df.filter(col(ERROR_COUNT_COL).eq(lit(0i64)))?;
829 }
830 None => {}
831 }
832 if let Some(sc) = filters.status_code {
833 df = df.filter(col(STATUS_CODE_COL).eq(lit(sc)))?;
834 }
835
836 if let Some(ref uid) = filters.entity_uid {
838 df = df.filter(datafusion::functions_nested::expr_fn::array_has(
839 col(ENTITY_IDS_COL),
840 lit(uid.as_str()),
841 ))?;
842 }
843
844 if let Some(ref uid) = filters.queue_uid {
846 df = df.filter(datafusion::functions_nested::expr_fn::array_has(
847 col(QUEUE_IDS_COL),
848 lit(uid.as_str()),
849 ))?;
850 }
851
852 if let Some(ref ids) = filters.trace_ids {
854 if !ids.is_empty() {
855 let binary_ids: Vec<Expr> = ids
856 .iter()
857 .filter_map(|hex| TraceId::hex_to_bytes(hex).ok())
858 .map(|b| lit(ScalarValue::Binary(Some(b))))
859 .collect();
860 if !binary_ids.is_empty() {
861 df = df.filter(col(TRACE_ID_COL).in_list(binary_ids, false))?;
862 }
863 }
864 }
865
866 if let (Some(cursor_time), Some(ref cursor_id)) =
870 (filters.cursor_start_time, &filters.cursor_trace_id)
871 {
872 if let Ok(cursor_bytes) = TraceId::hex_to_bytes(cursor_id) {
873 let cursor_ts = lit(ScalarValue::TimestampMicrosecond(
874 Some(cursor_time.timestamp_micros()),
875 Some("UTC".into()),
876 ));
877 let cursor_tid = lit(ScalarValue::Binary(Some(cursor_bytes)));
878 let cursor_expr = if direction == "previous" {
879 col(START_TIME_COL)
880 .gt(cursor_ts.clone())
881 .or(col(START_TIME_COL)
882 .eq(cursor_ts)
883 .and(col(TRACE_ID_COL).gt(cursor_tid)))
884 } else {
885 col(START_TIME_COL)
886 .lt(cursor_ts.clone())
887 .or(col(START_TIME_COL)
888 .eq(cursor_ts)
889 .and(col(TRACE_ID_COL).lt(cursor_tid)))
890 };
891 df = df.filter(cursor_expr)?;
892 }
893 }
894
895 if let Some(ref attr_filters) = filters.attribute_filters {
901 if !attr_filters.is_empty() {
902 let mut spans_df = self.ctx.table("trace_spans").await?.select_columns(&[
903 TRACE_ID_COL,
904 START_TIME_COL,
905 SEARCH_BLOB_COL,
906 ])?;
907
908 if let Some(start) = filters.start_time {
910 spans_df = spans_df.filter(col(START_TIME_COL).gt_eq(lit(
911 ScalarValue::TimestampMicrosecond(
912 Some(start.timestamp_micros()),
913 Some("UTC".into()),
914 ),
915 )))?;
916 }
917 if let Some(end) = filters.end_time {
918 spans_df = spans_df.filter(col(START_TIME_COL).lt(lit(
919 ScalarValue::TimestampMicrosecond(
920 Some(end.timestamp_micros()),
921 Some("UTC".into()),
922 ),
923 )))?;
924 }
925
926 let mut attr_expr: Option<Expr> = None;
930 for f in attr_filters {
931 let pattern = crate::parquet::tracing::queries::normalize_attr_filter(f);
932 let cond = match_attr_expr(col(SEARCH_BLOB_COL), lit(pattern));
933 attr_expr = Some(match attr_expr {
934 None => cond,
935 Some(e) => e.or(cond),
936 });
937 }
938 if let Some(expr) = attr_expr {
939 spans_df = spans_df.filter(expr)?;
940 }
941
942 let span_batches = spans_df.select_columns(&[TRACE_ID_COL])?.collect().await?;
945 let mut seen_ids: std::collections::HashSet<Vec<u8>> =
946 std::collections::HashSet::new();
947 let mut binary_ids: Vec<Expr> = Vec::new();
948 for batch in &span_batches {
949 if let Some(col_ref) = batch.column_by_name(TRACE_ID_COL) {
952 let casted = compute::cast(col_ref, &DataType::Binary)?;
953 let col_arr =
954 casted
955 .as_any()
956 .downcast_ref::<BinaryArray>()
957 .ok_or_else(|| {
958 TraceEngineError::DowncastError("trace_id to BinaryArray")
959 })?;
960 for i in 0..batch.num_rows() {
961 let id_bytes = col_arr.value(i).to_vec();
962 if seen_ids.insert(id_bytes.clone()) {
963 binary_ids.push(lit(ScalarValue::Binary(Some(id_bytes))));
964 }
965 }
966 }
967 }
968
969 if !binary_ids.is_empty() {
970 df = df.filter(col(TRACE_ID_COL).in_list(binary_ids, false))?;
971 } else {
972 df = df.filter(lit(false))?;
974 }
975 }
976 }
977
978 df = if direction == "previous" {
982 df.sort(vec![
983 col(START_TIME_COL).sort(true, true),
984 col(TRACE_ID_COL).sort(true, true),
985 ])?
986 } else {
987 df.sort(vec![
988 col(START_TIME_COL).sort(false, false),
989 col(TRACE_ID_COL).sort(false, false),
990 ])?
991 };
992
993 df = df.limit(0, Some(limit + 1))?;
995
996 let batches = df.collect().await?;
997 let mut items = batches_to_trace_list_items(batches)?;
998
999 let has_more = items.len() > limit;
1000 if has_more {
1001 items.pop(); }
1003
1004 let (has_next, next_cursor, has_previous, previous_cursor) = match direction {
1015 "next" => {
1016 let next_cursor = if has_more {
1017 items.last().map(|item| TraceCursor {
1018 start_time: item.start_time,
1019 trace_id: item.trace_id.clone(),
1020 })
1021 } else {
1022 None
1023 };
1024
1025 let previous_cursor = items.first().map(|item| TraceCursor {
1026 start_time: item.start_time,
1027 trace_id: item.trace_id.clone(),
1028 });
1029
1030 (
1031 has_more,
1032 next_cursor,
1033 filters.cursor_start_time.is_some(),
1034 previous_cursor,
1035 )
1036 }
1037 "previous" => {
1038 let previous_cursor = if has_more {
1043 items.last().map(|item| TraceCursor {
1044 start_time: item.start_time,
1045 trace_id: item.trace_id.clone(),
1046 })
1047 } else {
1048 None
1049 };
1050
1051 let next_cursor = items.first().map(|item| TraceCursor {
1055 start_time: item.start_time,
1056 trace_id: item.trace_id.clone(),
1057 });
1058
1059 (
1060 filters.cursor_start_time.is_some(),
1061 next_cursor,
1062 has_more,
1063 previous_cursor,
1064 )
1065 }
1066 _ => (false, None, false, None),
1067 };
1068
1069 Ok(TracePaginationResponse {
1070 items,
1071 has_next,
1072 next_cursor,
1073 has_previous,
1074 previous_cursor,
1075 })
1076 }
1077}
1078
1079fn extract_map_attributes(map_array: &MapArray, row_idx: usize) -> Vec<Attribute> {
1083 if map_array.is_null(row_idx) {
1084 return Vec::new();
1085 }
1086 let entry = map_array.value(row_idx);
1087 let Some(struct_array) = entry.as_any().downcast_ref::<StructArray>() else {
1088 tracing::warn!("extract_map_attributes: failed to downcast to StructArray");
1089 return Vec::new();
1090 };
1091 let Some(keys_arr) = compute::cast(struct_array.column(0).as_ref(), &DataType::Utf8).ok()
1092 else {
1093 tracing::warn!("extract_map_attributes: failed to cast keys to Utf8");
1094 return Vec::new();
1095 };
1096 let Some(keys) = keys_arr.as_any().downcast_ref::<StringArray>() else {
1097 tracing::warn!("extract_map_attributes: failed to downcast keys to StringArray");
1098 return Vec::new();
1099 };
1100 let Some(values_arr) = compute::cast(struct_array.column(1).as_ref(), &DataType::Utf8).ok()
1101 else {
1102 tracing::warn!("extract_map_attributes: failed to cast values to Utf8");
1103 return Vec::new();
1104 };
1105 let Some(values) = values_arr.as_any().downcast_ref::<StringArray>() else {
1106 tracing::warn!("extract_map_attributes: failed to downcast values to StringArray");
1107 return Vec::new();
1108 };
1109
1110 (0..struct_array.len())
1111 .map(|i| Attribute {
1112 key: keys.value(i).to_string(),
1113 value: serde_json::from_str(values.value(i)).unwrap_or(serde_json::Value::Null),
1114 })
1115 .collect()
1116}
1117
1118fn extract_list_strings(list: Option<&ListArray>, row_idx: usize) -> Vec<String> {
1120 let Some(list) = list else {
1121 return Vec::new();
1122 };
1123 if list.is_null(row_idx) {
1124 return Vec::new();
1125 }
1126 let inner = list.value(row_idx);
1127 let str_arr = compute::cast(&inner, &DataType::Utf8)
1128 .ok()
1129 .and_then(|a| a.as_any().downcast_ref::<StringArray>().cloned());
1130 match str_arr {
1131 Some(arr) => (0..arr.len())
1132 .filter(|i| !arr.is_null(*i))
1133 .map(|i| arr.value(i).to_string())
1134 .collect(),
1135 None => Vec::new(),
1136 }
1137}
1138
1139fn batches_to_trace_list_items(
1140 batches: Vec<RecordBatch>,
1141) -> Result<Vec<TraceListItem>, TraceEngineError> {
1142 let mut items = Vec::new();
1143
1144 for batch in &batches {
1145 let trace_id_col = batch.column_by_name(TRACE_ID_COL).ok_or_else(|| {
1148 TraceEngineError::UnsupportedOperation("missing trace_id column".into())
1149 })?;
1150 let trace_id_binary = compute::cast(trace_id_col, &DataType::Binary)?;
1151 let trace_ids = trace_id_binary
1152 .as_any()
1153 .downcast_ref::<BinaryArray>()
1154 .ok_or_else(|| {
1155 TraceEngineError::UnsupportedOperation("trace_id cast to BinaryArray failed".into())
1156 })?;
1157
1158 let svc_arr = compute::cast(
1161 batch.column_by_name(SERVICE_NAME_COL).ok_or_else(|| {
1162 TraceEngineError::UnsupportedOperation("missing service_name column".into())
1163 })?,
1164 &DataType::Utf8,
1165 )?;
1166 let service_names = svc_arr
1167 .as_any()
1168 .downcast_ref::<StringArray>()
1169 .ok_or_else(|| {
1170 TraceEngineError::UnsupportedOperation(
1171 "service_name cast to StringArray failed".into(),
1172 )
1173 })?;
1174
1175 let scope_arr = compute::cast(
1176 batch.column_by_name(SCOPE_NAME_COL).ok_or_else(|| {
1177 TraceEngineError::UnsupportedOperation("missing scope_name column".into())
1178 })?,
1179 &DataType::Utf8,
1180 )?;
1181 let scope_names = scope_arr
1182 .as_any()
1183 .downcast_ref::<StringArray>()
1184 .ok_or_else(|| {
1185 TraceEngineError::UnsupportedOperation(
1186 "scope_name cast to StringArray failed".into(),
1187 )
1188 })?;
1189
1190 let scopev_arr = compute::cast(
1191 batch.column_by_name(SCOPE_VERSION_COL).ok_or_else(|| {
1192 TraceEngineError::UnsupportedOperation("missing scope_version column".into())
1193 })?,
1194 &DataType::Utf8,
1195 )?;
1196 let scope_versions = scopev_arr
1197 .as_any()
1198 .downcast_ref::<StringArray>()
1199 .ok_or_else(|| {
1200 TraceEngineError::UnsupportedOperation(
1201 "scope_version cast to StringArray failed".into(),
1202 )
1203 })?;
1204
1205 let root_arr = compute::cast(
1206 batch.column_by_name(ROOT_OPERATION_COL).ok_or_else(|| {
1207 TraceEngineError::UnsupportedOperation("missing root_operation column".into())
1208 })?,
1209 &DataType::Utf8,
1210 )?;
1211 let root_operations = root_arr
1212 .as_any()
1213 .downcast_ref::<StringArray>()
1214 .ok_or_else(|| {
1215 TraceEngineError::UnsupportedOperation(
1216 "root_operation cast to StringArray failed".into(),
1217 )
1218 })?;
1219
1220 let sm_arr = compute::cast(
1221 batch.column_by_name(STATUS_MESSAGE_COL).ok_or_else(|| {
1222 TraceEngineError::UnsupportedOperation("missing status_message column".into())
1223 })?,
1224 &DataType::Utf8,
1225 )?;
1226 let status_messages = sm_arr
1227 .as_any()
1228 .downcast_ref::<StringArray>()
1229 .ok_or_else(|| {
1230 TraceEngineError::UnsupportedOperation(
1231 "status_message cast to StringArray failed".into(),
1232 )
1233 })?;
1234
1235 let resource_attrs_map = batch
1236 .column_by_name(RESOURCE_ATTRIBUTES_COL)
1237 .and_then(|c| c.as_any().downcast_ref::<MapArray>())
1238 .ok_or_else(|| {
1239 TraceEngineError::UnsupportedOperation("missing resource_attributes column".into())
1240 })?;
1241
1242 let entity_ids_list = batch
1243 .column_by_name(ENTITY_IDS_COL)
1244 .and_then(|c| c.as_any().downcast_ref::<ListArray>());
1245
1246 let queue_ids_list = batch
1247 .column_by_name(QUEUE_IDS_COL)
1248 .and_then(|c| c.as_any().downcast_ref::<ListArray>());
1249
1250 let start_times = batch
1251 .column_by_name(START_TIME_COL)
1252 .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>())
1253 .ok_or_else(|| {
1254 TraceEngineError::UnsupportedOperation("missing start_time column".into())
1255 })?;
1256
1257 let end_times = batch
1258 .column_by_name(END_TIME_COL)
1259 .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>())
1260 .ok_or_else(|| {
1261 TraceEngineError::UnsupportedOperation("missing end_time column".into())
1262 })?;
1263
1264 let durations = batch
1265 .column_by_name(DURATION_MS_COL)
1266 .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1267 .ok_or_else(|| {
1268 TraceEngineError::UnsupportedOperation("missing duration_ms column".into())
1269 })?;
1270
1271 let status_codes = batch
1272 .column_by_name(STATUS_CODE_COL)
1273 .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
1274 .ok_or_else(|| {
1275 TraceEngineError::UnsupportedOperation("missing status_code column".into())
1276 })?;
1277
1278 let span_counts = batch
1279 .column_by_name(SPAN_COUNT_COL)
1280 .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1281 .ok_or_else(|| {
1282 TraceEngineError::UnsupportedOperation("missing span_count column".into())
1283 })?;
1284
1285 let error_counts = batch
1286 .column_by_name(ERROR_COUNT_COL)
1287 .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1288 .ok_or_else(|| {
1289 TraceEngineError::UnsupportedOperation("missing error_count column".into())
1290 })?;
1291
1292 for i in 0..batch.num_rows() {
1293 let trace_id_hex = hex::encode(trace_ids.value(i));
1294
1295 let start_time = micros_to_datetime(start_times.value(i))?;
1296 let end_time = if end_times.is_null(i) {
1297 None
1298 } else {
1299 Some(micros_to_datetime(end_times.value(i))?)
1300 };
1301 let duration_ms = if durations.is_null(i) {
1302 None
1303 } else {
1304 Some(durations.value(i))
1305 };
1306 let error_count = error_counts.value(i);
1307
1308 let resource_attributes = extract_map_attributes(resource_attrs_map, i);
1309
1310 let entity_ids = extract_list_strings(entity_ids_list, i);
1311 let queue_ids = extract_list_strings(queue_ids_list, i);
1312
1313 items.push(TraceListItem {
1314 trace_id: trace_id_hex,
1315 service_name: service_names.value(i).to_string(),
1316 scope_name: scope_names.value(i).to_string(),
1317 scope_version: scope_versions.value(i).to_string(),
1318 root_operation: root_operations.value(i).to_string(),
1319 start_time,
1320 end_time,
1321 duration_ms,
1322 status_code: status_codes.value(i),
1323 status_message: if status_messages.is_null(i) {
1324 None
1325 } else {
1326 Some(status_messages.value(i).to_string())
1327 },
1328 span_count: span_counts.value(i),
1329 has_errors: error_count > 0,
1330 error_count,
1331 resource_attributes,
1332 entity_ids,
1333 queue_ids,
1334 });
1335 }
1336 }
1337
1338 Ok(items)
1339}
1340
1341fn micros_to_datetime(micros: i64) -> Result<DateTime<Utc>, TraceEngineError> {
1342 DateTime::from_timestamp_micros(micros).ok_or(TraceEngineError::InvalidTimestamp(
1343 "out-of-range microsecond timestamp",
1344 ))
1345}
1346
1347#[cfg(test)]
1348mod tests {
1349 use super::*;
1350 use crate::storage::ObjectStore;
1351 use scouter_settings::ObjectStorageSettings;
1352 use scouter_types::sql::TraceFilters;
1353 use scouter_types::{Attribute, SpanId, TraceId, TraceSpanRecord};
1354 use tracing_subscriber;
1355
1356 fn cleanup() {
1357 let _ = tracing_subscriber::fmt()
1358 .with_max_level(tracing::Level::INFO)
1359 .try_init();
1360
1361 let storage_settings = ObjectStorageSettings::default();
1362 let current_dir = std::env::current_dir().unwrap();
1363 let storage_path = current_dir.join(storage_settings.storage_root());
1364 if storage_path.exists() {
1365 let _ = std::fs::remove_dir_all(storage_path);
1366 }
1367 }
1368
1369 fn make_test_object_store(storage_settings: &ObjectStorageSettings) -> ObjectStore {
1370 ObjectStore::new(storage_settings).unwrap()
1371 }
1372
1373 fn make_test_ctx(object_store: &ObjectStore) -> Arc<SessionContext> {
1376 Arc::new(
1377 object_store
1378 .get_session_with_catalog(
1379 crate::parquet::tracing::engine::TRACE_CATALOG_NAME,
1380 "default",
1381 )
1382 .unwrap(),
1383 )
1384 }
1385
1386 fn make_test_catalog(ctx: &Arc<SessionContext>) -> Arc<TraceCatalogProvider> {
1389 use datafusion::catalog::CatalogProvider;
1390 let catalog = Arc::new(TraceCatalogProvider::new());
1391 ctx.register_catalog(
1392 crate::parquet::tracing::engine::TRACE_CATALOG_NAME,
1393 Arc::clone(&catalog) as Arc<dyn CatalogProvider>,
1394 );
1395 catalog
1396 }
1397
1398 fn make_summary(
1399 trace_id_bytes: [u8; 16],
1400 service_name: &str,
1401 error_count: i64,
1402 resource_attributes: Vec<Attribute>,
1403 ) -> TraceSummaryRecord {
1404 let now = Utc::now();
1405 TraceSummaryRecord {
1406 trace_id: TraceId::from_bytes(trace_id_bytes),
1407 service_name: service_name.to_string(),
1408 scope_name: "test.scope".to_string(),
1409 scope_version: String::new(),
1410 root_operation: "root_op".to_string(),
1411 start_time: now,
1412 end_time: Some(now + chrono::Duration::milliseconds(200)),
1413 status_code: if error_count > 0 { 2 } else { 0 },
1414 status_message: if error_count > 0 {
1415 "Internal Server Error".to_string()
1416 } else {
1417 "OK".to_string()
1418 },
1419 span_count: 3,
1420 error_count,
1421 resource_attributes,
1422 entity_ids: vec![],
1423 queue_ids: vec![],
1424 }
1425 }
1426
1427 #[tokio::test]
1429 async fn test_summary_write_and_paginate_basic() -> Result<(), TraceEngineError> {
1430 cleanup();
1431
1432 let storage_settings = ObjectStorageSettings::default();
1433 let object_store = make_test_object_store(&storage_settings);
1434 let ctx = make_test_ctx(&object_store);
1435 let catalog = make_test_catalog(&ctx);
1436 let service = TraceSummaryService::new(&object_store, 24, ctx, catalog, 10).await?;
1437
1438 let s1 = make_summary([1u8; 16], "svc_a", 0, vec![]);
1439 let s2 = make_summary([2u8; 16], "svc_b", 0, vec![]);
1440 service.write_summaries(vec![s1, s2]).await?;
1441 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1442
1443 let start = Utc::now() - chrono::Duration::hours(1);
1444 let end = Utc::now() + chrono::Duration::hours(1);
1445 let filters = TraceFilters {
1446 service_name: None,
1447 has_errors: None,
1448 status_code: None,
1449 start_time: Some(start),
1450 end_time: Some(end),
1451 limit: Some(25),
1452 cursor_start_time: None,
1453 cursor_trace_id: None,
1454 direction: None,
1455 attribute_filters: None,
1456 trace_ids: None,
1457 entity_uid: None,
1458 queue_uid: None,
1459 };
1460
1461 let response = service.query_service.get_paginated_traces(&filters).await?;
1462 assert!(
1463 response.items.len() >= 2,
1464 "Expected at least 2 items, got {}",
1465 response.items.len()
1466 );
1467
1468 service.shutdown().await?;
1469 cleanup();
1470 Ok(())
1471 }
1472
1473 #[tokio::test]
1475 async fn test_summary_has_errors_filter() -> Result<(), TraceEngineError> {
1476 cleanup();
1477
1478 let storage_settings = ObjectStorageSettings::default();
1479 let object_store = make_test_object_store(&storage_settings);
1480 let ctx = make_test_ctx(&object_store);
1481 let catalog = make_test_catalog(&ctx);
1482 let service = TraceSummaryService::new(&object_store, 24, ctx, catalog, 10).await?;
1483
1484 let ok_summary = make_summary([3u8; 16], "svc", 0, vec![]);
1485 let err_summary = make_summary([4u8; 16], "svc", 2, vec![]);
1486 service
1487 .write_summaries(vec![ok_summary, err_summary])
1488 .await?;
1489 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1490
1491 let start = Utc::now() - chrono::Duration::hours(1);
1492 let end = Utc::now() + chrono::Duration::hours(1);
1493
1494 let base_filters = TraceFilters {
1495 service_name: None,
1496 has_errors: None,
1497 status_code: None,
1498 start_time: Some(start),
1499 end_time: Some(end),
1500 limit: Some(25),
1501 cursor_start_time: None,
1502 cursor_trace_id: None,
1503 direction: None,
1504 attribute_filters: None,
1505 trace_ids: None,
1506 entity_uid: None,
1507 queue_uid: None,
1508 };
1509
1510 let mut filters_err = base_filters.clone();
1512 filters_err.has_errors = Some(true);
1513 let errors_only = service
1514 .query_service
1515 .get_paginated_traces(&filters_err)
1516 .await?;
1517 for item in &errors_only.items {
1518 assert!(
1519 item.error_count > 0,
1520 "Expected error trace, got: {:?}",
1521 item
1522 );
1523 }
1524 assert!(
1525 !errors_only.items.is_empty(),
1526 "Expected at least one error trace"
1527 );
1528
1529 let mut filters_ok = base_filters.clone();
1531 filters_ok.has_errors = Some(false);
1532 let no_errors = service
1533 .query_service
1534 .get_paginated_traces(&filters_ok)
1535 .await?;
1536 for item in &no_errors.items {
1537 assert_eq!(
1538 item.error_count, 0,
1539 "Expected non-error trace, got error_count={}",
1540 item.error_count
1541 );
1542 }
1543 assert!(
1544 !no_errors.items.is_empty(),
1545 "Expected at least one non-error trace"
1546 );
1547
1548 service.shutdown().await?;
1549 cleanup();
1550 Ok(())
1551 }
1552
1553 #[tokio::test]
1555 async fn test_summary_service_name_filter() -> Result<(), TraceEngineError> {
1556 cleanup();
1557
1558 let storage_settings = ObjectStorageSettings::default();
1559 let object_store = make_test_object_store(&storage_settings);
1560 let ctx = make_test_ctx(&object_store);
1561 let catalog = make_test_catalog(&ctx);
1562 let service = TraceSummaryService::new(&object_store, 24, ctx, catalog, 10).await?;
1563
1564 let s_alpha = make_summary([5u8; 16], "alpha_service", 0, vec![]);
1565 let s_beta = make_summary([6u8; 16], "beta_service", 0, vec![]);
1566 service.write_summaries(vec![s_alpha, s_beta]).await?;
1567 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1568
1569 let start = Utc::now() - chrono::Duration::hours(1);
1570 let end = Utc::now() + chrono::Duration::hours(1);
1571 let filters = TraceFilters {
1572 service_name: Some("alpha_service".to_string()),
1573 has_errors: None,
1574 status_code: None,
1575 start_time: Some(start),
1576 end_time: Some(end),
1577 limit: Some(25),
1578 cursor_start_time: None,
1579 cursor_trace_id: None,
1580 direction: None,
1581 attribute_filters: None,
1582 trace_ids: None,
1583 entity_uid: None,
1584 queue_uid: None,
1585 };
1586
1587 let response = service.query_service.get_paginated_traces(&filters).await?;
1588 assert!(
1589 !response.items.is_empty(),
1590 "Expected results for alpha_service"
1591 );
1592 for item in &response.items {
1593 assert_eq!(
1594 item.service_name, "alpha_service",
1595 "Expected only alpha_service items, got: {}",
1596 item.service_name
1597 );
1598 }
1599
1600 service.shutdown().await?;
1601 cleanup();
1602 Ok(())
1603 }
1604
1605 #[tokio::test]
1607 async fn test_summary_trace_ids_filter() -> Result<(), TraceEngineError> {
1608 cleanup();
1609
1610 let storage_settings = ObjectStorageSettings::default();
1611 let object_store = make_test_object_store(&storage_settings);
1612 let ctx = make_test_ctx(&object_store);
1613 let catalog = make_test_catalog(&ctx);
1614 let service = TraceSummaryService::new(&object_store, 24, ctx, catalog, 10).await?;
1615
1616 let wanted_id = TraceId::from_bytes([7u8; 16]);
1617 let unwanted_id = TraceId::from_bytes([8u8; 16]);
1618
1619 let s1 = make_summary([7u8; 16], "svc", 0, vec![]);
1620 let s2 = make_summary([8u8; 16], "svc", 0, vec![]);
1621 service.write_summaries(vec![s1, s2]).await?;
1622 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1623
1624 let start = Utc::now() - chrono::Duration::hours(1);
1625 let end = Utc::now() + chrono::Duration::hours(1);
1626 let filters = TraceFilters {
1627 service_name: None,
1628 has_errors: None,
1629 status_code: None,
1630 start_time: Some(start),
1631 end_time: Some(end),
1632 limit: Some(25),
1633 cursor_start_time: None,
1634 cursor_trace_id: None,
1635 direction: None,
1636 attribute_filters: None,
1637 trace_ids: Some(vec![wanted_id.to_hex()]),
1638 entity_uid: None,
1639 queue_uid: None,
1640 };
1641
1642 let response = service.query_service.get_paginated_traces(&filters).await?;
1643 assert_eq!(
1644 response.items.len(),
1645 1,
1646 "Expected exactly 1 item from trace_ids filter"
1647 );
1648 assert_eq!(
1649 response.items[0].trace_id,
1650 wanted_id.to_hex(),
1651 "Returned wrong trace_id"
1652 );
1653 assert_ne!(
1654 response.items[0].trace_id,
1655 unwanted_id.to_hex(),
1656 "Should not have returned unwanted trace_id"
1657 );
1658
1659 service.shutdown().await?;
1660 cleanup();
1661 Ok(())
1662 }
1663
1664 #[tokio::test]
1666 async fn test_summary_cursor_pagination() -> Result<(), TraceEngineError> {
1667 cleanup();
1668 let storage_settings = ObjectStorageSettings::default();
1669 let object_store = make_test_object_store(&storage_settings);
1670 let ctx = make_test_ctx(&object_store);
1671 let catalog = make_test_catalog(&ctx);
1672 let service = TraceSummaryService::new(&object_store, 24, ctx, catalog, 10).await?;
1673
1674 let now = Utc::now();
1675 let summaries: Vec<TraceSummaryRecord> = (0u8..100)
1676 .map(|i| {
1677 let mut s = make_summary([i; 16], "svc", 0, vec![]);
1678 s.start_time = now - chrono::Duration::minutes(i as i64);
1679 s
1680 })
1681 .collect();
1682 service.write_summaries(summaries).await?;
1683 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1684
1685 let mut filters = TraceFilters {
1686 start_time: Some(now - chrono::Duration::hours(2)),
1687 end_time: Some(now + chrono::Duration::hours(1)),
1688 limit: Some(50),
1689 ..Default::default()
1690 };
1691
1692 let first = service.query_service.get_paginated_traces(&filters).await?;
1694 assert_eq!(first.items.len(), 50, "first page: 50 items");
1695 assert!(
1696 first.next_cursor.is_some(),
1697 "first page: should have next_cursor"
1698 );
1699
1700 let next_cur = first.next_cursor.clone().unwrap();
1702 filters.cursor_start_time = Some(next_cur.start_time);
1703 filters.cursor_trace_id = Some(next_cur.trace_id.clone());
1704 filters.direction = Some("next".to_string());
1705 let second = service.query_service.get_paginated_traces(&filters).await?;
1706 assert_eq!(second.items.len(), 50, "second page: 50 items");
1707 assert!(
1708 second.items[0].start_time <= next_cur.start_time,
1709 "second page first item must be <= cursor"
1710 );
1711 assert!(second.previous_cursor.is_some());
1712
1713 let prev_cur = second.previous_cursor.unwrap();
1715 filters.cursor_start_time = Some(prev_cur.start_time);
1716 filters.cursor_trace_id = Some(prev_cur.trace_id.clone());
1717 filters.direction = Some("previous".to_string());
1718 let prev = service.query_service.get_paginated_traces(&filters).await?;
1719 assert_eq!(prev.items.len(), 50, "previous page: 50 items");
1720
1721 service.shutdown().await?;
1722 cleanup();
1723 Ok(())
1724 }
1725
1726 #[tokio::test]
1728 async fn test_summary_attribute_filter_via_join() -> Result<(), TraceEngineError> {
1729 use crate::parquet::tracing::service::TraceSpanService;
1730
1731 cleanup();
1732 let storage_settings = ObjectStorageSettings::default();
1733
1734 let span_service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
1736 let shared_ctx = span_service.ctx.clone();
1737
1738 let summary_service = TraceSummaryService::new(
1740 &span_service.object_store,
1741 24,
1742 shared_ctx,
1743 span_service.catalog.clone(),
1744 10,
1745 )
1746 .await?;
1747
1748 let now = Utc::now();
1749 let kafka_trace = TraceId::from_bytes([70u8; 16]);
1750 let plain_trace = TraceId::from_bytes([80u8; 16]);
1751
1752 let kafka_span = make_span_record(
1753 &kafka_trace,
1754 SpanId::from_bytes([70u8; 8]),
1755 "svc",
1756 vec![Attribute {
1757 key: "component".to_string(),
1758 value: serde_json::Value::String("kafka".to_string()),
1759 }],
1760 );
1761 let plain_span =
1762 make_span_record(&plain_trace, SpanId::from_bytes([80u8; 8]), "svc", vec![]);
1763 span_service
1764 .write_spans(vec![kafka_span, plain_span])
1765 .await?;
1766
1767 let mut kafka_summary = make_summary([70u8; 16], "svc", 0, vec![]);
1768 kafka_summary.start_time = now;
1769 let mut plain_summary = make_summary([80u8; 16], "svc", 0, vec![]);
1770 plain_summary.start_time = now;
1771 summary_service
1772 .write_summaries(vec![kafka_summary, plain_summary])
1773 .await?;
1774
1775 tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
1776
1777 let filters = TraceFilters {
1778 start_time: Some(now - chrono::Duration::hours(1)),
1779 end_time: Some(now + chrono::Duration::hours(1)),
1780 attribute_filters: Some(vec!["component:kafka".to_string()]),
1781 limit: Some(25),
1782 ..Default::default()
1783 };
1784
1785 let response = summary_service
1786 .query_service
1787 .get_paginated_traces(&filters)
1788 .await?;
1789
1790 assert!(
1791 !response.items.is_empty(),
1792 "attribute filter must return results"
1793 );
1794 assert!(
1795 response
1796 .items
1797 .iter()
1798 .all(|i| i.trace_id == kafka_trace.to_hex()),
1799 "only kafka trace should appear; got {:?}",
1800 response
1801 .items
1802 .iter()
1803 .map(|i| &i.trace_id)
1804 .collect::<Vec<_>>()
1805 );
1806
1807 span_service.shutdown().await?;
1808 summary_service.shutdown().await?;
1809 cleanup();
1810 Ok(())
1811 }
1812
1813 #[tokio::test]
1816 async fn test_summary_queue_id_filter_and_span_lookup() -> Result<(), TraceEngineError> {
1817 use crate::parquet::tracing::service::TraceSpanService;
1818
1819 cleanup();
1820 let storage_settings = ObjectStorageSettings::default();
1821
1822 let span_service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
1824 let shared_ctx = span_service.ctx.clone();
1825
1826 let summary_service = TraceSummaryService::new(
1828 &span_service.object_store,
1829 24,
1830 shared_ctx,
1831 span_service.catalog.clone(),
1832 10,
1833 )
1834 .await?;
1835
1836 let now = Utc::now();
1837 let queue_trace = TraceId::from_bytes([90u8; 16]);
1838 let plain_trace = TraceId::from_bytes([91u8; 16]);
1839 let target_queue_uid = "queue-record-abc123";
1840
1841 let queue_span = make_span_record(
1843 &queue_trace,
1844 SpanId::from_bytes([90u8; 8]),
1845 "svc_queue",
1846 vec![],
1847 );
1848 let plain_span = make_span_record(
1849 &plain_trace,
1850 SpanId::from_bytes([91u8; 8]),
1851 "svc_queue",
1852 vec![],
1853 );
1854 span_service
1855 .write_spans_direct(vec![queue_span, plain_span])
1856 .await?;
1857
1858 let mut queue_summary = make_summary([90u8; 16], "svc_queue", 0, vec![]);
1860 queue_summary.start_time = now;
1861 queue_summary.queue_ids = vec![target_queue_uid.to_string()];
1862
1863 let mut plain_summary = make_summary([91u8; 16], "svc_queue", 0, vec![]);
1864 plain_summary.start_time = now;
1865 summary_service
1868 .write_summaries(vec![queue_summary, plain_summary])
1869 .await?;
1870
1871 tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
1872
1873 let filters = TraceFilters {
1875 start_time: Some(now - chrono::Duration::hours(1)),
1876 end_time: Some(now + chrono::Duration::hours(1)),
1877 queue_uid: Some(target_queue_uid.to_string()),
1878 limit: Some(25),
1879 ..Default::default()
1880 };
1881
1882 let response = summary_service
1883 .query_service
1884 .get_paginated_traces(&filters)
1885 .await?;
1886
1887 assert!(
1888 !response.items.is_empty(),
1889 "queue_uid filter must return at least one result"
1890 );
1891 assert!(
1892 response
1893 .items
1894 .iter()
1895 .all(|i| i.trace_id == queue_trace.to_hex()),
1896 "only the queue trace should appear; got {:?}",
1897 response
1898 .items
1899 .iter()
1900 .map(|i| &i.trace_id)
1901 .collect::<Vec<_>>()
1902 );
1903
1904 let returned_trace_id =
1906 TraceId::from_hex(&response.items[0].trace_id).expect("trace_id must be valid hex");
1907 let spans = span_service
1908 .query_service
1909 .get_trace_spans(
1910 Some(returned_trace_id.as_bytes()),
1911 None,
1912 Some(&(now - chrono::Duration::hours(1))),
1913 Some(&(now + chrono::Duration::hours(1))),
1914 None,
1915 )
1916 .await?;
1917
1918 assert!(
1919 !spans.is_empty(),
1920 "should find spans for the returned trace_id"
1921 );
1922
1923 span_service.shutdown().await?;
1924 summary_service.shutdown().await?;
1925 cleanup();
1926 Ok(())
1927 }
1928
1929 fn make_span_record(
1931 trace_id: &TraceId,
1932 span_id: SpanId,
1933 service_name: &str,
1934 attributes: Vec<Attribute>,
1935 ) -> TraceSpanRecord {
1936 let now = Utc::now();
1937 TraceSpanRecord {
1938 created_at: now,
1939 trace_id: *trace_id,
1940 span_id,
1941 parent_span_id: None,
1942 flags: 1,
1943 trace_state: String::new(),
1944 scope_name: "test.scope".to_string(),
1945 scope_version: None,
1946 span_name: "op".to_string(),
1947 span_kind: "INTERNAL".to_string(),
1948 start_time: now,
1949 end_time: now + chrono::Duration::milliseconds(100),
1950 duration_ms: 100,
1951 status_code: 0,
1952 status_message: "OK".to_string(),
1953 attributes,
1954 events: vec![],
1955 links: vec![],
1956 label: None,
1957 input: serde_json::Value::Null,
1958 output: serde_json::Value::Null,
1959 service_name: service_name.to_string(),
1960 resource_attributes: vec![],
1961 }
1962 }
1963
1964 #[tokio::test]
1966 async fn test_summary_resource_attributes_roundtrip() -> Result<(), TraceEngineError> {
1967 cleanup();
1968
1969 let storage_settings = ObjectStorageSettings::default();
1970 let object_store = make_test_object_store(&storage_settings);
1971 let ctx = make_test_ctx(&object_store);
1972 let catalog = make_test_catalog(&ctx);
1973 let service = TraceSummaryService::new(&object_store, 24, ctx, catalog, 10).await?;
1974
1975 let attrs = vec![Attribute {
1976 key: "cloud.region".to_string(),
1977 value: serde_json::Value::String("us-east-1".to_string()),
1978 }];
1979 let summary = make_summary([9u8; 16], "svc", 0, attrs.clone());
1980 service.write_summaries(vec![summary]).await?;
1981 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1982
1983 let start = Utc::now() - chrono::Duration::hours(1);
1984 let end = Utc::now() + chrono::Duration::hours(1);
1985 let filters = TraceFilters {
1986 service_name: None,
1987 has_errors: None,
1988 status_code: None,
1989 start_time: Some(start),
1990 end_time: Some(end),
1991 limit: Some(25),
1992 cursor_start_time: None,
1993 cursor_trace_id: None,
1994 direction: None,
1995 attribute_filters: None,
1996 trace_ids: Some(vec![TraceId::from_bytes([9u8; 16]).to_hex()]),
1997 entity_uid: None,
1998 queue_uid: None,
1999 };
2000
2001 let response = service.query_service.get_paginated_traces(&filters).await?;
2002 assert_eq!(response.items.len(), 1, "Expected exactly 1 item");
2003 assert_eq!(
2004 response.items[0].resource_attributes.len(),
2005 1,
2006 "Expected 1 resource attribute"
2007 );
2008 assert_eq!(response.items[0].resource_attributes[0].key, "cloud.region");
2009
2010 service.shutdown().await?;
2011 cleanup();
2012 Ok(())
2013 }
2014
2015 #[tokio::test]
2020 async fn test_summary_write_visibility_across_multiple_writes() -> Result<(), TraceEngineError>
2021 {
2022 cleanup();
2023
2024 let storage_settings = ObjectStorageSettings::default();
2025 let object_store = make_test_object_store(&storage_settings);
2026 let ctx = make_test_ctx(&object_store);
2027 let catalog = make_test_catalog(&ctx);
2028 let service = TraceSummaryService::new(&object_store, 24, ctx, catalog, 10).await?;
2029
2030 let start = Utc::now() - chrono::Duration::hours(1);
2031 let end = Utc::now() + chrono::Duration::hours(1);
2032 let filters = TraceFilters {
2033 start_time: Some(start),
2034 end_time: Some(end),
2035 limit: Some(100),
2036 ..Default::default()
2037 };
2038
2039 let s1 = make_summary([0xA0; 16], "svc_vis", 0, vec![]);
2041 let s2 = make_summary([0xA1; 16], "svc_vis", 0, vec![]);
2042 service.write_summaries(vec![s1, s2]).await?;
2043
2044 let response = service.query_service.get_paginated_traces(&filters).await?;
2045 assert_eq!(
2046 response.items.len(),
2047 2,
2048 "After write #1: expected 2 items, got {}",
2049 response.items.len()
2050 );
2051
2052 let s3 = make_summary([0xA2; 16], "svc_vis", 0, vec![]);
2054 let s4 = make_summary([0xA3; 16], "svc_vis", 0, vec![]);
2055 service.write_summaries(vec![s3, s4]).await?;
2056
2057 let response = service.query_service.get_paginated_traces(&filters).await?;
2058 assert_eq!(
2059 response.items.len(),
2060 4,
2061 "After write #2: expected 4 items, got {} (stale snapshot?)",
2062 response.items.len()
2063 );
2064
2065 let s5 = make_summary([0xA4; 16], "svc_vis", 0, vec![]);
2067 let s6 = make_summary([0xA5; 16], "svc_vis", 0, vec![]);
2068 service.write_summaries(vec![s5, s6]).await?;
2069
2070 let response = service.query_service.get_paginated_traces(&filters).await?;
2071 assert_eq!(
2072 response.items.len(),
2073 6,
2074 "After write #3: expected 6 items, got {} (stale snapshot?)",
2075 response.items.len()
2076 );
2077
2078 service.shutdown().await?;
2079 cleanup();
2080 Ok(())
2081 }
2082
2083 #[tokio::test]
2098 async fn test_distributed_refresh() -> Result<(), TraceEngineError> {
2099 use crate::parquet::tracing::service::TraceSpanService;
2100
2101 let storage_settings = ObjectStorageSettings {
2104 storage_uri: "./scouter_storage_summary_dist".to_string(),
2105 ..ObjectStorageSettings::default()
2106 };
2107 let current_dir = std::env::current_dir().unwrap();
2108 let storage_path = current_dir.join(storage_settings.storage_root());
2109 if storage_path.exists() {
2110 let _ = std::fs::remove_dir_all(&storage_path);
2111 }
2112
2113 let writer_spans = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
2115 let writer = TraceSummaryService::new(
2116 &writer_spans.object_store,
2117 24,
2118 writer_spans.ctx.clone(),
2119 writer_spans.catalog.clone(),
2120 10,
2121 )
2122 .await?;
2123
2124 let reader_spans = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
2126 let reader = TraceSummaryService::new(
2127 &reader_spans.object_store,
2128 24,
2129 reader_spans.ctx.clone(),
2130 reader_spans.catalog.clone(),
2131 1,
2132 )
2133 .await?;
2134
2135 let summary = make_summary([0xDD_u8; 16], "distributed-svc", 0, vec![]);
2136 writer.write_summaries(vec![summary]).await?;
2137
2138 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
2140
2141 let start = Utc::now() - chrono::Duration::hours(1);
2142 let end = Utc::now() + chrono::Duration::hours(1);
2143 let filters = TraceFilters {
2144 service_name: Some("distributed-svc".to_string()),
2145 has_errors: None,
2146 status_code: None,
2147 start_time: Some(start),
2148 end_time: Some(end),
2149 limit: Some(25),
2150 cursor_start_time: None,
2151 cursor_trace_id: None,
2152 direction: None,
2153 attribute_filters: None,
2154 trace_ids: None,
2155 entity_uid: None,
2156 queue_uid: None,
2157 };
2158
2159 let response = reader.query_service.get_paginated_traces(&filters).await?;
2160 assert!(
2161 !response.items.is_empty(),
2162 "Reader pod should see summaries written by writer pod after refresh"
2163 );
2164
2165 writer.shutdown().await?;
2166 reader.shutdown().await?;
2167 writer_spans.shutdown().await?;
2168 reader_spans.shutdown().await?;
2169 if storage_path.exists() {
2170 let _ = std::fs::remove_dir_all(&storage_path);
2171 }
2172 Ok(())
2173 }
2174}