Skip to main content

scouter_dataframe/parquet/tracing/
summary.rs

1use crate::error::TraceEngineError;
2use crate::parquet::control::{get_pod_id, ControlTableEngine};
3use crate::parquet::tracing::traits::{arrow_schema_to_delta, resource_attribute_field};
4use crate::parquet::utils::match_attr_expr;
5use crate::parquet::utils::register_cloud_logstore_factories;
6use crate::storage::ObjectStore;
7use arrow::array::*;
8use arrow::compute;
9use arrow::datatypes::*;
10use arrow_array::Array;
11use arrow_array::RecordBatch;
12use chrono::{DateTime, Datelike, TimeZone, Utc};
13use datafusion::logical_expr::{cast as df_cast, col, lit, SortExpr};
14use datafusion::prelude::*;
15use datafusion::scalar::ScalarValue;
16use deltalake::operations::optimize::OptimizeType;
17use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty};
18use scouter_settings::ObjectStorageSettings;
19use scouter_types::sql::{TraceFilters, TraceListItem};
20use scouter_types::{Attribute, TraceCursor, TraceId, TracePaginationResponse, TraceSummaryRecord};
21use std::sync::Arc;
22use tokio::sync::oneshot;
23use tokio::sync::{mpsc, RwLock as AsyncRwLock};
24use tokio::time::{interval, Duration};
25use tracing::{error, info};
26use url::Url;
27
28/// Days from CE epoch to Unix epoch (1970-01-01).
29/// Equivalent to `NaiveDate::from_ymd_opt(1970, 1, 1).unwrap().num_days_from_ce()`.
30const UNIX_EPOCH_DAYS: i32 = 719_163;
31
32const SUMMARY_TABLE_NAME: &str = "trace_summaries";
33
34/// Control table task name for summary compaction coordination.
35const TASK_SUMMARY_OPTIMIZE: &str = "summary_optimize";
36
37// ── Column name constants ────────────────────────────────────────────────────
38const TRACE_ID_COL: &str = "trace_id";
39const SERVICE_NAME_COL: &str = "service_name";
40const SCOPE_NAME_COL: &str = "scope_name";
41const SCOPE_VERSION_COL: &str = "scope_version";
42const ROOT_OPERATION_COL: &str = "root_operation";
43const START_TIME_COL: &str = "start_time";
44const END_TIME_COL: &str = "end_time";
45const DURATION_MS_COL: &str = "duration_ms";
46const STATUS_CODE_COL: &str = "status_code";
47const STATUS_MESSAGE_COL: &str = "status_message";
48const SPAN_COUNT_COL: &str = "span_count";
49const ERROR_COUNT_COL: &str = "error_count";
50const SEARCH_BLOB_COL: &str = "search_blob";
51
52const RESOURCE_ATTRIBUTES_COL: &str = "resource_attributes";
53const ENTITY_IDS_COL: &str = "entity_ids";
54const QUEUE_IDS_COL: &str = "queue_ids";
55
56const PARTITION_DATE_COL: &str = "partition_date";
57
58// ── Schema ───────────────────────────────────────────────────────────────────
59
60fn create_summary_schema() -> Schema {
61    Schema::new(vec![
62        Field::new(TRACE_ID_COL, DataType::FixedSizeBinary(16), false),
63        Field::new(
64            SERVICE_NAME_COL,
65            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
66            false,
67        ),
68        Field::new(SCOPE_NAME_COL, DataType::Utf8, false),
69        Field::new(SCOPE_VERSION_COL, DataType::Utf8, true),
70        Field::new(ROOT_OPERATION_COL, DataType::Utf8, false),
71        Field::new(
72            START_TIME_COL,
73            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
74            false,
75        ),
76        Field::new(
77            END_TIME_COL,
78            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
79            true,
80        ),
81        Field::new(DURATION_MS_COL, DataType::Int64, true),
82        Field::new(STATUS_CODE_COL, DataType::Int32, false),
83        Field::new(STATUS_MESSAGE_COL, DataType::Utf8, true),
84        Field::new(SPAN_COUNT_COL, DataType::Int64, false),
85        Field::new(ERROR_COUNT_COL, DataType::Int64, false),
86        resource_attribute_field(),
87        Field::new(
88            ENTITY_IDS_COL,
89            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
90            true,
91        ),
92        Field::new(
93            QUEUE_IDS_COL,
94            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
95            true,
96        ),
97        Field::new(PARTITION_DATE_COL, DataType::Date32, false),
98    ])
99}
100
101// ── BatchBuilder ─────────────────────────────────────────────────────────────
102
103struct TraceSummaryBatchBuilder {
104    schema: Arc<Schema>,
105    trace_id: FixedSizeBinaryBuilder,
106    service_name: StringDictionaryBuilder<Int32Type>,
107    scope_name: StringBuilder,
108    scope_version: StringBuilder,
109    root_operation: StringBuilder,
110    start_time: TimestampMicrosecondBuilder,
111    end_time: TimestampMicrosecondBuilder,
112    duration_ms: Int64Builder,
113    status_code: Int32Builder,
114    status_message: StringBuilder,
115    span_count: Int64Builder,
116    error_count: Int64Builder,
117    resource_attributes: MapBuilder<StringBuilder, StringViewBuilder>,
118    entity_ids: ListBuilder<StringBuilder>,
119    queue_ids: ListBuilder<StringBuilder>,
120    partition_date: Date32Builder,
121}
122
123impl TraceSummaryBatchBuilder {
124    fn new(schema: Arc<Schema>, capacity: usize) -> Self {
125        let map_field_names = MapFieldNames {
126            entry: "key_value".to_string(),
127            key: "key".to_string(),
128            value: "value".to_string(),
129        };
130        let resource_attributes = MapBuilder::new(
131            Some(map_field_names),
132            StringBuilder::new(),
133            StringViewBuilder::new(),
134        );
135        Self {
136            schema,
137            trace_id: FixedSizeBinaryBuilder::with_capacity(capacity, 16),
138            service_name: StringDictionaryBuilder::new(),
139            scope_name: StringBuilder::with_capacity(capacity, capacity * 16),
140            scope_version: StringBuilder::with_capacity(capacity, capacity * 8),
141            root_operation: StringBuilder::with_capacity(capacity, capacity * 32),
142            start_time: TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone("UTC"),
143            end_time: TimestampMicrosecondBuilder::with_capacity(capacity).with_timezone("UTC"),
144            duration_ms: Int64Builder::with_capacity(capacity),
145            status_code: Int32Builder::with_capacity(capacity),
146            status_message: StringBuilder::with_capacity(capacity, capacity * 16),
147            span_count: Int64Builder::with_capacity(capacity),
148            error_count: Int64Builder::with_capacity(capacity),
149            resource_attributes,
150            entity_ids: ListBuilder::new(StringBuilder::new()),
151            queue_ids: ListBuilder::new(StringBuilder::new()),
152            partition_date: Date32Builder::with_capacity(capacity),
153        }
154    }
155
156    fn append(&mut self, rec: &TraceSummaryRecord) -> Result<(), TraceEngineError> {
157        self.trace_id.append_value(rec.trace_id.as_bytes())?;
158        self.service_name.append_value(&rec.service_name);
159        self.scope_name.append_value(&rec.scope_name);
160        if rec.scope_version.is_empty() {
161            self.scope_version.append_null();
162        } else {
163            self.scope_version.append_value(&rec.scope_version);
164        }
165        self.root_operation.append_value(&rec.root_operation);
166        self.start_time
167            .append_value(rec.start_time.timestamp_micros());
168        match rec.end_time {
169            Some(end) => self.end_time.append_value(end.timestamp_micros()),
170            None => self.end_time.append_null(),
171        }
172        let duration = rec
173            .end_time
174            .map(|end| (end - rec.start_time).num_milliseconds());
175        match duration {
176            Some(d) => self.duration_ms.append_value(d),
177            None => self.duration_ms.append_null(),
178        }
179        self.status_code.append_value(rec.status_code);
180        if rec.status_message.is_empty() {
181            self.status_message.append_null();
182        } else {
183            self.status_message.append_value(&rec.status_message);
184        }
185        self.span_count.append_value(rec.span_count);
186        self.error_count.append_value(rec.error_count);
187        if rec.resource_attributes.is_empty() {
188            self.resource_attributes.append(false)?; // null map
189        } else {
190            for attr in &rec.resource_attributes {
191                self.resource_attributes.keys().append_value(&attr.key);
192                let value_str =
193                    serde_json::to_string(&attr.value).unwrap_or_else(|_| "null".to_string());
194                self.resource_attributes.values().append_value(value_str);
195            }
196            self.resource_attributes.append(true)?;
197        }
198        if rec.entity_ids.is_empty() {
199            self.entity_ids.append_null();
200        } else {
201            for id in &rec.entity_ids {
202                self.entity_ids.values().append_value(id);
203            }
204            self.entity_ids.append(true);
205        }
206        if rec.queue_ids.is_empty() {
207            self.queue_ids.append_null();
208        } else {
209            for id in &rec.queue_ids {
210                self.queue_ids.values().append_value(id);
211            }
212            self.queue_ids.append(true);
213        }
214        // Partition key — days since Unix epoch, derived from start_time
215        let days = rec.start_time.date_naive().num_days_from_ce() - UNIX_EPOCH_DAYS;
216        self.partition_date.append_value(days);
217        Ok(())
218    }
219
220    fn finish(mut self) -> Result<RecordBatch, TraceEngineError> {
221        let columns: Vec<Arc<dyn Array>> = vec![
222            Arc::new(self.trace_id.finish()),
223            Arc::new(self.service_name.finish()),
224            Arc::new(self.scope_name.finish()),
225            Arc::new(self.scope_version.finish()),
226            Arc::new(self.root_operation.finish()),
227            Arc::new(self.start_time.finish()),
228            Arc::new(self.end_time.finish()),
229            Arc::new(self.duration_ms.finish()),
230            Arc::new(self.status_code.finish()),
231            Arc::new(self.status_message.finish()),
232            Arc::new(self.span_count.finish()),
233            Arc::new(self.error_count.finish()),
234            Arc::new(self.resource_attributes.finish()),
235            Arc::new(self.entity_ids.finish()),
236            Arc::new(self.queue_ids.finish()),
237            Arc::new(self.partition_date.finish()),
238        ];
239        RecordBatch::try_new(self.schema, columns).map_err(Into::into)
240    }
241}
242
243// ── TableCommand ─────────────────────────────────────────────────────────────
244
245pub enum SummaryTableCommand {
246    Write {
247        records: Vec<TraceSummaryRecord>,
248        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
249    },
250    Optimize {
251        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
252    },
253    Vacuum {
254        retention_hours: u64,
255        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
256    },
257    Shutdown,
258}
259
260async fn build_summary_url(object_store: &ObjectStore) -> Result<Url, TraceEngineError> {
261    let mut base = object_store.get_base_url()?;
262    let mut path = base.path().to_string();
263    if !path.ends_with('/') {
264        path.push('/');
265    }
266    path.push_str(SUMMARY_TABLE_NAME);
267    base.set_path(&path);
268    Ok(base)
269}
270
271async fn create_summary_table(
272    object_store: &ObjectStore,
273    table_url: Url,
274    schema: SchemaRef,
275) -> Result<DeltaTable, TraceEngineError> {
276    info!(
277        "Creating trace summary table [{}://.../{} ]",
278        table_url.scheme(),
279        table_url
280            .path_segments()
281            .and_then(|mut s| s.next_back())
282            .unwrap_or(SUMMARY_TABLE_NAME)
283    );
284    let store = object_store.as_dyn_object_store();
285    let table = DeltaTableBuilder::from_url(table_url.clone())?
286        .with_storage_backend(store, table_url)
287        .build()?;
288    let delta_fields = arrow_schema_to_delta(&schema);
289    table
290        .create()
291        .with_table_name(SUMMARY_TABLE_NAME)
292        .with_columns(delta_fields)
293        .with_partition_columns(vec![PARTITION_DATE_COL.to_string()])
294        // Only collect min/max statistics for non-binary columns.
295        // trace_id (FixedSizeBinary) has no meaningful ordering for file-level pruning.
296        .with_configuration_property(
297            TableProperty::DataSkippingStatsColumns,
298            Some("start_time,end_time,service_name,duration_ms,status_code,span_count,error_count,partition_date"),
299        )
300        .await
301        .map_err(Into::into)
302}
303
304async fn build_or_create_summary_table(
305    object_store: &ObjectStore,
306    schema: SchemaRef,
307) -> Result<DeltaTable, TraceEngineError> {
308    register_cloud_logstore_factories();
309    let table_url = build_summary_url(object_store).await?;
310    info!(
311        "Loading trace summary table [{}://.../{} ]",
312        table_url.scheme(),
313        table_url
314            .path_segments()
315            .and_then(|mut s| s.next_back())
316            .unwrap_or(SUMMARY_TABLE_NAME)
317    );
318
319    // Check whether a Delta log actually exists. For local tables, check the
320    // filesystem directly. For remote tables, attempt a full load with the
321    // explicit storage backend — required for S3/GCS/Azure where Delta Lake
322    // cannot infer the object store from the URL scheme alone.
323    let is_delta_table = if table_url.scheme() == "file" {
324        if let Ok(path) = table_url.to_file_path() {
325            if !path.exists() {
326                info!("Creating directory for summary table: {:?}", path);
327                std::fs::create_dir_all(&path)?;
328            }
329            path.join("_delta_log").exists()
330        } else {
331            false
332        }
333    } else {
334        let store = object_store.as_dyn_object_store();
335        match DeltaTableBuilder::from_url(table_url.clone()) {
336            Ok(builder) => builder
337                .with_storage_backend(store, table_url.clone())
338                .load()
339                .await
340                .is_ok(),
341            Err(_) => false,
342        }
343    };
344
345    if is_delta_table {
346        info!(
347            "Loaded existing trace summary table [{}://.../{} ]",
348            table_url.scheme(),
349            table_url
350                .path_segments()
351                .and_then(|mut s| s.next_back())
352                .unwrap_or(SUMMARY_TABLE_NAME)
353        );
354        let store = object_store.as_dyn_object_store();
355        DeltaTableBuilder::from_url(table_url.clone())?
356            .with_storage_backend(store, table_url)
357            .load()
358            .await
359            .map_err(Into::into)
360    } else {
361        info!("Summary table does not exist, creating new table");
362        create_summary_table(object_store, table_url, schema).await
363    }
364}
365
366pub struct TraceSummaryDBEngine {
367    schema: Arc<Schema>,
368    table: Arc<AsyncRwLock<DeltaTable>>,
369    pub ctx: Arc<SessionContext>,
370    control: ControlTableEngine,
371}
372
373impl TraceSummaryDBEngine {
374    /// Create a new `TraceSummaryDBEngine` using the provided shared `SessionContext`.
375    ///
376    /// The caller is responsible for passing a `SessionContext` that already has the object-store
377    /// backend configured (e.g. the one from `TraceSpanDBEngine`). This ensures both
378    /// `trace_spans` and `trace_summaries` live in the same context and can participate in
379    /// JOIN queries.
380    pub async fn new(
381        storage_settings: &ObjectStorageSettings,
382        ctx: Arc<SessionContext>,
383    ) -> Result<Self, TraceEngineError> {
384        let object_store = ObjectStore::new(storage_settings)?;
385        let schema = Arc::new(create_summary_schema());
386        let delta_table = build_or_create_summary_table(&object_store, schema.clone()).await?;
387        // A freshly-created table has no committed Parquet files yet — table_provider()
388        // returns an error in that case. Defer registration until the first write.
389        if let Ok(provider) = delta_table.table_provider().await {
390            ctx.register_table(SUMMARY_TABLE_NAME, provider)?;
391        } else {
392            info!("Empty summary table at init — deferring SessionContext registration until first write");
393        }
394
395        let control = ControlTableEngine::new(storage_settings, get_pod_id()).await?;
396
397        Ok(TraceSummaryDBEngine {
398            schema,
399            table: Arc::new(AsyncRwLock::new(delta_table)),
400            ctx,
401            control,
402        })
403    }
404
405    fn build_batch(
406        &self,
407        records: Vec<TraceSummaryRecord>,
408    ) -> Result<RecordBatch, TraceEngineError> {
409        let mut builder = TraceSummaryBatchBuilder::new(self.schema.clone(), records.len());
410        for rec in &records {
411            builder.append(rec)?;
412        }
413        builder.finish()
414    }
415
416    async fn write_records(
417        &self,
418        records: Vec<TraceSummaryRecord>,
419    ) -> Result<(), TraceEngineError> {
420        let count = records.len();
421        info!("Writing {} trace summaries", count);
422        let batch = self.build_batch(records)?;
423
424        let mut table_guard = self.table.write().await;
425
426        if let Err(e) = table_guard.update_incremental(None).await {
427            info!("Summary table update skipped (new table): {}", e);
428        }
429
430        let updated_table = table_guard
431            .clone()
432            .write(vec![batch])
433            .with_save_mode(deltalake::protocol::SaveMode::Append)
434            .with_partition_columns(vec![PARTITION_DATE_COL.to_string()])
435            .await?;
436
437        self.ctx.deregister_table(SUMMARY_TABLE_NAME)?;
438        self.ctx
439            .register_table(SUMMARY_TABLE_NAME, updated_table.table_provider().await?)?;
440
441        *table_guard = updated_table;
442        info!("Summary table updated with {} records", count);
443        Ok(())
444    }
445
446    async fn optimize_table(&self) -> Result<(), TraceEngineError> {
447        let mut table_guard = self.table.write().await;
448        let (updated_table, _metrics) = table_guard
449            .clone()
450            .optimize()
451            .with_target_size(128 * 1024 * 1024)
452            .with_type(OptimizeType::ZOrder(vec![
453                START_TIME_COL.to_string(),
454                SERVICE_NAME_COL.to_string(),
455            ]))
456            .await?;
457
458        self.ctx.deregister_table(SUMMARY_TABLE_NAME)?;
459        self.ctx
460            .register_table(SUMMARY_TABLE_NAME, updated_table.table_provider().await?)?;
461        *table_guard = updated_table;
462        Ok(())
463    }
464
465    async fn vacuum_table(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
466        let mut table_guard = self.table.write().await;
467        let (updated_table, _metrics) = table_guard
468            .clone()
469            .vacuum()
470            .with_retention_period(chrono::Duration::hours(retention_hours as i64))
471            .with_enforce_retention_duration(false)
472            .await?;
473
474        self.ctx.deregister_table(SUMMARY_TABLE_NAME)?;
475        self.ctx
476            .register_table(SUMMARY_TABLE_NAME, updated_table.table_provider().await?)?;
477        *table_guard = updated_table;
478        Ok(())
479    }
480
481    /// Try to claim and run the summary optimize task via the control table.
482    async fn try_run_optimize(&self, interval_hours: u64) {
483        match self.control.try_claim_task(TASK_SUMMARY_OPTIMIZE).await {
484            Ok(true) => match self.optimize_table().await {
485                Ok(()) => {
486                    if let Err(e) = self.vacuum_table(0).await {
487                        error!("Post-optimize vacuum failed: {}", e);
488                    }
489
490                    let _ = self
491                        .control
492                        .release_task(
493                            TASK_SUMMARY_OPTIMIZE,
494                            chrono::Duration::hours(interval_hours as i64),
495                        )
496                        .await;
497                }
498                Err(e) => {
499                    error!("Summary optimize failed: {}", e);
500                    let _ = self
501                        .control
502                        .release_task_on_failure(TASK_SUMMARY_OPTIMIZE)
503                        .await;
504                }
505            },
506            Ok(false) => { /* not due or another pod owns it */ }
507            Err(e) => error!("Summary optimize claim check failed: {}", e),
508        }
509    }
510
511    pub fn start_actor(
512        self,
513        compaction_interval_hours: u64,
514    ) -> (
515        mpsc::Sender<SummaryTableCommand>,
516        tokio::task::JoinHandle<()>,
517    ) {
518        let (tx, mut rx) = mpsc::channel::<SummaryTableCommand>(100);
519
520        let handle = tokio::spawn(async move {
521            // Poll every 5 minutes — the actual schedule is in the control table.
522            let mut scheduler_ticker = interval(Duration::from_secs(5 * 60));
523            scheduler_ticker.tick().await; // skip immediate tick
524
525            loop {
526                tokio::select! {
527                    Some(cmd) = rx.recv() => {
528                        match cmd {
529                            SummaryTableCommand::Write { records, respond_to } => {
530                                let result = self.write_records(records).await;
531                                if let Err(ref e) = result {
532                                    error!("Summary write failed: {}", e);
533                                }
534                                let _ = respond_to.send(result);
535                            }
536                            SummaryTableCommand::Optimize { respond_to } => {
537                                // Direct admin request — bypass control table
538                                let _ = respond_to.send(self.optimize_table().await);
539                                // vacuum table
540                                if let Err(e) = self.vacuum_table(0).await {
541                                    error!("Post-optimize vacuum failed: {}", e);
542                                }
543                            }
544                            SummaryTableCommand::Vacuum { retention_hours, respond_to } => {
545                                let _ = respond_to.send(self.vacuum_table(retention_hours).await);
546                            }
547                            SummaryTableCommand::Shutdown => {
548                                info!("TraceSummaryDBEngine actor shutting down");
549                                break;
550                            }
551                        }
552                    }
553                    _ = scheduler_ticker.tick() => {
554                        self.try_run_optimize(compaction_interval_hours).await;
555                    }
556                }
557            }
558        });
559
560        (tx, handle)
561    }
562}
563
564// ── Service ──────────────────────────────────────────────────────────────────
565
566pub struct TraceSummaryService {
567    engine_tx: mpsc::Sender<SummaryTableCommand>,
568    engine_handle: tokio::task::JoinHandle<()>,
569    pub query_service: TraceSummaryQueries,
570}
571
572impl TraceSummaryService {
573    pub async fn new(
574        storage_settings: &ObjectStorageSettings,
575        compaction_interval_hours: u64,
576        ctx: Arc<SessionContext>,
577    ) -> Result<Self, TraceEngineError> {
578        let engine = TraceSummaryDBEngine::new(storage_settings, ctx).await?;
579        let engine_ctx = engine.ctx.clone();
580        let (engine_tx, engine_handle) = engine.start_actor(compaction_interval_hours);
581
582        Ok(TraceSummaryService {
583            engine_tx,
584            engine_handle,
585            query_service: TraceSummaryQueries::new(engine_ctx),
586        })
587    }
588
589    /// Write a batch of `TraceSummaryRecord`s to the Delta Lake summary table.
590    pub async fn write_summaries(
591        &self,
592        records: Vec<TraceSummaryRecord>,
593    ) -> Result<(), TraceEngineError> {
594        let (tx, rx) = oneshot::channel();
595        self.engine_tx
596            .send(SummaryTableCommand::Write {
597                records,
598                respond_to: tx,
599            })
600            .await
601            .map_err(|_| TraceEngineError::ChannelClosed)?;
602        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
603    }
604
605    pub async fn optimize(&self) -> Result<(), TraceEngineError> {
606        let (tx, rx) = oneshot::channel();
607        self.engine_tx
608            .send(SummaryTableCommand::Optimize { respond_to: tx })
609            .await
610            .map_err(|_| TraceEngineError::ChannelClosed)?;
611        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
612    }
613
614    pub async fn vacuum(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
615        let (tx, rx) = oneshot::channel();
616        self.engine_tx
617            .send(SummaryTableCommand::Vacuum {
618                retention_hours,
619                respond_to: tx,
620            })
621            .await
622            .map_err(|_| TraceEngineError::ChannelClosed)?;
623        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
624    }
625
626    /// Signal shutdown without consuming `self` — safe to call from `Arc<TraceSummaryService>`.
627    pub async fn signal_shutdown(&self) {
628        info!("TraceSummaryService signaling shutdown");
629        let _ = self.engine_tx.send(SummaryTableCommand::Shutdown).await;
630    }
631
632    pub async fn shutdown(self) -> Result<(), TraceEngineError> {
633        info!("TraceSummaryService shutting down");
634        self.engine_tx
635            .send(SummaryTableCommand::Shutdown)
636            .await
637            .map_err(|_| TraceEngineError::ChannelClosed)?;
638        if let Err(e) = self.engine_handle.await {
639            error!("Summary engine handle error: {}", e);
640        }
641        info!("TraceSummaryService shutdown complete");
642        Ok(())
643    }
644}
645
646// ── Queries ──────────────────────────────────────────────────────────────────
647
648pub struct TraceSummaryQueries {
649    ctx: Arc<SessionContext>,
650}
651
652impl TraceSummaryQueries {
653    pub fn new(ctx: Arc<SessionContext>) -> Self {
654        Self { ctx }
655    }
656
657    /// Get paginated traces from the Delta Lake summary table.
658    ///
659    /// The first step is a `GROUP BY trace_id` dedup query that merges any duplicate
660    /// rows (from late-arriving spans) using the same rules as `TraceAggregator`:
661    ///   - `SUM` for span/error counts, `MIN`/`MAX` for times, `MAX` for status_code
662    ///   - `FIRST_VALUE` ordered by `span_count DESC` for string fields
663    ///   - `array_distinct(flatten(array_agg(...)))` for entity/queue ID lists (full union)
664    ///
665    ///   Time filters are pushed into the SQL WHERE clause for partition pruning.
666    ///
667    ///   Secondary filters (service, errors, cursor) apply to the deduplicated DataFrame.
668    pub async fn get_paginated_traces(
669        &self,
670        filters: &TraceFilters,
671    ) -> Result<TracePaginationResponse, TraceEngineError> {
672        let limit = filters.limit.unwrap_or(50) as usize;
673        let direction = filters.direction.as_deref().unwrap_or("next");
674
675        // ── Dedup: time-filtered GROUP BY trace_id (DataFrame API) ───────────
676        use crate::parquet::tracing::queries::{date_lit, ts_lit};
677        use datafusion::functions_aggregate::expr_fn::{array_agg, first_value, max, min, sum};
678        use datafusion::functions_nested::set_ops::array_distinct;
679
680        let mut df = self.ctx.table(SUMMARY_TABLE_NAME).await?;
681
682        // ① Partition date — directory-level pruning (skips entire partition folders)
683        // ② start_time     — row-group-level pruning within matched files
684        if let Some(start) = filters.start_time {
685            df = df.filter(col(PARTITION_DATE_COL).gt_eq(date_lit(&start)))?;
686            df = df.filter(col(START_TIME_COL).gt_eq(ts_lit(&start)))?;
687        }
688        if let Some(end) = filters.end_time {
689            df = df.filter(col(PARTITION_DATE_COL).lt_eq(date_lit(&end)))?;
690            df = df.filter(col(START_TIME_COL).lt(ts_lit(&end)))?;
691        }
692
693        // ORDER BY specs for FIRST_VALUE aggregates
694        // span_count DESC NULLS LAST, end_time DESC NULLS LAST
695        let by_span_end: Vec<SortExpr> = vec![
696            col(SPAN_COUNT_COL).sort(false, false),
697            col(END_TIME_COL).sort(false, false),
698        ];
699        // status_code DESC, span_count DESC
700        let by_status_span: Vec<SortExpr> = vec![
701            col(STATUS_CODE_COL).sort(false, false),
702            col(SPAN_COUNT_COL).sort(false, false),
703        ];
704
705        // Phase 1: aggregate
706        // _max_end_us / _min_start_us are hidden Int64 columns used to compute
707        // duration_ms post-aggregation (arithmetic across two aggregate exprs
708        // cannot be expressed in a single aggregate slot).
709        //
710        // entity_ids / queue_ids: array_agg without FILTER is intentional.
711        // array_flatten treats NULL outer-list elements as empty and skips them,
712        // giving identical results to FILTER (WHERE IS NOT NULL) for the GROUP BY
713        // case. Unlike the original SQL, this produces [] rather than NULL when
714        // ALL rows have null IDs — the safer outcome for downstream deserialization.
715        let mut df = df
716            .aggregate(
717                vec![col(TRACE_ID_COL)],
718                vec![
719                    min(col(START_TIME_COL)).alias(START_TIME_COL),
720                    max(col(END_TIME_COL)).alias(END_TIME_COL),
721                    max(df_cast(col(END_TIME_COL), DataType::Int64)).alias("_max_end_us"),
722                    min(df_cast(col(START_TIME_COL), DataType::Int64)).alias("_min_start_us"),
723                    max(col(STATUS_CODE_COL)).alias(STATUS_CODE_COL),
724                    sum(col(SPAN_COUNT_COL)).alias(SPAN_COUNT_COL),
725                    sum(col(ERROR_COUNT_COL)).alias(ERROR_COUNT_COL),
726                    first_value(col(SERVICE_NAME_COL), by_span_end.clone()).alias(SERVICE_NAME_COL),
727                    first_value(col(SCOPE_NAME_COL), by_span_end.clone()).alias(SCOPE_NAME_COL),
728                    first_value(col(SCOPE_VERSION_COL), by_span_end.clone())
729                        .alias(SCOPE_VERSION_COL),
730                    first_value(col(ROOT_OPERATION_COL), by_span_end.clone())
731                        .alias(ROOT_OPERATION_COL),
732                    first_value(col(STATUS_MESSAGE_COL), by_status_span).alias(STATUS_MESSAGE_COL),
733                    first_value(col(RESOURCE_ATTRIBUTES_COL), by_span_end)
734                        .alias(RESOURCE_ATTRIBUTES_COL),
735                    array_agg(col(ENTITY_IDS_COL)).alias("_entity_ids_raw"),
736                    array_agg(col(QUEUE_IDS_COL)).alias("_queue_ids_raw"),
737                ],
738            )?
739            // Phase 2: derive computed columns from hidden aggregates, then drop them
740            .with_column(
741                DURATION_MS_COL,
742                (col("_max_end_us") - col("_min_start_us")) / lit(1000i64),
743            )?
744            .with_column(
745                ENTITY_IDS_COL,
746                array_distinct(flatten(col("_entity_ids_raw"))),
747            )?
748            .with_column(
749                QUEUE_IDS_COL,
750                array_distinct(flatten(col("_queue_ids_raw"))),
751            )?
752            .drop_columns(&[
753                "_max_end_us",
754                "_min_start_us",
755                "_entity_ids_raw",
756                "_queue_ids_raw",
757            ])?;
758
759        // ── Secondary filters ────────────────────────────────────────────────
760        if let Some(ref svc) = filters.service_name {
761            df = df.filter(col(SERVICE_NAME_COL).eq(lit(svc.as_str())))?;
762        }
763        match filters.has_errors {
764            Some(true) => {
765                df = df.filter(col(ERROR_COUNT_COL).gt(lit(0i64)))?;
766            }
767            Some(false) => {
768                df = df.filter(col(ERROR_COUNT_COL).eq(lit(0i64)))?;
769            }
770            None => {}
771        }
772        if let Some(sc) = filters.status_code {
773            df = df.filter(col(STATUS_CODE_COL).eq(lit(sc)))?;
774        }
775
776        // ── entity_uid filter via array_has on List column ────────────────
777        if let Some(ref uid) = filters.entity_uid {
778            df = df.filter(datafusion::functions_nested::expr_fn::array_has(
779                col(ENTITY_IDS_COL),
780                lit(uid.as_str()),
781            ))?;
782        }
783
784        // ── queue_uid filter via array_has on List column ─────────────────
785        if let Some(ref uid) = filters.queue_uid {
786            df = df.filter(datafusion::functions_nested::expr_fn::array_has(
787                col(QUEUE_IDS_COL),
788                lit(uid.as_str()),
789            ))?;
790        }
791
792        // ── trace_ids IN filter ──────────────────────────────────────────────
793        if let Some(ref ids) = filters.trace_ids {
794            if !ids.is_empty() {
795                let binary_ids: Vec<Expr> = ids
796                    .iter()
797                    .filter_map(|hex| TraceId::hex_to_bytes(hex).ok())
798                    .map(|b| lit(ScalarValue::Binary(Some(b))))
799                    .collect();
800                if !binary_ids.is_empty() {
801                    df = df.filter(col(TRACE_ID_COL).in_list(binary_ids, false))?;
802                }
803            }
804        }
805
806        // ── Cursor filter in DataFusion ──────────────────────────────────────
807        // Equivalent to Postgres: `(start_time, trace_id) < (cursor_time, cursor_id)`
808        // for "next" or `> (cursor_time, cursor_id)` for "previous".
809        if let (Some(cursor_time), Some(ref cursor_id)) =
810            (filters.cursor_start_time, &filters.cursor_trace_id)
811        {
812            if let Ok(cursor_bytes) = TraceId::hex_to_bytes(cursor_id) {
813                let cursor_ts = lit(ScalarValue::TimestampMicrosecond(
814                    Some(cursor_time.timestamp_micros()),
815                    Some("UTC".into()),
816                ));
817                let cursor_tid = lit(ScalarValue::Binary(Some(cursor_bytes)));
818                let cursor_expr = if direction == "previous" {
819                    col(START_TIME_COL)
820                        .gt(cursor_ts.clone())
821                        .or(col(START_TIME_COL)
822                            .eq(cursor_ts)
823                            .and(col(TRACE_ID_COL).gt(cursor_tid)))
824                } else {
825                    col(START_TIME_COL)
826                        .lt(cursor_ts.clone())
827                        .or(col(START_TIME_COL)
828                            .eq(cursor_ts)
829                            .and(col(TRACE_ID_COL).lt(cursor_tid)))
830                };
831                df = df.filter(cursor_expr)?;
832            }
833        }
834
835        // ── Attribute filters via span lookup → IN list ──────────────────────
836        // Requires shared SessionContext (trace_spans must be registered in self.ctx).
837        // We execute the span query eagerly to collect matching trace IDs, then filter
838        // the summaries DataFrame with an IN-list predicate. This avoids a cross-table
839        // JOIN that causes DataFusion to report ambiguous `trace_id` column references.
840        if let Some(ref attr_filters) = filters.attribute_filters {
841            if !attr_filters.is_empty() {
842                let mut spans_df = self.ctx.table("trace_spans").await?.select_columns(&[
843                    TRACE_ID_COL,
844                    START_TIME_COL,
845                    SEARCH_BLOB_COL,
846                ])?;
847
848                // Time predicates on spans for partition pruning
849                if let Some(start) = filters.start_time {
850                    spans_df = spans_df.filter(col(START_TIME_COL).gt_eq(lit(
851                        ScalarValue::TimestampMicrosecond(
852                            Some(start.timestamp_micros()),
853                            Some("UTC".into()),
854                        ),
855                    )))?;
856                }
857                if let Some(end) = filters.end_time {
858                    spans_df = spans_df.filter(col(START_TIME_COL).lt(lit(
859                        ScalarValue::TimestampMicrosecond(
860                            Some(end.timestamp_micros()),
861                            Some("UTC".into()),
862                        ),
863                    )))?;
864                }
865
866                // OR-match each filter against search_blob.
867                // normalize_attr_filter converts "key:value" → "%key=value%" so the LIKE
868                // pattern matches the new pipe-bounded `|key=value|` blob format.
869                let mut attr_expr: Option<Expr> = None;
870                for f in attr_filters {
871                    let pattern = crate::parquet::tracing::queries::normalize_attr_filter(f);
872                    let cond = match_attr_expr(col(SEARCH_BLOB_COL), lit(pattern));
873                    attr_expr = Some(match attr_expr {
874                        None => cond,
875                        Some(e) => e.or(cond),
876                    });
877                }
878                if let Some(expr) = attr_expr {
879                    spans_df = spans_df.filter(expr)?;
880                }
881
882                // Collect matching trace IDs eagerly, then apply as IN-list filter.
883                // Use HashSet for O(1) dedup instead of O(n²) Vec::contains().
884                let span_batches = spans_df.select_columns(&[TRACE_ID_COL])?.collect().await?;
885                let mut seen_ids: std::collections::HashSet<Vec<u8>> =
886                    std::collections::HashSet::new();
887                let mut binary_ids: Vec<Expr> = Vec::new();
888                for batch in &span_batches {
889                    // trace_id may be FixedSizeBinary(16) or Binary after Delta round-trip.
890                    // Cast to Binary to handle both uniformly.
891                    if let Some(col_ref) = batch.column_by_name(TRACE_ID_COL) {
892                        let casted = compute::cast(col_ref, &DataType::Binary)?;
893                        let col_arr =
894                            casted
895                                .as_any()
896                                .downcast_ref::<BinaryArray>()
897                                .ok_or_else(|| {
898                                    TraceEngineError::DowncastError("trace_id to BinaryArray")
899                                })?;
900                        for i in 0..batch.num_rows() {
901                            let id_bytes = col_arr.value(i).to_vec();
902                            if seen_ids.insert(id_bytes.clone()) {
903                                binary_ids.push(lit(ScalarValue::Binary(Some(id_bytes))));
904                            }
905                        }
906                    }
907                }
908
909                if !binary_ids.is_empty() {
910                    df = df.filter(col(TRACE_ID_COL).in_list(binary_ids, false))?;
911                } else {
912                    // No matching spans → return empty result
913                    df = df.filter(lit(false))?;
914                }
915            }
916        }
917
918        // ── Sort: DESC for "next", ASC for "previous" ────────────────────────
919        // "previous" direction fetches the oldest limit+1 items newer than the cursor,
920        // which matches the original Rust post-reversal behavior.
921        df = if direction == "previous" {
922            df.sort(vec![
923                col(START_TIME_COL).sort(true, true),
924                col(TRACE_ID_COL).sort(true, true),
925            ])?
926        } else {
927            df.sort(vec![
928                col(START_TIME_COL).sort(false, false),
929                col(TRACE_ID_COL).sort(false, false),
930            ])?
931        };
932
933        // ── LIMIT pushed into DataFusion (fetch limit+1 to detect next page) ─
934        df = df.limit(0, Some(limit + 1))?;
935
936        let batches = df.collect().await?;
937        let mut items = batches_to_trace_list_items(batches)?;
938
939        let has_more = items.len() > limit;
940        if has_more {
941            items.pop(); // remove N+1 sentinel
942        }
943
944        // Direction-specific cursor logic — mirrors the original PostgreSQL implementation.
945        //
946        // "next" (DESC order): items are newest-first. The sentinel tells us if older
947        // items exist (has_next). Cursor presence means we navigated forward, so newer
948        // items exist behind us (has_previous).
949        //
950        // "previous" (ASC order): items are oldest-first (closest-to-cursor first).
951        // The sentinel tells us if even more newer items exist (has_previous). Cursor
952        // presence means we navigated backward, so older items exist ahead (has_next).
953        // Items stay in ASC order — no reversal — matching PG behavior exactly.
954        let (has_next, next_cursor, has_previous, previous_cursor) = match direction {
955            "next" => {
956                let next_cursor = if has_more {
957                    items.last().map(|item| TraceCursor {
958                        start_time: item.start_time,
959                        trace_id: item.trace_id.clone(),
960                    })
961                } else {
962                    None
963                };
964
965                let previous_cursor = items.first().map(|item| TraceCursor {
966                    start_time: item.start_time,
967                    trace_id: item.trace_id.clone(),
968                });
969
970                (
971                    has_more,
972                    next_cursor,
973                    filters.cursor_start_time.is_some(),
974                    previous_cursor,
975                )
976            }
977            "previous" => {
978                // ASC order: items.last() is the newest (largest start_time).
979                // To continue backward (fetch even newer items), the cursor must
980                // point past the current page's newest item so `> cursor` excludes
981                // everything already returned.
982                let previous_cursor = if has_more {
983                    items.last().map(|item| TraceCursor {
984                        start_time: item.start_time,
985                        trace_id: item.trace_id.clone(),
986                    })
987                } else {
988                    None
989                };
990
991                // items.first() is the oldest (smallest start_time).
992                // To go forward (back toward newer-first / DESC pages), the cursor
993                // must point at the oldest item so `< cursor` fetches older items.
994                let next_cursor = items.first().map(|item| TraceCursor {
995                    start_time: item.start_time,
996                    trace_id: item.trace_id.clone(),
997                });
998
999                (
1000                    filters.cursor_start_time.is_some(),
1001                    next_cursor,
1002                    has_more,
1003                    previous_cursor,
1004                )
1005            }
1006            _ => (false, None, false, None),
1007        };
1008
1009        Ok(TracePaginationResponse {
1010            items,
1011            has_next,
1012            next_cursor,
1013            has_previous,
1014            previous_cursor,
1015        })
1016    }
1017}
1018
1019// ── Arrow → TraceListItem conversion ─────────────────────────────────────────
1020
1021/// Extract attributes from a MapArray at a given row index.
1022fn extract_map_attributes(map_array: &MapArray, row_idx: usize) -> Vec<Attribute> {
1023    if map_array.is_null(row_idx) {
1024        return Vec::new();
1025    }
1026    let entry = map_array.value(row_idx);
1027    let struct_array = entry.as_any().downcast_ref::<StructArray>().unwrap();
1028    let keys_arr = compute::cast(struct_array.column(0).as_ref(), &DataType::Utf8).unwrap();
1029    let keys = keys_arr.as_any().downcast_ref::<StringArray>().unwrap();
1030    let values_arr = compute::cast(struct_array.column(1).as_ref(), &DataType::Utf8).unwrap();
1031    let values = values_arr.as_any().downcast_ref::<StringArray>().unwrap();
1032
1033    (0..struct_array.len())
1034        .map(|i| Attribute {
1035            key: keys.value(i).to_string(),
1036            value: serde_json::from_str(values.value(i)).unwrap_or(serde_json::Value::Null),
1037        })
1038        .collect()
1039}
1040
1041/// Extract a `Vec<String>` from a nullable `ListArray` at a given row index.
1042fn extract_list_strings(list: Option<&ListArray>, row_idx: usize) -> Vec<String> {
1043    let Some(list) = list else {
1044        return Vec::new();
1045    };
1046    if list.is_null(row_idx) {
1047        return Vec::new();
1048    }
1049    let inner = list.value(row_idx);
1050    let str_arr = compute::cast(&inner, &DataType::Utf8)
1051        .ok()
1052        .and_then(|a| a.as_any().downcast_ref::<StringArray>().cloned());
1053    match str_arr {
1054        Some(arr) => (0..arr.len())
1055            .filter(|i| !arr.is_null(*i))
1056            .map(|i| arr.value(i).to_string())
1057            .collect(),
1058        None => Vec::new(),
1059    }
1060}
1061
1062fn batches_to_trace_list_items(
1063    batches: Vec<RecordBatch>,
1064) -> Result<Vec<TraceListItem>, TraceEngineError> {
1065    let mut items = Vec::new();
1066
1067    for batch in &batches {
1068        // trace_id may come back as FixedSizeBinary(16) or Binary depending on
1069        // whether DataFusion/Delta round-tripped the schema. Handle both.
1070        let trace_id_col = batch.column_by_name(TRACE_ID_COL).ok_or_else(|| {
1071            TraceEngineError::UnsupportedOperation("missing trace_id column".into())
1072        })?;
1073        let trace_id_binary = compute::cast(trace_id_col, &DataType::Binary)?;
1074        let trace_ids = trace_id_binary
1075            .as_any()
1076            .downcast_ref::<BinaryArray>()
1077            .ok_or_else(|| {
1078                TraceEngineError::UnsupportedOperation("trace_id cast to BinaryArray failed".into())
1079            })?;
1080
1081        // Cast all string/dictionary columns to Utf8 uniformly (handles Utf8View,
1082        // Dictionary(Int32, Utf8), LargeUtf8, etc.).
1083        let svc_arr = compute::cast(
1084            batch.column_by_name(SERVICE_NAME_COL).ok_or_else(|| {
1085                TraceEngineError::UnsupportedOperation("missing service_name column".into())
1086            })?,
1087            &DataType::Utf8,
1088        )?;
1089        let service_names = svc_arr
1090            .as_any()
1091            .downcast_ref::<StringArray>()
1092            .ok_or_else(|| {
1093                TraceEngineError::UnsupportedOperation(
1094                    "service_name cast to StringArray failed".into(),
1095                )
1096            })?;
1097
1098        let scope_arr = compute::cast(
1099            batch.column_by_name(SCOPE_NAME_COL).ok_or_else(|| {
1100                TraceEngineError::UnsupportedOperation("missing scope_name column".into())
1101            })?,
1102            &DataType::Utf8,
1103        )?;
1104        let scope_names = scope_arr
1105            .as_any()
1106            .downcast_ref::<StringArray>()
1107            .ok_or_else(|| {
1108                TraceEngineError::UnsupportedOperation(
1109                    "scope_name cast to StringArray failed".into(),
1110                )
1111            })?;
1112
1113        let scopev_arr = compute::cast(
1114            batch.column_by_name(SCOPE_VERSION_COL).ok_or_else(|| {
1115                TraceEngineError::UnsupportedOperation("missing scope_version column".into())
1116            })?,
1117            &DataType::Utf8,
1118        )?;
1119        let scope_versions = scopev_arr
1120            .as_any()
1121            .downcast_ref::<StringArray>()
1122            .ok_or_else(|| {
1123                TraceEngineError::UnsupportedOperation(
1124                    "scope_version cast to StringArray failed".into(),
1125                )
1126            })?;
1127
1128        let root_arr = compute::cast(
1129            batch.column_by_name(ROOT_OPERATION_COL).ok_or_else(|| {
1130                TraceEngineError::UnsupportedOperation("missing root_operation column".into())
1131            })?,
1132            &DataType::Utf8,
1133        )?;
1134        let root_operations = root_arr
1135            .as_any()
1136            .downcast_ref::<StringArray>()
1137            .ok_or_else(|| {
1138                TraceEngineError::UnsupportedOperation(
1139                    "root_operation cast to StringArray failed".into(),
1140                )
1141            })?;
1142
1143        let sm_arr = compute::cast(
1144            batch.column_by_name(STATUS_MESSAGE_COL).ok_or_else(|| {
1145                TraceEngineError::UnsupportedOperation("missing status_message column".into())
1146            })?,
1147            &DataType::Utf8,
1148        )?;
1149        let status_messages = sm_arr
1150            .as_any()
1151            .downcast_ref::<StringArray>()
1152            .ok_or_else(|| {
1153                TraceEngineError::UnsupportedOperation(
1154                    "status_message cast to StringArray failed".into(),
1155                )
1156            })?;
1157
1158        let resource_attrs_map = batch
1159            .column_by_name(RESOURCE_ATTRIBUTES_COL)
1160            .and_then(|c| c.as_any().downcast_ref::<MapArray>())
1161            .ok_or_else(|| {
1162                TraceEngineError::UnsupportedOperation("missing resource_attributes column".into())
1163            })?;
1164
1165        let entity_ids_list = batch
1166            .column_by_name(ENTITY_IDS_COL)
1167            .and_then(|c| c.as_any().downcast_ref::<ListArray>());
1168
1169        let queue_ids_list = batch
1170            .column_by_name(QUEUE_IDS_COL)
1171            .and_then(|c| c.as_any().downcast_ref::<ListArray>());
1172
1173        let start_times = batch
1174            .column_by_name(START_TIME_COL)
1175            .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>())
1176            .ok_or_else(|| {
1177                TraceEngineError::UnsupportedOperation("missing start_time column".into())
1178            })?;
1179
1180        let end_times = batch
1181            .column_by_name(END_TIME_COL)
1182            .and_then(|c| c.as_any().downcast_ref::<TimestampMicrosecondArray>())
1183            .ok_or_else(|| {
1184                TraceEngineError::UnsupportedOperation("missing end_time column".into())
1185            })?;
1186
1187        let durations = batch
1188            .column_by_name(DURATION_MS_COL)
1189            .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1190            .ok_or_else(|| {
1191                TraceEngineError::UnsupportedOperation("missing duration_ms column".into())
1192            })?;
1193
1194        let status_codes = batch
1195            .column_by_name(STATUS_CODE_COL)
1196            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
1197            .ok_or_else(|| {
1198                TraceEngineError::UnsupportedOperation("missing status_code column".into())
1199            })?;
1200
1201        let span_counts = batch
1202            .column_by_name(SPAN_COUNT_COL)
1203            .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1204            .ok_or_else(|| {
1205                TraceEngineError::UnsupportedOperation("missing span_count column".into())
1206            })?;
1207
1208        let error_counts = batch
1209            .column_by_name(ERROR_COUNT_COL)
1210            .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1211            .ok_or_else(|| {
1212                TraceEngineError::UnsupportedOperation("missing error_count column".into())
1213            })?;
1214
1215        for i in 0..batch.num_rows() {
1216            let trace_id_hex = hex::encode(trace_ids.value(i));
1217
1218            let start_time = micros_to_datetime(start_times.value(i));
1219            let end_time = if end_times.is_null(i) {
1220                None
1221            } else {
1222                Some(micros_to_datetime(end_times.value(i)))
1223            };
1224            let duration_ms = if durations.is_null(i) {
1225                None
1226            } else {
1227                Some(durations.value(i))
1228            };
1229            let error_count = error_counts.value(i);
1230
1231            let resource_attributes = extract_map_attributes(resource_attrs_map, i);
1232
1233            let entity_ids = extract_list_strings(entity_ids_list, i);
1234            let queue_ids = extract_list_strings(queue_ids_list, i);
1235
1236            items.push(TraceListItem {
1237                trace_id: trace_id_hex,
1238                service_name: service_names.value(i).to_string(),
1239                scope_name: scope_names.value(i).to_string(),
1240                scope_version: scope_versions.value(i).to_string(),
1241                root_operation: root_operations.value(i).to_string(),
1242                start_time,
1243                end_time,
1244                duration_ms,
1245                status_code: status_codes.value(i),
1246                status_message: if status_messages.is_null(i) {
1247                    None
1248                } else {
1249                    Some(status_messages.value(i).to_string())
1250                },
1251                span_count: span_counts.value(i),
1252                has_errors: error_count > 0,
1253                error_count,
1254                resource_attributes,
1255                entity_ids,
1256                queue_ids,
1257            });
1258        }
1259    }
1260
1261    Ok(items)
1262}
1263
1264fn micros_to_datetime(micros: i64) -> DateTime<Utc> {
1265    let secs = micros / 1_000_000;
1266    let nanos = ((micros % 1_000_000) * 1_000) as u32;
1267    Utc.timestamp_opt(secs, nanos).unwrap()
1268}
1269
1270#[cfg(test)]
1271mod tests {
1272    use super::*;
1273    use crate::storage::ObjectStore;
1274    use scouter_settings::ObjectStorageSettings;
1275    use scouter_types::sql::TraceFilters;
1276    use scouter_types::{Attribute, SpanId, TraceId, TraceSpanRecord};
1277    use tracing_subscriber;
1278
1279    fn cleanup() {
1280        let _ = tracing_subscriber::fmt()
1281            .with_max_level(tracing::Level::INFO)
1282            .try_init();
1283
1284        let storage_settings = ObjectStorageSettings::default();
1285        let current_dir = std::env::current_dir().unwrap();
1286        let storage_path = current_dir.join(storage_settings.storage_root());
1287        if storage_path.exists() {
1288            std::fs::remove_dir_all(storage_path).unwrap();
1289        }
1290    }
1291
1292    /// Build a standalone SessionContext for test use (no trace_spans registered).
1293    /// Attribute-filter paths that need trace_spans are not exercised in these tests.
1294    fn make_test_ctx(storage_settings: &ObjectStorageSettings) -> Arc<SessionContext> {
1295        Arc::new(
1296            ObjectStore::new(storage_settings)
1297                .unwrap()
1298                .get_session()
1299                .unwrap(),
1300        )
1301    }
1302
1303    fn make_summary(
1304        trace_id_bytes: [u8; 16],
1305        service_name: &str,
1306        error_count: i64,
1307        resource_attributes: Vec<Attribute>,
1308    ) -> TraceSummaryRecord {
1309        let now = Utc::now();
1310        TraceSummaryRecord {
1311            trace_id: TraceId::from_bytes(trace_id_bytes),
1312            service_name: service_name.to_string(),
1313            scope_name: "test.scope".to_string(),
1314            scope_version: String::new(),
1315            root_operation: "root_op".to_string(),
1316            start_time: now,
1317            end_time: Some(now + chrono::Duration::milliseconds(200)),
1318            status_code: if error_count > 0 { 2 } else { 0 },
1319            status_message: if error_count > 0 {
1320                "Internal Server Error".to_string()
1321            } else {
1322                "OK".to_string()
1323            },
1324            span_count: 3,
1325            error_count,
1326            resource_attributes,
1327            entity_ids: vec![],
1328            queue_ids: vec![],
1329        }
1330    }
1331
1332    /// Basic write + paginate round-trip: writes two summaries and verifies both appear.
1333    #[tokio::test]
1334    async fn test_summary_write_and_paginate_basic() -> Result<(), TraceEngineError> {
1335        cleanup();
1336
1337        let storage_settings = ObjectStorageSettings::default();
1338        let ctx = make_test_ctx(&storage_settings);
1339        let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1340
1341        let s1 = make_summary([1u8; 16], "svc_a", 0, vec![]);
1342        let s2 = make_summary([2u8; 16], "svc_b", 0, vec![]);
1343        service.write_summaries(vec![s1, s2]).await?;
1344        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1345
1346        let start = Utc::now() - chrono::Duration::hours(1);
1347        let end = Utc::now() + chrono::Duration::hours(1);
1348        let filters = TraceFilters {
1349            service_name: None,
1350            has_errors: None,
1351            status_code: None,
1352            start_time: Some(start),
1353            end_time: Some(end),
1354            limit: Some(25),
1355            cursor_start_time: None,
1356            cursor_trace_id: None,
1357            direction: None,
1358            attribute_filters: None,
1359            trace_ids: None,
1360            entity_uid: None,
1361            queue_uid: None,
1362        };
1363
1364        let response = service.query_service.get_paginated_traces(&filters).await?;
1365        assert!(
1366            response.items.len() >= 2,
1367            "Expected at least 2 items, got {}",
1368            response.items.len()
1369        );
1370
1371        service.shutdown().await?;
1372        cleanup();
1373        Ok(())
1374    }
1375
1376    /// `has_errors = Some(true)` returns only error traces; `Some(false)` returns only non-errors.
1377    #[tokio::test]
1378    async fn test_summary_has_errors_filter() -> Result<(), TraceEngineError> {
1379        cleanup();
1380
1381        let storage_settings = ObjectStorageSettings::default();
1382        let ctx = make_test_ctx(&storage_settings);
1383        let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1384
1385        let ok_summary = make_summary([3u8; 16], "svc", 0, vec![]);
1386        let err_summary = make_summary([4u8; 16], "svc", 2, vec![]);
1387        service
1388            .write_summaries(vec![ok_summary, err_summary])
1389            .await?;
1390        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1391
1392        let start = Utc::now() - chrono::Duration::hours(1);
1393        let end = Utc::now() + chrono::Duration::hours(1);
1394
1395        let base_filters = TraceFilters {
1396            service_name: None,
1397            has_errors: None,
1398            status_code: None,
1399            start_time: Some(start),
1400            end_time: Some(end),
1401            limit: Some(25),
1402            cursor_start_time: None,
1403            cursor_trace_id: None,
1404            direction: None,
1405            attribute_filters: None,
1406            trace_ids: None,
1407            entity_uid: None,
1408            queue_uid: None,
1409        };
1410
1411        // has_errors = true → only error trace
1412        let mut filters_err = base_filters.clone();
1413        filters_err.has_errors = Some(true);
1414        let errors_only = service
1415            .query_service
1416            .get_paginated_traces(&filters_err)
1417            .await?;
1418        for item in &errors_only.items {
1419            assert!(
1420                item.error_count > 0,
1421                "Expected error trace, got: {:?}",
1422                item
1423            );
1424        }
1425        assert!(
1426            !errors_only.items.is_empty(),
1427            "Expected at least one error trace"
1428        );
1429
1430        // has_errors = false → only non-error traces
1431        let mut filters_ok = base_filters.clone();
1432        filters_ok.has_errors = Some(false);
1433        let no_errors = service
1434            .query_service
1435            .get_paginated_traces(&filters_ok)
1436            .await?;
1437        for item in &no_errors.items {
1438            assert_eq!(
1439                item.error_count, 0,
1440                "Expected non-error trace, got error_count={}",
1441                item.error_count
1442            );
1443        }
1444        assert!(
1445            !no_errors.items.is_empty(),
1446            "Expected at least one non-error trace"
1447        );
1448
1449        service.shutdown().await?;
1450        cleanup();
1451        Ok(())
1452    }
1453
1454    /// service_name filter returns only matching service traces.
1455    #[tokio::test]
1456    async fn test_summary_service_name_filter() -> Result<(), TraceEngineError> {
1457        cleanup();
1458
1459        let storage_settings = ObjectStorageSettings::default();
1460        let ctx = make_test_ctx(&storage_settings);
1461        let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1462
1463        let s_alpha = make_summary([5u8; 16], "alpha_service", 0, vec![]);
1464        let s_beta = make_summary([6u8; 16], "beta_service", 0, vec![]);
1465        service.write_summaries(vec![s_alpha, s_beta]).await?;
1466        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1467
1468        let start = Utc::now() - chrono::Duration::hours(1);
1469        let end = Utc::now() + chrono::Duration::hours(1);
1470        let filters = TraceFilters {
1471            service_name: Some("alpha_service".to_string()),
1472            has_errors: None,
1473            status_code: None,
1474            start_time: Some(start),
1475            end_time: Some(end),
1476            limit: Some(25),
1477            cursor_start_time: None,
1478            cursor_trace_id: None,
1479            direction: None,
1480            attribute_filters: None,
1481            trace_ids: None,
1482            entity_uid: None,
1483            queue_uid: None,
1484        };
1485
1486        let response = service.query_service.get_paginated_traces(&filters).await?;
1487        assert!(
1488            !response.items.is_empty(),
1489            "Expected results for alpha_service"
1490        );
1491        for item in &response.items {
1492            assert_eq!(
1493                item.service_name, "alpha_service",
1494                "Expected only alpha_service items, got: {}",
1495                item.service_name
1496            );
1497        }
1498
1499        service.shutdown().await?;
1500        cleanup();
1501        Ok(())
1502    }
1503
1504    /// trace_ids IN filter returns only the specified traces.
1505    #[tokio::test]
1506    async fn test_summary_trace_ids_filter() -> Result<(), TraceEngineError> {
1507        cleanup();
1508
1509        let storage_settings = ObjectStorageSettings::default();
1510        let ctx = make_test_ctx(&storage_settings);
1511        let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1512
1513        let wanted_id = TraceId::from_bytes([7u8; 16]);
1514        let unwanted_id = TraceId::from_bytes([8u8; 16]);
1515
1516        let s1 = make_summary([7u8; 16], "svc", 0, vec![]);
1517        let s2 = make_summary([8u8; 16], "svc", 0, vec![]);
1518        service.write_summaries(vec![s1, s2]).await?;
1519        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1520
1521        let start = Utc::now() - chrono::Duration::hours(1);
1522        let end = Utc::now() + chrono::Duration::hours(1);
1523        let filters = TraceFilters {
1524            service_name: None,
1525            has_errors: None,
1526            status_code: None,
1527            start_time: Some(start),
1528            end_time: Some(end),
1529            limit: Some(25),
1530            cursor_start_time: None,
1531            cursor_trace_id: None,
1532            direction: None,
1533            attribute_filters: None,
1534            trace_ids: Some(vec![wanted_id.to_hex()]),
1535            entity_uid: None,
1536            queue_uid: None,
1537        };
1538
1539        let response = service.query_service.get_paginated_traces(&filters).await?;
1540        assert_eq!(
1541            response.items.len(),
1542            1,
1543            "Expected exactly 1 item from trace_ids filter"
1544        );
1545        assert_eq!(
1546            response.items[0].trace_id,
1547            wanted_id.to_hex(),
1548            "Returned wrong trace_id"
1549        );
1550        assert_ne!(
1551            response.items[0].trace_id,
1552            unwanted_id.to_hex(),
1553            "Should not have returned unwanted trace_id"
1554        );
1555
1556        service.shutdown().await?;
1557        cleanup();
1558        Ok(())
1559    }
1560
1561    /// Cursor pagination: first page → next → previous all return correct item counts.
1562    #[tokio::test]
1563    async fn test_summary_cursor_pagination() -> Result<(), TraceEngineError> {
1564        cleanup();
1565        let storage_settings = ObjectStorageSettings::default();
1566        let ctx = make_test_ctx(&storage_settings);
1567        let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1568
1569        let now = Utc::now();
1570        let summaries: Vec<TraceSummaryRecord> = (0u8..100)
1571            .map(|i| {
1572                let mut s = make_summary([i; 16], "svc", 0, vec![]);
1573                s.start_time = now - chrono::Duration::minutes(i as i64);
1574                s
1575            })
1576            .collect();
1577        service.write_summaries(summaries).await?;
1578        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1579
1580        let mut filters = TraceFilters {
1581            start_time: Some(now - chrono::Duration::hours(2)),
1582            end_time: Some(now + chrono::Duration::hours(1)),
1583            limit: Some(50),
1584            ..Default::default()
1585        };
1586
1587        // First page
1588        let first = service.query_service.get_paginated_traces(&filters).await?;
1589        assert_eq!(first.items.len(), 50, "first page: 50 items");
1590        assert!(
1591            first.next_cursor.is_some(),
1592            "first page: should have next_cursor"
1593        );
1594
1595        // Next page
1596        let next_cur = first.next_cursor.clone().unwrap();
1597        filters.cursor_start_time = Some(next_cur.start_time);
1598        filters.cursor_trace_id = Some(next_cur.trace_id.clone());
1599        filters.direction = Some("next".to_string());
1600        let second = service.query_service.get_paginated_traces(&filters).await?;
1601        assert_eq!(second.items.len(), 50, "second page: 50 items");
1602        assert!(
1603            second.items[0].start_time <= next_cur.start_time,
1604            "second page first item must be <= cursor"
1605        );
1606        assert!(second.previous_cursor.is_some());
1607
1608        // Previous page
1609        let prev_cur = second.previous_cursor.unwrap();
1610        filters.cursor_start_time = Some(prev_cur.start_time);
1611        filters.cursor_trace_id = Some(prev_cur.trace_id.clone());
1612        filters.direction = Some("previous".to_string());
1613        let prev = service.query_service.get_paginated_traces(&filters).await?;
1614        assert_eq!(prev.items.len(), 50, "previous page: 50 items");
1615
1616        service.shutdown().await?;
1617        cleanup();
1618        Ok(())
1619    }
1620
1621    /// Attribute-filter JOIN path: only traces with matching span attributes are returned.
1622    #[tokio::test]
1623    async fn test_summary_attribute_filter_via_join() -> Result<(), TraceEngineError> {
1624        use crate::parquet::tracing::service::TraceSpanService;
1625
1626        cleanup();
1627        let storage_settings = ObjectStorageSettings::default();
1628
1629        // TraceSpanService owns the SessionContext (trace_spans registered in it)
1630        let span_service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
1631        let shared_ctx = span_service.ctx.clone();
1632
1633        // TraceSummaryService shares the same ctx — JOIN to trace_spans will work
1634        let summary_service = TraceSummaryService::new(&storage_settings, 24, shared_ctx).await?;
1635
1636        let now = Utc::now();
1637        let kafka_trace = TraceId::from_bytes([70u8; 16]);
1638        let plain_trace = TraceId::from_bytes([80u8; 16]);
1639
1640        let kafka_span = make_span_record(
1641            &kafka_trace,
1642            SpanId::from_bytes([70u8; 8]),
1643            "svc",
1644            vec![Attribute {
1645                key: "component".to_string(),
1646                value: serde_json::Value::String("kafka".to_string()),
1647            }],
1648        );
1649        let plain_span =
1650            make_span_record(&plain_trace, SpanId::from_bytes([80u8; 8]), "svc", vec![]);
1651        span_service
1652            .write_spans(vec![kafka_span, plain_span])
1653            .await?;
1654
1655        let mut kafka_summary = make_summary([70u8; 16], "svc", 0, vec![]);
1656        kafka_summary.start_time = now;
1657        let mut plain_summary = make_summary([80u8; 16], "svc", 0, vec![]);
1658        plain_summary.start_time = now;
1659        summary_service
1660            .write_summaries(vec![kafka_summary, plain_summary])
1661            .await?;
1662
1663        tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
1664
1665        let filters = TraceFilters {
1666            start_time: Some(now - chrono::Duration::hours(1)),
1667            end_time: Some(now + chrono::Duration::hours(1)),
1668            attribute_filters: Some(vec!["component:kafka".to_string()]),
1669            limit: Some(25),
1670            ..Default::default()
1671        };
1672
1673        let response = summary_service
1674            .query_service
1675            .get_paginated_traces(&filters)
1676            .await?;
1677
1678        assert!(
1679            !response.items.is_empty(),
1680            "attribute filter must return results"
1681        );
1682        assert!(
1683            response
1684                .items
1685                .iter()
1686                .all(|i| i.trace_id == kafka_trace.to_hex()),
1687            "only kafka trace should appear; got {:?}",
1688            response
1689                .items
1690                .iter()
1691                .map(|i| &i.trace_id)
1692                .collect::<Vec<_>>()
1693        );
1694
1695        span_service.shutdown().await?;
1696        summary_service.shutdown().await?;
1697        cleanup();
1698        Ok(())
1699    }
1700
1701    /// queue_uid filter: only traces whose queue_ids contain the target UID are returned,
1702    /// and the matching trace's spans can be fetched by trace_id.
1703    #[tokio::test]
1704    async fn test_summary_queue_id_filter_and_span_lookup() -> Result<(), TraceEngineError> {
1705        use crate::parquet::tracing::service::TraceSpanService;
1706
1707        cleanup();
1708        let storage_settings = ObjectStorageSettings::default();
1709
1710        // TraceSpanService owns the SessionContext
1711        let span_service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
1712        let shared_ctx = span_service.ctx.clone();
1713
1714        // TraceSummaryService shares the same ctx so JOIN path works
1715        let summary_service = TraceSummaryService::new(&storage_settings, 24, shared_ctx).await?;
1716
1717        let now = Utc::now();
1718        let queue_trace = TraceId::from_bytes([90u8; 16]);
1719        let plain_trace = TraceId::from_bytes([91u8; 16]);
1720        let target_queue_uid = "queue-record-abc123";
1721
1722        // Write spans for both traces
1723        let queue_span = make_span_record(
1724            &queue_trace,
1725            SpanId::from_bytes([90u8; 8]),
1726            "svc_queue",
1727            vec![],
1728        );
1729        let plain_span = make_span_record(
1730            &plain_trace,
1731            SpanId::from_bytes([91u8; 8]),
1732            "svc_queue",
1733            vec![],
1734        );
1735        span_service
1736            .write_spans_direct(vec![queue_span, plain_span])
1737            .await?;
1738
1739        // Write summaries: one with a matching queue_id, one without
1740        let mut queue_summary = make_summary([90u8; 16], "svc_queue", 0, vec![]);
1741        queue_summary.start_time = now;
1742        queue_summary.queue_ids = vec![target_queue_uid.to_string()];
1743
1744        let mut plain_summary = make_summary([91u8; 16], "svc_queue", 0, vec![]);
1745        plain_summary.start_time = now;
1746        // queue_ids left empty — should not appear in results
1747
1748        summary_service
1749            .write_summaries(vec![queue_summary, plain_summary])
1750            .await?;
1751
1752        tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
1753
1754        // ── Step 1: query summaries by queue_uid ─────────────────────────────────
1755        let filters = TraceFilters {
1756            start_time: Some(now - chrono::Duration::hours(1)),
1757            end_time: Some(now + chrono::Duration::hours(1)),
1758            queue_uid: Some(target_queue_uid.to_string()),
1759            limit: Some(25),
1760            ..Default::default()
1761        };
1762
1763        let response = summary_service
1764            .query_service
1765            .get_paginated_traces(&filters)
1766            .await?;
1767
1768        assert!(
1769            !response.items.is_empty(),
1770            "queue_uid filter must return at least one result"
1771        );
1772        assert!(
1773            response
1774                .items
1775                .iter()
1776                .all(|i| i.trace_id == queue_trace.to_hex()),
1777            "only the queue trace should appear; got {:?}",
1778            response
1779                .items
1780                .iter()
1781                .map(|i| &i.trace_id)
1782                .collect::<Vec<_>>()
1783        );
1784
1785        // ── Step 2: fetch spans for the returned trace_id ─────────────────────────
1786        let returned_trace_id =
1787            TraceId::from_hex(&response.items[0].trace_id).expect("trace_id must be valid hex");
1788        let spans = span_service
1789            .query_service
1790            .get_trace_spans(
1791                Some(returned_trace_id.as_bytes()),
1792                None,
1793                Some(&(now - chrono::Duration::hours(1))),
1794                Some(&(now + chrono::Duration::hours(1))),
1795                None,
1796            )
1797            .await?;
1798
1799        assert!(
1800            !spans.is_empty(),
1801            "should find spans for the returned trace_id"
1802        );
1803
1804        span_service.shutdown().await?;
1805        summary_service.shutdown().await?;
1806        cleanup();
1807        Ok(())
1808    }
1809
1810    /// Build a deterministic `TraceSpanRecord` for use in summary tests.
1811    fn make_span_record(
1812        trace_id: &TraceId,
1813        span_id: SpanId,
1814        service_name: &str,
1815        attributes: Vec<Attribute>,
1816    ) -> TraceSpanRecord {
1817        let now = Utc::now();
1818        TraceSpanRecord {
1819            created_at: now,
1820            trace_id: trace_id.clone(),
1821            span_id,
1822            parent_span_id: None,
1823            flags: 1,
1824            trace_state: String::new(),
1825            scope_name: "test.scope".to_string(),
1826            scope_version: None,
1827            span_name: "op".to_string(),
1828            span_kind: "INTERNAL".to_string(),
1829            start_time: now,
1830            end_time: now + chrono::Duration::milliseconds(100),
1831            duration_ms: 100,
1832            status_code: 0,
1833            status_message: "OK".to_string(),
1834            attributes,
1835            events: vec![],
1836            links: vec![],
1837            label: None,
1838            input: serde_json::Value::Null,
1839            output: serde_json::Value::Null,
1840            service_name: service_name.to_string(),
1841            resource_attributes: vec![],
1842        }
1843    }
1844
1845    /// `resource_attributes` survive a write → read round-trip.
1846    #[tokio::test]
1847    async fn test_summary_resource_attributes_roundtrip() -> Result<(), TraceEngineError> {
1848        cleanup();
1849
1850        let storage_settings = ObjectStorageSettings::default();
1851        let ctx = make_test_ctx(&storage_settings);
1852        let service = TraceSummaryService::new(&storage_settings, 24, ctx).await?;
1853
1854        let attrs = vec![Attribute {
1855            key: "cloud.region".to_string(),
1856            value: serde_json::Value::String("us-east-1".to_string()),
1857        }];
1858        let summary = make_summary([9u8; 16], "svc", 0, attrs.clone());
1859        service.write_summaries(vec![summary]).await?;
1860        tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1861
1862        let start = Utc::now() - chrono::Duration::hours(1);
1863        let end = Utc::now() + chrono::Duration::hours(1);
1864        let filters = TraceFilters {
1865            service_name: None,
1866            has_errors: None,
1867            status_code: None,
1868            start_time: Some(start),
1869            end_time: Some(end),
1870            limit: Some(25),
1871            cursor_start_time: None,
1872            cursor_trace_id: None,
1873            direction: None,
1874            attribute_filters: None,
1875            trace_ids: Some(vec![TraceId::from_bytes([9u8; 16]).to_hex()]),
1876            entity_uid: None,
1877            queue_uid: None,
1878        };
1879
1880        let response = service.query_service.get_paginated_traces(&filters).await?;
1881        assert_eq!(response.items.len(), 1, "Expected exactly 1 item");
1882        assert_eq!(
1883            response.items[0].resource_attributes.len(),
1884            1,
1885            "Expected 1 resource attribute"
1886        );
1887        assert_eq!(response.items[0].resource_attributes[0].key, "cloud.region");
1888
1889        service.shutdown().await?;
1890        cleanup();
1891        Ok(())
1892    }
1893}