Skip to main content

scouter_dataframe/parquet/tracing/
summary.rs

1use crate::error::TraceEngineError;
2use crate::parquet::control::{get_pod_id, ControlTableEngine};
3use crate::parquet::tracing::traits::{arrow_schema_to_delta, resource_attribute_field};
4use crate::parquet::utils::match_attr_expr;
5use crate::parquet::utils::register_cloud_logstore_factories;
6use crate::storage::ObjectStore;
7use arrow::array::*;
8use arrow::compute;
9use arrow::datatypes::*;
10use arrow_array::Array;
11use arrow_array::RecordBatch;
12use chrono::{DateTime, Datelike, TimeZone, Utc};
13use datafusion::logical_expr::{cast as df_cast, col, lit, SortExpr};
14use datafusion::prelude::*;
15use datafusion::scalar::ScalarValue;
16use deltalake::operations::optimize::OptimizeType;
17use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty};
18use scouter_settings::ObjectStorageSettings;
19use scouter_types::sql::{TraceFilters, TraceListItem};
20use scouter_types::{Attribute, TraceCursor, TraceId, TracePaginationResponse, TraceSummaryRecord};
21use std::sync::Arc;
22use tokio::sync::oneshot;
23use tokio::sync::{mpsc, RwLock as AsyncRwLock};
24use tokio::time::{interval, Duration};
25use tracing::{error, info};
26use url::Url;
27
28/// Days from CE epoch to Unix epoch (1970-01-01).
29/// Equivalent to `NaiveDate::from_ymd_opt(1970, 1, 1).unwrap().num_days_from_ce()`.
30const UNIX_EPOCH_DAYS: i32 = 719_163;
31
32const SUMMARY_TABLE_NAME: &str = "trace_summaries";
33
34/// Control table task name for summary compaction coordination.
35const TASK_SUMMARY_OPTIMIZE: &str = "summary_optimize";
36
37// ── Column name constants ────────────────────────────────────────────────────
38const TRACE_ID_COL: &str = "trace_id";
39const SERVICE_NAME_COL: &str = "service_name";
40const SCOPE_NAME_COL: &str = "scope_name";
41const SCOPE_VERSION_COL: &str = "scope_version";
42const ROOT_OPERATION_COL: &str = "root_operation";
43const START_TIME_COL: &str = "start_time";
44const END_TIME_COL: &str = "end_time";
45const DURATION_MS_COL: &str = "duration_ms";
46const STATUS_CODE_COL: &str = "status_code";
47const STATUS_MESSAGE_COL: &str = "status_message";
48const SPAN_COUNT_COL: &str = "span_count";
49const ERROR_COUNT_COL: &str = "error_count";
50const SEARCH_BLOB_COL: &str = "search_blob";
51
52const RESOURCE_ATTRIBUTES_COL: &str = "resource_attributes";
53const ENTITY_IDS_COL: &str = "entity_ids";
54const QUEUE_IDS_COL: &str = "queue_ids";
55
56const PARTITION_DATE_COL: &str = "partition_date";
57
58// ── Schema ───────────────────────────────────────────────────────────────────
59
60fn create_summary_schema() -> Schema {
61    Schema::new(vec![
62        Field::new(TRACE_ID_COL, DataType::FixedSizeBinary(16), false),
63        Field::new(
64            SERVICE_NAME_COL,
65            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
66            false,
67        ),
68        Field::new(SCOPE_NAME_COL, DataType::Utf8, false),
69        Field::new(SCOPE_VERSION_COL, DataType::Utf8, true),
70        Field::new(ROOT_OPERATION_COL, DataType::Utf8, false),
71        Field::new(
72            START_TIME_COL,
73            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
74            false,
75        ),
76        Field::new(
77            END_TIME_COL,
78            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
79            true,
80        ),
81        Field::new(DURATION_MS_COL, DataType::Int64, true),
82        Field::new(STATUS_CODE_COL, DataType::Int32, false),
83        Field::new(STATUS_MESSAGE_COL, DataType::Utf8, true),
84        Field::new(SPAN_COUNT_COL, DataType::Int64, false),
85        Field::new(ERROR_COUNT_COL, DataType::Int64, false),
86        resource_attribute_field(),
87        Field::new(
88            ENTITY_IDS_COL,
89            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
90            true,
91        ),
92        Field::new(
93            QUEUE_IDS_COL,
94            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
95            true,
96        ),
97        Field::new(PARTITION_DATE_COL, DataType::Date32, false),
98    ])
99}
100
101// ── BatchBuilder ─────────────────────────────────────────────────────────────
102
103struct TraceSummaryBatchBuilder {
104    schema: Arc<Schema>,
105    trace_id: FixedSizeBinaryBuilder,
106    service_name: StringDictionaryBuilder<Int32Type>,
107    scope_name: StringBuilder,
108    scope_version: StringBuilder,
109    root_operation: StringBuilder,
110    start_time: TimestampMicrosecondBuilder,
111    end_time: TimestampMicrosecondBuilder,
112    duration_ms: Int64Builder,
113    status_code: Int32Builder,
114    status_message: StringBuilder,
115    span_count: Int64Builder,
116    error_count: Int64Builder,
117    resource_attributes: MapBuilder<StringBuilder, StringViewBuilder>,
118    entity_ids: ListBuilder<StringBuilder>,
119    queue_ids: ListBuilder<StringBuilder>,
120    partition_date: Date32Builder,
121}
122
123impl TraceSummaryBatchBuilder {
124    fn new(schema: Arc<Schema>, capacity: usize) -> Self {
125        let map_field_names = MapFieldNames {
126            entry: "key_value".to_string(),
127            key: "key".to_string(),
128            value: "value".to_string(),
129        };
130        let resource_attributes = MapBuilder::new(
131            Some(map_field_names),
132            StringBuilder::new(),
133            StringViewBuilder::new(),
134        );
135        Self {
136            schema,
137            trace_id: FixedSizeBinaryBuilder::with_capacity(capacity, 16),
138            service_name: StringDictionaryBuilder::new(),
139            scope_name: StringBuilder::with_capacity(capacity, capacity * 16),
140            scope_version: StringBuilder::with_capacity(capacity, capacity * 8),
141            root_operation: StringBuilder::with_capacity(capacity, capacity * 32),
142            start_time: TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone("UTC"),
143            end_time: TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone("UTC"),
144            duration_ms: Int64Builder::with_capacity(capacity),
145            status_code: Int32Builder::with_capacity(capacity),
146            status_message: StringBuilder::with_capacity(capacity, capacity * 16),
147            span_count: Int64Builder::with_capacity(capacity),
148            error_count: Int64Builder::with_capacity(capacity),
149            resource_attributes,
150            entity_ids: ListBuilder::new(StringBuilder::new()),
151            queue_ids: ListBuilder::new(StringBuilder::new()),
152            partition_date: Date32Builder::with_capacity(capacity),
153        }
154    }
155
156    fn append(&mut self, rec: &TraceSummaryRecord) -> Result<(), TraceEngineError> {
157        self.trace_id.append_value(rec.trace_id.as_bytes())?;
158        self.service_name.append_value(&rec.service_name);
159        self.scope_name.append_value(&rec.scope_name);
160        if rec.scope_version.is_empty() {
161            self.scope_version.append_null();
162        } else {
163            self.scope_version.append_value(&rec.scope_version);
164        }
165        self.root_operation.append_value(&rec.root_operation);
166        self.start_time
167            .append_value(rec.start_time.timestamp_micros());
168        match rec.end_time {
169            Some(end) => self.end_time.append_value(end.timestamp_micros()),
170            None => self.end_time.append_null(),
171        }
172        let duration = rec
173            .end_time
174            .map(|end| (end - rec.start_time).num_milliseconds());
175        match duration {
176            Some(d) => self.duration_ms.append_value(d),
177            None => self.duration_ms.append_null(),
178        }
179        self.status_code.append_value(rec.status_code);
180        if rec.status_message.is_empty() {
181            self.status_message.append_null();
182        } else {
183            self.status_message.append_value(&rec.status_message);
184        }
185        self.span_count.append_value(rec.span_count);
186        self.error_count.append_value(rec.error_count);
187        if rec.resource_attributes.is_empty() {
188            self.resource_attributes.append(false)?; // null map
189        } else {
190            for attr in &rec.resource_attributes {
191                self.resource_attributes.keys().append_value(&attr.key);
192                let value_str =
193                    serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
194                self.resource_attributes.values().append_value(value_str);
195            }
196            self.resource_attributes.append(true)?;
197        }
198        if rec.entity_ids.is_empty() {
199            self.entity_ids.append_null();
200        } else {
201            for id in &rec.entity_ids {
202                self.entity_ids.values().append_value(id);
203            }
204            self.entity_ids.append(true);
205        }
206        if rec.queue_ids.is_empty() {
207            self.queue_ids.append_null();
208        } else {
209            for id in &rec.queue_ids {
210                self.queue_ids.values().append_value(id);
211            }
212            self.queue_ids.append(true);
213        }
214        // Partition key — days since Unix epoch, derived from start_time
215        let days = rec.start_time.date_naive().num_days_from_ce() - UNIX_EPOCH_DAYS;
216        self.partition_date.append_value(days);
217        Ok(())
218    }
219
220    fn finish(mut self) -> Result<RecordBatch, TraceEngineError> {
221        let columns: Vec<Arc<dyn Array>> = vec![
222            Arc::new(self.trace_id.finish()),
223            Arc::new(self.service_name.finish()),
224            Arc::new(self.scope_name.finish()),
225            Arc::new(self.scope_version.finish()),
226            Arc::new(self.root_operation.finish()),
227            Arc::new(self.start_time.finish()),
228            Arc::new(self.end_time.finish()),
229            Arc::new(self.duration_ms.finish()),
230            Arc::new(self.status_code.finish()),
231            Arc::new(self.status_message.finish()),
232            Arc::new(self.span_count.finish()),
233            Arc::new(self.error_count.finish()),
234            Arc::new(self.resource_attributes.finish()),
235            Arc::new(self.entity_ids.finish()),
236            Arc::new(self.queue_ids.finish()),
237            Arc::new(self.partition_date.finish()),
238        ];
239        RecordBatch::try_new(self.schema, columns).map_err(Into::into)
240    }
241}
242
243// ── TableCommand ─────────────────────────────────────────────────────────────
244
245pub enum SummaryTableCommand {
246    Write {
247        records: Vec<TraceSummaryRecord>,
248        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
249    },
250    Optimize {
251        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
252    },
253    Vacuum {
254        retention_hours: u64,
255        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
256    },
257    Shutdown,
258}
259
260async fn build_summary_url(object_store: &ObjectStore) -> Result<Url, TraceEngineError> {
261    let mut base = object_store.get_base_url()?;
262    let mut path = base.path().to_string();
263    if !path.ends_with('/') {
264        path.push('/');
265    }
266    path.push_str(SUMMARY_TABLE_NAME);
267    base.set_path(&path);
268    Ok(base)
269}
270
271async fn create_summary_table(
272    object_store: &ObjectStore,
273    table_url: Url,
274    schema: SchemaRef,
275) -> Result<DeltaTable, TraceEngineError> {
276    info!(
277        "Creating trace summary table [{}://.../{} ]",
278        table_url.scheme(),
279        table_url
280            .path_segments()
281            .and_then(|mut s| s.next_back())
282            .unwrap_or(SUMMARY_TABLE_NAME)
283    );
284    let store = object_store.as_dyn_object_store();
285    let table = DeltaTableBuilder::from_url(table_url.clone())?
286        .with_storage_backend(store, table_url)
287        .build()?;
288    let delta_fields = arrow_schema_to_delta(&schema);
289    table
290        .create()
291        .with_table_name(SUMMARY_TABLE_NAME)
292        .with_columns(delta_fields)
293        .with_partition_columns(vec![PARTITION_DATE_COL.to_string()])
294        // Only collect min/max statistics for non-binary columns.
295        // trace_id (FixedSizeBinary) has no meaningful ordering for file-level pruning.
296        .with_configuration_property(
297            TableProperty::DataSkippingStatsColumns,
298            Some("start_time,end_time,service_name,duration_ms,status_code,span_count,error_count,partition_date"),
299        )
300        .await
301        .map_err(Into::into)
302}
303
304async fn build_or_create_summary_table(
305    object_store: &ObjectStore,
306    schema: SchemaRef,
307) -> Result<DeltaTable, TraceEngineError> {
308    register_cloud_logstore_factories();
309    let table_url = build_summary_url(object_store).await?;
310    info!(
311        "Loading trace summary table [{}://.../{} ]",
312        table_url.scheme(),
313        table_url
314            .path_segments()
315            .and_then(|mut s| s.next_back())
316            .unwrap_or(SUMMARY_TABLE_NAME)
317    );
318
319    // Check whether a Delta log actually exists. For local tables, check the
320    // filesystem directly. For remote tables, attempt a full load with the
321    // explicit storage backend — required for S3/GCS/Azure where Delta Lake
322    // cannot infer the object store from the URL scheme alone.
323    let is_delta_table = if table_url.scheme() == "file" {
324        if let Ok(path) = table_url.to_file_path() {
325            if !path.exists() {
326                info!("Creating directory for summary table: {:?}", path);
327                std::fs::create_dir_all(&path)?;
328            }
329            path.join("_delta_log").exists()
330        } else {
331            false
332        }
333    } else {
334        let store = object_store.as_dyn_object_store();
335        match DeltaTableBuilder::from_url(table_url.clone()) {
336            Ok(builder) => builder
337                .with_storage_backend(store, table_url.clone())
338                .load()
339                .await
340                .is_ok(),
341            Err(_) => false,
342        }
343    };
344
345    if is_delta_table {
346        info!(
347            "Loaded existing trace summary table [{}://.../{} ]",
348            table_url.scheme(),
349            table_url
350                .path_segments()
351                .and_then(|mut s| s.next_back())
352                .unwrap_or(SUMMARY_TABLE_NAME)
353        );
354        let store = object_store.as_dyn_object_store();
355        DeltaTableBuilder::from_url(table_url.clone())?
356            .with_storage_backend(store, table_url)
357            .load()
358            .await
359            .map_err(Into::into)
360    } else {
361        info!("Summary table does not exist, creating new table");
362        create_summary_table(object_store, table_url, schema).await
363    }
364}
365
366pub struct TraceSummaryDBEngine {
367    schema: Arc<Schema>,
368    table: Arc<AsyncRwLock<DeltaTable>>,
369    pub ctx: Arc<SessionContext>,
370    control: ControlTableEngine,
371}
372
373impl TraceSummaryDBEngine {
374    /// Create a new `TraceSummaryDBEngine` using the provided shared `SessionContext`.
375    ///
376    /// The caller is responsible for passing a `SessionContext` that already has the object-store
377    /// backend configured (e.g. the one from `TraceSpanDBEngine`). This ensures both
378    /// `trace_spans` and `trace_summaries` live in the same context and can participate in
379    /// JOIN queries.
380    pub async fn new(
381        storage_settings: &ObjectStorageSettings,
382        ctx: Arc<SessionContext>,
383    ) -> Result<Self, TraceEngineError> {
384        let object_store = ObjectStore::new(storage_settings)?;
385        let schema = Arc::new(create_summary_schema());
386        let delta_table = build_or_create_summary_table(&object_store, schema.clone()).await?;
387        // A freshly-created table has no committed Parquet files yet — table_provider()
388        // returns an error in that case. Defer registration until the first write.
389        if let Ok(provider) = delta_table.table_provider().await {
390            ctx.register_table(SUMMARY_TABLE_NAME, provider)?;
391        } else {
392            info!("Empty summary table at init — deferring SessionContext registration until first write");
393        }
394
395        let control = ControlTableEngine::new(storage_settings, get_pod_id()).await?;
396
397        Ok(TraceSummaryDBEngine {
398            schema,
399            table: Arc::new(AsyncRwLock::new(delta_table)),
400            ctx,
401            control,
402        })
403    }
404
405    fn build_batch(
406        &self,
407        records: Vec<TraceSummaryRecord>,
408    ) -> Result<RecordBatch, TraceEngineError> {
409        let mut builder = TraceSummaryBatchBuilder::new(self.schema.clone(), records.len());
410        for rec in &records {
411            builder.append(rec)?;
412        }
413        builder.finish()
414    }
415
416    async fn write_records(
417        &self,
418        records: Vec<TraceSummaryRecord>,
419    ) -> Result<(), TraceEngineError> {
420        let count = records.len();
421        info!("Writing {} trace summaries", count);
422        let batch = self.build_batch(records)?;
423
424        let mut table_guard = self.table.write().await;
425
426        if let Err(e) = table_guard.update_incremental(None).await {
427            info!("Summary table update skipped (new table): {}", e);
428        }
429
430        let updated_table = table_guard
431            .clone()
432            .write(vec![batch])
433            .with_save_mode(deltalake::protocol::SaveMode::Append)
434            .with_partition_columns(vec![PARTITION_DATE_COL.to_string()])
435            .await?;
436
437        self.ctx.deregister_table(SUMMARY_TABLE_NAME)?;
438        self.ctx
439            .register_table(SUMMARY_TABLE_NAME, updated_table.table_provider().await?)?;
440
441        *table_guard = updated_table;
442        info!("Summary table updated with {} records", count);
443        Ok(())
444    }
445
446    async fn optimize_table(&self) -> Result<(), TraceEngineError> {
447        let mut table_guard = self.table.write().await;
448        let (updated_table, _metrics) = table_guard
449            .clone()
450            .optimize()
451            .with_target_size(128 * 1024 * 1024)
452            .with_type(OptimizeType::ZOrder(vec![
453                START_TIME_COL.to_string(),
454                SERVICE_NAME_COL.to_string(),
455            ]))
456            .await?;
457
458        self.ctx.deregister_table(SUMMARY_TABLE_NAME)?;
459        self.ctx
460            .register_table(SUMMARY_TABLE_NAME, updated_table.table_provider().await?)?;
461        *table_guard = updated_table;
462        Ok(())
463    }
464
465    async fn vacuum_table(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
466        let mut table_guard = self.table.write().await;
467        let (updated_table, _metrics) = table_guard
468            .clone()
469            .vacuum()
470            .with_retention_period(chrono::Duration::hours(retention_hours as i64))
471            .with_enforce_retention_duration(false)
472            .await?;
473
474        self.ctx.deregister_table(SUMMARY_TABLE_NAME)?;
475        self.ctx
476            .register_table(SUMMARY_TABLE_NAME, updated_table.table_provider().await?)?;
477        *table_guard = updated_table;
478        Ok(())
479    }
480
481    /// Try to claim and run the summary optimize task via the control table.
482    async fn try_run_optimize(&self, interval_hours: u64) {
483        match self.control.try_claim_task(TASK_SUMMARY_OPTIMIZE).await {
484            Ok(true) => match self.optimize_table().await {
485                Ok(()) => {
486                    let _ = self
487                        .control
488                        .release_task(
489                            TASK_SUMMARY_OPTIMIZE,
490                            chrono::Duration::hours(interval_hours as i64),
491                        )
492                        .await;
493                }
494                Err(e) => {
495                    error!("Summary optimize failed: {}", e);
496                    let _ = self
497                        .control
498                        .release_task_on_failure(TASK_SUMMARY_OPTIMIZE)
499                        .await;
500                }
501            },
502            Ok(false) => { /* not due or another pod owns it */ }
503            Err(e) => error!("Summary optimize claim check failed: {}", e),
504        }
505    }
506
507    pub fn start_actor(
508        self,
509        compaction_interval_hours: u64,
510    ) -> (
511        mpsc::Sender<SummaryTableCommand>,
512        tokio::task::JoinHandle<()>,
513    ) {
514        let (tx, mut rx) = mpsc::channel::<SummaryTableCommand>(100);
515
516        let handle = tokio::spawn(async move {
517            // Poll every 5 minutes — the actual schedule is in the control table.
518            let mut scheduler_ticker = interval(Duration::from_secs(5 * 60));
519            scheduler_ticker.tick().await; // skip immediate tick
520
521            loop {
522                tokio::select! {
523                    Some(cmd) = rx.recv() => {
524                        match cmd {
525                            SummaryTableCommand::Write { records, respond_to } => {
526                                let result = self.write_records(records).await;
527                                if let Err(ref e) = result {
528                                    error!("Summary write failed: {}", e);
529                                }
530                                let _ = respond_to.send(result);
531                            }
532                            SummaryTableCommand::Optimize { respond_to } => {
533                                // Direct admin request — bypass control table
534                                let _ = respond_to.send(self.optimize_table().await);
535                            }
536                            SummaryTableCommand::Vacuum { retention_hours, respond_to } => {
537                                let _ = respond_to.send(self.vacuum_table(retention_hours).await);
538                            }
539                            SummaryTableCommand::Shutdown => {
540                                info!("TraceSummaryDBEngine actor shutting down");
541                                break;
542                            }
543                        }
544                    }
545                    _ = scheduler_ticker.tick() => {
546                        self.try_run_optimize(compaction_interval_hours).await;
547                    }
548                }
549            }
550        });
551
552        (tx, handle)
553    }
554}
555
556// ── Service ──────────────────────────────────────────────────────────────────
557
558pub struct TraceSummaryService {
559    engine_tx: mpsc::Sender<SummaryTableCommand>,
560    engine_handle: tokio::task::JoinHandle<()>,
561    pub query_service: TraceSummaryQueries,
562}
563
564impl TraceSummaryService {
565    pub async fn new(
566        storage_settings: &ObjectStorageSettings,
567        compaction_interval_hours: u64,
568        ctx: Arc<SessionContext>,
569    ) -> Result<Self, TraceEngineError> {
570        let engine = TraceSummaryDBEngine::new(storage_settings, ctx).await?;
571        let engine_ctx = engine.ctx.clone();
572        let (engine_tx, engine_handle) = engine.start_actor(compaction_interval_hours);
573
574        Ok(TraceSummaryService {
575            engine_tx,
576            engine_handle,
577            query_service: TraceSummaryQueries::new(engine_ctx),
578        })
579    }
580
581    /// Write a batch of `TraceSummaryRecord`s to the Delta Lake summary table.
582    pub async fn write_summaries(
583        &self,
584        records: Vec<TraceSummaryRecord>,
585    ) -> Result<(), TraceEngineError> {
586        let (tx, rx) = oneshot::channel();
587        self.engine_tx
588            .send(SummaryTableCommand::Write {
589                records,
590                respond_to: tx,
591            })
592            .await
593            .map_err(|_| TraceEngineError::ChannelClosed)?;
594        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
595    }
596
597    pub async fn optimize(&self) -> Result<(), TraceEngineError> {
598        let (tx, rx) = oneshot::channel();
599        self.engine_tx
600            .send(SummaryTableCommand::Optimize { respond_to: tx })
601            .await
602            .map_err(|_| TraceEngineError::ChannelClosed)?;
603        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
604    }
605
606    pub async fn vacuum(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
607        let (tx, rx) = oneshot::channel();
608        self.engine_tx
609            .send(SummaryTableCommand::Vacuum {
610                retention_hours,
611                respond_to: tx,
612            })
613            .await
614            .map_err(|_| TraceEngineError::ChannelClosed)?;
615        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
616    }
617
618    /// Signal shutdown without consuming `self` — safe to call from `Arc<TraceSummaryService>`.
619    pub async fn signal_shutdown(&self) {
620        info!("TraceSummaryService signaling shutdown");
621        let _ = self.engine_tx.send(SummaryTableCommand::Shutdown).await;
622    }
623
624    pub async fn shutdown(self) -> Result<(), TraceEngineError> {
625        info!("TraceSummaryService shutting down");
626        self.engine_tx
627            .send(SummaryTableCommand::Shutdown)
628            .await
629            .map_err(|_| TraceEngineError::ChannelClosed)?;
630        if let Err(e) = self.engine_handle.await {
631            error!("Summary engine handle error: {}", e);
632        }
633        info!("TraceSummaryService shutdown complete");
634        Ok(())
635    }
636}
637
638// ── Queries ──────────────────────────────────────────────────────────────────
639
640pub struct TraceSummaryQueries {
641    ctx: Arc<SessionContext>,
642}
643
644impl TraceSummaryQueries {
645    pub fn new(ctx: Arc<SessionContext>) -> Self {
646        Self { ctx }
647    }
648
649    /// Get paginated traces from the Delta Lake summary table.
650    ///
651    /// The first step is a `GROUP BY trace_id` dedup query that merges any duplicate
652    /// rows (from late-arriving spans) using the same rules as `TraceAggregator`:
653    ///   - `SUM` for span/error counts, `MIN`/`MAX` for times, `MAX` for status_code
654    ///   - `FIRST_VALUE` ordered by `span_count DESC` for string fields
655    ///   - `array_distinct(flatten(array_agg(...)))` for entity/queue ID lists (full union)
656    ///
657    ///   Time filters are pushed into the SQL WHERE clause for partition pruning.
658    ///
659    ///   Secondary filters (service, errors, cursor) apply to the deduplicated DataFrame.
660    pub async fn get_paginated_traces(
661        &self,
662        filters: &TraceFilters,
663    ) -> Result<TracePaginationResponse, TraceEngineError> {
664        let limit = filters.limit.unwrap_or(50) as usize;
665        let direction = filters.direction.as_deref().unwrap_or("next");
666
667        // ── Dedup: time-filtered GROUP BY trace_id (DataFrame API) ───────────
668        use crate::parquet::tracing::queries::{date_lit, ts_lit};
669        use datafusion::functions_aggregate::expr_fn::{array_agg, first_value, max, min, sum};
670        use datafusion::functions_nested::set_ops::array_distinct;
671
672        let mut df = self.ctx.table(SUMMARY_TABLE_NAME).await?;
673
674        // ① Partition date — directory-level pruning (skips entire partition folders)
675        // ② start_time     — row-group-level pruning within matched files
676        if let Some(start) = filters.start_time {
677            df = df.filter(col(PARTITION_DATE_COL).gt_eq(date_lit(&start)))?;
678            df = df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start)))?;
679        }
680        if let Some(end) = filters.end_time {
681            df = df.filter(col(PARTITION_DATE_COL).lt_eq(date_lit(&end)))?;
682            df = df.filter(col(START_TIME_COL).lt(ts_lit(&end)))?;
683        }
684
685        // ORDER BY specs for FIRST_VALUE aggregates
686        // span_count DESC NULLS LAST, end_time DESC NULLS LAST
687        let by_span_end: Vec<SortExpr> = vec![
688            col(SPAN_COUNT_COL).sort(false, false),
689            col(END_TIME_COL).sort(false, false),
690        ];
691        // status_code DESC, span_count DESC
692        let by_status_span: Vec<SortExpr> = vec![
693            col(STATUS_CODE_COL).sort(false, false),
694            col(SPAN_COUNT_COL).sort(false, false),
695        ];
696
697        // Phase 1: aggregate
698        // _max_end_us / _min_start_us are hidden Int64 columns used to compute
699        // duration_ms post-aggregation (arithmetic across two aggregate exprs
700        // cannot be expressed in a single aggregate slot).
701        //
702        // entity_ids / queue_ids: array_agg without FILTER is intentional.
703        // array_flatten treats NULL outer-list elements as empty and skips them,
704        // giving identical results to FILTER (WHERE IS NOT NULL) for the GROUP BY
705        // case. Unlike the original SQL, this produces [] rather than NULL when
706        // ALL rows have null IDs — the safer outcome for downstream deserialization.
707        let mut df = df
708            .aggregate(
709                vec![col(TRACE_ID_COL)],
710                vec![
711                    min(col(START_TIME_COL)).alias(START_TIME_COL),
712                    max(col(END_TIME_COL)).alias(END_TIME_COL),
713                    max(df_cast(col(END_TIME_COL), DataType::Int64)).alias("_max_end_us"),
714                    min(df_cast(col(START_TIME_COL), DataType::Int64)).alias("_min_start_us"),
715                    max(col(STATUS_CODE_COL)).alias(STATUS_CODE_COL),
716                    sum(col(SPAN_COUNT_COL)).alias(SPAN_COUNT_COL),
717                    sum(col(ERROR_COUNT_COL)).alias(ERROR_COUNT_COL),
718                    first_value(col(SERVICE_NAME_COL), by_span_end.clone()).alias(SERVICE_NAME_COL),
719                    first_value(col(SCOPE_NAME_COL), by_span_end.clone()).alias(SCOPE_NAME_COL),
720                    first_value(col(SCOPE_VERSION_COL), by_span_end.clone())
721                        .alias(SCOPE_VERSION_COL),
722                    first_value(col(ROOT_OPERATION_COL), by_span_end.clone())
723                        .alias(ROOT_OPERATION_COL),
724                    first_value(col(STATUS_MESSAGE_COL), by_status_span).alias(STATUS_MESSAGE_COL),
725                    first_value(col(RESOURCE_ATTRIBUTES_COL), by_span_end)
726                        .alias(RESOURCE_ATTRIBUTES_COL),
727                    array_agg(col(ENTITY_IDS_COL)).alias("_entity_ids_raw"),
728                    array_agg(col(QUEUE_IDS_COL)).alias("_queue_ids_raw"),
729                ],
730            )?
731            // Phase 2: derive computed columns from hidden aggregates, then drop them
732            .with_column(
733                DURATION_MS_COL,
734                (col("_max_end_us") - col("_min_start_us")) / lit(1000i64),
735            )?
736            .with_column(
737                ENTITY_IDS_COL,
738                array_distinct(flatten(col("_entity_ids_raw"))),
739            )?
740            .with_column(
741                QUEUE_IDS_COL,
742                array_distinct(flatten(col("_queue_ids_raw"))),
743            )?
744            .drop_columns(&[
745                "_max_end_us",
746                "_min_start_us",
747                "_entity_ids_raw",
748                "_queue_ids_raw",
749            ])?;
750
751        // ── Secondary filters ────────────────────────────────────────────────
752        if let Some(ref svc) = filters.service_name {
753            df = df.filter(col(SERVICE_NAME_COL).eq(lit(svc.as_str())))?;
754        }
755        match filters.has_errors {
756            Some(true) => {
757                df = df.filter(col(ERROR_COUNT_COL).gt(lit(0i64)))?;
758            }
759            Some(false) => {
760                df = df.filter(col(ERROR_COUNT_COL).eq(lit(0i64)))?;
761            }
762            None => {}
763        }
764        if let Some(sc) = filters.status_code {
765            df = df.filter(col(STATUS_CODE_COL).eq(lit(sc)))?;
766        }
767
768        // ── entity_uid filter via array_has on List column ────────────────
769        if let Some(ref uid) = filters.entity_uid {
770            df = df.filter(datafusion::functions_nested::expr_fn::array_has(
771                col(ENTITY_IDS_COL),
772                lit(uid.as_str()),
773            ))?;
774        }
775
776        // ── queue_uid filter via array_has on List column ─────────────────
777        if let Some(ref uid) = filters.queue_uid {
778            df = df.filter(datafusion::functions_nested::expr_fn::array_has(
779                col(QUEUE_IDS_COL),
780                lit(uid.as_str()),
781            ))?;
782        }
783
784        // ── trace_ids IN filter ──────────────────────────────────────────────
785        if let Some(ref ids) = filters.trace_ids {
786            if !ids.is_empty() {
787                let binary_ids: Vec<Expr> = ids
788                    .iter()
789                    .filter_map(|hex| TraceId::hex_to_bytes(hex).ok())
790                    .map(|b| lit(ScalarValue::Binary(Some(b))))
791                    .collect();
792                if !binary_ids.is_empty() {
793                    df = df.filter(col(TRACE_ID_COL).in_list(binary_ids, false))?;
794                }
795            }
796        }
797
798        // ── Cursor filter in DataFusion ──────────────────────────────────────
799        // Equivalent to Postgres: `(start_time, trace_id) < (cursor_time, cursor_id)`
800        // for "next" or `> (cursor_time, cursor_id)` for "previous".
801        if let (Some(cursor_time), Some(ref cursor_id)) =
802            (filters.cursor_start_time, &filters.cursor_trace_id)
803        {
804            if let Ok(cursor_bytes) = TraceId::hex_to_bytes(cursor_id) {
805                let cursor_ts = lit(ScalarValue::TimestampMicrosecond(
806                    Some(cursor_time.timestamp_micros()),
807                    Some("UTC".into()),
808                ));
809                let cursor_tid = lit(ScalarValue::Binary(Some(cursor_bytes)));
810                let cursor_expr = if direction == "previous" {
811                    col(START_TIME_COL)
812                        .gt(cursor_ts.clone())
813                        .or(col(START_TIME_COL)
814                            .eq(cursor_ts)
815                            .and(col(TRACE_ID_COL).gt(cursor_tid)))
816                } else {
817                    col(START_TIME_COL)
818                        .lt(cursor_ts.clone())
819                        .or(col(START_TIME_COL)
820                            .eq(cursor_ts)
821                            .and(col(TRACE_ID_COL).lt(cursor_tid)))
822                };
823                df = df.filter(cursor_expr)?;
824            }
825        }
826
827        // ── Attribute filters via span lookup → IN list ──────────────────────
828        // Requires shared SessionContext (trace_spans must be registered in self.ctx).
829        // We execute the span query eagerly to collect matching trace IDs, then filter
830        // the summaries DataFrame with an IN-list predicate. This avoids a cross-table
831        // JOIN that causes DataFusion to report ambiguous `trace_id` column references.
832        if let Some(ref attr_filters) = filters.attribute_filters {
833            if !attr_filters.is_empty() {
834                let mut spans_df = self.ctx.table("trace_spans").await?.select_columns(&[
835                    TRACE_ID_COL,
836                    START_TIME_COL,
837                    SEARCH_BLOB_COL,
838                ])?;
839
840                // Time predicates on spans for partition pruning
841                if let Some(start) = filters.start_time {
842                    spans_df = spans_df.filter(col(START_TIME_COL).gt_eq(lit(
843                        ScalarValue::TimestampMicrosecond(
844                            Some(start.timestamp_micros()),
845                            Some("UTC".into()),
846                        ),
847                    )))?;
848                }
849                if let Some(end) = filters.end_time {
850                    spans_df = spans_df.filter(col(START_TIME_COL).lt(lit(
851                        ScalarValue::TimestampMicrosecond(
852                            Some(end.timestamp_micros()),
853                            Some("UTC".into()),
854                        ),
855                    )))?;
856                }
857
858                // OR-match each filter against search_blob.
859                // normalize_attr_filter converts "key:value" → "%key=value%" so the LIKE
860                // pattern matches the new pipe-bounded `|key=value|` blob format.
861                let mut attr_expr: Option<Expr> = None;
862                for f in attr_filters {
863                    let pattern = crate::parquet::tracing::queries::normalize_attr_filter(f);
864                    let cond = match_attr_expr(col(SEARCH_BLOB_COL), lit(pattern));
865                    attr_expr = Some(match attr_expr {
866                        None => cond,
867                        Some(e) => e.or(cond),
868                    });
869                }
870                if let Some(expr) = attr_expr {
871                    spans_df = spans_df.filter(expr)?;
872                }
873
874                // Collect matching trace IDs eagerly, then apply as IN-list filter.
875                // Use HashSet for O(1) dedup instead of O(n²) Vec::contains().
876                let span_batches = spans_df.select_columns(&[TRACE_ID_COL])?.collect().await?;
877                let mut seen_ids: std::collections::HashSet<Vec<u8>> =
878                    std::collections::HashSet::new();
879                let mut binary_ids: Vec<Expr> = Vec::new();
880                for batch in &span_batches {
881                    // trace_id may be FixedSizeBinary(16) or Binary after Delta round-trip.
882                    // Cast to Binary to handle both uniformly.
883                    if let Some(col_ref) = batch.column_by_name(TRACE_ID_COL) {
884                        let casted = compute::cast(col_ref, &DataType::Binary)?;
885                        let col_arr =
886                            casted
887                                .as_any()
888                                .downcast_ref::<BinaryArray>()
889                                .ok_or_else(|| {
890                                    TraceEngineError::DowncastError("trace_id to BinaryArray")
891                                })?;
892                        for i in 0..batch.num_rows() {
893                            let id_bytes = col_arr.value(i).to_vec();
894                            if seen_ids.insert(id_bytes.clone()) {
895                                binary_ids.push(lit(ScalarValue::Binary(Some(id_bytes))));
896                            }
897                        }
898                    }
899                }
900
901                if !binary_ids.is_empty() {
902                    df = df.filter(col(TRACE_ID_COL).in_list(binary_ids, false))?;
903                } else {
904                    // No matching spans → return empty result
905                    df = df.filter(lit(false))?;
906                }
907            }
908        }
909
910        // ── Sort: DESC for "next", ASC for "previous" ────────────────────────
911        // "previous" direction fetches the oldest limit+1 items newer than the cursor,
912        // which matches the original Rust post-reversal behavior.
913        df = if direction == "previous" {
914            df.sort(vec![
915                col(START_TIME_COL).sort(true, true),
916                col(TRACE_ID_COL).sort(true, true),
917            ])?
918        } else {
919            df.sort(vec![
920                col(START_TIME_COL).sort(false, false),
921                col(TRACE_ID_COL).sort(false, false),
922            ])?
923        };
924
925        // ── LIMIT pushed into DataFusion (fetch limit+1 to detect next page) ─
926        df = df.limit(0, Some(limit + 1))?;
927
928        let batches = df.collect().await?;
929        let mut items = batches_to_trace_list_items(batches)?;
930
931        let has_more = items.len() > limit;
932        if has_more {
933            items.pop(); // remove N+1 sentinel
934        }
935
936        // Direction-specific cursor logic — mirrors the original PostgreSQL implementation.
937        //
938        // "next" (DESC order): items are newest-first. The sentinel tells us if older
939        // items exist (has_next). Cursor presence means we navigated forward, so newer
940        // items exist behind us (has_previous).
941        //
942        // "previous" (ASC order): items are oldest-first (closest-to-cursor first).
943        // The sentinel tells us if even more newer items exist (has_previous). Cursor
944        // presence means we navigated backward, so older items exist ahead (has_next).
945        // Items stay in ASC order — no reversal — matching PG behavior exactly.
946        let (has_next, next_cursor, has_previous, previous_cursor) = match direction {
947            "next" => {
948                let next_cursor = if has_more {
949                    items.last().map(|item| TraceCursor {
950                        start_time: item.start_time,
951                        trace_id: item.trace_id.clone(),
952                    })
953                } else {
954                    None
955                };
956
957                let previous_cursor = items.first().map(|item| TraceCursor {
958                    start_time: item.start_time,
959                    trace_id: item.trace_id.clone(),
960                });
961
962                (
963                    has_more,
964                    next_cursor,
965                    filters.cursor_start_time.is_some(),
966                    previous_cursor,
967                )
968            }
969            "previous" => {
970                // ASC order: items.last() is the newest (largest start_time).
971                // To continue backward (fetch even newer items), the cursor must
972                // point past the current page's newest item so `> cursor` excludes
973                // everything already returned.
974                let previous_cursor = if has_more {
975                    items.last().map(|item| TraceCursor {
976                        start_time: item.start_time,
977                        trace_id: item.trace_id.clone(),
978                    })
979                } else {
980                    None
981                };
982
983                // items.first() is the oldest (smallest start_time).
984                // To go forward (back toward newer-first / DESC pages), the cursor
985                // must point at the oldest item so `< cursor` fetches older items.
986                let next_cursor = items.first().map(|item| TraceCursor {
987                    start_time: item.start_time,
988                    trace_id: item.trace_id.clone(),
989                });
990
991                (
992                    filters.cursor_start_time.is_some(),
993                    next_cursor,
994                    has_more,
995                    previous_cursor,
996                )
997            }
998            _ => (false, None, false, None),
999        };
1000
1001        Ok(TracePaginationResponse {
1002            items,
1003            has_next,
1004            next_cursor,
1005            has_previous,
1006            previous_cursor,
1007        })
1008    }
1009}
1010
1011// ── Arrow → TraceListItem conversion ─────────────────────────────────────────
1012
1013/// Extract attributes from a MapArray at a given row index.
1014fn extract_map_attributes(map_array: &MapArray, row_idx: usize) -> Vec<Attribute> {
1015    if map_array.is_null(row_idx) {
1016        return Vec::new();
1017    }
1018    let entry = map_array.value(row_idx);
1019    let struct_array = entry.as_any().downcast_ref::<StructArray>().unwrap();
1020    let keys_arr = compute::cast(struct_array.column(0).as_ref(), &DataType::Utf8).unwrap();
1021    let keys = keys_arr.as_any().downcast_ref::<StringArray>().unwrap();
1022    let values_arr = compute::cast(struct_array.column(1).as_ref(), &DataType::Utf8).unwrap();
1023    let values = values_arr.as_any().downcast_ref::<StringArray>().unwrap();
1024
1025    (0..struct_array.len())
1026        .map(|i| Attribute {
1027            key: keys.value(i).to_string(),
1028            value: serde_json::from_str(values.value(i)).unwrap_or(serde_json::Value::Null),
1029        })
1030        .collect()
1031}
1032
1033/// Extract a `Vec<String>` from a nullable `ListArray` at a given row index.
1034fn extract_list_strings(list: Option<&ListArray>, row_idx: usize) -> Vec<String> {
1035    let Some(list) = list else {
1036        return Vec::new();
1037    };
1038    if list.is_null(row_idx) {
1039        return Vec::new();
1040    }
1041    let inner = list.value(row_idx);
1042    let str_arr = compute::cast(&inner, &DataType::Utf8)
1043        .ok()
1044        .and_then(|a| a.as_any().downcast_ref::<StringArray>().cloned());
1045    match str_arr {
1046        Some(arr) => (0..arr.len())
1047            .filter(|i| !arr.is_null(*i))
1048            .map(|i| arr.value(i).to_string())
1049            .collect(),
1050        None => Vec::new(),
1051    }
1052}
1053
1054fn batches_to_trace_list_items(
1055    batches: Vec<RecordBatch>,
1056) -> Result<Vec<TraceListItem>, TraceEngineError> {
1057    let mut items = Vec::new();
1058
1059    for batch in &batches {
1060        // trace_id may come back as FixedSizeBinary(16) or Binary depending on
1061        // whether DataFusion/Delta round-tripped the schema. Handle both.
1062        let trace_id_col = batch.column_by_name(TRACE_ID_COL).ok_or_else(|| {
1063            TraceEngineError::UnsupportedOperation("missing trace_id column".into())
1064        })?;
1065        let trace_id_binary = compute::cast(trace_id_col, &DataType::Binary)?;
1066        let trace_ids = trace_id_binary
1067            .as_any()
1068            .downcast_ref::<BinaryArray>()
1069            .ok_or_else(|| {
1070                TraceEngineError::UnsupportedOperation("trace_id cast to BinaryArray failed".into())
1071            })?;
1072
1073        // Cast all string/dictionary columns to Utf8 uniformly (handles Utf8View,
1074        // Dictionary(Int32, Utf8), LargeUtf8, etc.).
1075        let svc_arr = compute::cast(
1076            batch.column_by_name(SERVICE_NAME_COL).ok_or_else(|| {
1077                TraceEngineError::UnsupportedOperation("missing service_name column".into())
1078            })?,
1079            &DataType::Utf8,
1080        )?;
1081        let service_names = svc_arr
1082            .as_any()
1083            .downcast_ref::<StringArray>()
1084            .ok_or_else(|| {
1085                TraceEngineError::UnsupportedOperation(
1086                    "service_name cast to StringArray failed".into(),
1087                )
1088            })?;
1089
1090        let scope_arr = compute::cast(
1091            batch.column_by_name(SCOPE_NAME_COL).ok_or_else(|| {
1092                TraceEngineError::UnsupportedOperation("missing scope_name column".into())
1093            })?,
1094            &DataType::Utf8,
1095        )?;
1096        let scope_names = scope_arr
1097            .as_any()
1098            .downcast_ref::<StringArray>()
1099            .ok_or_else(|| {
1100                TraceEngineError::UnsupportedOperation(
1101                    "scope_name cast to StringArray failed".into(),
1102                )
1103            })?;
1104
1105        let scopev_arr = compute::cast(
1106            batch.column_by_name(SCOPE_VERSION_COL).ok_or_else(|| {
1107                TraceEngineError::UnsupportedOperation("missing scope_version column".into())
1108            })?,
1109            &DataType::Utf8,
1110        )?;
1111        let scope_versions = scopev_arr
1112            .as_any()
1113            .downcast_ref::<StringArray>()
1114            .ok_or_else(|| {
1115                TraceEngineError::UnsupportedOperation(
1116                    "scope_version cast to StringArray failed".into(),
1117                )
1118            })?;
1119
1120        let root_arr = compute::cast(
1121            batch.column_by_name(ROOT_OPERATION_COL).ok_or_else(|| {
1122                TraceEngineError::UnsupportedOperation("missing root_operation column".into())
1123            })?,
1124            &DataType::Utf8,
1125        )?;
1126        let root_operations = root_arr
1127            .as_any()
1128            .downcast_ref::<StringArray>()
1129            .ok_or_else(|| {
1130                TraceEngineError::UnsupportedOperation(
1131                    "root_operation cast to StringArray failed".into(),
1132                )
1133            })?;
1134
1135        let sm_arr = compute::cast(
1136            batch.column_by_name(STATUS_MESSAGE_COL).ok_or_else(|| {
1137                TraceEngineError::UnsupportedOperation("missing status_message column".into())
1138            })?,
1139            &DataType::Utf8,
1140        )?;
1141        let status_messages = sm_arr
1142            .as_any()
1143            .downcast_ref::<StringArray>()
1144            .ok_or_else(|| {
1145                TraceEngineError::UnsupportedOperation(
1146                    "status_message cast to StringArray failed".into(),
1147                )
1148            })?;
1149
1150        let resource_attrs_map = batch
1151            .column_by_name(RESOURCE_ATTRIBUTES_COL)
1152            .and_then(|c| c.as_any().downcast_ref::<MapArray>())
1153            .ok_or_else(|| {
1154                TraceEngineError::UnsupportedOperation("missing resource_attributes column".into())
1155            })?;
1156
1157        let entity_ids_list = batch
1158            .column_by_name(ENTITY_IDS_COL)
1159            .and_then(|c| c.as_any().downcast_ref::<ListArray>());
1160
1161        let queue_ids_list = batch
1162            .column_by_name(QUEUE_IDS_COL)
1163            .and_then(|c| c.as_any().downcast_ref::<ListArray>());
1164
1165        let start_times = batch
1166            .column_by_name(START_TIME_COL)
1167            .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>())
1168            .ok_or_else(|| {
1169                TraceEngineError::UnsupportedOperation("missing start_time column".into())
1170            })?;
1171
1172        let end_times = batch
1173            .column_by_name(END_TIME_COL)
1174            .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>())
1175            .ok_or_else(|| {
1176                TraceEngineError::UnsupportedOperation("missing end_time column".into())
1177            })?;
1178
1179        let durations = batch
1180            .column_by_name(DURATION_MS_COL)
1181            .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1182            .ok_or_else(|| {
1183                TraceEngineError::UnsupportedOperation("missing duration_ms column".into())
1184            })?;
1185
1186        let status_codes = batch
1187            .column_by_name(STATUS_CODE_COL)
1188            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
1189            .ok_or_else(|| {
1190                TraceEngineError::UnsupportedOperation("missing status_code column".into())
1191            })?;
1192
1193        let span_counts = batch
1194            .column_by_name(SPAN_COUNT_COL)
1195            .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1196            .ok_or_else(|| {
1197                TraceEngineError::UnsupportedOperation("missing span_count column".into())
1198            })?;
1199
1200        let error_counts = batch
1201            .column_by_name(ERROR_COUNT_COL)
1202            .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1203            .ok_or_else(|| {
1204                TraceEngineError::UnsupportedOperation("missing error_count column".into())
1205            })?;
1206
1207        for i in 0..batch.num_rows() {
1208            let trace_id_hex = hex::encode(trace_ids.value(i));
1209
1210            let start_time = micros_to_datetime(start_times.value(i));
1211            let end_time = if end_times.is_null(i) {
1212                None
1213            } else {
1214                Some(micros_to_datetime(end_times.value(i)))
1215            };
1216            let duration_ms = if durations.is_null(i) {
1217                None
1218            } else {
1219                Some(durations.value(i))
1220            };
1221            let error_count = error_counts.value(i);
1222
1223            let resource_attributes = extract_map_attributes(resource_attrs_map, i);
1224
1225            let entity_ids = extract_list_strings(entity_ids_list, i);
1226            let queue_ids = extract_list_strings(queue_ids_list, i);
1227
1228            items.push(TraceListItem {
1229                trace_id: trace_id_hex,
1230                service_name: service_names.value(i).to_string(),
1231                scope_name: scope_names.value(i).to_string(),
1232                scope_version: scope_versions.value(i).to_string(),
1233                root_operation: root_operations.value(i).to_string(),
1234                start_time,
1235                end_time,
1236                duration_ms,
1237                status_code: status_codes.value(i),
1238                status_message: if status_messages.is_null(i) {
1239                    None
1240                } else {
1241                    Some(status_messages.value(i).to_string())
1242                },
1243                span_count: span_counts.value(i),
1244                has_errors: error_count > 0,
1245                error_count,
1246                resource_attributes,
1247                entity_ids,
1248                queue_ids,
1249            });
1250        }
1251    }
1252
1253    Ok(items)
1254}
1255
1256fn micros_to_datetime(micros: i64) -> DateTime<Utc> {
1257    let secs = micros / 1_000_000;
1258    let nanos = ((micros % 1_000_000) * 1_000) as u32;
1259    Utc.timestamp_opt(secs, nanos).unwrap()
1260}
1261
1262#[cfg(test)]
1263mod tests {
1264    use super::*;
1265    use crate::storage::ObjectStore;
1266    use scouter_settings::ObjectStorageSettings;
1267    use scouter_types::sql::TraceFilters;
1268    use scouter_types::{Attribute, SpanId, TraceId, TraceSpanRecord};
1269    use tracing_subscriber;
1270
1271    fn cleanup() {
1272        let _ = tracing_subscriber::fmt()
1273            .with_max_level(tracing::Level::INFO)
1274            .try_init();
1275
1276        let storage_settings = ObjectStorageSettings::default();
1277        let current_dir = std::env::current_dir().unwrap();
1278        let storage_path = current_dir.join(storage_settings.storage_root());
1279        if storage_path.exists() {
1280            std::fs::remove_dir_all(storage_path).unwrap();
1281        }
1282    }
1283
1284    /// Build a standalone SessionContext for test use (no trace_spans registered).
1285    /// Attribute-filter paths that need trace_spans are not exercised in these tests.
1286    fn make_test_ctx(storage_settings: &ObjectStorageSettings) -> Arc<SessionContext> {
1287        Arc::new(
1288            ObjectStore::new(storage_settings)
1289                .unwrap()
1290                .get_session()
1291                .unwrap(),
1292        )
1293    }
1294
1295    fn make_summary(
1296        trace_id_bytes: [u8; 16],
1297        service_name: &str,
1298        error_count: i64,
1299        resource_attributes: Vec<Attribute>,
1300    ) -> TraceSummaryRecord {
1301        let now = Utc::now();
1302        TraceSummaryRecord {
1303            trace_id: TraceId::from_bytes(trace_id_bytes),
1304            service_name: service_name.to_string(),
1305            scope_name: "test.scope".to_string(),
1306            scope_version: String::new(),
1307            root_operation: "root_op".to_string(),
1308            start_time: now,
1309            end_time: Some(now + chrono::Duration::milliseconds(200)),
1310            status_code: if error_count > 0 { 2 } else { 0 },
1311            status_message: if error_count > 0 {
1312                "Internal Server Error".to_string()
1313            } else {
1314                "OK".to_string()
1315            },
1316            span_count: 3,
1317            error_count,
1318            resource_attributes,
1319            entity_ids: vec![],
1320            queue_ids: vec![],
1321        }
1322    }
1323
1324    /// Basic write + paginate round-trip: writes two summaries and verifies both appear.
1325    #[tokio::test]
1326    async fn test_summary_write_and_paginate_basic() -> Result<(), TraceEngineError> {
1327        cleanup();
1328
1329        let storage_settings = ObjectStorageSettings::default();
1330        let ctx = make_test_ctx(&storage_settings);
1331        let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1332
1333        let s1 = make_summary([1u8; 16], "svc_a", 0, vec![]);
1334        let s2 = make_summary([2u8; 16], "svc_b", 0, vec![]);
1335        service.write_summaries(vec![s1, s2]).await?;
1336        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1337
1338        let start = Utc::now() - chrono::Duration::hours(1);
1339        let end = Utc::now() + chrono::Duration::hours(1);
1340        let filters = TraceFilters {
1341            service_name: None,
1342            has_errors: None,
1343            status_code: None,
1344            start_time: Some(start),
1345            end_time: Some(end),
1346            limit: Some(25),
1347            cursor_start_time: None,
1348            cursor_trace_id: None,
1349            direction: None,
1350            attribute_filters: None,
1351            trace_ids: None,
1352            entity_uid: None,
1353            queue_uid: None,
1354        };
1355
1356        let response = service.query_service.get_paginated_traces(&filters).await?;
1357        assert!(
1358            response.items.len() >= 2,
1359            "Expected at least 2 items, got {}",
1360            response.items.len()
1361        );
1362
1363        service.shutdown().await?;
1364        cleanup();
1365        Ok(())
1366    }
1367
1368    /// `has_errors = Some(true)` returns only error traces; `Some(false)` returns only non-errors.
1369    #[tokio::test]
1370    async fn test_summary_has_errors_filter() -> Result<(), TraceEngineError> {
1371        cleanup();
1372
1373        let storage_settings = ObjectStorageSettings::default();
1374        let ctx = make_test_ctx(&storage_settings);
1375        let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1376
1377        let ok_summary = make_summary([3u8; 16], "svc", 0, vec![]);
1378        let err_summary = make_summary([4u8; 16], "svc", 2, vec![]);
1379        service
1380            .write_summaries(vec![ok_summary, err_summary])
1381            .await?;
1382        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1383
1384        let start = Utc::now() - chrono::Duration::hours(1);
1385        let end = Utc::now() + chrono::Duration::hours(1);
1386
1387        let base_filters = TraceFilters {
1388            service_name: None,
1389            has_errors: None,
1390            status_code: None,
1391            start_time: Some(start),
1392            end_time: Some(end),
1393            limit: Some(25),
1394            cursor_start_time: None,
1395            cursor_trace_id: None,
1396            direction: None,
1397            attribute_filters: None,
1398            trace_ids: None,
1399            entity_uid: None,
1400            queue_uid: None,
1401        };
1402
1403        // has_errors = true → only error trace
1404        let mut filters_err = base_filters.clone();
1405        filters_err.has_errors = Some(true);
1406        let errors_only = service
1407            .query_service
1408            .get_paginated_traces(&filters_err)
1409            .await?;
1410        for item in &errors_only.items {
1411            assert!(
1412                item.error_count > 0,
1413                "Expected error trace, got: {:?}",
1414                item
1415            );
1416        }
1417        assert!(
1418            !errors_only.items.is_empty(),
1419            "Expected at least one error trace"
1420        );
1421
1422        // has_errors = false → only non-error traces
1423        let mut filters_ok = base_filters.clone();
1424        filters_ok.has_errors = Some(false);
1425        let no_errors = service
1426            .query_service
1427            .get_paginated_traces(&filters_ok)
1428            .await?;
1429        for item in &no_errors.items {
1430            assert_eq!(
1431                item.error_count, 0,
1432                "Expected non-error trace, got error_count={}",
1433                item.error_count
1434            );
1435        }
1436        assert!(
1437            !no_errors.items.is_empty(),
1438            "Expected at least one non-error trace"
1439        );
1440
1441        service.shutdown().await?;
1442        cleanup();
1443        Ok(())
1444    }
1445
1446    /// service_name filter returns only matching service traces.
1447    #[tokio::test]
1448    async fn test_summary_service_name_filter() -> Result<(), TraceEngineError> {
1449        cleanup();
1450
1451        let storage_settings = ObjectStorageSettings::default();
1452        let ctx = make_test_ctx(&storage_settings);
1453        let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1454
1455        let s_alpha = make_summary([5u8; 16], "alpha_service", 0, vec![]);
1456        let s_beta = make_summary([6u8; 16], "beta_service", 0, vec![]);
1457        service.write_summaries(vec![s_alpha, s_beta]).await?;
1458        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1459
1460        let start = Utc::now() - chrono::Duration::hours(1);
1461        let end = Utc::now() + chrono::Duration::hours(1);
1462        let filters = TraceFilters {
1463            service_name: Some("alpha_service".to_string()),
1464            has_errors: None,
1465            status_code: None,
1466            start_time: Some(start),
1467            end_time: Some(end),
1468            limit: Some(25),
1469            cursor_start_time: None,
1470            cursor_trace_id: None,
1471            direction: None,
1472            attribute_filters: None,
1473            trace_ids: None,
1474            entity_uid: None,
1475            queue_uid: None,
1476        };
1477
1478        let response = service.query_service.get_paginated_traces(&filters).await?;
1479        assert!(
1480            !response.items.is_empty(),
1481            "Expected results for alpha_service"
1482        );
1483        for item in &response.items {
1484            assert_eq!(
1485                item.service_name, "alpha_service",
1486                "Expected only alpha_service items, got: {}",
1487                item.service_name
1488            );
1489        }
1490
1491        service.shutdown().await?;
1492        cleanup();
1493        Ok(())
1494    }
1495
1496    /// trace_ids IN filter returns only the specified traces.
1497    #[tokio::test]
1498    async fn test_summary_trace_ids_filter() -> Result<(), TraceEngineError> {
1499        cleanup();
1500
1501        let storage_settings = ObjectStorageSettings::default();
1502        let ctx = make_test_ctx(&storage_settings);
1503        let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1504
1505        let wanted_id = TraceId::from_bytes([7u8; 16]);
1506        let unwanted_id = TraceId::from_bytes([8u8; 16]);
1507
1508        let s1 = make_summary([7u8; 16], "svc", 0, vec![]);
1509        let s2 = make_summary([8u8; 16], "svc", 0, vec![]);
1510        service.write_summaries(vec![s1, s2]).await?;
1511        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1512
1513        let start = Utc::now() - chrono::Duration::hours(1);
1514        let end = Utc::now() + chrono::Duration::hours(1);
1515        let filters = TraceFilters {
1516            service_name: None,
1517            has_errors: None,
1518            status_code: None,
1519            start_time: Some(start),
1520            end_time: Some(end),
1521            limit: Some(25),
1522            cursor_start_time: None,
1523            cursor_trace_id: None,
1524            direction: None,
1525            attribute_filters: None,
1526            trace_ids: Some(vec![wanted_id.to_hex()]),
1527            entity_uid: None,
1528            queue_uid: None,
1529        };
1530
1531        let response = service.query_service.get_paginated_traces(&filters).await?;
1532        assert_eq!(
1533            response.items.len(),
1534            1,
1535            "Expected exactly 1 item from trace_ids filter"
1536        );
1537        assert_eq!(
1538            response.items[0].trace_id,
1539            wanted_id.to_hex(),
1540            "Returned wrong trace_id"
1541        );
1542        assert_ne!(
1543            response.items[0].trace_id,
1544            unwanted_id.to_hex(),
1545            "Should not have returned unwanted trace_id"
1546        );
1547
1548        service.shutdown().await?;
1549        cleanup();
1550        Ok(())
1551    }
1552
1553    /// Cursor pagination: first page → next → previous all return correct item counts.
1554    #[tokio::test]
1555    async fn test_summary_cursor_pagination() -> Result<(), TraceEngineError> {
1556        cleanup();
1557        let storage_settings = ObjectStorageSettings::default();
1558        let ctx = make_test_ctx(&storage_settings);
1559        let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1560
1561        let now = Utc::now();
1562        let summaries: Vec<TraceSummaryRecord> = (0u8..100)
1563            .map(|i| {
1564                let mut s = make_summary([i; 16], "svc", 0, vec![]);
1565                s.start_time = now - chrono::Duration::minutes(i as i64);
1566                s
1567            })
1568            .collect();
1569        service.write_summaries(summaries).await?;
1570        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1571
1572        let mut filters = TraceFilters {
1573            start_time: Some(now - chrono::Duration::hours(2)),
1574            end_time: Some(now + chrono::Duration::hours(1)),
1575            limit: Some(50),
1576            ..Default::default()
1577        };
1578
1579        // First page
1580        let first = service.query_service.get_paginated_traces(&filters).await?;
1581        assert_eq!(first.items.len(), 50, "first page: 50 items");
1582        assert!(
1583            first.next_cursor.is_some(),
1584            "first page: should have next_cursor"
1585        );
1586
1587        // Next page
1588        let next_cur = first.next_cursor.clone().unwrap();
1589        filters.cursor_start_time = Some(next_cur.start_time);
1590        filters.cursor_trace_id = Some(next_cur.trace_id.clone());
1591        filters.direction = Some("next".to_string());
1592        let second = service.query_service.get_paginated_traces(&filters).await?;
1593        assert_eq!(second.items.len(), 50, "second page: 50 items");
1594        assert!(
1595            second.items[0].start_time <= next_cur.start_time,
1596            "second page first item must be <= cursor"
1597        );
1598        assert!(second.previous_cursor.is_some());
1599
1600        // Previous page
1601        let prev_cur = second.previous_cursor.unwrap();
1602        filters.cursor_start_time = Some(prev_cur.start_time);
1603        filters.cursor_trace_id = Some(prev_cur.trace_id.clone());
1604        filters.direction = Some("previous".to_string());
1605        let prev = service.query_service.get_paginated_traces(&filters).await?;
1606        assert_eq!(prev.items.len(), 50, "previous page: 50 items");
1607
1608        service.shutdown().await?;
1609        cleanup();
1610        Ok(())
1611    }
1612
1613    /// Attribute-filter JOIN path: only traces with matching span attributes are returned.
1614    #[tokio::test]
1615    async fn test_summary_attribute_filter_via_join() -> Result<(), TraceEngineError> {
1616        use crate::parquet::tracing::service::TraceSpanService;
1617
1618        cleanup();
1619        let storage_settings = ObjectStorageSettings::default();
1620
1621        // TraceSpanService owns the SessionContext (trace_spans registered in it)
1622        let span_service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
1623        let shared_ctx = span_service.ctx.clone();
1624
1625        // TraceSummaryService shares the same ctx — JOIN to trace_spans will work
1626        let summary_service = TraceSummaryService::new(&storage_settings, 24, shared_ctx).await?;
1627
1628        let now = Utc::now();
1629        let kafka_trace = TraceId::from_bytes([70u8; 16]);
1630        let plain_trace = TraceId::from_bytes([80u8; 16]);
1631
1632        let kafka_span = make_span_record(
1633            &kafka_trace,
1634            SpanId::from_bytes([70u8; 8]),
1635            "svc",
1636            vec![Attribute {
1637                key: "component".to_string(),
1638                value: serde_json::Value::String("kafka".to_string()),
1639            }],
1640        );
1641        let plain_span =
1642            make_span_record(&plain_trace, SpanId::from_bytes([80u8; 8]), "svc", vec![]);
1643        span_service
1644            .write_spans(vec![kafka_span, plain_span])
1645            .await?;
1646
1647        let mut kafka_summary = make_summary([70u8; 16], "svc", 0, vec![]);
1648        kafka_summary.start_time = now;
1649        let mut plain_summary = make_summary([80u8; 16], "svc", 0, vec![]);
1650        plain_summary.start_time = now;
1651        summary_service
1652            .write_summaries(vec![kafka_summary, plain_summary])
1653            .await?;
1654
1655        tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
1656
1657        let filters = TraceFilters {
1658            start_time: Some(now - chrono::Duration::hours(1)),
1659            end_time: Some(now + chrono::Duration::hours(1)),
1660            attribute_filters: Some(vec!["component:kafka".to_string()]),
1661            limit: Some(25),
1662            ..Default::default()
1663        };
1664
1665        let response = summary_service
1666            .query_service
1667            .get_paginated_traces(&filters)
1668            .await?;
1669
1670        assert!(
1671            !response.items.is_empty(),
1672            "attribute filter must return results"
1673        );
1674        assert!(
1675            response
1676                .items
1677                .iter()
1678                .all(|i| i.trace_id == kafka_trace.to_hex()),
1679            "only kafka trace should appear; got {:?}",
1680            response
1681                .items
1682                .iter()
1683                .map(|i| &i.trace_id)
1684                .collect::<Vec<_>>()
1685        );
1686
1687        span_service.shutdown().await?;
1688        summary_service.shutdown().await?;
1689        cleanup();
1690        Ok(())
1691    }
1692
1693    /// queue_uid filter: only traces whose queue_ids contain the target UID are returned,
1694    /// and the matching trace's spans can be fetched by trace_id.
1695    #[tokio::test]
1696    async fn test_summary_queue_id_filter_and_span_lookup() -> Result<(), TraceEngineError> {
1697        use crate::parquet::tracing::service::TraceSpanService;
1698
1699        cleanup();
1700        let storage_settings = ObjectStorageSettings::default();
1701
1702        // TraceSpanService owns the SessionContext
1703        let span_service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
1704        let shared_ctx = span_service.ctx.clone();
1705
1706        // TraceSummaryService shares the same ctx so JOIN path works
1707        let summary_service = TraceSummaryService::new(&storage_settings, 24, shared_ctx).await?;
1708
1709        let now = Utc::now();
1710        let queue_trace = TraceId::from_bytes([90u8; 16]);
1711        let plain_trace = TraceId::from_bytes([91u8; 16]);
1712        let target_queue_uid = "queue-record-abc123";
1713
1714        // Write spans for both traces
1715        let queue_span = make_span_record(
1716            &queue_trace,
1717            SpanId::from_bytes([90u8; 8]),
1718            "svc_queue",
1719            vec![],
1720        );
1721        let plain_span = make_span_record(
1722            &plain_trace,
1723            SpanId::from_bytes([91u8; 8]),
1724            "svc_queue",
1725            vec![],
1726        );
1727        span_service
1728            .write_spans_direct(vec![queue_span, plain_span])
1729            .await?;
1730
1731        // Write summaries: one with a matching queue_id, one without
1732        let mut queue_summary = make_summary([90u8; 16], "svc_queue", 0, vec![]);
1733        queue_summary.start_time = now;
1734        queue_summary.queue_ids = vec![target_queue_uid.to_string()];
1735
1736        let mut plain_summary = make_summary([91u8; 16], "svc_queue", 0, vec![]);
1737        plain_summary.start_time = now;
1738        // queue_ids left empty — should not appear in results
1739
1740        summary_service
1741            .write_summaries(vec![queue_summary, plain_summary])
1742            .await?;
1743
1744        tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
1745
1746        // ── Step 1: query summaries by queue_uid ─────────────────────────────────
1747        let filters = TraceFilters {
1748            start_time: Some(now - chrono::Duration::hours(1)),
1749            end_time: Some(now + chrono::Duration::hours(1)),
1750            queue_uid: Some(target_queue_uid.to_string()),
1751            limit: Some(25),
1752            ..Default::default()
1753        };
1754
1755        let response = summary_service
1756            .query_service
1757            .get_paginated_traces(&filters)
1758            .await?;
1759
1760        assert!(
1761            !response.items.is_empty(),
1762            "queue_uid filter must return at least one result"
1763        );
1764        assert!(
1765            response
1766                .items
1767                .iter()
1768                .all(|i| i.trace_id == queue_trace.to_hex()),
1769            "only the queue trace should appear; got {:?}",
1770            response
1771                .items
1772                .iter()
1773                .map(|i| &i.trace_id)
1774                .collect::<Vec<_>>()
1775        );
1776
1777        // ── Step 2: fetch spans for the returned trace_id ─────────────────────────
1778        let returned_trace_id =
1779            TraceId::from_hex(&response.items[0].trace_id).expect("trace_id must be valid hex");
1780        let spans = span_service
1781            .query_service
1782            .get_trace_spans(
1783                Some(returned_trace_id.as_bytes()),
1784                None,
1785                Some(&(now - chrono::Duration::hours(1))),
1786                Some(&(now + chrono::Duration::hours(1))),
1787                None,
1788            )
1789            .await?;
1790
1791        assert!(
1792            !spans.is_empty(),
1793            "should find spans for the returned trace_id"
1794        );
1795
1796        span_service.shutdown().await?;
1797        summary_service.shutdown().await?;
1798        cleanup();
1799        Ok(())
1800    }
1801
1802    /// Build a deterministic `TraceSpanRecord` for use in summary tests.
1803    fn make_span_record(
1804        trace_id: &TraceId,
1805        span_id: SpanId,
1806        service_name: &str,
1807        attributes: Vec<Attribute>,
1808    ) -> TraceSpanRecord {
1809        let now = Utc::now();
1810        TraceSpanRecord {
1811            created_at: now,
1812            trace_id: trace_id.clone(),
1813            span_id,
1814            parent_span_id: None,
1815            flags: 1,
1816            trace_state: String::new(),
1817            scope_name: "test.scope".to_string(),
1818            scope_version: None,
1819            span_name: "op".to_string(),
1820            span_kind: "INTERNAL".to_string(),
1821            start_time: now,
1822            end_time: now + chrono::Duration::milliseconds(100),
1823            duration_ms: 100,
1824            status_code: 0,
1825            status_message: "OK".to_string(),
1826            attributes,
1827            events: vec![],
1828            links: vec![],
1829            label: None,
1830            input: serde_json::Value::Null,
1831            output: serde_json::Value::Null,
1832            service_name: service_name.to_string(),
1833            resource_attributes: vec![],
1834        }
1835    }
1836
1837    /// `resource_attributes` survive a write → read round-trip.
1838    #[tokio::test]
1839    async fn test_summary_resource_attributes_roundtrip() -> Result<(), TraceEngineError> {
1840        cleanup();
1841
1842        let storage_settings = ObjectStorageSettings::default();
1843        let ctx = make_test_ctx(&storage_settings);
1844        let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1845
1846        let attrs = vec![Attribute {
1847            key: "cloud.region".to_string(),
1848            value: serde_json::Value::String("us-east-1".to_string()),
1849        }];
1850        let summary = make_summary([9u8; 16], "svc", 0, attrs.clone());
1851        service.write_summaries(vec![summary]).await?;
1852        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1853
1854        let start = Utc::now() - chrono::Duration::hours(1);
1855        let end = Utc::now() + chrono::Duration::hours(1);
1856        let filters = TraceFilters {
1857            service_name: None,
1858            has_errors: None,
1859            status_code: None,
1860            start_time: Some(start),
1861            end_time: Some(end),
1862            limit: Some(25),
1863            cursor_start_time: None,
1864            cursor_trace_id: None,
1865            direction: None,
1866            attribute_filters: None,
1867            trace_ids: Some(vec![TraceId::from_bytes([9u8; 16]).to_hex()]),
1868            entity_uid: None,
1869            queue_uid: None,
1870        };
1871
1872        let response = service.query_service.get_paginated_traces(&filters).await?;
1873        assert_eq!(response.items.len(), 1, "Expected exactly 1 item");
1874        assert_eq!(
1875            response.items[0].resource_attributes.len(),
1876            1,
1877            "Expected 1 resource attribute"
1878        );
1879        assert_eq!(response.items[0].resource_attributes[0].key, "cloud.region");
1880
1881        service.shutdown().await?;
1882        cleanup();
1883        Ok(())
1884    }
1885}