Skip to main content

scouter_dataframe/parquet/tracing/
service.rs

1use crate::error::TraceEngineError;
2use crate::parquet::tracing::engine::{TableCommand, TraceSpanDBEngine};
3use crate::parquet::tracing::queries::TraceQueries;
4use datafusion::prelude::SessionContext;
5use scouter_settings::ObjectStorageSettings;
6use scouter_types::TraceSpanRecord;
7use std::sync::Arc;
8use tokio::sync::mpsc;
9use tokio::time::{interval, Duration};
10use tracing::{debug, info};
11
12const FLUSH_INTERVAL_SECS: u64 = 5;
13
14/// Global singleton for the TraceSpanService.
15///
16/// Initialized via `init_trace_span_service()` in server setup.
17/// Consumer workers call `get_trace_span_service()` to obtain the Arc for writing.
18/// Uses `RwLock<Option<...>>` so tests can re-initialize with a fresh service.
19static TRACE_SPAN_SERVICE: std::sync::RwLock<Option<Arc<TraceSpanService>>> =
20    std::sync::RwLock::new(None);
21
22/// Initialize the global `TraceSpanService`.
23///
24/// If a previous service exists (e.g. in test re-initialization), it is signaled to
25/// shut down before being replaced.
26pub async fn init_trace_span_service(
27    storage_settings: &ObjectStorageSettings,
28    compaction_interval_hours: u64,
29    flush_interval_secs: Option<u64>,
30    retention_days: Option<u32>,
31) -> Result<Arc<TraceSpanService>, TraceEngineError> {
32    // Shut down any existing service before replacing
33    let old_service = {
34        let guard = TRACE_SPAN_SERVICE.read().unwrap();
35        guard.clone()
36    };
37    if let Some(old) = old_service {
38        info!("Shutting down previous TraceSpanService before re-initialization");
39        old.signal_shutdown().await;
40    }
41
42    let service = Arc::new(
43        TraceSpanService::new(
44            storage_settings,
45            compaction_interval_hours,
46            flush_interval_secs,
47            retention_days,
48        )
49        .await?,
50    );
51
52    {
53        let mut guard = TRACE_SPAN_SERVICE.write().unwrap();
54        *guard = Some(service.clone());
55    }
56
57    info!("TraceSpanService global singleton initialized");
58    Ok(service)
59}
60
61/// Retrieve the global `TraceSpanService` initialized during startup.
62///
63/// Returns `None` if called before `init_trace_span_service()`.
64pub fn get_trace_span_service() -> Option<Arc<TraceSpanService>> {
65    TRACE_SPAN_SERVICE.read().unwrap().clone()
66}
67
68pub struct TraceSpanService {
69    engine_tx: mpsc::Sender<TableCommand>,
70    span_tx: mpsc::Sender<Vec<TraceSpanRecord>>,
71    shutdown_tx: mpsc::Sender<()>,
72    engine_handle: tokio::task::JoinHandle<()>,
73    buffer_handle: tokio::task::JoinHandle<()>,
74    pub query_service: TraceQueries,
75    /// Shared SessionContext — exposes `trace_spans` registration for TraceSummaryService.
76    pub ctx: Arc<SessionContext>,
77}
78
79impl TraceSpanService {
80    /// Create a new `TraceSpanService` with the given storage settings and start the engine and buffering actors.
81    /// The buffering actor will flush spans to storage when the buffer reaches capacity or after a time interval.
82    /// # Arguments
83    /// * `storage_settings` - Configuration for object storage where trace spans will be persisted.
84    /// * `compaction_interval_hours` - How often the engine should perform compaction
85    ///   (merging small files into larger ones). Longer intervals reduce write amplification
86    ///   but may increase read latency.
87    /// * `flush_interval_secs` - Optional interval in seconds for flushing the buffer to storage
88    pub async fn new(
89        storage_settings: &ObjectStorageSettings,
90        compaction_interval_hours: u64,
91        flush_interval_secs: Option<u64>,
92        retention_days: Option<u32>,
93    ) -> Result<Self, TraceEngineError> {
94        let buffer_size = storage_settings.trace_buffer_size();
95        let engine = TraceSpanDBEngine::new(storage_settings).await?;
96
97        info!(
98            "TraceSpanService initialized with buffer_size: {}",
99            buffer_size
100        );
101
102        let ctx = engine.ctx.clone();
103        let (engine_tx, engine_handle) =
104            engine.start_actor(compaction_interval_hours, retention_days);
105        let (span_tx, span_rx) = mpsc::channel::<Vec<TraceSpanRecord>>(100);
106        let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
107
108        let buffer_handle = Self::start_buffering_actor(
109            engine_tx.clone(),
110            span_rx,
111            shutdown_rx,
112            flush_interval_secs,
113            buffer_size,
114        );
115
116        Ok(TraceSpanService {
117            engine_tx,
118            span_tx,
119            shutdown_tx,
120            engine_handle,
121            buffer_handle,
122            query_service: TraceQueries::new(ctx.clone()),
123            ctx,
124        })
125    }
126
127    fn start_buffering_actor(
128        engine_tx: mpsc::Sender<TableCommand>,
129        mut span_rx: mpsc::Receiver<Vec<TraceSpanRecord>>,
130        mut shutdown_rx: mpsc::Receiver<()>,
131        flush_interval_secs: Option<u64>,
132        buffer_size: usize,
133    ) -> tokio::task::JoinHandle<()> {
134        tokio::spawn(async move {
135            let mut buffer: Vec<TraceSpanRecord> = Vec::with_capacity(buffer_size);
136            let mut flush_ticker = interval(Duration::from_secs(
137                flush_interval_secs.unwrap_or(FLUSH_INTERVAL_SECS),
138            ));
139            flush_ticker.tick().await;
140
141            loop {
142                tokio::select! {
143                    Some(spans) = span_rx.recv() => {
144                        buffer.extend(spans);
145                        if buffer.len() >= buffer_size {
146                            Self::flush_buffer(&engine_tx, &mut buffer).await;
147                        }
148                    }
149                    _ = flush_ticker.tick() => {
150                        if !buffer.is_empty() {
151                            info!("Flushing spans buffer with {} spans", buffer.len());
152                            Self::flush_buffer(&engine_tx, &mut buffer).await;
153                        }
154                    }
155                    _ = shutdown_rx.recv() => {
156                        info!("Buffer actor received shutdown signal");
157                        if !buffer.is_empty() {
158                            info!("Flushing final {} spans before shutdown", buffer.len());
159                            Self::flush_buffer(&engine_tx, &mut buffer).await;
160                        }
161                        break;
162                    }
163                }
164            }
165
166            info!("Buffering actor shutting down");
167        })
168    }
169
170    async fn flush_buffer(
171        engine_tx: &mpsc::Sender<TableCommand>,
172        buffer: &mut Vec<TraceSpanRecord>,
173    ) {
174        if buffer.is_empty() {
175            return;
176        }
177
178        let capacity = buffer.capacity();
179        let spans_to_write = std::mem::replace(buffer, Vec::with_capacity(capacity));
180        let span_count = spans_to_write.len();
181
182        debug!("Sending write command to engine for {} spans", span_count);
183
184        let (tx, rx) = tokio::sync::oneshot::channel();
185
186        if let Err(e) = engine_tx
187            .send(TableCommand::Write {
188                spans: spans_to_write,
189                respond_to: tx,
190            })
191            .await
192        {
193            tracing::error!("Failed to send write command: {}", e);
194            return;
195        }
196
197        match rx.await {
198            Ok(Ok(())) => info!("Successfully flushed {} spans", span_count),
199            Ok(Err(e)) => tracing::error!("Write failed: {}", e),
200            Err(e) => tracing::error!("Failed to receive write response: {}", e),
201        }
202    }
203
204    /// Send spans to the buffering actor for async write to Delta Lake.
205    /// # Arguments
206    /// * `spans` - A batch of `TraceSpanRecord` to write. The buffering actor will flush to storage
207    ///   when the buffer reaches capacity or after a time interval.
208    pub async fn write_spans(&self, spans: Vec<TraceSpanRecord>) -> Result<(), TraceEngineError> {
209        self.span_tx
210            .send(spans)
211            .await
212            .map_err(|_| TraceEngineError::ChannelClosed)?;
213        Ok(())
214    }
215
216    /// Write spans directly to the engine actor, bypassing the buffer.
217    ///
218    /// Unlike `write_spans()`, this method sends a single large batch as one
219    /// Delta Lake commit and awaits the result. Use for bulk seeding/migration
220    /// where you need deterministic commit boundaries and maximum throughput.
221    /// This is used in benchmarks and stress tests to simulate high-volume writes without caching effects
222    pub async fn write_spans_direct(
223        &self,
224        spans: Vec<TraceSpanRecord>,
225    ) -> Result<(), TraceEngineError> {
226        let (tx, rx) = tokio::sync::oneshot::channel();
227        self.engine_tx
228            .send(TableCommand::Write {
229                spans,
230                respond_to: tx,
231            })
232            .await
233            .map_err(|_| TraceEngineError::ChannelClosed)?;
234        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
235    }
236
237    pub async fn optimize(&self) -> Result<(), TraceEngineError> {
238        let (tx, rx) = tokio::sync::oneshot::channel();
239
240        self.engine_tx
241            .send(TableCommand::Optimize { respond_to: tx })
242            .await
243            .map_err(|_| TraceEngineError::ChannelClosed)?;
244
245        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
246    }
247
248    pub async fn vacuum(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
249        let (tx, rx) = tokio::sync::oneshot::channel();
250
251        self.engine_tx
252            .send(TableCommand::Vacuum {
253                retention_hours,
254                respond_to: tx,
255            })
256            .await
257            .map_err(|_| TraceEngineError::ChannelClosed)?;
258
259        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
260    }
261
262    /// Delete all spans with a `partition_date` older than `retention_days` ago,
263    /// then run VACUUM to physically reclaim disk space.
264    ///
265    /// Call order matters: DELETE marks files as unreferenced in the Delta log;
266    /// VACUUM then removes the orphaned Parquet files from storage.
267    pub async fn expire(&self, retention_days: u32) -> Result<(), TraceEngineError> {
268        let cutoff_date =
269            (chrono::Utc::now() - chrono::Duration::days(retention_days as i64)).date_naive();
270
271        // Step 1: DELETE WHERE partition_date < cutoff (marks rows removed in Delta log)
272        let (tx, rx) = tokio::sync::oneshot::channel();
273        self.engine_tx
274            .send(TableCommand::Expire {
275                cutoff_date,
276                respond_to: tx,
277            })
278            .await
279            .map_err(|_| TraceEngineError::ChannelClosed)?;
280        rx.await.map_err(|_| TraceEngineError::ChannelClosed)??;
281
282        // Step 2: VACUUM — physically deletes orphaned files from storage.
283        // retention_hours=0 with enforce_retention_duration=false removes all
284        // post-DELETE orphans immediately.
285        self.vacuum(0).await
286    }
287
288    /// Signal shutdown without consuming `self` — safe to call from `Arc<TraceSpanService>`.
289    ///
290    /// Sends the shutdown signal to the buffering actor and engine actor.
291    /// Callers that own `self` should prefer `shutdown()` to await full drain.
292    pub async fn signal_shutdown(&self) {
293        info!("TraceSpanService signaling shutdown");
294        let _ = self.shutdown_tx.send(()).await;
295        let _ = self.engine_tx.send(TableCommand::Shutdown).await;
296    }
297
298    pub async fn shutdown(self) -> Result<(), TraceEngineError> {
299        info!("TraceSpanService shutting down");
300
301        let _ = self.shutdown_tx.send(()).await;
302
303        if let Err(e) = self.buffer_handle.await {
304            tracing::error!("Buffer handle error: {}", e);
305        }
306
307        self.engine_tx
308            .send(TableCommand::Shutdown)
309            .await
310            .map_err(|_| TraceEngineError::ChannelClosed)?;
311
312        if let Err(e) = self.engine_handle.await {
313            tracing::error!("Engine handle error: {}", e);
314        }
315
316        info!("TraceSpanService shutdown complete");
317        Ok(())
318    }
319}
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324    use crate::parquet::tracing::queries::{
325        date_lit, ts_lit, PARTITION_DATE_COL, SPAN_TABLE_NAME, START_TIME_COL,
326    };
327    use arrow_array::Array;
328    use chrono::Utc;
329    use datafusion::logical_expr::col;
330    use scouter_mocks::generate_trace_with_spans;
331    use scouter_settings::ObjectStorageSettings;
332    use scouter_types::sql::TraceSpan;
333    use scouter_types::{Attribute, SpanId, TraceId, TraceSpanRecord};
334    use serde_json::Value;
335    use tracing_subscriber;
336
337    fn cleanup() {
338        let _ = tracing_subscriber::fmt()
339            .with_max_level(tracing::Level::INFO)
340            .try_init();
341
342        let storage_settings = ObjectStorageSettings::default();
343        let current_dir = std::env::current_dir().unwrap();
344        let storage_path = current_dir.join(storage_settings.storage_root());
345        if storage_path.exists() {
346            std::fs::remove_dir_all(storage_path).unwrap();
347        }
348    }
349
350    /// Build a deterministic `TraceSpanRecord` with the given IDs and attributes.
351    fn make_span(
352        trace_id: &TraceId,
353        span_id: SpanId,
354        parent_span_id: Option<SpanId>,
355        service_name: &str,
356        span_name: &str,
357        attributes: Vec<Attribute>,
358    ) -> TraceSpanRecord {
359        let now = Utc::now();
360        TraceSpanRecord {
361            created_at: now,
362            trace_id: trace_id.clone(),
363            span_id,
364            parent_span_id,
365            flags: 1,
366            trace_state: String::new(),
367            scope_name: "test.scope".to_string(),
368            scope_version: None,
369            span_name: span_name.to_string(),
370            span_kind: "INTERNAL".to_string(),
371            start_time: now,
372            end_time: now + chrono::Duration::milliseconds(100),
373            duration_ms: 100,
374            status_code: 0,
375            status_message: "OK".to_string(),
376            attributes,
377            events: vec![],
378            links: vec![],
379            label: None,
380            input: Value::Null,
381            output: Value::Null,
382            service_name: service_name.to_string(),
383            resource_attributes: vec![],
384        }
385    }
386
387    #[tokio::test]
388    async fn test_service_initialization() -> Result<(), TraceEngineError> {
389        cleanup();
390
391        let storage_settings = ObjectStorageSettings::default();
392        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
393        service.shutdown().await?;
394        cleanup();
395        Ok(())
396    }
397
398    #[tokio::test]
399    async fn test_dataframe_trace_write_single_batch() -> Result<(), TraceEngineError> {
400        cleanup();
401
402        let storage_settings = ObjectStorageSettings::default();
403        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
404
405        let (_trace_record, spans, _tags) = generate_trace_with_spans(3, 0);
406        info!("Test: writing {} spans", spans.len());
407
408        let first_trace_id = spans.first().unwrap().trace_id.clone();
409        service.write_spans(spans).await?;
410
411        info!("Test: waiting for flush");
412        tokio::time::sleep(Duration::from_secs(5)).await;
413
414        let trace_id_bytes = first_trace_id.as_bytes();
415        let result_spans: Vec<TraceSpan> = service
416            .query_service
417            .get_trace_spans(Some(trace_id_bytes.as_slice()), None, None, None, None)
418            .await?;
419
420        assert!(
421            !result_spans.is_empty(),
422            "Expected at least 1 span but got 0"
423        );
424
425        service.shutdown().await?;
426        cleanup();
427        Ok(())
428    }
429
430    /// Verify that `build_span_tree` returns spans in DFS (depth-first) order with correct
431    /// depth, path, and root_span_id fields — matching what the Postgres recursive CTE produced.
432    #[tokio::test]
433    async fn test_span_tree_sort_order() -> Result<(), TraceEngineError> {
434        cleanup();
435
436        let storage_settings = ObjectStorageSettings::default();
437        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
438
439        // Build a deterministic tree: root → child → grandchild
440        let trace_id = TraceId::from_bytes([1u8; 16]);
441        let root_span_id = SpanId::from_bytes([1u8; 8]);
442        let child_span_id = SpanId::from_bytes([2u8; 8]);
443        let grandchild_span_id = SpanId::from_bytes([3u8; 8]);
444
445        let root = make_span(
446            &trace_id,
447            root_span_id.clone(),
448            None,
449            "svc",
450            "root_op",
451            vec![],
452        );
453        let child = make_span(
454            &trace_id,
455            child_span_id.clone(),
456            Some(root_span_id.clone()),
457            "svc",
458            "child_op",
459            vec![],
460        );
461        let grandchild = make_span(
462            &trace_id,
463            grandchild_span_id.clone(),
464            Some(child_span_id.clone()),
465            "svc",
466            "grandchild_op",
467            vec![],
468        );
469
470        service.write_spans(vec![root, child, grandchild]).await?;
471        tokio::time::sleep(Duration::from_secs(4)).await;
472
473        let spans: Vec<TraceSpan> = service
474            .query_service
475            .get_trace_spans(Some(trace_id.as_bytes().as_slice()), None, None, None, None)
476            .await?;
477
478        assert_eq!(spans.len(), 3, "Expected 3 spans");
479
480        // DFS order: root(0), child(1), grandchild(2)
481        let by_order: Vec<&TraceSpan> = {
482            let mut v: Vec<&TraceSpan> = spans.iter().collect();
483            v.sort_by_key(|s| s.span_order);
484            v
485        };
486
487        assert_eq!(
488            by_order[0].span_name, "root_op",
489            "span_order=0 should be root"
490        );
491        assert_eq!(by_order[0].depth, 0);
492        assert_eq!(by_order[0].path.len(), 1);
493
494        assert_eq!(
495            by_order[1].span_name, "child_op",
496            "span_order=1 should be child"
497        );
498        assert_eq!(by_order[1].depth, 1);
499        assert_eq!(by_order[1].path.len(), 2);
500
501        assert_eq!(
502            by_order[2].span_name, "grandchild_op",
503            "span_order=2 should be grandchild"
504        );
505        assert_eq!(by_order[2].depth, 2);
506        assert_eq!(by_order[2].path.len(), 3);
507
508        // All spans should share the same root_span_id
509        let root_sid = root_span_id.to_hex();
510        for span in &spans {
511            assert_eq!(
512                span.root_span_id, root_sid,
513                "root_span_id mismatch for {}",
514                span.span_name
515            );
516        }
517
518        service.shutdown().await?;
519        cleanup();
520        Ok(())
521    }
522
523    /// Verify `get_trace_metrics` returns time-bucketed aggregate rows.
524    #[tokio::test]
525    async fn test_trace_metrics_basic() -> Result<(), TraceEngineError> {
526        cleanup();
527
528        let storage_settings = ObjectStorageSettings::default();
529        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
530
531        let (_record, spans, _tags) = generate_trace_with_spans(5, 0);
532        service.write_spans(spans).await?;
533        tokio::time::sleep(Duration::from_secs(4)).await;
534
535        let start = Utc::now() - chrono::Duration::hours(1);
536        let end = Utc::now() + chrono::Duration::hours(1);
537
538        let metrics = service
539            .query_service
540            .get_trace_metrics(None, start, end, "hour", None, None)
541            .await?;
542
543        assert!(!metrics.is_empty(), "Expected at least one metric bucket");
544        assert!(metrics[0].trace_count > 0, "Expected non-zero trace count");
545
546        service.shutdown().await?;
547        cleanup();
548        Ok(())
549    }
550
551    /// Verify `get_trace_metrics` with a service_name filter excludes other services.
552    #[tokio::test]
553    async fn test_trace_metrics_service_filter() -> Result<(), TraceEngineError> {
554        cleanup();
555
556        let storage_settings = ObjectStorageSettings::default();
557        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
558
559        // Write spans for two distinct services using deterministic IDs
560        let trace_a = TraceId::from_bytes([10u8; 16]);
561        let trace_b = TraceId::from_bytes([20u8; 16]);
562
563        let span_a = make_span(
564            &trace_a,
565            SpanId::from_bytes([10u8; 8]),
566            None,
567            "service_alpha",
568            "op_a",
569            vec![],
570        );
571        let span_b = make_span(
572            &trace_b,
573            SpanId::from_bytes([20u8; 8]),
574            None,
575            "service_beta",
576            "op_b",
577            vec![],
578        );
579
580        service.write_spans(vec![span_a, span_b]).await?;
581        tokio::time::sleep(Duration::from_secs(4)).await;
582
583        let start = Utc::now() - chrono::Duration::hours(1);
584        let end = Utc::now() + chrono::Duration::hours(1);
585
586        // Filter to service_alpha only
587        let metrics_alpha = service
588            .query_service
589            .get_trace_metrics(Some("service_alpha"), start, end, "hour", None, None)
590            .await?;
591
592        // Filter to service_beta only
593        let metrics_beta = service
594            .query_service
595            .get_trace_metrics(Some("service_beta"), start, end, "hour", None, None)
596            .await?;
597
598        let alpha_count: i64 = metrics_alpha.iter().map(|m| m.trace_count).sum();
599        let beta_count: i64 = metrics_beta.iter().map(|m| m.trace_count).sum();
600
601        assert!(alpha_count > 0, "Expected non-zero count for service_alpha");
602        assert!(beta_count > 0, "Expected non-zero count for service_beta");
603
604        // Querying with a non-existent service returns nothing
605        let metrics_none = service
606            .query_service
607            .get_trace_metrics(Some("nonexistent_svc"), start, end, "hour", None, None)
608            .await?;
609        assert!(
610            metrics_none.is_empty(),
611            "Expected no buckets for nonexistent service"
612        );
613
614        service.shutdown().await?;
615        cleanup();
616        Ok(())
617    }
618
619    /// Verify all three DataFusion filter layers are wired correctly by inspecting the
620    /// physical plan produced for a partitioned time-window query.
621    ///
622    /// Layer 1 — partition pruning: `partition_date` appears in the plan because Delta Lake
623    /// pushes partition column filters to directory enumeration before reading any files.
624    ///
625    /// Layer 2 — row-group stats: `start_time` appears because typed Timestamp literals
626    /// enable Parquet min/max pruning across row groups.
627    ///
628    /// Layer 3 — bloom filter: querying a nonexistent `trace_id` within a tight window
629    /// returns 0 rows; bloom filters discard row groups instantly without page-level scanning.
630    #[tokio::test]
631    async fn test_query_plan_shows_filter_layers() -> Result<(), TraceEngineError> {
632        cleanup();
633
634        let storage_settings = ObjectStorageSettings::default();
635        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
636
637        // Write a known batch directly so it is immediately queryable
638        // Use distinct byte values that don't collide with other tests.
639        let trace_id = TraceId::from_bytes([0xAA_u8; 16]);
640        let root_id = SpanId::from_bytes([0xAA_u8; 8]);
641        let child_id = SpanId::from_bytes([0xBB_u8; 8]);
642        let spans = vec![
643            make_span(&trace_id, root_id.clone(), None, "svc-a", "root-op", vec![]),
644            make_span(
645                &trace_id,
646                child_id.clone(),
647                Some(root_id),
648                "svc-a",
649                "child-op",
650                vec![],
651            ),
652        ];
653        service.write_spans_direct(spans).await?;
654
655        // ── Layers 1 & 2: inspect physical plan ──────────────────────────────────
656        let now = Utc::now();
657        let start = now - chrono::Duration::hours(1);
658        let end = now + chrono::Duration::hours(1);
659
660        // Build the same DataFrame the query path would produce internally
661        let df = service
662            .ctx
663            .table(SPAN_TABLE_NAME)
664            .await
665            .map_err(TraceEngineError::DatafusionError)?;
666
667        // Partition filter — pushed to directory enumeration (Layer 1)
668        let df = df
669            .filter(
670                col(PARTITION_DATE_COL)
671                    .gt_eq(date_lit(&start))
672                    .and(col(PARTITION_DATE_COL).lt_eq(date_lit(&end))),
673            )
674            .map_err(TraceEngineError::DatafusionError)?;
675
676        // Timestamp filter — enables row-group min/max pruning (Layer 2)
677        let df = df
678            .filter(
679                col(START_TIME_COL)
680                    .gt_eq(ts_lit(&start))
681                    .and(col(START_TIME_COL).lt(ts_lit(&end))),
682            )
683            .map_err(TraceEngineError::DatafusionError)?;
684
685        // Collect the physical plan as a string via EXPLAIN
686        let explain_df = df
687            .explain(false, false)
688            .map_err(TraceEngineError::DatafusionError)?;
689        let batches = explain_df
690            .collect()
691            .await
692            .map_err(TraceEngineError::DatafusionError)?;
693
694        let plan_text: String = batches
695            .iter()
696            .flat_map(|b| {
697                let plan_col = b.column_by_name("plan").unwrap();
698                let arr =
699                    arrow::compute::cast(plan_col, &arrow::datatypes::DataType::Utf8).unwrap();
700                let s = arr
701                    .as_any()
702                    .downcast_ref::<arrow::array::StringArray>()
703                    .unwrap();
704                (0..s.len())
705                    .map(|i| s.value(i).to_string())
706                    .collect::<Vec<_>>()
707            })
708            .collect::<Vec<_>>()
709            .join("\n");
710
711        // Layer 1: partition filter must appear in plan
712        assert!(
713            plan_text.contains("partition_date"),
714            "Partition filter not found in physical plan:\n{plan_text}"
715        );
716        // Layer 2: start_time row-group filter must appear in plan
717        assert!(
718            plan_text.contains("start_time"),
719            "Row-group time filter not found in physical plan:\n{plan_text}"
720        );
721
722        // ── Layer 3: bloom filter — behavioral proof ──────────────────────────────
723        // A nonexistent trace_id with bloom filters enabled returns 0 rows.
724        // Without bloom filters, DataFusion would scan every row group to confirm absence.
725        let fake_id = TraceId::from_bytes([0xFF_u8; 16]);
726        let result = service
727            .query_service
728            .get_trace_spans(
729                Some(fake_id.as_bytes()),
730                None,
731                Some(&start),
732                Some(&end),
733                None,
734            )
735            .await?;
736        assert!(
737            result.is_empty(),
738            "Expected 0 spans for nonexistent trace_id"
739        );
740
741        service.shutdown().await?;
742        cleanup();
743        Ok(())
744    }
745
746    /// Verify `get_trace_metrics` with attribute_filters narrows results to matching spans.
747    #[tokio::test]
748    async fn test_trace_metrics_with_attribute_filter() -> Result<(), TraceEngineError> {
749        cleanup();
750
751        let storage_settings = ObjectStorageSettings::default();
752        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
753
754        let trace_kafka = TraceId::from_bytes([30u8; 16]);
755        let trace_http = TraceId::from_bytes([40u8; 16]);
756
757        // span with component:kafka attribute
758        let span_kafka = make_span(
759            &trace_kafka,
760            SpanId::from_bytes([30u8; 8]),
761            None,
762            "my_service",
763            "kafka_consumer",
764            vec![Attribute {
765                key: "component".to_string(),
766                value: Value::String("kafka".to_string()),
767            }],
768        );
769        // span with component:http attribute (should NOT match kafka filter)
770        let span_http = make_span(
771            &trace_http,
772            SpanId::from_bytes([40u8; 8]),
773            None,
774            "my_service",
775            "http_handler",
776            vec![Attribute {
777                key: "component".to_string(),
778                value: Value::String("http".to_string()),
779            }],
780        );
781
782        service.write_spans(vec![span_kafka, span_http]).await?;
783        tokio::time::sleep(Duration::from_secs(4)).await;
784
785        let start = Utc::now() - chrono::Duration::hours(1);
786        let end = Utc::now() + chrono::Duration::hours(1);
787        let kafka_filter = vec!["component:kafka".to_string()];
788
789        // With filter, only kafka trace should appear
790        let filtered = service
791            .query_service
792            .get_trace_metrics(None, start, end, "hour", Some(&kafka_filter), None)
793            .await?;
794
795        let filtered_count: i64 = filtered.iter().map(|m| m.trace_count).sum();
796        assert!(
797            filtered_count > 0,
798            "Expected non-zero count with kafka attribute filter"
799        );
800
801        // Without filter, both traces appear
802        let unfiltered = service
803            .query_service
804            .get_trace_metrics(None, start, end, "hour", None, None)
805            .await?;
806        let unfiltered_count: i64 = unfiltered.iter().map(|m| m.trace_count).sum();
807        assert!(
808            unfiltered_count >= filtered_count,
809            "Unfiltered count ({}) should be >= filtered count ({})",
810            unfiltered_count,
811            filtered_count
812        );
813
814        service.shutdown().await?;
815        cleanup();
816        Ok(())
817    }
818}