Skip to main content

scouter_dataframe/parquet/tracing/
summary.rs

1use crate::error::TraceEngineError;
2use crate::parquet::control::{get_pod_id, ControlTableEngine};
3use crate::parquet::tracing::traits::{arrow_schema_to_delta, resource_attribute_field};
4use crate::parquet::utils::match_attr_expr;
5use crate::parquet::utils::register_cloud_logstore_factories;
6use crate::storage::ObjectStore;
7use arrow::array::*;
8use arrow::compute;
9use arrow::datatypes::*;
10use arrow_array::Array;
11use arrow_array::RecordBatch;
12use chrono::{DateTime, Datelike, Utc};
13use datafusion::logical_expr::{cast as df_cast, col, lit, SortExpr};
14use datafusion::prelude::*;
15use datafusion::scalar::ScalarValue;
16use deltalake::operations::optimize::OptimizeType;
17use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty};
18use scouter_types::sql::{TraceFilters, TraceListItem};
19use scouter_types::{Attribute, TraceCursor, TraceId, TracePaginationResponse, TraceSummaryRecord};
20use std::sync::Arc;
21use tokio::sync::oneshot;
22use tokio::sync::{mpsc, RwLock as AsyncRwLock};
23use tokio::time::{interval, Duration};
24use tracing::{error, info};
25use url::Url;
26
27/// Days from CE epoch to Unix epoch (1970-01-01).
28/// Equivalent to `NaiveDate::from_ymd_opt(1970, 1, 1).unwrap().num_days_from_ce()`.
29const UNIX_EPOCH_DAYS: i32 = 719_163;
30
31const SUMMARY_TABLE_NAME: &str = "trace_summaries";
32
33/// Control table task name for summary compaction coordination.
34const TASK_SUMMARY_OPTIMIZE: &str = "summary_optimize";
35
36// ── Column name constants ────────────────────────────────────────────────────
37const TRACE_ID_COL: &str = "trace_id";
38const SERVICE_NAME_COL: &str = "service_name";
39const SCOPE_NAME_COL: &str = "scope_name";
40const SCOPE_VERSION_COL: &str = "scope_version";
41const ROOT_OPERATION_COL: &str = "root_operation";
42const START_TIME_COL: &str = "start_time";
43const END_TIME_COL: &str = "end_time";
44const DURATION_MS_COL: &str = "duration_ms";
45const STATUS_CODE_COL: &str = "status_code";
46const STATUS_MESSAGE_COL: &str = "status_message";
47const SPAN_COUNT_COL: &str = "span_count";
48const ERROR_COUNT_COL: &str = "error_count";
49const SEARCH_BLOB_COL: &str = "search_blob";
50
51const RESOURCE_ATTRIBUTES_COL: &str = "resource_attributes";
52const ENTITY_IDS_COL: &str = "entity_ids";
53const QUEUE_IDS_COL: &str = "queue_ids";
54
55const PARTITION_DATE_COL: &str = "partition_date";
56
57// ── Schema ───────────────────────────────────────────────────────────────────
58
59fn create_summary_schema() -> Schema {
60    Schema::new(vec![
61        Field::new(TRACE_ID_COL, DataType::FixedSizeBinary(16), false),
62        Field::new(
63            SERVICE_NAME_COL,
64            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
65            false,
66        ),
67        Field::new(SCOPE_NAME_COL, DataType::Utf8, false),
68        Field::new(SCOPE_VERSION_COL, DataType::Utf8, true),
69        Field::new(ROOT_OPERATION_COL, DataType::Utf8, false),
70        Field::new(
71            START_TIME_COL,
72            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
73            false,
74        ),
75        Field::new(
76            END_TIME_COL,
77            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
78            true,
79        ),
80        Field::new(DURATION_MS_COL, DataType::Int64, true),
81        Field::new(STATUS_CODE_COL, DataType::Int32, false),
82        Field::new(STATUS_MESSAGE_COL, DataType::Utf8, true),
83        Field::new(SPAN_COUNT_COL, DataType::Int64, false),
84        Field::new(ERROR_COUNT_COL, DataType::Int64, false),
85        resource_attribute_field(),
86        Field::new(
87            ENTITY_IDS_COL,
88            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
89            true,
90        ),
91        Field::new(
92            QUEUE_IDS_COL,
93            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
94            true,
95        ),
96        Field::new(PARTITION_DATE_COL, DataType::Date32, false),
97    ])
98}
99
100// ── BatchBuilder ─────────────────────────────────────────────────────────────
101
102struct TraceSummaryBatchBuilder {
103    schema: Arc<Schema>,
104    trace_id: FixedSizeBinaryBuilder,
105    service_name: StringDictionaryBuilder<Int32Type>,
106    scope_name: StringBuilder,
107    scope_version: StringBuilder,
108    root_operation: StringBuilder,
109    start_time: TimestampMicrosecondBuilder,
110    end_time: TimestampMicrosecondBuilder,
111    duration_ms: Int64Builder,
112    status_code: Int32Builder,
113    status_message: StringBuilder,
114    span_count: Int64Builder,
115    error_count: Int64Builder,
116    resource_attributes: MapBuilder<StringBuilder, StringViewBuilder>,
117    entity_ids: ListBuilder<StringBuilder>,
118    queue_ids: ListBuilder<StringBuilder>,
119    partition_date: Date32Builder,
120}
121
122impl TraceSummaryBatchBuilder {
123    fn new(schema: Arc<Schema>, capacity: usize) -> Self {
124        let map_field_names = MapFieldNames {
125            entry: "key_value".to_string(),
126            key: "key".to_string(),
127            value: "value".to_string(),
128        };
129        let resource_attributes = MapBuilder::new(
130            Some(map_field_names),
131            StringBuilder::new(),
132            StringViewBuilder::new(),
133        );
134        Self {
135            schema,
136            trace_id: FixedSizeBinaryBuilder::with_capacity(capacity, 16),
137            service_name: StringDictionaryBuilder::new(),
138            scope_name: StringBuilder::with_capacity(capacity, capacity * 16),
139            scope_version: StringBuilder::with_capacity(capacity, capacity * 8),
140            root_operation: StringBuilder::with_capacity(capacity, capacity * 32),
141            start_time: TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone("UTC"),
142            end_time: TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone("UTC"),
143            duration_ms: Int64Builder::with_capacity(capacity),
144            status_code: Int32Builder::with_capacity(capacity),
145            status_message: StringBuilder::with_capacity(capacity, capacity * 16),
146            span_count: Int64Builder::with_capacity(capacity),
147            error_count: Int64Builder::with_capacity(capacity),
148            resource_attributes,
149            entity_ids: ListBuilder::new(StringBuilder::new()),
150            queue_ids: ListBuilder::new(StringBuilder::new()),
151            partition_date: Date32Builder::with_capacity(capacity),
152        }
153    }
154
155    fn append(&mut self, rec: &TraceSummaryRecord) -> Result<(), TraceEngineError> {
156        self.trace_id.append_value(rec.trace_id.as_bytes())?;
157        self.service_name.append_value(&rec.service_name);
158        self.scope_name.append_value(&rec.scope_name);
159        if rec.scope_version.is_empty() {
160            self.scope_version.append_null();
161        } else {
162            self.scope_version.append_value(&rec.scope_version);
163        }
164        self.root_operation.append_value(&rec.root_operation);
165        self.start_time
166            .append_value(rec.start_time.timestamp_micros());
167        match rec.end_time {
168            Some(end) => self.end_time.append_value(end.timestamp_micros()),
169            None => self.end_time.append_null(),
170        }
171        let duration = rec
172            .end_time
173            .map(|end| (end - rec.start_time).num_milliseconds().max(0));
174        match duration {
175            Some(d) => self.duration_ms.append_value(d),
176            None => self.duration_ms.append_null(),
177        }
178        self.status_code.append_value(rec.status_code);
179        if rec.status_message.is_empty() {
180            self.status_message.append_null();
181        } else {
182            self.status_message.append_value(&rec.status_message);
183        }
184        self.span_count.append_value(rec.span_count);
185        self.error_count.append_value(rec.error_count);
186        if rec.resource_attributes.is_empty() {
187            self.resource_attributes.append(false)?; // null map
188        } else {
189            for attr in &rec.resource_attributes {
190                self.resource_attributes.keys().append_value(&attr.key);
191                let value_str =
192                    serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
193                self.resource_attributes.values().append_value(value_str);
194            }
195            self.resource_attributes.append(true)?;
196        }
197        if rec.entity_ids.is_empty() {
198            self.entity_ids.append_null();
199        } else {
200            for id in &rec.entity_ids {
201                self.entity_ids.values().append_value(id);
202            }
203            self.entity_ids.append(true);
204        }
205        if rec.queue_ids.is_empty() {
206            self.queue_ids.append_null();
207        } else {
208            for id in &rec.queue_ids {
209                self.queue_ids.values().append_value(id);
210            }
211            self.queue_ids.append(true);
212        }
213        // Partition key — days since Unix epoch, derived from start_time
214        let days = rec.start_time.date_naive().num_days_from_ce() - UNIX_EPOCH_DAYS;
215        self.partition_date.append_value(days);
216        Ok(())
217    }
218
219    fn finish(mut self) -> Result<RecordBatch, TraceEngineError> {
220        let columns: Vec<Arc<dyn Array>> = vec![
221            Arc::new(self.trace_id.finish()),
222            Arc::new(self.service_name.finish()),
223            Arc::new(self.scope_name.finish()),
224            Arc::new(self.scope_version.finish()),
225            Arc::new(self.root_operation.finish()),
226            Arc::new(self.start_time.finish()),
227            Arc::new(self.end_time.finish()),
228            Arc::new(self.duration_ms.finish()),
229            Arc::new(self.status_code.finish()),
230            Arc::new(self.status_message.finish()),
231            Arc::new(self.span_count.finish()),
232            Arc::new(self.error_count.finish()),
233            Arc::new(self.resource_attributes.finish()),
234            Arc::new(self.entity_ids.finish()),
235            Arc::new(self.queue_ids.finish()),
236            Arc::new(self.partition_date.finish()),
237        ];
238        RecordBatch::try_new(self.schema, columns).map_err(Into::into)
239    }
240}
241
242// ── TableCommand ─────────────────────────────────────────────────────────────
243
244pub enum SummaryTableCommand {
245    Write {
246        records: Vec<TraceSummaryRecord>,
247        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
248    },
249    Optimize {
250        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
251    },
252    Vacuum {
253        retention_hours: u64,
254        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
255    },
256    Shutdown,
257}
258
259async fn build_summary_url(object_store: &ObjectStore) -> Result<Url, TraceEngineError> {
260    let mut base = object_store.get_base_url()?;
261    let mut path = base.path().to_string();
262    if !path.ends_with('/') {
263        path.push('/');
264    }
265    path.push_str(SUMMARY_TABLE_NAME);
266    base.set_path(&path);
267    Ok(base)
268}
269
270async fn create_summary_table(
271    object_store: &ObjectStore,
272    table_url: Url,
273    schema: SchemaRef,
274) -> Result<DeltaTable, TraceEngineError> {
275    info!(
276        "Creating trace summary table [{}://.../{} ]",
277        table_url.scheme(),
278        table_url
279            .path_segments()
280            .and_then(|mut s| s.next_back())
281            .unwrap_or(SUMMARY_TABLE_NAME)
282    );
283    let store = object_store.as_dyn_object_store();
284    let table = DeltaTableBuilder::from_url(table_url.clone())?
285        .with_storage_backend(store, table_url)
286        .build()?;
287    let delta_fields = arrow_schema_to_delta(&schema);
288    table
289        .create()
290        .with_table_name(SUMMARY_TABLE_NAME)
291        .with_columns(delta_fields)
292        .with_partition_columns(vec![PARTITION_DATE_COL.to_string()])
293        // Only collect min/max statistics for non-binary columns.
294        // trace_id (FixedSizeBinary) has no meaningful ordering for file-level pruning.
295        .with_configuration_property(
296            TableProperty::DataSkippingStatsColumns,
297            Some("start_time,end_time,service_name,duration_ms,status_code,span_count,error_count,partition_date"),
298        )
299        .await
300        .map_err(Into::into)
301}
302
303async fn build_or_create_summary_table(
304    object_store: &ObjectStore,
305    schema: SchemaRef,
306) -> Result<DeltaTable, TraceEngineError> {
307    register_cloud_logstore_factories();
308    let table_url = build_summary_url(object_store).await?;
309    info!(
310        "Loading trace summary table [{}://.../{} ]",
311        table_url.scheme(),
312        table_url
313            .path_segments()
314            .and_then(|mut s| s.next_back())
315            .unwrap_or(SUMMARY_TABLE_NAME)
316    );
317
318    // Check whether a Delta log actually exists. For local tables, check the
319    // filesystem directly. For remote tables, attempt a full load with the
320    // explicit storage backend — required for S3/GCS/Azure where Delta Lake
321    // cannot infer the object store from the URL scheme alone.
322    let is_delta_table = if table_url.scheme() == "file" {
323        if let Ok(path) = table_url.to_file_path() {
324            if !path.exists() {
325                info!("Creating directory for summary table: {:?}", path);
326                std::fs::create_dir_all(&path)?;
327            }
328            path.join("_delta_log").exists()
329        } else {
330            false
331        }
332    } else {
333        let store = object_store.as_dyn_object_store();
334        match DeltaTableBuilder::from_url(table_url.clone()) {
335            Ok(builder) => builder
336                .with_storage_backend(store, table_url.clone())
337                .load()
338                .await
339                .is_ok(),
340            Err(_) => false,
341        }
342    };
343
344    if is_delta_table {
345        info!(
346            "Loaded existing trace summary table [{}://.../{} ]",
347            table_url.scheme(),
348            table_url
349                .path_segments()
350                .and_then(|mut s| s.next_back())
351                .unwrap_or(SUMMARY_TABLE_NAME)
352        );
353        let store = object_store.as_dyn_object_store();
354        DeltaTableBuilder::from_url(table_url.clone())?
355            .with_storage_backend(store, table_url)
356            .load()
357            .await
358            .map_err(Into::into)
359    } else {
360        info!("Summary table does not exist, creating new table");
361        create_summary_table(object_store, table_url, schema).await
362    }
363}
364
365pub struct TraceSummaryDBEngine {
366    schema: Arc<Schema>,
367    table: Arc<AsyncRwLock<DeltaTable>>,
368    pub ctx: Arc<SessionContext>,
369    control: ControlTableEngine,
370}
371
372impl TraceSummaryDBEngine {
373    /// Create a new `TraceSummaryDBEngine` using the provided shared `SessionContext`.
374    ///
375    /// The caller is responsible for passing a `SessionContext` that already has the object-store
376    /// backend configured (e.g. the one from `TraceSpanDBEngine`). This ensures both
377    /// `trace_spans` and `trace_summaries` live in the same context and can participate in
378    /// JOIN queries.
379    pub async fn new(
380        object_store: &ObjectStore,
381        ctx: Arc<SessionContext>,
382    ) -> Result<Self, TraceEngineError> {
383        let schema = Arc::new(create_summary_schema());
384        let delta_table = build_or_create_summary_table(object_store, schema.clone()).await?;
385        // A freshly-created table has no committed Parquet files yet — table_provider()
386        // returns an error in that case. Defer registration until the first write.
387        if let Ok(provider) = delta_table.table_provider().await {
388            ctx.register_table(SUMMARY_TABLE_NAME, provider)?;
389        } else {
390            info!("Empty summary table at init — deferring SessionContext registration until first write");
391        }
392
393        let control = ControlTableEngine::new(object_store, get_pod_id()).await?;
394
395        Ok(TraceSummaryDBEngine {
396            schema,
397            table: Arc::new(AsyncRwLock::new(delta_table)),
398            ctx,
399            control,
400        })
401    }
402
403    fn build_batch(
404        &self,
405        records: Vec<TraceSummaryRecord>,
406    ) -> Result<RecordBatch, TraceEngineError> {
407        let mut builder = TraceSummaryBatchBuilder::new(self.schema.clone(), records.len());
408        for rec in &records {
409            builder.append(rec)?;
410        }
411        builder.finish()
412    }
413
414    async fn write_records(
415        &self,
416        records: Vec<TraceSummaryRecord>,
417    ) -> Result<(), TraceEngineError> {
418        let count = records.len();
419        info!("Writing {} trace summaries", count);
420        let batch = self.build_batch(records)?;
421
422        let mut table_guard = self.table.write().await;
423        // update_incremental is intentionally omitted here.
424        //
425        // This engine runs as a single-writer actor — no other process commits to this
426        // Delta table, so the in-memory state is always current. Calling update_incremental
427        // can mutate table_guard into a corrupted intermediate state before the error
428        // propagates, producing a DeltaTable whose snapshot may not reflect newly written
429        // files — causing stale query results until restart.
430
431        let current_table = table_guard.clone();
432        let updated_table = current_table
433            .write(vec![batch])
434            .with_save_mode(deltalake::protocol::SaveMode::Append)
435            .with_partition_columns(vec![PARTITION_DATE_COL.to_string()])
436            .await?;
437
438        self.ctx.deregister_table(SUMMARY_TABLE_NAME)?;
439        self.ctx
440            .register_table(SUMMARY_TABLE_NAME, updated_table.table_provider().await?)?;
441        updated_table.update_datafusion_session(&self.ctx.state())?;
442
443        *table_guard = updated_table;
444        info!("Summary table updated with {} records", count);
445        Ok(())
446    }
447
448    async fn optimize_table(&self) -> Result<(), TraceEngineError> {
449        let mut table_guard = self.table.write().await;
450        let (updated_table, _metrics) = table_guard
451            .clone()
452            .optimize()
453            .with_target_size(128 * 1024 * 1024)
454            .with_type(OptimizeType::ZOrder(vec![
455                START_TIME_COL.to_string(),
456                SERVICE_NAME_COL.to_string(),
457            ]))
458            .await?;
459
460        self.ctx.deregister_table(SUMMARY_TABLE_NAME)?;
461        self.ctx
462            .register_table(SUMMARY_TABLE_NAME, updated_table.table_provider().await?)?;
463        updated_table.update_datafusion_session(&self.ctx.state())?;
464        *table_guard = updated_table;
465        Ok(())
466    }
467
468    async fn vacuum_table(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
469        let mut table_guard = self.table.write().await;
470        let (updated_table, _metrics) = table_guard
471            .clone()
472            .vacuum()
473            .with_retention_period(chrono::Duration::hours(retention_hours as i64))
474            .with_enforce_retention_duration(false)
475            .await?;
476
477        self.ctx.deregister_table(SUMMARY_TABLE_NAME)?;
478        self.ctx
479            .register_table(SUMMARY_TABLE_NAME, updated_table.table_provider().await?)?;
480        updated_table.update_datafusion_session(&self.ctx.state())?;
481        *table_guard = updated_table;
482        Ok(())
483    }
484
485    /// Try to claim and run the summary optimize task via the control table.
486    async fn try_run_optimize(&self, interval_hours: u64) {
487        match self.control.try_claim_task(TASK_SUMMARY_OPTIMIZE).await {
488            Ok(true) => match self.optimize_table().await {
489                Ok(()) => {
490                    if let Err(e) = self.vacuum_table(0).await {
491                        error!("Post-optimize vacuum failed: {}", e);
492                    }
493
494                    let _ = self
495                        .control
496                        .release_task(
497                            TASK_SUMMARY_OPTIMIZE,
498                            chrono::Duration::hours(interval_hours as i64),
499                        )
500                        .await;
501                }
502                Err(e) => {
503                    error!("Summary optimize failed: {}", e);
504                    let _ = self
505                        .control
506                        .release_task_on_failure(TASK_SUMMARY_OPTIMIZE)
507                        .await;
508                }
509            },
510            Ok(false) => { /* not due or another pod owns it */ }
511            Err(e) => error!("Summary optimize claim check failed: {}", e),
512        }
513    }
514
515    pub fn start_actor(
516        self,
517        compaction_interval_hours: u64,
518    ) -> (
519        mpsc::Sender<SummaryTableCommand>,
520        tokio::task::JoinHandle<()>,
521    ) {
522        let (tx, mut rx) = mpsc::channel::<SummaryTableCommand>(100);
523
524        let handle = tokio::spawn(async move {
525            // Poll every 5 minutes — the actual schedule is in the control table.
526            let mut scheduler_ticker = interval(Duration::from_secs(5 * 60));
527            scheduler_ticker.tick().await; // skip immediate tick
528
529            loop {
530                tokio::select! {
531                    Some(cmd) = rx.recv() => {
532                        match cmd {
533                            SummaryTableCommand::Write { records, respond_to } => {
534                                let result = self.write_records(records).await;
535                                if let Err(ref e) = result {
536                                    error!("Summary write failed: {}", e);
537                                }
538                                let _ = respond_to.send(result);
539                            }
540                            SummaryTableCommand::Optimize { respond_to } => {
541                                // Direct admin request — bypass control table.
542                                // Response is sent before vacuum so callers aren't blocked
543                                // on the potentially slow file-deletion pass.
544                                let _ = respond_to.send(self.optimize_table().await);
545                                if let Err(e) = self.vacuum_table(0).await {
546                                    error!("Post-optimize vacuum failed: {}", e);
547                                }
548                            }
549                            SummaryTableCommand::Vacuum { retention_hours, respond_to } => {
550                                let _ = respond_to.send(self.vacuum_table(retention_hours).await);
551                            }
552                            SummaryTableCommand::Shutdown => {
553                                info!("TraceSummaryDBEngine actor shutting down");
554                                break;
555                            }
556                        }
557                    }
558                    _ = scheduler_ticker.tick() => {
559                        self.try_run_optimize(compaction_interval_hours).await;
560                    }
561                }
562            }
563        });
564
565        (tx, handle)
566    }
567}
568
569// ── Service ──────────────────────────────────────────────────────────────────
570
571pub struct TraceSummaryService {
572    engine_tx: mpsc::Sender<SummaryTableCommand>,
573    engine_handle: tokio::task::JoinHandle<()>,
574    pub query_service: TraceSummaryQueries,
575}
576
577impl TraceSummaryService {
578    pub async fn new(
579        object_store: &ObjectStore,
580        compaction_interval_hours: u64,
581        ctx: Arc<SessionContext>,
582    ) -> Result<Self, TraceEngineError> {
583        let engine = TraceSummaryDBEngine::new(object_store, ctx).await?;
584        let engine_ctx = engine.ctx.clone();
585        let (engine_tx, engine_handle) = engine.start_actor(compaction_interval_hours);
586
587        Ok(TraceSummaryService {
588            engine_tx,
589            engine_handle,
590            query_service: TraceSummaryQueries::new(engine_ctx),
591        })
592    }
593
594    /// Write a batch of `TraceSummaryRecord`s to the Delta Lake summary table.
595    pub async fn write_summaries(
596        &self,
597        records: Vec<TraceSummaryRecord>,
598    ) -> Result<(), TraceEngineError> {
599        let (tx, rx) = oneshot::channel();
600        self.engine_tx
601            .send(SummaryTableCommand::Write {
602                records,
603                respond_to: tx,
604            })
605            .await
606            .map_err(|_| TraceEngineError::ChannelClosed)?;
607        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
608    }
609
610    pub async fn optimize(&self) -> Result<(), TraceEngineError> {
611        let (tx, rx) = oneshot::channel();
612        self.engine_tx
613            .send(SummaryTableCommand::Optimize { respond_to: tx })
614            .await
615            .map_err(|_| TraceEngineError::ChannelClosed)?;
616        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
617    }
618
619    pub async fn vacuum(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
620        let (tx, rx) = oneshot::channel();
621        self.engine_tx
622            .send(SummaryTableCommand::Vacuum {
623                retention_hours,
624                respond_to: tx,
625            })
626            .await
627            .map_err(|_| TraceEngineError::ChannelClosed)?;
628        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
629    }
630
631    /// Signal shutdown without consuming `self` — safe to call from `Arc<TraceSummaryService>`.
632    pub async fn signal_shutdown(&self) {
633        info!("TraceSummaryService signaling shutdown");
634        let _ = self.engine_tx.send(SummaryTableCommand::Shutdown).await;
635    }
636
637    pub async fn shutdown(self) -> Result<(), TraceEngineError> {
638        info!("TraceSummaryService shutting down");
639        self.engine_tx
640            .send(SummaryTableCommand::Shutdown)
641            .await
642            .map_err(|_| TraceEngineError::ChannelClosed)?;
643        if let Err(e) = self.engine_handle.await {
644            error!("Summary engine handle error: {}", e);
645        }
646        info!("TraceSummaryService shutdown complete");
647        Ok(())
648    }
649}
650
651// ── Queries ──────────────────────────────────────────────────────────────────
652
653pub struct TraceSummaryQueries {
654    ctx: Arc<SessionContext>,
655}
656
657impl TraceSummaryQueries {
658    pub fn new(ctx: Arc<SessionContext>) -> Self {
659        Self { ctx }
660    }
661
662    /// Get paginated traces from the Delta Lake summary table.
663    ///
664    /// The first step is a `GROUP BY trace_id` dedup query that merges any duplicate
665    /// rows (from late-arriving spans) using the same rules as `TraceAggregator`:
666    ///   - `SUM` for span/error counts, `MIN`/`MAX` for times, `MAX` for status_code
667    ///   - `FIRST_VALUE` ordered by `span_count DESC` for string fields
668    ///   - `array_distinct(flatten(array_agg(...)))` for entity/queue ID lists (full union)
669    ///
670    ///   Time filters are pushed into the SQL WHERE clause for partition pruning.
671    ///
672    ///   Secondary filters (service, errors, cursor) apply to the deduplicated DataFrame.
673    pub async fn get_paginated_traces(
674        &self,
675        filters: &TraceFilters,
676    ) -> Result<TracePaginationResponse, TraceEngineError> {
677        let limit = filters.limit.unwrap_or(50) as usize;
678        let direction = filters.direction.as_deref().unwrap_or("next");
679
680        // ── Dedup: time-filtered GROUP BY trace_id (DataFrame API) ───────────
681        use crate::parquet::tracing::queries::{date_lit, ts_lit};
682        use datafusion::functions_aggregate::expr_fn::{array_agg, first_value, max, min, sum};
683        use datafusion::functions_nested::set_ops::array_distinct;
684
685        let mut df = self.ctx.table(SUMMARY_TABLE_NAME).await?;
686
687        // ① Partition date — directory-level pruning (skips entire partition folders)
688        // ② start_time     — row-group-level pruning within matched files
689        if let Some(start) = filters.start_time {
690            df = df.filter(col(PARTITION_DATE_COL).gt_eq(date_lit(&start)))?;
691            df = df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start)))?;
692        }
693        if let Some(end) = filters.end_time {
694            df = df.filter(col(PARTITION_DATE_COL).lt_eq(date_lit(&end)))?;
695            df = df.filter(col(START_TIME_COL).lt(ts_lit(&end)))?;
696        }
697
698        // ORDER BY specs for FIRST_VALUE aggregates
699        // span_count DESC NULLS LAST, end_time DESC NULLS LAST
700        let by_span_end: Vec<SortExpr> = vec![
701            col(SPAN_COUNT_COL).sort(false, false),
702            col(END_TIME_COL).sort(false, false),
703        ];
704        // status_code DESC, span_count DESC
705        let by_status_span: Vec<SortExpr> = vec![
706            col(STATUS_CODE_COL).sort(false, false),
707            col(SPAN_COUNT_COL).sort(false, false),
708        ];
709
710        // Phase 1: aggregate
711        // _max_end_us / _min_start_us are hidden Int64 columns used to compute
712        // duration_ms post-aggregation (arithmetic across two aggregate exprs
713        // cannot be expressed in a single aggregate slot).
714        //
715        // entity_ids / queue_ids: array_agg without FILTER is intentional.
716        // array_flatten treats NULL outer-list elements as empty and skips them,
717        // giving identical results to FILTER (WHERE IS NOT NULL) for the GROUP BY
718        // case. Unlike the original SQL, this produces [] rather than NULL when
719        // ALL rows have null IDs — the safer outcome for downstream deserialization.
720        let mut df = df
721            .aggregate(
722                vec![col(TRACE_ID_COL)],
723                vec![
724                    min(col(START_TIME_COL)).alias(START_TIME_COL),
725                    max(col(END_TIME_COL)).alias(END_TIME_COL),
726                    max(df_cast(col(END_TIME_COL), DataType::Int64)).alias("_max_end_us"),
727                    min(df_cast(col(START_TIME_COL), DataType::Int64)).alias("_min_start_us"),
728                    max(col(STATUS_CODE_COL)).alias(STATUS_CODE_COL),
729                    sum(col(SPAN_COUNT_COL)).alias(SPAN_COUNT_COL),
730                    sum(col(ERROR_COUNT_COL)).alias(ERROR_COUNT_COL),
731                    first_value(col(SERVICE_NAME_COL), by_span_end.clone()).alias(SERVICE_NAME_COL),
732                    first_value(col(SCOPE_NAME_COL), by_span_end.clone()).alias(SCOPE_NAME_COL),
733                    first_value(col(SCOPE_VERSION_COL), by_span_end.clone())
734                        .alias(SCOPE_VERSION_COL),
735                    first_value(col(ROOT_OPERATION_COL), by_span_end.clone())
736                        .alias(ROOT_OPERATION_COL),
737                    first_value(col(STATUS_MESSAGE_COL), by_status_span).alias(STATUS_MESSAGE_COL),
738                    first_value(col(RESOURCE_ATTRIBUTES_COL), by_span_end)
739                        .alias(RESOURCE_ATTRIBUTES_COL),
740                    array_agg(col(ENTITY_IDS_COL)).alias("_entity_ids_raw"),
741                    array_agg(col(QUEUE_IDS_COL)).alias("_queue_ids_raw"),
742                ],
743            )?
744            // Phase 2: derive computed columns from hidden aggregates, then drop them
745            .with_column(
746                DURATION_MS_COL,
747                (col("_max_end_us") - col("_min_start_us")) / lit(1000i64),
748            )?
749            .with_column(
750                ENTITY_IDS_COL,
751                array_distinct(flatten(col("_entity_ids_raw"))),
752            )?
753            .with_column(
754                QUEUE_IDS_COL,
755                array_distinct(flatten(col("_queue_ids_raw"))),
756            )?
757            .drop_columns(&[
758                "_max_end_us",
759                "_min_start_us",
760                "_entity_ids_raw",
761                "_queue_ids_raw",
762            ])?;
763
764        // ── Secondary filters ────────────────────────────────────────────────
765        if let Some(ref svc) = filters.service_name {
766            df = df.filter(col(SERVICE_NAME_COL).eq(lit(svc.as_str())))?;
767        }
768        match filters.has_errors {
769            Some(true) => {
770                df = df.filter(col(ERROR_COUNT_COL).gt(lit(0i64)))?;
771            }
772            Some(false) => {
773                df = df.filter(col(ERROR_COUNT_COL).eq(lit(0i64)))?;
774            }
775            None => {}
776        }
777        if let Some(sc) = filters.status_code {
778            df = df.filter(col(STATUS_CODE_COL).eq(lit(sc)))?;
779        }
780
781        // ── entity_uid filter via array_has on List column ────────────────
782        if let Some(ref uid) = filters.entity_uid {
783            df = df.filter(datafusion::functions_nested::expr_fn::array_has(
784                col(ENTITY_IDS_COL),
785                lit(uid.as_str()),
786            ))?;
787        }
788
789        // ── queue_uid filter via array_has on List column ─────────────────
790        if let Some(ref uid) = filters.queue_uid {
791            df = df.filter(datafusion::functions_nested::expr_fn::array_has(
792                col(QUEUE_IDS_COL),
793                lit(uid.as_str()),
794            ))?;
795        }
796
797        // ── trace_ids IN filter ──────────────────────────────────────────────
798        if let Some(ref ids) = filters.trace_ids {
799            if !ids.is_empty() {
800                let binary_ids: Vec<Expr> = ids
801                    .iter()
802                    .filter_map(|hex| TraceId::hex_to_bytes(hex).ok())
803                    .map(|b| lit(ScalarValue::Binary(Some(b))))
804                    .collect();
805                if !binary_ids.is_empty() {
806                    df = df.filter(col(TRACE_ID_COL).in_list(binary_ids, false))?;
807                }
808            }
809        }
810
811        // ── Cursor filter in DataFusion ──────────────────────────────────────
812        // Equivalent to Postgres: `(start_time, trace_id) < (cursor_time, cursor_id)`
813        // for "next" or `> (cursor_time, cursor_id)` for "previous".
814        if let (Some(cursor_time), Some(ref cursor_id)) =
815            (filters.cursor_start_time, &filters.cursor_trace_id)
816        {
817            if let Ok(cursor_bytes) = TraceId::hex_to_bytes(cursor_id) {
818                let cursor_ts = lit(ScalarValue::TimestampMicrosecond(
819                    Some(cursor_time.timestamp_micros()),
820                    Some("UTC".into()),
821                ));
822                let cursor_tid = lit(ScalarValue::Binary(Some(cursor_bytes)));
823                let cursor_expr = if direction == "previous" {
824                    col(START_TIME_COL)
825                        .gt(cursor_ts.clone())
826                        .or(col(START_TIME_COL)
827                            .eq(cursor_ts)
828                            .and(col(TRACE_ID_COL).gt(cursor_tid)))
829                } else {
830                    col(START_TIME_COL)
831                        .lt(cursor_ts.clone())
832                        .or(col(START_TIME_COL)
833                            .eq(cursor_ts)
834                            .and(col(TRACE_ID_COL).lt(cursor_tid)))
835                };
836                df = df.filter(cursor_expr)?;
837            }
838        }
839
840        // ── Attribute filters via span lookup → IN list ──────────────────────
841        // Requires shared SessionContext (trace_spans must be registered in self.ctx).
842        // We execute the span query eagerly to collect matching trace IDs, then filter
843        // the summaries DataFrame with an IN-list predicate. This avoids a cross-table
844        // JOIN that causes DataFusion to report ambiguous `trace_id` column references.
845        if let Some(ref attr_filters) = filters.attribute_filters {
846            if !attr_filters.is_empty() {
847                let mut spans_df = self.ctx.table("trace_spans").await?.select_columns(&[
848                    TRACE_ID_COL,
849                    START_TIME_COL,
850                    SEARCH_BLOB_COL,
851                ])?;
852
853                // Time predicates on spans for partition pruning
854                if let Some(start) = filters.start_time {
855                    spans_df = spans_df.filter(col(START_TIME_COL).gt_eq(lit(
856                        ScalarValue::TimestampMicrosecond(
857                            Some(start.timestamp_micros()),
858                            Some("UTC".into()),
859                        ),
860                    )))?;
861                }
862                if let Some(end) = filters.end_time {
863                    spans_df = spans_df.filter(col(START_TIME_COL).lt(lit(
864                        ScalarValue::TimestampMicrosecond(
865                            Some(end.timestamp_micros()),
866                            Some("UTC".into()),
867                        ),
868                    )))?;
869                }
870
871                // OR-match each filter against search_blob.
872                // normalize_attr_filter converts "key:value" → "%key=value%" so the LIKE
873                // pattern matches the new pipe-bounded `|key=value|` blob format.
874                let mut attr_expr: Option<Expr> = None;
875                for f in attr_filters {
876                    let pattern = crate::parquet::tracing::queries::normalize_attr_filter(f);
877                    let cond = match_attr_expr(col(SEARCH_BLOB_COL), lit(pattern));
878                    attr_expr = Some(match attr_expr {
879                        None => cond,
880                        Some(e) => e.or(cond),
881                    });
882                }
883                if let Some(expr) = attr_expr {
884                    spans_df = spans_df.filter(expr)?;
885                }
886
887                // Collect matching trace IDs eagerly, then apply as IN-list filter.
888                // Use HashSet for O(1) dedup instead of O(n²) Vec::contains().
889                let span_batches = spans_df.select_columns(&[TRACE_ID_COL])?.collect().await?;
890                let mut seen_ids: std::collections::HashSet<Vec<u8>> =
891                    std::collections::HashSet::new();
892                let mut binary_ids: Vec<Expr> = Vec::new();
893                for batch in &span_batches {
894                    // trace_id may be FixedSizeBinary(16) or Binary after Delta round-trip.
895                    // Cast to Binary to handle both uniformly.
896                    if let Some(col_ref) = batch.column_by_name(TRACE_ID_COL) {
897                        let casted = compute::cast(col_ref, &DataType::Binary)?;
898                        let col_arr =
899                            casted
900                                .as_any()
901                                .downcast_ref::<BinaryArray>()
902                                .ok_or_else(|| {
903                                    TraceEngineError::DowncastError("trace_id to BinaryArray")
904                                })?;
905                        for i in 0..batch.num_rows() {
906                            let id_bytes = col_arr.value(i).to_vec();
907                            if seen_ids.insert(id_bytes.clone()) {
908                                binary_ids.push(lit(ScalarValue::Binary(Some(id_bytes))));
909                            }
910                        }
911                    }
912                }
913
914                if !binary_ids.is_empty() {
915                    df = df.filter(col(TRACE_ID_COL).in_list(binary_ids, false))?;
916                } else {
917                    // No matching spans → return empty result
918                    df = df.filter(lit(false))?;
919                }
920            }
921        }
922
923        // ── Sort: DESC for "next", ASC for "previous" ────────────────────────
924        // "previous" direction fetches the oldest limit+1 items newer than the cursor,
925        // which matches the original Rust post-reversal behavior.
926        df = if direction == "previous" {
927            df.sort(vec![
928                col(START_TIME_COL).sort(true, true),
929                col(TRACE_ID_COL).sort(true, true),
930            ])?
931        } else {
932            df.sort(vec![
933                col(START_TIME_COL).sort(false, false),
934                col(TRACE_ID_COL).sort(false, false),
935            ])?
936        };
937
938        // ── LIMIT pushed into DataFusion (fetch limit+1 to detect next page) ─
939        df = df.limit(0, Some(limit + 1))?;
940
941        let batches = df.collect().await?;
942        let mut items = batches_to_trace_list_items(batches)?;
943
944        let has_more = items.len() > limit;
945        if has_more {
946            items.pop(); // remove N+1 sentinel
947        }
948
949        // Direction-specific cursor logic — mirrors the original PostgreSQL implementation.
950        //
951        // "next" (DESC order): items are newest-first. The sentinel tells us if older
952        // items exist (has_next). Cursor presence means we navigated forward, so newer
953        // items exist behind us (has_previous).
954        //
955        // "previous" (ASC order): items are oldest-first (closest-to-cursor first).
956        // The sentinel tells us if even more newer items exist (has_previous). Cursor
957        // presence means we navigated backward, so older items exist ahead (has_next).
958        // Items stay in ASC order — no reversal — matching PG behavior exactly.
959        let (has_next, next_cursor, has_previous, previous_cursor) = match direction {
960            "next" => {
961                let next_cursor = if has_more {
962                    items.last().map(|item| TraceCursor {
963                        start_time: item.start_time,
964                        trace_id: item.trace_id.clone(),
965                    })
966                } else {
967                    None
968                };
969
970                let previous_cursor = items.first().map(|item| TraceCursor {
971                    start_time: item.start_time,
972                    trace_id: item.trace_id.clone(),
973                });
974
975                (
976                    has_more,
977                    next_cursor,
978                    filters.cursor_start_time.is_some(),
979                    previous_cursor,
980                )
981            }
982            "previous" => {
983                // ASC order: items.last() is the newest (largest start_time).
984                // To continue backward (fetch even newer items), the cursor must
985                // point past the current page's newest item so `> cursor` excludes
986                // everything already returned.
987                let previous_cursor = if has_more {
988                    items.last().map(|item| TraceCursor {
989                        start_time: item.start_time,
990                        trace_id: item.trace_id.clone(),
991                    })
992                } else {
993                    None
994                };
995
996                // items.first() is the oldest (smallest start_time).
997                // To go forward (back toward newer-first / DESC pages), the cursor
998                // must point at the oldest item so `< cursor` fetches older items.
999                let next_cursor = items.first().map(|item| TraceCursor {
1000                    start_time: item.start_time,
1001                    trace_id: item.trace_id.clone(),
1002                });
1003
1004                (
1005                    filters.cursor_start_time.is_some(),
1006                    next_cursor,
1007                    has_more,
1008                    previous_cursor,
1009                )
1010            }
1011            _ => (false, None, false, None),
1012        };
1013
1014        Ok(TracePaginationResponse {
1015            items,
1016            has_next,
1017            next_cursor,
1018            has_previous,
1019            previous_cursor,
1020        })
1021    }
1022}
1023
1024// ── Arrow → TraceListItem conversion ─────────────────────────────────────────
1025
1026/// Extract attributes from a MapArray at a given row index.
1027fn extract_map_attributes(map_array: &MapArray, row_idx: usize) -> Vec<Attribute> {
1028    if map_array.is_null(row_idx) {
1029        return Vec::new();
1030    }
1031    let entry = map_array.value(row_idx);
1032    let Some(struct_array) = entry.as_any().downcast_ref::<StructArray>() else {
1033        tracing::warn!("extract_map_attributes: failed to downcast to StructArray");
1034        return Vec::new();
1035    };
1036    let Some(keys_arr) = compute::cast(struct_array.column(0).as_ref(), &DataType::Utf8).ok()
1037    else {
1038        tracing::warn!("extract_map_attributes: failed to cast keys to Utf8");
1039        return Vec::new();
1040    };
1041    let Some(keys) = keys_arr.as_any().downcast_ref::<StringArray>() else {
1042        tracing::warn!("extract_map_attributes: failed to downcast keys to StringArray");
1043        return Vec::new();
1044    };
1045    let Some(values_arr) = compute::cast(struct_array.column(1).as_ref(), &DataType::Utf8).ok()
1046    else {
1047        tracing::warn!("extract_map_attributes: failed to cast values to Utf8");
1048        return Vec::new();
1049    };
1050    let Some(values) = values_arr.as_any().downcast_ref::<StringArray>() else {
1051        tracing::warn!("extract_map_attributes: failed to downcast values to StringArray");
1052        return Vec::new();
1053    };
1054
1055    (0..struct_array.len())
1056        .map(|i| Attribute {
1057            key: keys.value(i).to_string(),
1058            value: serde_json::from_str(values.value(i)).unwrap_or(serde_json::Value::Null),
1059        })
1060        .collect()
1061}
1062
1063/// Extract a `Vec<String>` from a nullable `ListArray` at a given row index.
1064fn extract_list_strings(list: Option<&ListArray>, row_idx: usize) -> Vec<String> {
1065    let Some(list) = list else {
1066        return Vec::new();
1067    };
1068    if list.is_null(row_idx) {
1069        return Vec::new();
1070    }
1071    let inner = list.value(row_idx);
1072    let str_arr = compute::cast(&inner, &DataType::Utf8)
1073        .ok()
1074        .and_then(|a| a.as_any().downcast_ref::<StringArray>().cloned());
1075    match str_arr {
1076        Some(arr) => (0..arr.len())
1077            .filter(|i| !arr.is_null(*i))
1078            .map(|i| arr.value(i).to_string())
1079            .collect(),
1080        None => Vec::new(),
1081    }
1082}
1083
1084fn batches_to_trace_list_items(
1085    batches: Vec<RecordBatch>,
1086) -> Result<Vec<TraceListItem>, TraceEngineError> {
1087    let mut items = Vec::new();
1088
1089    for batch in &batches {
1090        // trace_id may come back as FixedSizeBinary(16) or Binary depending on
1091        // whether DataFusion/Delta round-tripped the schema. Handle both.
1092        let trace_id_col = batch.column_by_name(TRACE_ID_COL).ok_or_else(|| {
1093            TraceEngineError::UnsupportedOperation("missing trace_id column".into())
1094        })?;
1095        let trace_id_binary = compute::cast(trace_id_col, &DataType::Binary)?;
1096        let trace_ids = trace_id_binary
1097            .as_any()
1098            .downcast_ref::<BinaryArray>()
1099            .ok_or_else(|| {
1100                TraceEngineError::UnsupportedOperation("trace_id cast to BinaryArray failed".into())
1101            })?;
1102
1103        // Cast all string/dictionary columns to Utf8 uniformly (handles Utf8View,
1104        // Dictionary(Int32, Utf8), LargeUtf8, etc.).
1105        let svc_arr = compute::cast(
1106            batch.column_by_name(SERVICE_NAME_COL).ok_or_else(|| {
1107                TraceEngineError::UnsupportedOperation("missing service_name column".into())
1108            })?,
1109            &DataType::Utf8,
1110        )?;
1111        let service_names = svc_arr
1112            .as_any()
1113            .downcast_ref::<StringArray>()
1114            .ok_or_else(|| {
1115                TraceEngineError::UnsupportedOperation(
1116                    "service_name cast to StringArray failed".into(),
1117                )
1118            })?;
1119
1120        let scope_arr = compute::cast(
1121            batch.column_by_name(SCOPE_NAME_COL).ok_or_else(|| {
1122                TraceEngineError::UnsupportedOperation("missing scope_name column".into())
1123            })?,
1124            &DataType::Utf8,
1125        )?;
1126        let scope_names = scope_arr
1127            .as_any()
1128            .downcast_ref::<StringArray>()
1129            .ok_or_else(|| {
1130                TraceEngineError::UnsupportedOperation(
1131                    "scope_name cast to StringArray failed".into(),
1132                )
1133            })?;
1134
1135        let scopev_arr = compute::cast(
1136            batch.column_by_name(SCOPE_VERSION_COL).ok_or_else(|| {
1137                TraceEngineError::UnsupportedOperation("missing scope_version column".into())
1138            })?,
1139            &DataType::Utf8,
1140        )?;
1141        let scope_versions = scopev_arr
1142            .as_any()
1143            .downcast_ref::<StringArray>()
1144            .ok_or_else(|| {
1145                TraceEngineError::UnsupportedOperation(
1146                    "scope_version cast to StringArray failed".into(),
1147                )
1148            })?;
1149
1150        let root_arr = compute::cast(
1151            batch.column_by_name(ROOT_OPERATION_COL).ok_or_else(|| {
1152                TraceEngineError::UnsupportedOperation("missing root_operation column".into())
1153            })?,
1154            &DataType::Utf8,
1155        )?;
1156        let root_operations = root_arr
1157            .as_any()
1158            .downcast_ref::<StringArray>()
1159            .ok_or_else(|| {
1160                TraceEngineError::UnsupportedOperation(
1161                    "root_operation cast to StringArray failed".into(),
1162                )
1163            })?;
1164
1165        let sm_arr = compute::cast(
1166            batch.column_by_name(STATUS_MESSAGE_COL).ok_or_else(|| {
1167                TraceEngineError::UnsupportedOperation("missing status_message column".into())
1168            })?,
1169            &DataType::Utf8,
1170        )?;
1171        let status_messages = sm_arr
1172            .as_any()
1173            .downcast_ref::<StringArray>()
1174            .ok_or_else(|| {
1175                TraceEngineError::UnsupportedOperation(
1176                    "status_message cast to StringArray failed".into(),
1177                )
1178            })?;
1179
1180        let resource_attrs_map = batch
1181            .column_by_name(RESOURCE_ATTRIBUTES_COL)
1182            .and_then(|c| c.as_any().downcast_ref::<MapArray>())
1183            .ok_or_else(|| {
1184                TraceEngineError::UnsupportedOperation("missing resource_attributes column".into())
1185            })?;
1186
1187        let entity_ids_list = batch
1188            .column_by_name(ENTITY_IDS_COL)
1189            .and_then(|c| c.as_any().downcast_ref::<ListArray>());
1190
1191        let queue_ids_list = batch
1192            .column_by_name(QUEUE_IDS_COL)
1193            .and_then(|c| c.as_any().downcast_ref::<ListArray>());
1194
1195        let start_times = batch
1196            .column_by_name(START_TIME_COL)
1197            .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>())
1198            .ok_or_else(|| {
1199                TraceEngineError::UnsupportedOperation("missing start_time column".into())
1200            })?;
1201
1202        let end_times = batch
1203            .column_by_name(END_TIME_COL)
1204            .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>())
1205            .ok_or_else(|| {
1206                TraceEngineError::UnsupportedOperation("missing end_time column".into())
1207            })?;
1208
1209        let durations = batch
1210            .column_by_name(DURATION_MS_COL)
1211            .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1212            .ok_or_else(|| {
1213                TraceEngineError::UnsupportedOperation("missing duration_ms column".into())
1214            })?;
1215
1216        let status_codes = batch
1217            .column_by_name(STATUS_CODE_COL)
1218            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
1219            .ok_or_else(|| {
1220                TraceEngineError::UnsupportedOperation("missing status_code column".into())
1221            })?;
1222
1223        let span_counts = batch
1224            .column_by_name(SPAN_COUNT_COL)
1225            .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1226            .ok_or_else(|| {
1227                TraceEngineError::UnsupportedOperation("missing span_count column".into())
1228            })?;
1229
1230        let error_counts = batch
1231            .column_by_name(ERROR_COUNT_COL)
1232            .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1233            .ok_or_else(|| {
1234                TraceEngineError::UnsupportedOperation("missing error_count column".into())
1235            })?;
1236
1237        for i in 0..batch.num_rows() {
1238            let trace_id_hex = hex::encode(trace_ids.value(i));
1239
1240            let start_time = micros_to_datetime(start_times.value(i))?;
1241            let end_time = if end_times.is_null(i) {
1242                None
1243            } else {
1244                Some(micros_to_datetime(end_times.value(i))?)
1245            };
1246            let duration_ms = if durations.is_null(i) {
1247                None
1248            } else {
1249                Some(durations.value(i))
1250            };
1251            let error_count = error_counts.value(i);
1252
1253            let resource_attributes = extract_map_attributes(resource_attrs_map, i);
1254
1255            let entity_ids = extract_list_strings(entity_ids_list, i);
1256            let queue_ids = extract_list_strings(queue_ids_list, i);
1257
1258            items.push(TraceListItem {
1259                trace_id: trace_id_hex,
1260                service_name: service_names.value(i).to_string(),
1261                scope_name: scope_names.value(i).to_string(),
1262                scope_version: scope_versions.value(i).to_string(),
1263                root_operation: root_operations.value(i).to_string(),
1264                start_time,
1265                end_time,
1266                duration_ms,
1267                status_code: status_codes.value(i),
1268                status_message: if status_messages.is_null(i) {
1269                    None
1270                } else {
1271                    Some(status_messages.value(i).to_string())
1272                },
1273                span_count: span_counts.value(i),
1274                has_errors: error_count > 0,
1275                error_count,
1276                resource_attributes,
1277                entity_ids,
1278                queue_ids,
1279            });
1280        }
1281    }
1282
1283    Ok(items)
1284}
1285
1286fn micros_to_datetime(micros: i64) -> Result<DateTime<Utc>, TraceEngineError> {
1287    DateTime::from_timestamp_micros(micros).ok_or(TraceEngineError::InvalidTimestamp(
1288        "out-of-range microsecond timestamp",
1289    ))
1290}
1291
1292#[cfg(test)]
1293mod tests {
1294    use super::*;
1295    use crate::storage::ObjectStore;
1296    use scouter_settings::ObjectStorageSettings;
1297    use scouter_types::sql::TraceFilters;
1298    use scouter_types::{Attribute, SpanId, TraceId, TraceSpanRecord};
1299    use tracing_subscriber;
1300
1301    fn cleanup() {
1302        let _ = tracing_subscriber::fmt()
1303            .with_max_level(tracing::Level::INFO)
1304            .try_init();
1305
1306        let storage_settings = ObjectStorageSettings::default();
1307        let current_dir = std::env::current_dir().unwrap();
1308        let storage_path = current_dir.join(storage_settings.storage_root());
1309        if storage_path.exists() {
1310            let _ = std::fs::remove_dir_all(storage_path);
1311        }
1312    }
1313
1314    fn make_test_object_store(storage_settings: &ObjectStorageSettings) -> ObjectStore {
1315        ObjectStore::new(storage_settings).unwrap()
1316    }
1317
1318    /// Build a standalone SessionContext for test use (no trace_spans registered).
1319    /// Attribute-filter paths that need trace_spans are not exercised in these tests.
1320    fn make_test_ctx(object_store: &ObjectStore) -> Arc<SessionContext> {
1321        Arc::new(object_store.get_session().unwrap())
1322    }
1323
1324    fn make_summary(
1325        trace_id_bytes: [u8; 16],
1326        service_name: &str,
1327        error_count: i64,
1328        resource_attributes: Vec<Attribute>,
1329    ) -> TraceSummaryRecord {
1330        let now = Utc::now();
1331        TraceSummaryRecord {
1332            trace_id: TraceId::from_bytes(trace_id_bytes),
1333            service_name: service_name.to_string(),
1334            scope_name: "test.scope".to_string(),
1335            scope_version: String::new(),
1336            root_operation: "root_op".to_string(),
1337            start_time: now,
1338            end_time: Some(now + chrono::Duration::milliseconds(200)),
1339            status_code: if error_count > 0 { 2 } else { 0 },
1340            status_message: if error_count > 0 {
1341                "Internal Server Error".to_string()
1342            } else {
1343                "OK".to_string()
1344            },
1345            span_count: 3,
1346            error_count,
1347            resource_attributes,
1348            entity_ids: vec![],
1349            queue_ids: vec![],
1350        }
1351    }
1352
1353    /// Basic write + paginate round-trip: writes two summaries and verifies both appear.
1354    #[tokio::test]
1355    async fn test_summary_write_and_paginate_basic() -> Result<(), TraceEngineError> {
1356        cleanup();
1357
1358        let storage_settings = ObjectStorageSettings::default();
1359        let object_store = make_test_object_store(&storage_settings);
1360        let ctx = make_test_ctx(&object_store);
1361        let service = TraceSummaryService::new(&object_store, 24, ctx).await?;
1362
1363        let s1 = make_summary([1u8; 16], "svc_a", 0, vec![]);
1364        let s2 = make_summary([2u8; 16], "svc_b", 0, vec![]);
1365        service.write_summaries(vec![s1, s2]).await?;
1366        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1367
1368        let start = Utc::now() - chrono::Duration::hours(1);
1369        let end = Utc::now() + chrono::Duration::hours(1);
1370        let filters = TraceFilters {
1371            service_name: None,
1372            has_errors: None,
1373            status_code: None,
1374            start_time: Some(start),
1375            end_time: Some(end),
1376            limit: Some(25),
1377            cursor_start_time: None,
1378            cursor_trace_id: None,
1379            direction: None,
1380            attribute_filters: None,
1381            trace_ids: None,
1382            entity_uid: None,
1383            queue_uid: None,
1384        };
1385
1386        let response = service.query_service.get_paginated_traces(&filters).await?;
1387        assert!(
1388            response.items.len() >= 2,
1389            "Expected at least 2 items, got {}",
1390            response.items.len()
1391        );
1392
1393        service.shutdown().await?;
1394        cleanup();
1395        Ok(())
1396    }
1397
1398    /// `has_errors = Some(true)` returns only error traces; `Some(false)` returns only non-errors.
1399    #[tokio::test]
1400    async fn test_summary_has_errors_filter() -> Result<(), TraceEngineError> {
1401        cleanup();
1402
1403        let storage_settings = ObjectStorageSettings::default();
1404        let object_store = make_test_object_store(&storage_settings);
1405        let ctx = make_test_ctx(&object_store);
1406        let service = TraceSummaryService::new(&object_store, 24, ctx).await?;
1407
1408        let ok_summary = make_summary([3u8; 16], "svc", 0, vec![]);
1409        let err_summary = make_summary([4u8; 16], "svc", 2, vec![]);
1410        service
1411            .write_summaries(vec![ok_summary, err_summary])
1412            .await?;
1413        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1414
1415        let start = Utc::now() - chrono::Duration::hours(1);
1416        let end = Utc::now() + chrono::Duration::hours(1);
1417
1418        let base_filters = TraceFilters {
1419            service_name: None,
1420            has_errors: None,
1421            status_code: None,
1422            start_time: Some(start),
1423            end_time: Some(end),
1424            limit: Some(25),
1425            cursor_start_time: None,
1426            cursor_trace_id: None,
1427            direction: None,
1428            attribute_filters: None,
1429            trace_ids: None,
1430            entity_uid: None,
1431            queue_uid: None,
1432        };
1433
1434        // has_errors = true → only error trace
1435        let mut filters_err = base_filters.clone();
1436        filters_err.has_errors = Some(true);
1437        let errors_only = service
1438            .query_service
1439            .get_paginated_traces(&filters_err)
1440            .await?;
1441        for item in &errors_only.items {
1442            assert!(
1443                item.error_count > 0,
1444                "Expected error trace, got: {:?}",
1445                item
1446            );
1447        }
1448        assert!(
1449            !errors_only.items.is_empty(),
1450            "Expected at least one error trace"
1451        );
1452
1453        // has_errors = false → only non-error traces
1454        let mut filters_ok = base_filters.clone();
1455        filters_ok.has_errors = Some(false);
1456        let no_errors = service
1457            .query_service
1458            .get_paginated_traces(&filters_ok)
1459            .await?;
1460        for item in &no_errors.items {
1461            assert_eq!(
1462                item.error_count, 0,
1463                "Expected non-error trace, got error_count={}",
1464                item.error_count
1465            );
1466        }
1467        assert!(
1468            !no_errors.items.is_empty(),
1469            "Expected at least one non-error trace"
1470        );
1471
1472        service.shutdown().await?;
1473        cleanup();
1474        Ok(())
1475    }
1476
1477    /// service_name filter returns only matching service traces.
1478    #[tokio::test]
1479    async fn test_summary_service_name_filter() -> Result<(), TraceEngineError> {
1480        cleanup();
1481
1482        let storage_settings = ObjectStorageSettings::default();
1483        let object_store = make_test_object_store(&storage_settings);
1484        let ctx = make_test_ctx(&object_store);
1485        let service = TraceSummaryService::new(&object_store, 24, ctx).await?;
1486
1487        let s_alpha = make_summary([5u8; 16], "alpha_service", 0, vec![]);
1488        let s_beta = make_summary([6u8; 16], "beta_service", 0, vec![]);
1489        service.write_summaries(vec![s_alpha, s_beta]).await?;
1490        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1491
1492        let start = Utc::now() - chrono::Duration::hours(1);
1493        let end = Utc::now() + chrono::Duration::hours(1);
1494        let filters = TraceFilters {
1495            service_name: Some("alpha_service".to_string()),
1496            has_errors: None,
1497            status_code: None,
1498            start_time: Some(start),
1499            end_time: Some(end),
1500            limit: Some(25),
1501            cursor_start_time: None,
1502            cursor_trace_id: None,
1503            direction: None,
1504            attribute_filters: None,
1505            trace_ids: None,
1506            entity_uid: None,
1507            queue_uid: None,
1508        };
1509
1510        let response = service.query_service.get_paginated_traces(&filters).await?;
1511        assert!(
1512            !response.items.is_empty(),
1513            "Expected results for alpha_service"
1514        );
1515        for item in &response.items {
1516            assert_eq!(
1517                item.service_name, "alpha_service",
1518                "Expected only alpha_service items, got: {}",
1519                item.service_name
1520            );
1521        }
1522
1523        service.shutdown().await?;
1524        cleanup();
1525        Ok(())
1526    }
1527
1528    /// trace_ids IN filter returns only the specified traces.
1529    #[tokio::test]
1530    async fn test_summary_trace_ids_filter() -> Result<(), TraceEngineError> {
1531        cleanup();
1532
1533        let storage_settings = ObjectStorageSettings::default();
1534        let object_store = make_test_object_store(&storage_settings);
1535        let ctx = make_test_ctx(&object_store);
1536        let service = TraceSummaryService::new(&object_store, 24, ctx).await?;
1537
1538        let wanted_id = TraceId::from_bytes([7u8; 16]);
1539        let unwanted_id = TraceId::from_bytes([8u8; 16]);
1540
1541        let s1 = make_summary([7u8; 16], "svc", 0, vec![]);
1542        let s2 = make_summary([8u8; 16], "svc", 0, vec![]);
1543        service.write_summaries(vec![s1, s2]).await?;
1544        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1545
1546        let start = Utc::now() - chrono::Duration::hours(1);
1547        let end = Utc::now() + chrono::Duration::hours(1);
1548        let filters = TraceFilters {
1549            service_name: None,
1550            has_errors: None,
1551            status_code: None,
1552            start_time: Some(start),
1553            end_time: Some(end),
1554            limit: Some(25),
1555            cursor_start_time: None,
1556            cursor_trace_id: None,
1557            direction: None,
1558            attribute_filters: None,
1559            trace_ids: Some(vec![wanted_id.to_hex()]),
1560            entity_uid: None,
1561            queue_uid: None,
1562        };
1563
1564        let response = service.query_service.get_paginated_traces(&filters).await?;
1565        assert_eq!(
1566            response.items.len(),
1567            1,
1568            "Expected exactly 1 item from trace_ids filter"
1569        );
1570        assert_eq!(
1571            response.items[0].trace_id,
1572            wanted_id.to_hex(),
1573            "Returned wrong trace_id"
1574        );
1575        assert_ne!(
1576            response.items[0].trace_id,
1577            unwanted_id.to_hex(),
1578            "Should not have returned unwanted trace_id"
1579        );
1580
1581        service.shutdown().await?;
1582        cleanup();
1583        Ok(())
1584    }
1585
1586    /// Cursor pagination: first page → next → previous all return correct item counts.
1587    #[tokio::test]
1588    async fn test_summary_cursor_pagination() -> Result<(), TraceEngineError> {
1589        cleanup();
1590        let storage_settings = ObjectStorageSettings::default();
1591        let object_store = make_test_object_store(&storage_settings);
1592        let ctx = make_test_ctx(&object_store);
1593        let service = TraceSummaryService::new(&object_store, 24, ctx).await?;
1594
1595        let now = Utc::now();
1596        let summaries: Vec<TraceSummaryRecord> = (0u8..100)
1597            .map(|i| {
1598                let mut s = make_summary([i; 16], "svc", 0, vec![]);
1599                s.start_time = now - chrono::Duration::minutes(i as i64);
1600                s
1601            })
1602            .collect();
1603        service.write_summaries(summaries).await?;
1604        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1605
1606        let mut filters = TraceFilters {
1607            start_time: Some(now - chrono::Duration::hours(2)),
1608            end_time: Some(now + chrono::Duration::hours(1)),
1609            limit: Some(50),
1610            ..Default::default()
1611        };
1612
1613        // First page
1614        let first = service.query_service.get_paginated_traces(&filters).await?;
1615        assert_eq!(first.items.len(), 50, "first page: 50 items");
1616        assert!(
1617            first.next_cursor.is_some(),
1618            "first page: should have next_cursor"
1619        );
1620
1621        // Next page
1622        let next_cur = first.next_cursor.clone().unwrap();
1623        filters.cursor_start_time = Some(next_cur.start_time);
1624        filters.cursor_trace_id = Some(next_cur.trace_id.clone());
1625        filters.direction = Some("next".to_string());
1626        let second = service.query_service.get_paginated_traces(&filters).await?;
1627        assert_eq!(second.items.len(), 50, "second page: 50 items");
1628        assert!(
1629            second.items[0].start_time <= next_cur.start_time,
1630            "second page first item must be <= cursor"
1631        );
1632        assert!(second.previous_cursor.is_some());
1633
1634        // Previous page
1635        let prev_cur = second.previous_cursor.unwrap();
1636        filters.cursor_start_time = Some(prev_cur.start_time);
1637        filters.cursor_trace_id = Some(prev_cur.trace_id.clone());
1638        filters.direction = Some("previous".to_string());
1639        let prev = service.query_service.get_paginated_traces(&filters).await?;
1640        assert_eq!(prev.items.len(), 50, "previous page: 50 items");
1641
1642        service.shutdown().await?;
1643        cleanup();
1644        Ok(())
1645    }
1646
1647    /// Attribute-filter JOIN path: only traces with matching span attributes are returned.
1648    #[tokio::test]
1649    async fn test_summary_attribute_filter_via_join() -> Result<(), TraceEngineError> {
1650        use crate::parquet::tracing::service::TraceSpanService;
1651
1652        cleanup();
1653        let storage_settings = ObjectStorageSettings::default();
1654
1655        // TraceSpanService owns the SessionContext (trace_spans registered in it)
1656        let span_service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
1657        let shared_ctx = span_service.ctx.clone();
1658
1659        // TraceSummaryService shares the same ctx — JOIN to trace_spans will work
1660        let summary_service =
1661            TraceSummaryService::new(&span_service.object_store, 24, shared_ctx).await?;
1662
1663        let now = Utc::now();
1664        let kafka_trace = TraceId::from_bytes([70u8; 16]);
1665        let plain_trace = TraceId::from_bytes([80u8; 16]);
1666
1667        let kafka_span = make_span_record(
1668            &kafka_trace,
1669            SpanId::from_bytes([70u8; 8]),
1670            "svc",
1671            vec![Attribute {
1672                key: "component".to_string(),
1673                value: serde_json::Value::String("kafka".to_string()),
1674            }],
1675        );
1676        let plain_span =
1677            make_span_record(&plain_trace, SpanId::from_bytes([80u8; 8]), "svc", vec![]);
1678        span_service
1679            .write_spans(vec![kafka_span, plain_span])
1680            .await?;
1681
1682        let mut kafka_summary = make_summary([70u8; 16], "svc", 0, vec![]);
1683        kafka_summary.start_time = now;
1684        let mut plain_summary = make_summary([80u8; 16], "svc", 0, vec![]);
1685        plain_summary.start_time = now;
1686        summary_service
1687            .write_summaries(vec![kafka_summary, plain_summary])
1688            .await?;
1689
1690        tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
1691
1692        let filters = TraceFilters {
1693            start_time: Some(now - chrono::Duration::hours(1)),
1694            end_time: Some(now + chrono::Duration::hours(1)),
1695            attribute_filters: Some(vec!["component:kafka".to_string()]),
1696            limit: Some(25),
1697            ..Default::default()
1698        };
1699
1700        let response = summary_service
1701            .query_service
1702            .get_paginated_traces(&filters)
1703            .await?;
1704
1705        assert!(
1706            !response.items.is_empty(),
1707            "attribute filter must return results"
1708        );
1709        assert!(
1710            response
1711                .items
1712                .iter()
1713                .all(|i| i.trace_id == kafka_trace.to_hex()),
1714            "only kafka trace should appear; got {:?}",
1715            response
1716                .items
1717                .iter()
1718                .map(|i| &i.trace_id)
1719                .collect::<Vec<_>>()
1720        );
1721
1722        span_service.shutdown().await?;
1723        summary_service.shutdown().await?;
1724        cleanup();
1725        Ok(())
1726    }
1727
1728    /// queue_uid filter: only traces whose queue_ids contain the target UID are returned,
1729    /// and the matching trace's spans can be fetched by trace_id.
1730    #[tokio::test]
1731    async fn test_summary_queue_id_filter_and_span_lookup() -> Result<(), TraceEngineError> {
1732        use crate::parquet::tracing::service::TraceSpanService;
1733
1734        cleanup();
1735        let storage_settings = ObjectStorageSettings::default();
1736
1737        // TraceSpanService owns the SessionContext
1738        let span_service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
1739        let shared_ctx = span_service.ctx.clone();
1740
1741        // TraceSummaryService shares the same ctx so JOIN path works
1742        let summary_service =
1743            TraceSummaryService::new(&span_service.object_store, 24, shared_ctx).await?;
1744
1745        let now = Utc::now();
1746        let queue_trace = TraceId::from_bytes([90u8; 16]);
1747        let plain_trace = TraceId::from_bytes([91u8; 16]);
1748        let target_queue_uid = "queue-record-abc123";
1749
1750        // Write spans for both traces
1751        let queue_span = make_span_record(
1752            &queue_trace,
1753            SpanId::from_bytes([90u8; 8]),
1754            "svc_queue",
1755            vec![],
1756        );
1757        let plain_span = make_span_record(
1758            &plain_trace,
1759            SpanId::from_bytes([91u8; 8]),
1760            "svc_queue",
1761            vec![],
1762        );
1763        span_service
1764            .write_spans_direct(vec![queue_span, plain_span])
1765            .await?;
1766
1767        // Write summaries: one with a matching queue_id, one without
1768        let mut queue_summary = make_summary([90u8; 16], "svc_queue", 0, vec![]);
1769        queue_summary.start_time = now;
1770        queue_summary.queue_ids = vec![target_queue_uid.to_string()];
1771
1772        let mut plain_summary = make_summary([91u8; 16], "svc_queue", 0, vec![]);
1773        plain_summary.start_time = now;
1774        // queue_ids left empty — should not appear in results
1775
1776        summary_service
1777            .write_summaries(vec![queue_summary, plain_summary])
1778            .await?;
1779
1780        tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
1781
1782        // ── Step 1: query summaries by queue_uid ─────────────────────────────────
1783        let filters = TraceFilters {
1784            start_time: Some(now - chrono::Duration::hours(1)),
1785            end_time: Some(now + chrono::Duration::hours(1)),
1786            queue_uid: Some(target_queue_uid.to_string()),
1787            limit: Some(25),
1788            ..Default::default()
1789        };
1790
1791        let response = summary_service
1792            .query_service
1793            .get_paginated_traces(&filters)
1794            .await?;
1795
1796        assert!(
1797            !response.items.is_empty(),
1798            "queue_uid filter must return at least one result"
1799        );
1800        assert!(
1801            response
1802                .items
1803                .iter()
1804                .all(|i| i.trace_id == queue_trace.to_hex()),
1805            "only the queue trace should appear; got {:?}",
1806            response
1807                .items
1808                .iter()
1809                .map(|i| &i.trace_id)
1810                .collect::<Vec<_>>()
1811        );
1812
1813        // ── Step 2: fetch spans for the returned trace_id ─────────────────────────
1814        let returned_trace_id =
1815            TraceId::from_hex(&response.items[0].trace_id).expect("trace_id must be valid hex");
1816        let spans = span_service
1817            .query_service
1818            .get_trace_spans(
1819                Some(returned_trace_id.as_bytes()),
1820                None,
1821                Some(&(now - chrono::Duration::hours(1))),
1822                Some(&(now + chrono::Duration::hours(1))),
1823                None,
1824            )
1825            .await?;
1826
1827        assert!(
1828            !spans.is_empty(),
1829            "should find spans for the returned trace_id"
1830        );
1831
1832        span_service.shutdown().await?;
1833        summary_service.shutdown().await?;
1834        cleanup();
1835        Ok(())
1836    }
1837
1838    /// Build a deterministic `TraceSpanRecord` for use in summary tests.
1839    fn make_span_record(
1840        trace_id: &TraceId,
1841        span_id: SpanId,
1842        service_name: &str,
1843        attributes: Vec<Attribute>,
1844    ) -> TraceSpanRecord {
1845        let now = Utc::now();
1846        TraceSpanRecord {
1847            created_at: now,
1848            trace_id: *trace_id,
1849            span_id,
1850            parent_span_id: None,
1851            flags: 1,
1852            trace_state: String::new(),
1853            scope_name: "test.scope".to_string(),
1854            scope_version: None,
1855            span_name: "op".to_string(),
1856            span_kind: "INTERNAL".to_string(),
1857            start_time: now,
1858            end_time: now + chrono::Duration::milliseconds(100),
1859            duration_ms: 100,
1860            status_code: 0,
1861            status_message: "OK".to_string(),
1862            attributes,
1863            events: vec![],
1864            links: vec![],
1865            label: None,
1866            input: serde_json::Value::Null,
1867            output: serde_json::Value::Null,
1868            service_name: service_name.to_string(),
1869            resource_attributes: vec![],
1870        }
1871    }
1872
1873    /// `resource_attributes` survive a write → read round-trip.
1874    #[tokio::test]
1875    async fn test_summary_resource_attributes_roundtrip() -> Result<(), TraceEngineError> {
1876        cleanup();
1877
1878        let storage_settings = ObjectStorageSettings::default();
1879        let object_store = make_test_object_store(&storage_settings);
1880        let ctx = make_test_ctx(&object_store);
1881        let service = TraceSummaryService::new(&object_store, 24, ctx).await?;
1882
1883        let attrs = vec![Attribute {
1884            key: "cloud.region".to_string(),
1885            value: serde_json::Value::String("us-east-1".to_string()),
1886        }];
1887        let summary = make_summary([9u8; 16], "svc", 0, attrs.clone());
1888        service.write_summaries(vec![summary]).await?;
1889        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1890
1891        let start = Utc::now() - chrono::Duration::hours(1);
1892        let end = Utc::now() + chrono::Duration::hours(1);
1893        let filters = TraceFilters {
1894            service_name: None,
1895            has_errors: None,
1896            status_code: None,
1897            start_time: Some(start),
1898            end_time: Some(end),
1899            limit: Some(25),
1900            cursor_start_time: None,
1901            cursor_trace_id: None,
1902            direction: None,
1903            attribute_filters: None,
1904            trace_ids: Some(vec![TraceId::from_bytes([9u8; 16]).to_hex()]),
1905            entity_uid: None,
1906            queue_uid: None,
1907        };
1908
1909        let response = service.query_service.get_paginated_traces(&filters).await?;
1910        assert_eq!(response.items.len(), 1, "Expected exactly 1 item");
1911        assert_eq!(
1912            response.items[0].resource_attributes.len(),
1913            1,
1914            "Expected 1 resource attribute"
1915        );
1916        assert_eq!(response.items[0].resource_attributes[0].key, "cloud.region");
1917
1918        service.shutdown().await?;
1919        cleanup();
1920        Ok(())
1921    }
1922
1923    /// Regression test: multiple sequential writes must be immediately visible to queries.
1924    /// This catches the stale snapshot bug where re-registration doesn't refresh the
1925    /// DataFusion session's object store, causing queries after subsequent writes to
1926    /// return stale results.
1927    #[tokio::test]
1928    async fn test_summary_write_visibility_across_multiple_writes() -> Result<(), TraceEngineError>
1929    {
1930        cleanup();
1931
1932        let storage_settings = ObjectStorageSettings::default();
1933        let object_store = make_test_object_store(&storage_settings);
1934        let ctx = make_test_ctx(&object_store);
1935        let service = TraceSummaryService::new(&object_store, 24, ctx).await?;
1936
1937        let start = Utc::now() - chrono::Duration::hours(1);
1938        let end = Utc::now() + chrono::Duration::hours(1);
1939        let filters = TraceFilters {
1940            start_time: Some(start),
1941            end_time: Some(end),
1942            limit: Some(100),
1943            ..Default::default()
1944        };
1945
1946        // Write batch #1 (2 summaries)
1947        let s1 = make_summary([0xA0; 16], "svc_vis", 0, vec![]);
1948        let s2 = make_summary([0xA1; 16], "svc_vis", 0, vec![]);
1949        service.write_summaries(vec![s1, s2]).await?;
1950
1951        let response = service.query_service.get_paginated_traces(&filters).await?;
1952        assert_eq!(
1953            response.items.len(),
1954            2,
1955            "After write #1: expected 2 items, got {}",
1956            response.items.len()
1957        );
1958
1959        // Write batch #2 (2 more summaries)
1960        let s3 = make_summary([0xA2; 16], "svc_vis", 0, vec![]);
1961        let s4 = make_summary([0xA3; 16], "svc_vis", 0, vec![]);
1962        service.write_summaries(vec![s3, s4]).await?;
1963
1964        let response = service.query_service.get_paginated_traces(&filters).await?;
1965        assert_eq!(
1966            response.items.len(),
1967            4,
1968            "After write #2: expected 4 items, got {} (stale snapshot?)",
1969            response.items.len()
1970        );
1971
1972        // Write batch #3 (2 more summaries)
1973        let s5 = make_summary([0xA4; 16], "svc_vis", 0, vec![]);
1974        let s6 = make_summary([0xA5; 16], "svc_vis", 0, vec![]);
1975        service.write_summaries(vec![s5, s6]).await?;
1976
1977        let response = service.query_service.get_paginated_traces(&filters).await?;
1978        assert_eq!(
1979            response.items.len(),
1980            6,
1981            "After write #3: expected 6 items, got {} (stale snapshot?)",
1982            response.items.len()
1983        );
1984
1985        service.shutdown().await?;
1986        cleanup();
1987        Ok(())
1988    }
1989}