Skip to main content

scouter_dataframe/parquet/tracing/
summary.rs

1use crate::error::TraceEngineError;
2use crate::parquet::control::{get_pod_id, ControlTableEngine};
3use crate::parquet::tracing::catalog::TraceCatalogProvider;
4use crate::parquet::tracing::traits::{arrow_schema_to_delta, resource_attribute_field};
5use crate::parquet::utils::match_attr_expr;
6use crate::parquet::utils::register_cloud_logstore_factories;
7use crate::storage::ObjectStore;
8use arrow::array::*;
9use arrow::compute;
10use arrow::datatypes::*;
11use arrow_array::Array;
12use arrow_array::RecordBatch;
13use chrono::{DateTime, Datelike, Utc};
14use datafusion::logical_expr::{cast as df_cast, col, lit, SortExpr};
15use datafusion::prelude::*;
16use datafusion::scalar::ScalarValue;
17use deltalake::operations::optimize::OptimizeType;
18use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty};
19use scouter_types::sql::{TraceFilters, TraceListItem};
20use scouter_types::{Attribute, TraceCursor, TraceId, TracePaginationResponse, TraceSummaryRecord};
21use std::sync::Arc;
22use tokio::sync::oneshot;
23use tokio::sync::{mpsc, RwLock as AsyncRwLock};
24use tokio::time::{interval, Duration};
25use tracing::{debug, error, info, instrument};
26use url::Url;
27
28/// Days from CE epoch to Unix epoch (1970-01-01).
29/// Equivalent to `NaiveDate::from_ymd_opt(1970, 1, 1).unwrap().num_days_from_ce()`.
30const UNIX_EPOCH_DAYS: i32 = 719_163;
31
32const SUMMARY_TABLE_NAME: &str = "trace_summaries";
33
34/// Control table task name for summary compaction coordination.
35const TASK_SUMMARY_OPTIMIZE: &str = "summary_optimize";
36
37// ── Column name constants ────────────────────────────────────────────────────
38const TRACE_ID_COL: &str = "trace_id";
39const SERVICE_NAME_COL: &str = "service_name";
40const SCOPE_NAME_COL: &str = "scope_name";
41const SCOPE_VERSION_COL: &str = "scope_version";
42const ROOT_OPERATION_COL: &str = "root_operation";
43const START_TIME_COL: &str = "start_time";
44const END_TIME_COL: &str = "end_time";
45const DURATION_MS_COL: &str = "duration_ms";
46const STATUS_CODE_COL: &str = "status_code";
47const STATUS_MESSAGE_COL: &str = "status_message";
48const SPAN_COUNT_COL: &str = "span_count";
49const ERROR_COUNT_COL: &str = "error_count";
50const SEARCH_BLOB_COL: &str = "search_blob";
51
52const RESOURCE_ATTRIBUTES_COL: &str = "resource_attributes";
53const ENTITY_IDS_COL: &str = "entity_ids";
54const QUEUE_IDS_COL: &str = "queue_ids";
55
56const PARTITION_DATE_COL: &str = "partition_date";
57
58// ── Schema ───────────────────────────────────────────────────────────────────
59
60fn create_summary_schema() -> Schema {
61    Schema::new(vec![
62        Field::new(TRACE_ID_COL, DataType::FixedSizeBinary(16), false),
63        Field::new(
64            SERVICE_NAME_COL,
65            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
66            false,
67        ),
68        Field::new(SCOPE_NAME_COL, DataType::Utf8, false),
69        Field::new(SCOPE_VERSION_COL, DataType::Utf8, true),
70        Field::new(ROOT_OPERATION_COL, DataType::Utf8, false),
71        Field::new(
72            START_TIME_COL,
73            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
74            false,
75        ),
76        Field::new(
77            END_TIME_COL,
78            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
79            true,
80        ),
81        Field::new(DURATION_MS_COL, DataType::Int64, true),
82        Field::new(STATUS_CODE_COL, DataType::Int32, false),
83        Field::new(STATUS_MESSAGE_COL, DataType::Utf8, true),
84        Field::new(SPAN_COUNT_COL, DataType::Int64, false),
85        Field::new(ERROR_COUNT_COL, DataType::Int64, false),
86        resource_attribute_field(),
87        Field::new(
88            ENTITY_IDS_COL,
89            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
90            true,
91        ),
92        Field::new(
93            QUEUE_IDS_COL,
94            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
95            true,
96        ),
97        Field::new(PARTITION_DATE_COL, DataType::Date32, false),
98    ])
99}
100
101// ── BatchBuilder ─────────────────────────────────────────────────────────────
102
103struct TraceSummaryBatchBuilder {
104    schema: Arc<Schema>,
105    trace_id: FixedSizeBinaryBuilder,
106    service_name: StringDictionaryBuilder<Int32Type>,
107    scope_name: StringBuilder,
108    scope_version: StringBuilder,
109    root_operation: StringBuilder,
110    start_time: TimestampMicrosecondBuilder,
111    end_time: TimestampMicrosecondBuilder,
112    duration_ms: Int64Builder,
113    status_code: Int32Builder,
114    status_message: StringBuilder,
115    span_count: Int64Builder,
116    error_count: Int64Builder,
117    resource_attributes: MapBuilder<StringBuilder, StringViewBuilder>,
118    entity_ids: ListBuilder<StringBuilder>,
119    queue_ids: ListBuilder<StringBuilder>,
120    partition_date: Date32Builder,
121}
122
123impl TraceSummaryBatchBuilder {
124    fn new(schema: Arc<Schema>, capacity: usize) -> Self {
125        let map_field_names = MapFieldNames {
126            entry: "key_value".to_string(),
127            key: "key".to_string(),
128            value: "value".to_string(),
129        };
130        let resource_attributes = MapBuilder::new(
131            Some(map_field_names),
132            StringBuilder::new(),
133            StringViewBuilder::new(),
134        );
135        Self {
136            schema,
137            trace_id: FixedSizeBinaryBuilder::with_capacity(capacity, 16),
138            service_name: StringDictionaryBuilder::new(),
139            scope_name: StringBuilder::with_capacity(capacity, capacity * 16),
140            scope_version: StringBuilder::with_capacity(capacity, capacity * 8),
141            root_operation: StringBuilder::with_capacity(capacity, capacity * 32),
142            start_time: TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone("UTC"),
143            end_time: TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone("UTC"),
144            duration_ms: Int64Builder::with_capacity(capacity),
145            status_code: Int32Builder::with_capacity(capacity),
146            status_message: StringBuilder::with_capacity(capacity, capacity * 16),
147            span_count: Int64Builder::with_capacity(capacity),
148            error_count: Int64Builder::with_capacity(capacity),
149            resource_attributes,
150            entity_ids: ListBuilder::new(StringBuilder::new()),
151            queue_ids: ListBuilder::new(StringBuilder::new()),
152            partition_date: Date32Builder::with_capacity(capacity),
153        }
154    }
155
156    fn append(&mut self, rec: &TraceSummaryRecord) -> Result<(), TraceEngineError> {
157        self.trace_id.append_value(rec.trace_id.as_bytes())?;
158        self.service_name.append_value(&rec.service_name);
159        self.scope_name.append_value(&rec.scope_name);
160        if rec.scope_version.is_empty() {
161            self.scope_version.append_null();
162        } else {
163            self.scope_version.append_value(&rec.scope_version);
164        }
165        self.root_operation.append_value(&rec.root_operation);
166        self.start_time
167            .append_value(rec.start_time.timestamp_micros());
168        match rec.end_time {
169            Some(end) => self.end_time.append_value(end.timestamp_micros()),
170            None => self.end_time.append_null(),
171        }
172        let duration = rec
173            .end_time
174            .map(|end| (end - rec.start_time).num_milliseconds().max(0));
175        match duration {
176            Some(d) => self.duration_ms.append_value(d),
177            None => self.duration_ms.append_null(),
178        }
179        self.status_code.append_value(rec.status_code);
180        if rec.status_message.is_empty() {
181            self.status_message.append_null();
182        } else {
183            self.status_message.append_value(&rec.status_message);
184        }
185        self.span_count.append_value(rec.span_count);
186        self.error_count.append_value(rec.error_count);
187        if rec.resource_attributes.is_empty() {
188            self.resource_attributes.append(false)?; // null map
189        } else {
190            for attr in &rec.resource_attributes {
191                self.resource_attributes.keys().append_value(&attr.key);
192                let value_str =
193                    serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
194                self.resource_attributes.values().append_value(value_str);
195            }
196            self.resource_attributes.append(true)?;
197        }
198        if rec.entity_ids.is_empty() {
199            self.entity_ids.append_null();
200        } else {
201            for id in &rec.entity_ids {
202                self.entity_ids.values().append_value(id);
203            }
204            self.entity_ids.append(true);
205        }
206        if rec.queue_ids.is_empty() {
207            self.queue_ids.append_null();
208        } else {
209            for id in &rec.queue_ids {
210                self.queue_ids.values().append_value(id);
211            }
212            self.queue_ids.append(true);
213        }
214        // Partition key — days since Unix epoch, derived from start_time
215        let days = rec.start_time.date_naive().num_days_from_ce() - UNIX_EPOCH_DAYS;
216        self.partition_date.append_value(days);
217        Ok(())
218    }
219
220    fn finish(mut self) -> Result<RecordBatch, TraceEngineError> {
221        let columns: Vec<Arc<dyn Array>> = vec![
222            Arc::new(self.trace_id.finish()),
223            Arc::new(self.service_name.finish()),
224            Arc::new(self.scope_name.finish()),
225            Arc::new(self.scope_version.finish()),
226            Arc::new(self.root_operation.finish()),
227            Arc::new(self.start_time.finish()),
228            Arc::new(self.end_time.finish()),
229            Arc::new(self.duration_ms.finish()),
230            Arc::new(self.status_code.finish()),
231            Arc::new(self.status_message.finish()),
232            Arc::new(self.span_count.finish()),
233            Arc::new(self.error_count.finish()),
234            Arc::new(self.resource_attributes.finish()),
235            Arc::new(self.entity_ids.finish()),
236            Arc::new(self.queue_ids.finish()),
237            Arc::new(self.partition_date.finish()),
238        ];
239        RecordBatch::try_new(self.schema, columns).map_err(Into::into)
240    }
241}
242
243// ── TableCommand ─────────────────────────────────────────────────────────────
244
245pub enum SummaryTableCommand {
246    Write {
247        records: Vec<TraceSummaryRecord>,
248        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
249    },
250    Optimize {
251        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
252    },
253    Vacuum {
254        retention_hours: u64,
255        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
256    },
257    Shutdown,
258}
259
260async fn build_summary_url(object_store: &ObjectStore) -> Result<Url, TraceEngineError> {
261    let mut base = object_store.get_base_url()?;
262    let mut path = base.path().to_string();
263    if !path.ends_with('/') {
264        path.push('/');
265    }
266    path.push_str(SUMMARY_TABLE_NAME);
267    base.set_path(&path);
268    Ok(base)
269}
270
271async fn create_summary_table(
272    object_store: &ObjectStore,
273    table_url: Url,
274    schema: SchemaRef,
275) -> Result<DeltaTable, TraceEngineError> {
276    info!(
277        "Creating trace summary table [{}://.../{} ]",
278        table_url.scheme(),
279        table_url
280            .path_segments()
281            .and_then(|mut s| s.next_back())
282            .unwrap_or(SUMMARY_TABLE_NAME)
283    );
284    let store = object_store.as_dyn_object_store();
285    let table = DeltaTableBuilder::from_url(table_url.clone())?
286        .with_storage_backend(store, table_url)
287        .build()?;
288    let delta_fields = arrow_schema_to_delta(&schema);
289    table
290        .create()
291        .with_table_name(SUMMARY_TABLE_NAME)
292        .with_columns(delta_fields)
293        .with_partition_columns(vec![PARTITION_DATE_COL.to_string()])
294        // Only collect min/max statistics for non-binary columns.
295        // trace_id (FixedSizeBinary) has no meaningful ordering for file-level pruning.
296        .with_configuration_property(
297            TableProperty::DataSkippingStatsColumns,
298            Some("start_time,end_time,service_name,duration_ms,status_code,span_count,error_count,partition_date"),
299        )
300        .await
301        .map_err(Into::into)
302}
303
304async fn build_or_create_summary_table(
305    object_store: &ObjectStore,
306    schema: SchemaRef,
307) -> Result<DeltaTable, TraceEngineError> {
308    register_cloud_logstore_factories();
309    let table_url = build_summary_url(object_store).await?;
310    info!(
311        "Loading trace summary table [{}://.../{} ]",
312        table_url.scheme(),
313        table_url
314            .path_segments()
315            .and_then(|mut s| s.next_back())
316            .unwrap_or(SUMMARY_TABLE_NAME)
317    );
318
319    // Check whether a Delta log actually exists. For local tables, check the
320    // filesystem directly. For remote tables, attempt a full load with the
321    // explicit storage backend — required for S3/GCS/Azure where Delta Lake
322    // cannot infer the object store from the URL scheme alone.
323    let is_delta_table = if table_url.scheme() == "file" {
324        if let Ok(path) = table_url.to_file_path() {
325            if !path.exists() {
326                info!("Creating directory for summary table: {:?}", path);
327                std::fs::create_dir_all(&path)?;
328            }
329            path.join("_delta_log").exists()
330        } else {
331            false
332        }
333    } else {
334        let store = object_store.as_dyn_object_store();
335        match DeltaTableBuilder::from_url(table_url.clone()) {
336            Ok(builder) => builder
337                .with_storage_backend(store, table_url.clone())
338                .load()
339                .await
340                .is_ok(),
341            Err(_) => false,
342        }
343    };
344
345    if is_delta_table {
346        info!(
347            "Loaded existing trace summary table [{}://.../{} ]",
348            table_url.scheme(),
349            table_url
350                .path_segments()
351                .and_then(|mut s| s.next_back())
352                .unwrap_or(SUMMARY_TABLE_NAME)
353        );
354        let store = object_store.as_dyn_object_store();
355        DeltaTableBuilder::from_url(table_url.clone())?
356            .with_storage_backend(store, table_url)
357            .load()
358            .await
359            .map_err(Into::into)
360    } else {
361        info!("Summary table does not exist, creating new table");
362        create_summary_table(object_store, table_url, schema).await
363    }
364}
365
366pub struct TraceSummaryDBEngine {
367    schema: Arc<Schema>,
368    table: Arc<AsyncRwLock<DeltaTable>>,
369    pub ctx: Arc<SessionContext>,
370    /// Shared atomic catalog — calls `swap()` to update the summary `TableProvider`
371    /// without a deregister/register gap visible to concurrent readers.
372    catalog: Arc<TraceCatalogProvider>,
373    control: ControlTableEngine,
374}
375
376impl TraceSummaryDBEngine {
377    /// Create a new `TraceSummaryDBEngine` using the provided shared `SessionContext` and catalog.
378    ///
379    /// The caller is responsible for passing the `SessionContext` and `TraceCatalogProvider`
380    /// created by `TraceSpanDBEngine::new()`. This ensures both `trace_spans` and
381    /// `trace_summaries` share the same context and atomic DashMap-backed catalog.
382    pub async fn new(
383        object_store: &ObjectStore,
384        ctx: Arc<SessionContext>,
385        catalog: Arc<TraceCatalogProvider>,
386    ) -> Result<Self, TraceEngineError> {
387        let schema = Arc::new(create_summary_schema());
388        let delta_table = build_or_create_summary_table(object_store, schema.clone()).await?;
389        // A freshly-created table has no committed Parquet files yet — table_provider()
390        // returns an error in that case. Defer registration until the first write.
391        if let Ok(provider) = delta_table.table_provider().await {
392            catalog.swap(SUMMARY_TABLE_NAME, provider);
393        } else {
394            info!("Empty summary table at init — deferring catalog registration until first write");
395        }
396
397        let control = ControlTableEngine::new(object_store, get_pod_id()).await?;
398
399        Ok(TraceSummaryDBEngine {
400            schema,
401            table: Arc::new(AsyncRwLock::new(delta_table)),
402            ctx,
403            catalog,
404            control,
405        })
406    }
407
408    fn build_batch(
409        &self,
410        records: Vec<TraceSummaryRecord>,
411    ) -> Result<RecordBatch, TraceEngineError> {
412        let mut builder = TraceSummaryBatchBuilder::new(self.schema.clone(), records.len());
413        for rec in &records {
414            builder.append(rec)?;
415        }
416        builder.finish()
417    }
418
419    async fn write_records(
420        &self,
421        records: Vec<TraceSummaryRecord>,
422    ) -> Result<(), TraceEngineError> {
423        let count = records.len();
424        info!("Writing {} trace summaries", count);
425        let batch = self.build_batch(records)?;
426
427        let mut table_guard = self.table.write().await;
428        // update_incremental is intentionally omitted here.
429        //
430        // This engine runs as a single-writer actor — no other process commits to this
431        // Delta table, so the in-memory state is always current. Calling update_incremental
432        // can mutate table_guard into a corrupted intermediate state before the error
433        // propagates, producing a DeltaTable whose snapshot may not reflect newly written
434        // files — causing stale query results until restart.
435
436        let current_table = table_guard.clone();
437        let updated_table = current_table
438            .write(vec![batch])
439            .with_save_mode(deltalake::protocol::SaveMode::Append)
440            .with_partition_columns(vec![PARTITION_DATE_COL.to_string()])
441            .await?;
442
443        let new_provider = updated_table.table_provider().await?;
444        // Atomic single-step swap — no deregister/register gap where queries see "not found".
445        self.catalog.swap(SUMMARY_TABLE_NAME, new_provider);
446        updated_table.update_datafusion_session(&self.ctx.state())?;
447
448        *table_guard = updated_table;
449        info!("Summary table updated with {} records", count);
450        Ok(())
451    }
452
453    async fn optimize_table(&self) -> Result<(), TraceEngineError> {
454        let mut table_guard = self.table.write().await;
455        let (updated_table, _metrics) = table_guard
456            .clone()
457            .optimize()
458            .with_target_size(std::num::NonZero::new(128 * 1024 * 1024).unwrap())
459            .with_type(OptimizeType::ZOrder(vec![
460                START_TIME_COL.to_string(),
461                SERVICE_NAME_COL.to_string(),
462            ]))
463            .await?;
464
465        self.catalog
466            .swap(SUMMARY_TABLE_NAME, updated_table.table_provider().await?);
467        updated_table.update_datafusion_session(&self.ctx.state())?;
468        *table_guard = updated_table;
469        Ok(())
470    }
471
472    async fn vacuum_table(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
473        let mut table_guard = self.table.write().await;
474        let (updated_table, _metrics) = table_guard
475            .clone()
476            .vacuum()
477            .with_retention_period(chrono::Duration::hours(retention_hours as i64))
478            .with_enforce_retention_duration(false)
479            .await?;
480
481        self.catalog
482            .swap(SUMMARY_TABLE_NAME, updated_table.table_provider().await?);
483        updated_table.update_datafusion_session(&self.ctx.state())?;
484        *table_guard = updated_table;
485        Ok(())
486    }
487
488    /// Refresh the in-memory Delta table snapshot from shared object storage.
489    ///
490    /// Runs periodically on every pod so that read pods pick up commits written
491    /// by the write pod. Safety: clones the table before `update_incremental` so a
492    /// failure leaves the original guard intact.
493    async fn refresh_table(&self) -> Result<(), TraceEngineError> {
494        let mut table_guard = self.table.write().await;
495        let current_version = table_guard.version();
496
497        let mut refreshed = table_guard.clone();
498        match refreshed.update_incremental(None).await {
499            Ok(_) => {
500                if refreshed.version() > current_version {
501                    info!(
502                        "Summary table refreshed: v{:?} → v{:?}",
503                        current_version,
504                        refreshed.version()
505                    );
506                    let new_provider = refreshed.table_provider().await?;
507                    // Atomic swap — no gap between deregister and register.
508                    self.catalog.swap(SUMMARY_TABLE_NAME, new_provider);
509                    refreshed.update_datafusion_session(&self.ctx.state())?;
510                    *table_guard = refreshed;
511                }
512            }
513            Err(e) => {
514                // Tolerate: empty tables (no log yet), transient network errors.
515                // These are expected on freshly-created tables and do not indicate a bug.
516                debug!("Summary table refresh skipped: {}", e);
517            }
518        }
519        Ok(())
520    }
521
522    /// Try to claim and run the summary optimize task via the control table.
523    async fn try_run_optimize(&self, interval_hours: u64) {
524        match self.control.try_claim_task(TASK_SUMMARY_OPTIMIZE).await {
525            Ok(true) => match self.optimize_table().await {
526                Ok(()) => {
527                    if let Err(e) = self.vacuum_table(0).await {
528                        error!("Post-optimize vacuum failed: {}", e);
529                    }
530
531                    let _ = self
532                        .control
533                        .release_task(
534                            TASK_SUMMARY_OPTIMIZE,
535                            chrono::Duration::hours(interval_hours as i64),
536                        )
537                        .await;
538                }
539                Err(e) => {
540                    error!("Summary optimize failed: {}", e);
541                    let _ = self
542                        .control
543                        .release_task_on_failure(TASK_SUMMARY_OPTIMIZE)
544                        .await;
545                }
546            },
547            Ok(false) => { /* not due or another pod owns it */ }
548            Err(e) => error!("Summary optimize claim check failed: {}", e),
549        }
550    }
551
552    #[instrument(skip_all, name = "summary_engine_actor")]
553    pub fn start_actor(
554        self,
555        compaction_interval_hours: u64,
556        refresh_interval_secs: u64,
557    ) -> (
558        mpsc::Sender<SummaryTableCommand>,
559        tokio::task::JoinHandle<()>,
560    ) {
561        let (tx, mut rx) = mpsc::channel::<SummaryTableCommand>(100);
562
563        let handle = tokio::spawn(async move {
564            info!(refresh_interval_secs, "TraceSummaryDBEngine actor started");
565
566            // Poll every 5 minutes — the actual schedule is in the control table.
567            let mut scheduler_ticker = interval(Duration::from_secs(5 * 60));
568            scheduler_ticker.tick().await; // skip immediate tick
569
570            // Refresh ticker: picks up commits from the write pod on shared storage.
571            // Every pod must refresh its own in-memory snapshot independently.
572            // Clamp to 1s minimum — tokio::time::interval panics on Duration::ZERO.
573            let mut refresh_ticker = interval(Duration::from_secs(refresh_interval_secs.max(1)));
574            refresh_ticker.tick().await; // skip immediate tick
575
576            loop {
577                tokio::select! {
578                    Some(cmd) = rx.recv() => {
579                        match cmd {
580                            SummaryTableCommand::Write { records, respond_to } => {
581                                let result = self.write_records(records).await;
582                                if let Err(ref e) = result {
583                                    error!("Summary write failed: {}", e);
584                                }
585                                let _ = respond_to.send(result);
586                            }
587                            SummaryTableCommand::Optimize { respond_to } => {
588                                // Direct admin request — bypass control table.
589                                // Response is sent before vacuum so callers aren't blocked
590                                // on the potentially slow file-deletion pass.
591                                let _ = respond_to.send(self.optimize_table().await);
592                                if let Err(e) = self.vacuum_table(0).await {
593                                    error!("Post-optimize vacuum failed: {}", e);
594                                }
595                            }
596                            SummaryTableCommand::Vacuum { retention_hours, respond_to } => {
597                                let _ = respond_to.send(self.vacuum_table(retention_hours).await);
598                            }
599                            SummaryTableCommand::Shutdown => {
600                                info!("TraceSummaryDBEngine actor shutting down");
601                                break;
602                            }
603                        }
604                    }
605                    _ = scheduler_ticker.tick() => {
606                        self.try_run_optimize(compaction_interval_hours).await;
607                    }
608                    _ = refresh_ticker.tick() => {
609                        if let Err(e) = self.refresh_table().await {
610                            error!("Summary table refresh failed: {}", e);
611                        }
612                    }
613                }
614            }
615        });
616
617        (tx, handle)
618    }
619}
620
621// ── Service ──────────────────────────────────────────────────────────────────
622
623pub struct TraceSummaryService {
624    engine_tx: mpsc::Sender<SummaryTableCommand>,
625    engine_handle: tokio::task::JoinHandle<()>,
626    pub query_service: TraceSummaryQueries,
627}
628
629impl TraceSummaryService {
630    pub async fn new(
631        object_store: &ObjectStore,
632        compaction_interval_hours: u64,
633        ctx: Arc<SessionContext>,
634        catalog: Arc<TraceCatalogProvider>,
635        refresh_interval_secs: u64,
636    ) -> Result<Self, TraceEngineError> {
637        let engine = TraceSummaryDBEngine::new(object_store, ctx, catalog).await?;
638        let engine_ctx = engine.ctx.clone();
639        let (engine_tx, engine_handle) =
640            engine.start_actor(compaction_interval_hours, refresh_interval_secs);
641
642        Ok(TraceSummaryService {
643            engine_tx,
644            engine_handle,
645            query_service: TraceSummaryQueries::new(engine_ctx),
646        })
647    }
648
649    /// Write a batch of `TraceSummaryRecord`s to the Delta Lake summary table.
650    pub async fn write_summaries(
651        &self,
652        records: Vec<TraceSummaryRecord>,
653    ) -> Result<(), TraceEngineError> {
654        let (tx, rx) = oneshot::channel();
655        self.engine_tx
656            .send(SummaryTableCommand::Write {
657                records,
658                respond_to: tx,
659            })
660            .await
661            .map_err(|_| TraceEngineError::ChannelClosed)?;
662        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
663    }
664
665    pub async fn optimize(&self) -> Result<(), TraceEngineError> {
666        let (tx, rx) = oneshot::channel();
667        self.engine_tx
668            .send(SummaryTableCommand::Optimize { respond_to: tx })
669            .await
670            .map_err(|_| TraceEngineError::ChannelClosed)?;
671        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
672    }
673
674    pub async fn vacuum(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
675        let (tx, rx) = oneshot::channel();
676        self.engine_tx
677            .send(SummaryTableCommand::Vacuum {
678                retention_hours,
679                respond_to: tx,
680            })
681            .await
682            .map_err(|_| TraceEngineError::ChannelClosed)?;
683        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
684    }
685
686    /// Signal shutdown without consuming `self` — safe to call from `Arc<TraceSummaryService>`.
687    pub async fn signal_shutdown(&self) {
688        info!("TraceSummaryService signaling shutdown");
689        let _ = self.engine_tx.send(SummaryTableCommand::Shutdown).await;
690    }
691
692    pub async fn shutdown(self) -> Result<(), TraceEngineError> {
693        info!("TraceSummaryService shutting down");
694        self.engine_tx
695            .send(SummaryTableCommand::Shutdown)
696            .await
697            .map_err(|_| TraceEngineError::ChannelClosed)?;
698        if let Err(e) = self.engine_handle.await {
699            error!("Summary engine handle error: {}", e);
700        }
701        info!("TraceSummaryService shutdown complete");
702        Ok(())
703    }
704}
705
706// ── Queries ──────────────────────────────────────────────────────────────────
707
708pub struct TraceSummaryQueries {
709    ctx: Arc<SessionContext>,
710}
711
712impl TraceSummaryQueries {
713    pub fn new(ctx: Arc<SessionContext>) -> Self {
714        Self { ctx }
715    }
716
717    /// Get paginated traces from the Delta Lake summary table.
718    ///
719    /// The first step is a `GROUP BY trace_id` dedup query that merges any duplicate
720    /// rows (from late-arriving spans) using the same rules as `TraceAggregator`:
721    ///   - `SUM` for span/error counts, `MIN`/`MAX` for times, `MAX` for status_code
722    ///   - `FIRST_VALUE` ordered by `span_count DESC` for string fields
723    ///   - `array_distinct(flatten(array_agg(...)))` for entity/queue ID lists (full union)
724    ///
725    ///   Time filters are pushed into the SQL WHERE clause for partition pruning.
726    ///
727    ///   Secondary filters (service, errors, cursor) apply to the deduplicated DataFrame.
728    pub async fn get_paginated_traces(
729        &self,
730        filters: &TraceFilters,
731    ) -> Result<TracePaginationResponse, TraceEngineError> {
732        let limit = filters.limit.unwrap_or(50) as usize;
733        let direction = filters.direction.as_deref().unwrap_or("next");
734
735        // ── Dedup: time-filtered GROUP BY trace_id (DataFrame API) ───────────
736        use crate::parquet::tracing::queries::{date_lit, ts_lit};
737        use datafusion::functions_aggregate::expr_fn::{array_agg, first_value, max, min, sum};
738        use datafusion::functions_nested::set_ops::array_distinct;
739
740        let mut df = self.ctx.table(SUMMARY_TABLE_NAME).await?;
741
742        // ① Partition date — directory-level pruning (skips entire partition folders)
743        // ② start_time     — row-group-level pruning within matched files
744        if let Some(start) = filters.start_time {
745            df = df.filter(col(PARTITION_DATE_COL).gt_eq(date_lit(&start)))?;
746            df = df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start)))?;
747        }
748        if let Some(end) = filters.end_time {
749            df = df.filter(col(PARTITION_DATE_COL).lt_eq(date_lit(&end)))?;
750            df = df.filter(col(START_TIME_COL).lt(ts_lit(&end)))?;
751        }
752
753        // ORDER BY specs for FIRST_VALUE aggregates
754        // span_count DESC NULLS LAST, end_time DESC NULLS LAST
755        let by_span_end: Vec<SortExpr> = vec![
756            col(SPAN_COUNT_COL).sort(false, false),
757            col(END_TIME_COL).sort(false, false),
758        ];
759        // status_code DESC, span_count DESC
760        let by_status_span: Vec<SortExpr> = vec![
761            col(STATUS_CODE_COL).sort(false, false),
762            col(SPAN_COUNT_COL).sort(false, false),
763        ];
764
765        // Phase 1: aggregate
766        // _max_end_us / _min_start_us are hidden Int64 columns used to compute
767        // duration_ms post-aggregation (arithmetic across two aggregate exprs
768        // cannot be expressed in a single aggregate slot).
769        //
770        // entity_ids / queue_ids: array_agg without FILTER is intentional.
771        // array_flatten treats NULL outer-list elements as empty and skips them,
772        // giving identical results to FILTER (WHERE IS NOT NULL) for the GROUP BY
773        // case. Unlike the original SQL, this produces [] rather than NULL when
774        // ALL rows have null IDs — the safer outcome for downstream deserialization.
775        let mut df = df
776            .aggregate(
777                vec![col(TRACE_ID_COL)],
778                vec![
779                    min(col(START_TIME_COL)).alias(START_TIME_COL),
780                    max(col(END_TIME_COL)).alias(END_TIME_COL),
781                    max(df_cast(col(END_TIME_COL), DataType::Int64)).alias("_max_end_us"),
782                    min(df_cast(col(START_TIME_COL), DataType::Int64)).alias("_min_start_us"),
783                    max(col(STATUS_CODE_COL)).alias(STATUS_CODE_COL),
784                    sum(col(SPAN_COUNT_COL)).alias(SPAN_COUNT_COL),
785                    sum(col(ERROR_COUNT_COL)).alias(ERROR_COUNT_COL),
786                    first_value(col(SERVICE_NAME_COL), by_span_end.clone()).alias(SERVICE_NAME_COL),
787                    first_value(col(SCOPE_NAME_COL), by_span_end.clone()).alias(SCOPE_NAME_COL),
788                    first_value(col(SCOPE_VERSION_COL), by_span_end.clone())
789                        .alias(SCOPE_VERSION_COL),
790                    first_value(col(ROOT_OPERATION_COL), by_span_end.clone())
791                        .alias(ROOT_OPERATION_COL),
792                    first_value(col(STATUS_MESSAGE_COL), by_status_span).alias(STATUS_MESSAGE_COL),
793                    first_value(col(RESOURCE_ATTRIBUTES_COL), by_span_end)
794                        .alias(RESOURCE_ATTRIBUTES_COL),
795                    array_agg(col(ENTITY_IDS_COL)).alias("_entity_ids_raw"),
796                    array_agg(col(QUEUE_IDS_COL)).alias("_queue_ids_raw"),
797                ],
798            )?
799            // Phase 2: derive computed columns from hidden aggregates, then drop them
800            .with_column(
801                DURATION_MS_COL,
802                (col("_max_end_us") - col("_min_start_us")) / lit(1000i64),
803            )?
804            .with_column(
805                ENTITY_IDS_COL,
806                array_distinct(flatten(col("_entity_ids_raw"))),
807            )?
808            .with_column(
809                QUEUE_IDS_COL,
810                array_distinct(flatten(col("_queue_ids_raw"))),
811            )?
812            .drop_columns(&[
813                "_max_end_us",
814                "_min_start_us",
815                "_entity_ids_raw",
816                "_queue_ids_raw",
817            ])?;
818
819        // ── Secondary filters ────────────────────────────────────────────────
820        if let Some(ref svc) = filters.service_name {
821            df = df.filter(col(SERVICE_NAME_COL).eq(lit(svc.as_str())))?;
822        }
823        match filters.has_errors {
824            Some(true) => {
825                df = df.filter(col(ERROR_COUNT_COL).gt(lit(0i64)))?;
826            }
827            Some(false) => {
828                df = df.filter(col(ERROR_COUNT_COL).eq(lit(0i64)))?;
829            }
830            None => {}
831        }
832        if let Some(sc) = filters.status_code {
833            df = df.filter(col(STATUS_CODE_COL).eq(lit(sc)))?;
834        }
835
836        // ── entity_uid filter via array_has on List column ────────────────
837        if let Some(ref uid) = filters.entity_uid {
838            df = df.filter(datafusion::functions_nested::expr_fn::array_has(
839                col(ENTITY_IDS_COL),
840                lit(uid.as_str()),
841            ))?;
842        }
843
844        // ── queue_uid filter via array_has on List column ─────────────────
845        if let Some(ref uid) = filters.queue_uid {
846            df = df.filter(datafusion::functions_nested::expr_fn::array_has(
847                col(QUEUE_IDS_COL),
848                lit(uid.as_str()),
849            ))?;
850        }
851
852        // ── trace_ids IN filter ──────────────────────────────────────────────
853        if let Some(ref ids) = filters.trace_ids {
854            if !ids.is_empty() {
855                let binary_ids: Vec<Expr> = ids
856                    .iter()
857                    .filter_map(|hex| TraceId::hex_to_bytes(hex).ok())
858                    .map(|b| lit(ScalarValue::Binary(Some(b))))
859                    .collect();
860                if !binary_ids.is_empty() {
861                    df = df.filter(col(TRACE_ID_COL).in_list(binary_ids, false))?;
862                }
863            }
864        }
865
866        // ── Cursor filter in DataFusion ──────────────────────────────────────
867        // Equivalent to Postgres: `(start_time, trace_id) < (cursor_time, cursor_id)`
868        // for "next" or `> (cursor_time, cursor_id)` for "previous".
869        if let (Some(cursor_time), Some(ref cursor_id)) =
870            (filters.cursor_start_time, &filters.cursor_trace_id)
871        {
872            if let Ok(cursor_bytes) = TraceId::hex_to_bytes(cursor_id) {
873                let cursor_ts = lit(ScalarValue::TimestampMicrosecond(
874                    Some(cursor_time.timestamp_micros()),
875                    Some("UTC".into()),
876                ));
877                let cursor_tid = lit(ScalarValue::Binary(Some(cursor_bytes)));
878                let cursor_expr = if direction == "previous" {
879                    col(START_TIME_COL)
880                        .gt(cursor_ts.clone())
881                        .or(col(START_TIME_COL)
882                            .eq(cursor_ts)
883                            .and(col(TRACE_ID_COL).gt(cursor_tid)))
884                } else {
885                    col(START_TIME_COL)
886                        .lt(cursor_ts.clone())
887                        .or(col(START_TIME_COL)
888                            .eq(cursor_ts)
889                            .and(col(TRACE_ID_COL).lt(cursor_tid)))
890                };
891                df = df.filter(cursor_expr)?;
892            }
893        }
894
895        // ── Attribute filters via span lookup → IN list ──────────────────────
896        // Requires shared SessionContext (trace_spans must be registered in self.ctx).
897        // We execute the span query eagerly to collect matching trace IDs, then filter
898        // the summaries DataFrame with an IN-list predicate. This avoids a cross-table
899        // JOIN that causes DataFusion to report ambiguous `trace_id` column references.
900        if let Some(ref attr_filters) = filters.attribute_filters {
901            if !attr_filters.is_empty() {
902                let mut spans_df = self.ctx.table("trace_spans").await?.select_columns(&[
903                    TRACE_ID_COL,
904                    START_TIME_COL,
905                    SEARCH_BLOB_COL,
906                ])?;
907
908                // Time predicates on spans for partition pruning
909                if let Some(start) = filters.start_time {
910                    spans_df = spans_df.filter(col(START_TIME_COL).gt_eq(lit(
911                        ScalarValue::TimestampMicrosecond(
912                            Some(start.timestamp_micros()),
913                            Some("UTC".into()),
914                        ),
915                    )))?;
916                }
917                if let Some(end) = filters.end_time {
918                    spans_df = spans_df.filter(col(START_TIME_COL).lt(lit(
919                        ScalarValue::TimestampMicrosecond(
920                            Some(end.timestamp_micros()),
921                            Some("UTC".into()),
922                        ),
923                    )))?;
924                }
925
926                // OR-match each filter against search_blob.
927                // normalize_attr_filter converts "key:value" → "%key=value%" so the LIKE
928                // pattern matches the new pipe-bounded `|key=value|` blob format.
929                let mut attr_expr: Option<Expr> = None;
930                for f in attr_filters {
931                    let pattern = crate::parquet::tracing::queries::normalize_attr_filter(f);
932                    let cond = match_attr_expr(col(SEARCH_BLOB_COL), lit(pattern));
933                    attr_expr = Some(match attr_expr {
934                        None => cond,
935                        Some(e) => e.or(cond),
936                    });
937                }
938                if let Some(expr) = attr_expr {
939                    spans_df = spans_df.filter(expr)?;
940                }
941
942                // Collect matching trace IDs eagerly, then apply as IN-list filter.
943                // Use HashSet for O(1) dedup instead of O(n²) Vec::contains().
944                let span_batches = spans_df.select_columns(&[TRACE_ID_COL])?.collect().await?;
945                let mut seen_ids: std::collections::HashSet<Vec<u8>> =
946                    std::collections::HashSet::new();
947                let mut binary_ids: Vec<Expr> = Vec::new();
948                for batch in &span_batches {
949                    // trace_id may be FixedSizeBinary(16) or Binary after Delta round-trip.
950                    // Cast to Binary to handle both uniformly.
951                    if let Some(col_ref) = batch.column_by_name(TRACE_ID_COL) {
952                        let casted = compute::cast(col_ref, &DataType::Binary)?;
953                        let col_arr =
954                            casted
955                                .as_any()
956                                .downcast_ref::<BinaryArray>()
957                                .ok_or_else(|| {
958                                    TraceEngineError::DowncastError("trace_id to BinaryArray")
959                                })?;
960                        for i in 0..batch.num_rows() {
961                            let id_bytes = col_arr.value(i).to_vec();
962                            if seen_ids.insert(id_bytes.clone()) {
963                                binary_ids.push(lit(ScalarValue::Binary(Some(id_bytes))));
964                            }
965                        }
966                    }
967                }
968
969                if !binary_ids.is_empty() {
970                    df = df.filter(col(TRACE_ID_COL).in_list(binary_ids, false))?;
971                } else {
972                    // No matching spans → return empty result
973                    df = df.filter(lit(false))?;
974                }
975            }
976        }
977
978        // ── Sort: DESC for "next", ASC for "previous" ────────────────────────
979        // "previous" direction fetches the oldest limit+1 items newer than the cursor,
980        // which matches the original Rust post-reversal behavior.
981        df = if direction == "previous" {
982            df.sort(vec![
983                col(START_TIME_COL).sort(true, true),
984                col(TRACE_ID_COL).sort(true, true),
985            ])?
986        } else {
987            df.sort(vec![
988                col(START_TIME_COL).sort(false, false),
989                col(TRACE_ID_COL).sort(false, false),
990            ])?
991        };
992
993        // ── LIMIT pushed into DataFusion (fetch limit+1 to detect next page) ─
994        df = df.limit(0, Some(limit + 1))?;
995
996        let batches = df.collect().await?;
997        let mut items = batches_to_trace_list_items(batches)?;
998
999        let has_more = items.len() > limit;
1000        if has_more {
1001            items.pop(); // remove N+1 sentinel
1002        }
1003
1004        // Direction-specific cursor logic — mirrors the original PostgreSQL implementation.
1005        //
1006        // "next" (DESC order): items are newest-first. The sentinel tells us if older
1007        // items exist (has_next). Cursor presence means we navigated forward, so newer
1008        // items exist behind us (has_previous).
1009        //
1010        // "previous" (ASC order): items are oldest-first (closest-to-cursor first).
1011        // The sentinel tells us if even more newer items exist (has_previous). Cursor
1012        // presence means we navigated backward, so older items exist ahead (has_next).
1013        // Items stay in ASC order — no reversal — matching PG behavior exactly.
1014        let (has_next, next_cursor, has_previous, previous_cursor) = match direction {
1015            "next" => {
1016                let next_cursor = if has_more {
1017                    items.last().map(|item| TraceCursor {
1018                        start_time: item.start_time,
1019                        trace_id: item.trace_id.clone(),
1020                    })
1021                } else {
1022                    None
1023                };
1024
1025                let previous_cursor = items.first().map(|item| TraceCursor {
1026                    start_time: item.start_time,
1027                    trace_id: item.trace_id.clone(),
1028                });
1029
1030                (
1031                    has_more,
1032                    next_cursor,
1033                    filters.cursor_start_time.is_some(),
1034                    previous_cursor,
1035                )
1036            }
1037            "previous" => {
1038                // ASC order: items.last() is the newest (largest start_time).
1039                // To continue backward (fetch even newer items), the cursor must
1040                // point past the current page's newest item so `> cursor` excludes
1041                // everything already returned.
1042                let previous_cursor = if has_more {
1043                    items.last().map(|item| TraceCursor {
1044                        start_time: item.start_time,
1045                        trace_id: item.trace_id.clone(),
1046                    })
1047                } else {
1048                    None
1049                };
1050
1051                // items.first() is the oldest (smallest start_time).
1052                // To go forward (back toward newer-first / DESC pages), the cursor
1053                // must point at the oldest item so `< cursor` fetches older items.
1054                let next_cursor = items.first().map(|item| TraceCursor {
1055                    start_time: item.start_time,
1056                    trace_id: item.trace_id.clone(),
1057                });
1058
1059                (
1060                    filters.cursor_start_time.is_some(),
1061                    next_cursor,
1062                    has_more,
1063                    previous_cursor,
1064                )
1065            }
1066            _ => (false, None, false, None),
1067        };
1068
1069        Ok(TracePaginationResponse {
1070            items,
1071            has_next,
1072            next_cursor,
1073            has_previous,
1074            previous_cursor,
1075        })
1076    }
1077}
1078
1079// ── Arrow → TraceListItem conversion ─────────────────────────────────────────
1080
1081/// Extract attributes from a MapArray at a given row index.
1082fn extract_map_attributes(map_array: &MapArray, row_idx: usize) -> Vec<Attribute> {
1083    if map_array.is_null(row_idx) {
1084        return Vec::new();
1085    }
1086    let entry = map_array.value(row_idx);
1087    let Some(struct_array) = entry.as_any().downcast_ref::<StructArray>() else {
1088        tracing::warn!("extract_map_attributes: failed to downcast to StructArray");
1089        return Vec::new();
1090    };
1091    let Some(keys_arr) = compute::cast(struct_array.column(0).as_ref(), &DataType::Utf8).ok()
1092    else {
1093        tracing::warn!("extract_map_attributes: failed to cast keys to Utf8");
1094        return Vec::new();
1095    };
1096    let Some(keys) = keys_arr.as_any().downcast_ref::<StringArray>() else {
1097        tracing::warn!("extract_map_attributes: failed to downcast keys to StringArray");
1098        return Vec::new();
1099    };
1100    let Some(values_arr) = compute::cast(struct_array.column(1).as_ref(), &DataType::Utf8).ok()
1101    else {
1102        tracing::warn!("extract_map_attributes: failed to cast values to Utf8");
1103        return Vec::new();
1104    };
1105    let Some(values) = values_arr.as_any().downcast_ref::<StringArray>() else {
1106        tracing::warn!("extract_map_attributes: failed to downcast values to StringArray");
1107        return Vec::new();
1108    };
1109
1110    (0..struct_array.len())
1111        .map(|i| Attribute {
1112            key: keys.value(i).to_string(),
1113            value: serde_json::from_str(values.value(i)).unwrap_or(serde_json::Value::Null),
1114        })
1115        .collect()
1116}
1117
1118/// Extract a `Vec<String>` from a nullable `ListArray` at a given row index.
1119fn extract_list_strings(list: Option<&ListArray>, row_idx: usize) -> Vec<String> {
1120    let Some(list) = list else {
1121        return Vec::new();
1122    };
1123    if list.is_null(row_idx) {
1124        return Vec::new();
1125    }
1126    let inner = list.value(row_idx);
1127    let str_arr = compute::cast(&inner, &DataType::Utf8)
1128        .ok()
1129        .and_then(|a| a.as_any().downcast_ref::<StringArray>().cloned());
1130    match str_arr {
1131        Some(arr) => (0..arr.len())
1132            .filter(|i| !arr.is_null(*i))
1133            .map(|i| arr.value(i).to_string())
1134            .collect(),
1135        None => Vec::new(),
1136    }
1137}
1138
1139fn batches_to_trace_list_items(
1140    batches: Vec<RecordBatch>,
1141) -> Result<Vec<TraceListItem>, TraceEngineError> {
1142    let mut items = Vec::new();
1143
1144    for batch in &batches {
1145        // trace_id may come back as FixedSizeBinary(16) or Binary depending on
1146        // whether DataFusion/Delta round-tripped the schema. Handle both.
1147        let trace_id_col = batch.column_by_name(TRACE_ID_COL).ok_or_else(|| {
1148            TraceEngineError::UnsupportedOperation("missing trace_id column".into())
1149        })?;
1150        let trace_id_binary = compute::cast(trace_id_col, &DataType::Binary)?;
1151        let trace_ids = trace_id_binary
1152            .as_any()
1153            .downcast_ref::<BinaryArray>()
1154            .ok_or_else(|| {
1155                TraceEngineError::UnsupportedOperation("trace_id cast to BinaryArray failed".into())
1156            })?;
1157
1158        // Cast all string/dictionary columns to Utf8 uniformly (handles Utf8View,
1159        // Dictionary(Int32, Utf8), LargeUtf8, etc.).
1160        let svc_arr = compute::cast(
1161            batch.column_by_name(SERVICE_NAME_COL).ok_or_else(|| {
1162                TraceEngineError::UnsupportedOperation("missing service_name column".into())
1163            })?,
1164            &DataType::Utf8,
1165        )?;
1166        let service_names = svc_arr
1167            .as_any()
1168            .downcast_ref::<StringArray>()
1169            .ok_or_else(|| {
1170                TraceEngineError::UnsupportedOperation(
1171                    "service_name cast to StringArray failed".into(),
1172                )
1173            })?;
1174
1175        let scope_arr = compute::cast(
1176            batch.column_by_name(SCOPE_NAME_COL).ok_or_else(|| {
1177                TraceEngineError::UnsupportedOperation("missing scope_name column".into())
1178            })?,
1179            &DataType::Utf8,
1180        )?;
1181        let scope_names = scope_arr
1182            .as_any()
1183            .downcast_ref::<StringArray>()
1184            .ok_or_else(|| {
1185                TraceEngineError::UnsupportedOperation(
1186                    "scope_name cast to StringArray failed".into(),
1187                )
1188            })?;
1189
1190        let scopev_arr = compute::cast(
1191            batch.column_by_name(SCOPE_VERSION_COL).ok_or_else(|| {
1192                TraceEngineError::UnsupportedOperation("missing scope_version column".into())
1193            })?,
1194            &DataType::Utf8,
1195        )?;
1196        let scope_versions = scopev_arr
1197            .as_any()
1198            .downcast_ref::<StringArray>()
1199            .ok_or_else(|| {
1200                TraceEngineError::UnsupportedOperation(
1201                    "scope_version cast to StringArray failed".into(),
1202                )
1203            })?;
1204
1205        let root_arr = compute::cast(
1206            batch.column_by_name(ROOT_OPERATION_COL).ok_or_else(|| {
1207                TraceEngineError::UnsupportedOperation("missing root_operation column".into())
1208            })?,
1209            &DataType::Utf8,
1210        )?;
1211        let root_operations = root_arr
1212            .as_any()
1213            .downcast_ref::<StringArray>()
1214            .ok_or_else(|| {
1215                TraceEngineError::UnsupportedOperation(
1216                    "root_operation cast to StringArray failed".into(),
1217                )
1218            })?;
1219
1220        let sm_arr = compute::cast(
1221            batch.column_by_name(STATUS_MESSAGE_COL).ok_or_else(|| {
1222                TraceEngineError::UnsupportedOperation("missing status_message column".into())
1223            })?,
1224            &DataType::Utf8,
1225        )?;
1226        let status_messages = sm_arr
1227            .as_any()
1228            .downcast_ref::<StringArray>()
1229            .ok_or_else(|| {
1230                TraceEngineError::UnsupportedOperation(
1231                    "status_message cast to StringArray failed".into(),
1232                )
1233            })?;
1234
1235        let resource_attrs_map = batch
1236            .column_by_name(RESOURCE_ATTRIBUTES_COL)
1237            .and_then(|c| c.as_any().downcast_ref::<MapArray>())
1238            .ok_or_else(|| {
1239                TraceEngineError::UnsupportedOperation("missing resource_attributes column".into())
1240            })?;
1241
1242        let entity_ids_list = batch
1243            .column_by_name(ENTITY_IDS_COL)
1244            .and_then(|c| c.as_any().downcast_ref::<ListArray>());
1245
1246        let queue_ids_list = batch
1247            .column_by_name(QUEUE_IDS_COL)
1248            .and_then(|c| c.as_any().downcast_ref::<ListArray>());
1249
1250        let start_times = batch
1251            .column_by_name(START_TIME_COL)
1252            .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>())
1253            .ok_or_else(|| {
1254                TraceEngineError::UnsupportedOperation("missing start_time column".into())
1255            })?;
1256
1257        let end_times = batch
1258            .column_by_name(END_TIME_COL)
1259            .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>())
1260            .ok_or_else(|| {
1261                TraceEngineError::UnsupportedOperation("missing end_time column".into())
1262            })?;
1263
1264        let durations = batch
1265            .column_by_name(DURATION_MS_COL)
1266            .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1267            .ok_or_else(|| {
1268                TraceEngineError::UnsupportedOperation("missing duration_ms column".into())
1269            })?;
1270
1271        let status_codes = batch
1272            .column_by_name(STATUS_CODE_COL)
1273            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
1274            .ok_or_else(|| {
1275                TraceEngineError::UnsupportedOperation("missing status_code column".into())
1276            })?;
1277
1278        let span_counts = batch
1279            .column_by_name(SPAN_COUNT_COL)
1280            .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1281            .ok_or_else(|| {
1282                TraceEngineError::UnsupportedOperation("missing span_count column".into())
1283            })?;
1284
1285        let error_counts = batch
1286            .column_by_name(ERROR_COUNT_COL)
1287            .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1288            .ok_or_else(|| {
1289                TraceEngineError::UnsupportedOperation("missing error_count column".into())
1290            })?;
1291
1292        for i in 0..batch.num_rows() {
1293            let trace_id_hex = hex::encode(trace_ids.value(i));
1294
1295            let start_time = micros_to_datetime(start_times.value(i))?;
1296            let end_time = if end_times.is_null(i) {
1297                None
1298            } else {
1299                Some(micros_to_datetime(end_times.value(i))?)
1300            };
1301            let duration_ms = if durations.is_null(i) {
1302                None
1303            } else {
1304                Some(durations.value(i))
1305            };
1306            let error_count = error_counts.value(i);
1307
1308            let resource_attributes = extract_map_attributes(resource_attrs_map, i);
1309
1310            let entity_ids = extract_list_strings(entity_ids_list, i);
1311            let queue_ids = extract_list_strings(queue_ids_list, i);
1312
1313            items.push(TraceListItem {
1314                trace_id: trace_id_hex,
1315                service_name: service_names.value(i).to_string(),
1316                scope_name: scope_names.value(i).to_string(),
1317                scope_version: scope_versions.value(i).to_string(),
1318                root_operation: root_operations.value(i).to_string(),
1319                start_time,
1320                end_time,
1321                duration_ms,
1322                status_code: status_codes.value(i),
1323                status_message: if status_messages.is_null(i) {
1324                    None
1325                } else {
1326                    Some(status_messages.value(i).to_string())
1327                },
1328                span_count: span_counts.value(i),
1329                has_errors: error_count > 0,
1330                error_count,
1331                resource_attributes,
1332                entity_ids,
1333                queue_ids,
1334            });
1335        }
1336    }
1337
1338    Ok(items)
1339}
1340
1341fn micros_to_datetime(micros: i64) -> Result<DateTime<Utc>, TraceEngineError> {
1342    DateTime::from_timestamp_micros(micros).ok_or(TraceEngineError::InvalidTimestamp(
1343        "out-of-range microsecond timestamp",
1344    ))
1345}
1346
1347#[cfg(test)]
1348mod tests {
1349    use super::*;
1350    use crate::storage::ObjectStore;
1351    use scouter_settings::ObjectStorageSettings;
1352    use scouter_types::sql::TraceFilters;
1353    use scouter_types::{Attribute, SpanId, TraceId, TraceSpanRecord};
1354    use tracing_subscriber;
1355
1356    fn cleanup() {
1357        let _ = tracing_subscriber::fmt()
1358            .with_max_level(tracing::Level::INFO)
1359            .try_init();
1360
1361        let storage_settings = ObjectStorageSettings::default();
1362        let current_dir = std::env::current_dir().unwrap();
1363        let storage_path = current_dir.join(storage_settings.storage_root());
1364        if storage_path.exists() {
1365            let _ = std::fs::remove_dir_all(storage_path);
1366        }
1367    }
1368
1369    fn make_test_object_store(storage_settings: &ObjectStorageSettings) -> ObjectStore {
1370        ObjectStore::new(storage_settings).unwrap()
1371    }
1372
1373    /// Build a standalone `SessionContext` for test use with the scouter_tracing catalog
1374    /// configured as the default so unqualified table names resolve through our DashMap.
1375    fn make_test_ctx(object_store: &ObjectStore) -> Arc<SessionContext> {
1376        Arc::new(
1377            object_store
1378                .get_session_with_catalog(
1379                    crate::parquet::tracing::engine::TRACE_CATALOG_NAME,
1380                    "default",
1381                )
1382                .unwrap(),
1383        )
1384    }
1385
1386    /// Create a `TraceCatalogProvider`, register it on `ctx`, and return it.
1387    /// Call this after `make_test_ctx` to get the catalog for standalone summary tests.
1388    fn make_test_catalog(ctx: &Arc<SessionContext>) -> Arc<TraceCatalogProvider> {
1389        use datafusion::catalog::CatalogProvider;
1390        let catalog = Arc::new(TraceCatalogProvider::new());
1391        ctx.register_catalog(
1392            crate::parquet::tracing::engine::TRACE_CATALOG_NAME,
1393            Arc::clone(&catalog) as Arc<dyn CatalogProvider>,
1394        );
1395        catalog
1396    }
1397
1398    fn make_summary(
1399        trace_id_bytes: [u8; 16],
1400        service_name: &str,
1401        error_count: i64,
1402        resource_attributes: Vec<Attribute>,
1403    ) -> TraceSummaryRecord {
1404        let now = Utc::now();
1405        TraceSummaryRecord {
1406            trace_id: TraceId::from_bytes(trace_id_bytes),
1407            service_name: service_name.to_string(),
1408            scope_name: "test.scope".to_string(),
1409            scope_version: String::new(),
1410            root_operation: "root_op".to_string(),
1411            start_time: now,
1412            end_time: Some(now + chrono::Duration::milliseconds(200)),
1413            status_code: if error_count > 0 { 2 } else { 0 },
1414            status_message: if error_count > 0 {
1415                "Internal Server Error".to_string()
1416            } else {
1417                "OK".to_string()
1418            },
1419            span_count: 3,
1420            error_count,
1421            resource_attributes,
1422            entity_ids: vec![],
1423            queue_ids: vec![],
1424        }
1425    }
1426
1427    /// Basic write + paginate round-trip: writes two summaries and verifies both appear.
1428    #[tokio::test]
1429    async fn test_summary_write_and_paginate_basic() -> Result<(), TraceEngineError> {
1430        cleanup();
1431
1432        let storage_settings = ObjectStorageSettings::default();
1433        let object_store = make_test_object_store(&storage_settings);
1434        let ctx = make_test_ctx(&object_store);
1435        let catalog = make_test_catalog(&ctx);
1436        let service = TraceSummaryService::new(&object_store, 24, ctx, catalog, 10).await?;
1437
1438        let s1 = make_summary([1u8; 16], "svc_a", 0, vec![]);
1439        let s2 = make_summary([2u8; 16], "svc_b", 0, vec![]);
1440        service.write_summaries(vec![s1, s2]).await?;
1441        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1442
1443        let start = Utc::now() - chrono::Duration::hours(1);
1444        let end = Utc::now() + chrono::Duration::hours(1);
1445        let filters = TraceFilters {
1446            service_name: None,
1447            has_errors: None,
1448            status_code: None,
1449            start_time: Some(start),
1450            end_time: Some(end),
1451            limit: Some(25),
1452            cursor_start_time: None,
1453            cursor_trace_id: None,
1454            direction: None,
1455            attribute_filters: None,
1456            trace_ids: None,
1457            entity_uid: None,
1458            queue_uid: None,
1459        };
1460
1461        let response = service.query_service.get_paginated_traces(&filters).await?;
1462        assert!(
1463            response.items.len() >= 2,
1464            "Expected at least 2 items, got {}",
1465            response.items.len()
1466        );
1467
1468        service.shutdown().await?;
1469        cleanup();
1470        Ok(())
1471    }
1472
1473    /// `has_errors = Some(true)` returns only error traces; `Some(false)` returns only non-errors.
1474    #[tokio::test]
1475    async fn test_summary_has_errors_filter() -> Result<(), TraceEngineError> {
1476        cleanup();
1477
1478        let storage_settings = ObjectStorageSettings::default();
1479        let object_store = make_test_object_store(&storage_settings);
1480        let ctx = make_test_ctx(&object_store);
1481        let catalog = make_test_catalog(&ctx);
1482        let service = TraceSummaryService::new(&object_store, 24, ctx, catalog, 10).await?;
1483
1484        let ok_summary = make_summary([3u8; 16], "svc", 0, vec![]);
1485        let err_summary = make_summary([4u8; 16], "svc", 2, vec![]);
1486        service
1487            .write_summaries(vec![ok_summary, err_summary])
1488            .await?;
1489        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1490
1491        let start = Utc::now() - chrono::Duration::hours(1);
1492        let end = Utc::now() + chrono::Duration::hours(1);
1493
1494        let base_filters = TraceFilters {
1495            service_name: None,
1496            has_errors: None,
1497            status_code: None,
1498            start_time: Some(start),
1499            end_time: Some(end),
1500            limit: Some(25),
1501            cursor_start_time: None,
1502            cursor_trace_id: None,
1503            direction: None,
1504            attribute_filters: None,
1505            trace_ids: None,
1506            entity_uid: None,
1507            queue_uid: None,
1508        };
1509
1510        // has_errors = true → only error trace
1511        let mut filters_err = base_filters.clone();
1512        filters_err.has_errors = Some(true);
1513        let errors_only = service
1514            .query_service
1515            .get_paginated_traces(&filters_err)
1516            .await?;
1517        for item in &errors_only.items {
1518            assert!(
1519                item.error_count > 0,
1520                "Expected error trace, got: {:?}",
1521                item
1522            );
1523        }
1524        assert!(
1525            !errors_only.items.is_empty(),
1526            "Expected at least one error trace"
1527        );
1528
1529        // has_errors = false → only non-error traces
1530        let mut filters_ok = base_filters.clone();
1531        filters_ok.has_errors = Some(false);
1532        let no_errors = service
1533            .query_service
1534            .get_paginated_traces(&filters_ok)
1535            .await?;
1536        for item in &no_errors.items {
1537            assert_eq!(
1538                item.error_count, 0,
1539                "Expected non-error trace, got error_count={}",
1540                item.error_count
1541            );
1542        }
1543        assert!(
1544            !no_errors.items.is_empty(),
1545            "Expected at least one non-error trace"
1546        );
1547
1548        service.shutdown().await?;
1549        cleanup();
1550        Ok(())
1551    }
1552
1553    /// service_name filter returns only matching service traces.
1554    #[tokio::test]
1555    async fn test_summary_service_name_filter() -> Result<(), TraceEngineError> {
1556        cleanup();
1557
1558        let storage_settings = ObjectStorageSettings::default();
1559        let object_store = make_test_object_store(&storage_settings);
1560        let ctx = make_test_ctx(&object_store);
1561        let catalog = make_test_catalog(&ctx);
1562        let service = TraceSummaryService::new(&object_store, 24, ctx, catalog, 10).await?;
1563
1564        let s_alpha = make_summary([5u8; 16], "alpha_service", 0, vec![]);
1565        let s_beta = make_summary([6u8; 16], "beta_service", 0, vec![]);
1566        service.write_summaries(vec![s_alpha, s_beta]).await?;
1567        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1568
1569        let start = Utc::now() - chrono::Duration::hours(1);
1570        let end = Utc::now() + chrono::Duration::hours(1);
1571        let filters = TraceFilters {
1572            service_name: Some("alpha_service".to_string()),
1573            has_errors: None,
1574            status_code: None,
1575            start_time: Some(start),
1576            end_time: Some(end),
1577            limit: Some(25),
1578            cursor_start_time: None,
1579            cursor_trace_id: None,
1580            direction: None,
1581            attribute_filters: None,
1582            trace_ids: None,
1583            entity_uid: None,
1584            queue_uid: None,
1585        };
1586
1587        let response = service.query_service.get_paginated_traces(&filters).await?;
1588        assert!(
1589            !response.items.is_empty(),
1590            "Expected results for alpha_service"
1591        );
1592        for item in &response.items {
1593            assert_eq!(
1594                item.service_name, "alpha_service",
1595                "Expected only alpha_service items, got: {}",
1596                item.service_name
1597            );
1598        }
1599
1600        service.shutdown().await?;
1601        cleanup();
1602        Ok(())
1603    }
1604
1605    /// trace_ids IN filter returns only the specified traces.
1606    #[tokio::test]
1607    async fn test_summary_trace_ids_filter() -> Result<(), TraceEngineError> {
1608        cleanup();
1609
1610        let storage_settings = ObjectStorageSettings::default();
1611        let object_store = make_test_object_store(&storage_settings);
1612        let ctx = make_test_ctx(&object_store);
1613        let catalog = make_test_catalog(&ctx);
1614        let service = TraceSummaryService::new(&object_store, 24, ctx, catalog, 10).await?;
1615
1616        let wanted_id = TraceId::from_bytes([7u8; 16]);
1617        let unwanted_id = TraceId::from_bytes([8u8; 16]);
1618
1619        let s1 = make_summary([7u8; 16], "svc", 0, vec![]);
1620        let s2 = make_summary([8u8; 16], "svc", 0, vec![]);
1621        service.write_summaries(vec![s1, s2]).await?;
1622        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1623
1624        let start = Utc::now() - chrono::Duration::hours(1);
1625        let end = Utc::now() + chrono::Duration::hours(1);
1626        let filters = TraceFilters {
1627            service_name: None,
1628            has_errors: None,
1629            status_code: None,
1630            start_time: Some(start),
1631            end_time: Some(end),
1632            limit: Some(25),
1633            cursor_start_time: None,
1634            cursor_trace_id: None,
1635            direction: None,
1636            attribute_filters: None,
1637            trace_ids: Some(vec![wanted_id.to_hex()]),
1638            entity_uid: None,
1639            queue_uid: None,
1640        };
1641
1642        let response = service.query_service.get_paginated_traces(&filters).await?;
1643        assert_eq!(
1644            response.items.len(),
1645            1,
1646            "Expected exactly 1 item from trace_ids filter"
1647        );
1648        assert_eq!(
1649            response.items[0].trace_id,
1650            wanted_id.to_hex(),
1651            "Returned wrong trace_id"
1652        );
1653        assert_ne!(
1654            response.items[0].trace_id,
1655            unwanted_id.to_hex(),
1656            "Should not have returned unwanted trace_id"
1657        );
1658
1659        service.shutdown().await?;
1660        cleanup();
1661        Ok(())
1662    }
1663
1664    /// Cursor pagination: first page → next → previous all return correct item counts.
1665    #[tokio::test]
1666    async fn test_summary_cursor_pagination() -> Result<(), TraceEngineError> {
1667        cleanup();
1668        let storage_settings = ObjectStorageSettings::default();
1669        let object_store = make_test_object_store(&storage_settings);
1670        let ctx = make_test_ctx(&object_store);
1671        let catalog = make_test_catalog(&ctx);
1672        let service = TraceSummaryService::new(&object_store, 24, ctx, catalog, 10).await?;
1673
1674        let now = Utc::now();
1675        let summaries: Vec<TraceSummaryRecord> = (0u8..100)
1676            .map(|i| {
1677                let mut s = make_summary([i; 16], "svc", 0, vec![]);
1678                s.start_time = now - chrono::Duration::minutes(i as i64);
1679                s
1680            })
1681            .collect();
1682        service.write_summaries(summaries).await?;
1683        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1684
1685        let mut filters = TraceFilters {
1686            start_time: Some(now - chrono::Duration::hours(2)),
1687            end_time: Some(now + chrono::Duration::hours(1)),
1688            limit: Some(50),
1689            ..Default::default()
1690        };
1691
1692        // First page
1693        let first = service.query_service.get_paginated_traces(&filters).await?;
1694        assert_eq!(first.items.len(), 50, "first page: 50 items");
1695        assert!(
1696            first.next_cursor.is_some(),
1697            "first page: should have next_cursor"
1698        );
1699
1700        // Next page
1701        let next_cur = first.next_cursor.clone().unwrap();
1702        filters.cursor_start_time = Some(next_cur.start_time);
1703        filters.cursor_trace_id = Some(next_cur.trace_id.clone());
1704        filters.direction = Some("next".to_string());
1705        let second = service.query_service.get_paginated_traces(&filters).await?;
1706        assert_eq!(second.items.len(), 50, "second page: 50 items");
1707        assert!(
1708            second.items[0].start_time <= next_cur.start_time,
1709            "second page first item must be <= cursor"
1710        );
1711        assert!(second.previous_cursor.is_some());
1712
1713        // Previous page
1714        let prev_cur = second.previous_cursor.unwrap();
1715        filters.cursor_start_time = Some(prev_cur.start_time);
1716        filters.cursor_trace_id = Some(prev_cur.trace_id.clone());
1717        filters.direction = Some("previous".to_string());
1718        let prev = service.query_service.get_paginated_traces(&filters).await?;
1719        assert_eq!(prev.items.len(), 50, "previous page: 50 items");
1720
1721        service.shutdown().await?;
1722        cleanup();
1723        Ok(())
1724    }
1725
1726    /// Attribute-filter JOIN path: only traces with matching span attributes are returned.
1727    #[tokio::test]
1728    async fn test_summary_attribute_filter_via_join() -> Result<(), TraceEngineError> {
1729        use crate::parquet::tracing::service::TraceSpanService;
1730
1731        cleanup();
1732        let storage_settings = ObjectStorageSettings::default();
1733
1734        // TraceSpanService owns the SessionContext (trace_spans registered in it)
1735        let span_service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
1736        let shared_ctx = span_service.ctx.clone();
1737
1738        // TraceSummaryService shares the same ctx + catalog — JOIN to trace_spans will work
1739        let summary_service = TraceSummaryService::new(
1740            &span_service.object_store,
1741            24,
1742            shared_ctx,
1743            span_service.catalog.clone(),
1744            10,
1745        )
1746        .await?;
1747
1748        let now = Utc::now();
1749        let kafka_trace = TraceId::from_bytes([70u8; 16]);
1750        let plain_trace = TraceId::from_bytes([80u8; 16]);
1751
1752        let kafka_span = make_span_record(
1753            &kafka_trace,
1754            SpanId::from_bytes([70u8; 8]),
1755            "svc",
1756            vec![Attribute {
1757                key: "component".to_string(),
1758                value: serde_json::Value::String("kafka".to_string()),
1759            }],
1760        );
1761        let plain_span =
1762            make_span_record(&plain_trace, SpanId::from_bytes([80u8; 8]), "svc", vec![]);
1763        span_service
1764            .write_spans(vec![kafka_span, plain_span])
1765            .await?;
1766
1767        let mut kafka_summary = make_summary([70u8; 16], "svc", 0, vec![]);
1768        kafka_summary.start_time = now;
1769        let mut plain_summary = make_summary([80u8; 16], "svc", 0, vec![]);
1770        plain_summary.start_time = now;
1771        summary_service
1772            .write_summaries(vec![kafka_summary, plain_summary])
1773            .await?;
1774
1775        tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
1776
1777        let filters = TraceFilters {
1778            start_time: Some(now - chrono::Duration::hours(1)),
1779            end_time: Some(now + chrono::Duration::hours(1)),
1780            attribute_filters: Some(vec!["component:kafka".to_string()]),
1781            limit: Some(25),
1782            ..Default::default()
1783        };
1784
1785        let response = summary_service
1786            .query_service
1787            .get_paginated_traces(&filters)
1788            .await?;
1789
1790        assert!(
1791            !response.items.is_empty(),
1792            "attribute filter must return results"
1793        );
1794        assert!(
1795            response
1796                .items
1797                .iter()
1798                .all(|i| i.trace_id == kafka_trace.to_hex()),
1799            "only kafka trace should appear; got {:?}",
1800            response
1801                .items
1802                .iter()
1803                .map(|i| &i.trace_id)
1804                .collect::<Vec<_>>()
1805        );
1806
1807        span_service.shutdown().await?;
1808        summary_service.shutdown().await?;
1809        cleanup();
1810        Ok(())
1811    }
1812
1813    /// queue_uid filter: only traces whose queue_ids contain the target UID are returned,
1814    /// and the matching trace's spans can be fetched by trace_id.
1815    #[tokio::test]
1816    async fn test_summary_queue_id_filter_and_span_lookup() -> Result<(), TraceEngineError> {
1817        use crate::parquet::tracing::service::TraceSpanService;
1818
1819        cleanup();
1820        let storage_settings = ObjectStorageSettings::default();
1821
1822        // TraceSpanService owns the SessionContext
1823        let span_service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
1824        let shared_ctx = span_service.ctx.clone();
1825
1826        // TraceSummaryService shares the same ctx + catalog so JOIN path works
1827        let summary_service = TraceSummaryService::new(
1828            &span_service.object_store,
1829            24,
1830            shared_ctx,
1831            span_service.catalog.clone(),
1832            10,
1833        )
1834        .await?;
1835
1836        let now = Utc::now();
1837        let queue_trace = TraceId::from_bytes([90u8; 16]);
1838        let plain_trace = TraceId::from_bytes([91u8; 16]);
1839        let target_queue_uid = "queue-record-abc123";
1840
1841        // Write spans for both traces
1842        let queue_span = make_span_record(
1843            &queue_trace,
1844            SpanId::from_bytes([90u8; 8]),
1845            "svc_queue",
1846            vec![],
1847        );
1848        let plain_span = make_span_record(
1849            &plain_trace,
1850            SpanId::from_bytes([91u8; 8]),
1851            "svc_queue",
1852            vec![],
1853        );
1854        span_service
1855            .write_spans_direct(vec![queue_span, plain_span])
1856            .await?;
1857
1858        // Write summaries: one with a matching queue_id, one without
1859        let mut queue_summary = make_summary([90u8; 16], "svc_queue", 0, vec![]);
1860        queue_summary.start_time = now;
1861        queue_summary.queue_ids = vec![target_queue_uid.to_string()];
1862
1863        let mut plain_summary = make_summary([91u8; 16], "svc_queue", 0, vec![]);
1864        plain_summary.start_time = now;
1865        // queue_ids left empty — should not appear in results
1866
1867        summary_service
1868            .write_summaries(vec![queue_summary, plain_summary])
1869            .await?;
1870
1871        tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
1872
1873        // ── Step 1: query summaries by queue_uid ─────────────────────────────────
1874        let filters = TraceFilters {
1875            start_time: Some(now - chrono::Duration::hours(1)),
1876            end_time: Some(now + chrono::Duration::hours(1)),
1877            queue_uid: Some(target_queue_uid.to_string()),
1878            limit: Some(25),
1879            ..Default::default()
1880        };
1881
1882        let response = summary_service
1883            .query_service
1884            .get_paginated_traces(&filters)
1885            .await?;
1886
1887        assert!(
1888            !response.items.is_empty(),
1889            "queue_uid filter must return at least one result"
1890        );
1891        assert!(
1892            response
1893                .items
1894                .iter()
1895                .all(|i| i.trace_id == queue_trace.to_hex()),
1896            "only the queue trace should appear; got {:?}",
1897            response
1898                .items
1899                .iter()
1900                .map(|i| &i.trace_id)
1901                .collect::<Vec<_>>()
1902        );
1903
1904        // ── Step 2: fetch spans for the returned trace_id ─────────────────────────
1905        let returned_trace_id =
1906            TraceId::from_hex(&response.items[0].trace_id).expect("trace_id must be valid hex");
1907        let spans = span_service
1908            .query_service
1909            .get_trace_spans(
1910                Some(returned_trace_id.as_bytes()),
1911                None,
1912                Some(&(now - chrono::Duration::hours(1))),
1913                Some(&(now + chrono::Duration::hours(1))),
1914                None,
1915            )
1916            .await?;
1917
1918        assert!(
1919            !spans.is_empty(),
1920            "should find spans for the returned trace_id"
1921        );
1922
1923        span_service.shutdown().await?;
1924        summary_service.shutdown().await?;
1925        cleanup();
1926        Ok(())
1927    }
1928
1929    /// Build a deterministic `TraceSpanRecord` for use in summary tests.
1930    fn make_span_record(
1931        trace_id: &TraceId,
1932        span_id: SpanId,
1933        service_name: &str,
1934        attributes: Vec<Attribute>,
1935    ) -> TraceSpanRecord {
1936        let now = Utc::now();
1937        TraceSpanRecord {
1938            created_at: now,
1939            trace_id: *trace_id,
1940            span_id,
1941            parent_span_id: None,
1942            flags: 1,
1943            trace_state: String::new(),
1944            scope_name: "test.scope".to_string(),
1945            scope_version: None,
1946            span_name: "op".to_string(),
1947            span_kind: "INTERNAL".to_string(),
1948            start_time: now,
1949            end_time: now + chrono::Duration::milliseconds(100),
1950            duration_ms: 100,
1951            status_code: 0,
1952            status_message: "OK".to_string(),
1953            attributes,
1954            events: vec![],
1955            links: vec![],
1956            label: None,
1957            input: serde_json::Value::Null,
1958            output: serde_json::Value::Null,
1959            service_name: service_name.to_string(),
1960            resource_attributes: vec![],
1961        }
1962    }
1963
1964    /// `resource_attributes` survive a write → read round-trip.
1965    #[tokio::test]
1966    async fn test_summary_resource_attributes_roundtrip() -> Result<(), TraceEngineError> {
1967        cleanup();
1968
1969        let storage_settings = ObjectStorageSettings::default();
1970        let object_store = make_test_object_store(&storage_settings);
1971        let ctx = make_test_ctx(&object_store);
1972        let catalog = make_test_catalog(&ctx);
1973        let service = TraceSummaryService::new(&object_store, 24, ctx, catalog, 10).await?;
1974
1975        let attrs = vec![Attribute {
1976            key: "cloud.region".to_string(),
1977            value: serde_json::Value::String("us-east-1".to_string()),
1978        }];
1979        let summary = make_summary([9u8; 16], "svc", 0, attrs.clone());
1980        service.write_summaries(vec![summary]).await?;
1981        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1982
1983        let start = Utc::now() - chrono::Duration::hours(1);
1984        let end = Utc::now() + chrono::Duration::hours(1);
1985        let filters = TraceFilters {
1986            service_name: None,
1987            has_errors: None,
1988            status_code: None,
1989            start_time: Some(start),
1990            end_time: Some(end),
1991            limit: Some(25),
1992            cursor_start_time: None,
1993            cursor_trace_id: None,
1994            direction: None,
1995            attribute_filters: None,
1996            trace_ids: Some(vec![TraceId::from_bytes([9u8; 16]).to_hex()]),
1997            entity_uid: None,
1998            queue_uid: None,
1999        };
2000
2001        let response = service.query_service.get_paginated_traces(&filters).await?;
2002        assert_eq!(response.items.len(), 1, "Expected exactly 1 item");
2003        assert_eq!(
2004            response.items[0].resource_attributes.len(),
2005            1,
2006            "Expected 1 resource attribute"
2007        );
2008        assert_eq!(response.items[0].resource_attributes[0].key, "cloud.region");
2009
2010        service.shutdown().await?;
2011        cleanup();
2012        Ok(())
2013    }
2014
2015    /// Regression test: multiple sequential writes must be immediately visible to queries.
2016    /// This catches the stale snapshot bug where re-registration doesn't refresh the
2017    /// DataFusion session's object store, causing queries after subsequent writes to
2018    /// return stale results.
2019    #[tokio::test]
2020    async fn test_summary_write_visibility_across_multiple_writes() -> Result<(), TraceEngineError>
2021    {
2022        cleanup();
2023
2024        let storage_settings = ObjectStorageSettings::default();
2025        let object_store = make_test_object_store(&storage_settings);
2026        let ctx = make_test_ctx(&object_store);
2027        let catalog = make_test_catalog(&ctx);
2028        let service = TraceSummaryService::new(&object_store, 24, ctx, catalog, 10).await?;
2029
2030        let start = Utc::now() - chrono::Duration::hours(1);
2031        let end = Utc::now() + chrono::Duration::hours(1);
2032        let filters = TraceFilters {
2033            start_time: Some(start),
2034            end_time: Some(end),
2035            limit: Some(100),
2036            ..Default::default()
2037        };
2038
2039        // Write batch #1 (2 summaries)
2040        let s1 = make_summary([0xA0; 16], "svc_vis", 0, vec![]);
2041        let s2 = make_summary([0xA1; 16], "svc_vis", 0, vec![]);
2042        service.write_summaries(vec![s1, s2]).await?;
2043
2044        let response = service.query_service.get_paginated_traces(&filters).await?;
2045        assert_eq!(
2046            response.items.len(),
2047            2,
2048            "After write #1: expected 2 items, got {}",
2049            response.items.len()
2050        );
2051
2052        // Write batch #2 (2 more summaries)
2053        let s3 = make_summary([0xA2; 16], "svc_vis", 0, vec![]);
2054        let s4 = make_summary([0xA3; 16], "svc_vis", 0, vec![]);
2055        service.write_summaries(vec![s3, s4]).await?;
2056
2057        let response = service.query_service.get_paginated_traces(&filters).await?;
2058        assert_eq!(
2059            response.items.len(),
2060            4,
2061            "After write #2: expected 4 items, got {} (stale snapshot?)",
2062            response.items.len()
2063        );
2064
2065        // Write batch #3 (2 more summaries)
2066        let s5 = make_summary([0xA4; 16], "svc_vis", 0, vec![]);
2067        let s6 = make_summary([0xA5; 16], "svc_vis", 0, vec![]);
2068        service.write_summaries(vec![s5, s6]).await?;
2069
2070        let response = service.query_service.get_paginated_traces(&filters).await?;
2071        assert_eq!(
2072            response.items.len(),
2073            6,
2074            "After write #3: expected 6 items, got {} (stale snapshot?)",
2075            response.items.len()
2076        );
2077
2078        service.shutdown().await?;
2079        cleanup();
2080        Ok(())
2081    }
2082
2083    /// Simulate a 2-pod deployment: writer pod commits summaries, reader pod picks them
2084    /// up via the refresh ticker.
2085    ///
2086    /// Both pods share the same local storage directory (equivalent to a shared GCS/S3
2087    /// bucket in production). Each pod has its own `ObjectStore` + `SessionContext` — there
2088    /// is no shared memory. The reader's refresh ticker (1s interval) calls
2089    /// `update_incremental()`, detects the new Delta log entry, and re-registers the
2090    /// `SessionContext` so subsequent queries return fresh data.
2091    ///
2092    /// We build each pod by creating a `TraceSpanService` first (which sets up the
2093    /// `ObjectStore` + `SessionContext` correctly) and then attaching a `TraceSummaryService`
2094    /// on top — the same pattern used in server setup. Using `make_test_ctx()` alone is
2095    /// insufficient because it does not register the object-store URL scheme that the Delta
2096    /// log uses for Parquet file paths, causing a "Failed to fetch metadata" error at query time.
2097    #[tokio::test]
2098    async fn test_distributed_refresh() -> Result<(), TraceEngineError> {
2099        use crate::parquet::tracing::service::TraceSpanService;
2100
2101        // Use a unique storage dir to avoid racing with service::tests::test_distributed_refresh,
2102        // which also uses scouter_storage and runs concurrently in the same binary.
2103        let storage_settings = ObjectStorageSettings {
2104            storage_uri: "./scouter_storage_summary_dist".to_string(),
2105            ..ObjectStorageSettings::default()
2106        };
2107        let current_dir = std::env::current_dir().unwrap();
2108        let storage_path = current_dir.join(storage_settings.storage_root());
2109        if storage_path.exists() {
2110            let _ = std::fs::remove_dir_all(&storage_path);
2111        }
2112
2113        // "Writer pod" — owns writes; standard refresh interval (not needed on the writer)
2114        let writer_spans = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
2115        let writer = TraceSummaryService::new(
2116            &writer_spans.object_store,
2117            24,
2118            writer_spans.ctx.clone(),
2119            writer_spans.catalog.clone(),
2120            10,
2121        )
2122        .await?;
2123
2124        // "Reader pod" — separate ObjectStore + SessionContext; 1s refresh for fast turnaround
2125        let reader_spans = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
2126        let reader = TraceSummaryService::new(
2127            &reader_spans.object_store,
2128            24,
2129            reader_spans.ctx.clone(),
2130            reader_spans.catalog.clone(),
2131            1,
2132        )
2133        .await?;
2134
2135        let summary = make_summary([0xDD_u8; 16], "distributed-svc", 0, vec![]);
2136        writer.write_summaries(vec![summary]).await?;
2137
2138        // Wait for the reader's refresh ticker to fire (1s interval + margin)
2139        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
2140
2141        let start = Utc::now() - chrono::Duration::hours(1);
2142        let end = Utc::now() + chrono::Duration::hours(1);
2143        let filters = TraceFilters {
2144            service_name: Some("distributed-svc".to_string()),
2145            has_errors: None,
2146            status_code: None,
2147            start_time: Some(start),
2148            end_time: Some(end),
2149            limit: Some(25),
2150            cursor_start_time: None,
2151            cursor_trace_id: None,
2152            direction: None,
2153            attribute_filters: None,
2154            trace_ids: None,
2155            entity_uid: None,
2156            queue_uid: None,
2157        };
2158
2159        let response = reader.query_service.get_paginated_traces(&filters).await?;
2160        assert!(
2161            !response.items.is_empty(),
2162            "Reader pod should see summaries written by writer pod after refresh"
2163        );
2164
2165        writer.shutdown().await?;
2166        reader.shutdown().await?;
2167        writer_spans.shutdown().await?;
2168        reader_spans.shutdown().await?;
2169        if storage_path.exists() {
2170            let _ = std::fs::remove_dir_all(&storage_path);
2171        }
2172        Ok(())
2173    }
2174}