Skip to main content

scouter_dataframe/parquet/tracing/
service.rs

1use crate::error::TraceEngineError;
2use crate::parquet::tracing::engine::{TableCommand, TraceSpanDBEngine};
3use crate::parquet::tracing::queries::TraceQueries;
4use datafusion::prelude::SessionContext;
5use scouter_settings::ObjectStorageSettings;
6use scouter_types::TraceSpanRecord;
7use std::sync::Arc;
8use tokio::sync::mpsc;
9use tokio::time::{interval, Duration};
10use tracing::{debug, info};
11
12const FLUSH_INTERVAL_SECS: u64 = 5;
13
14/// Global singleton for the TraceSpanService.
15///
16/// Initialized via `init_trace_span_service()` in server setup.
17/// Consumer workers call `get_trace_span_service()` to obtain the Arc for writing.
18/// Uses `RwLock<Option<...>>` so tests can re-initialize with a fresh service.
19static TRACE_SPAN_SERVICE: std::sync::RwLock<Option<Arc<TraceSpanService>>> =
20    std::sync::RwLock::new(None);
21
22/// Initialize the global `TraceSpanService`.
23///
24/// If a previous service exists (e.g. in test re-initialization), it is signaled to
25/// shut down before being replaced.
26pub async fn init_trace_span_service(
27    storage_settings: &ObjectStorageSettings,
28    compaction_interval_hours: u64,
29    flush_interval_secs: Option<u64>,
30    retention_days: Option<u32>,
31) -> Result<Arc<TraceSpanService>, TraceEngineError> {
32    // Shut down any existing service before replacing
33    let old_service = {
34        let guard = TRACE_SPAN_SERVICE.read().unwrap();
35        guard.clone()
36    };
37    if let Some(old) = old_service {
38        info!("Shutting down previous TraceSpanService before re-initialization");
39        old.signal_shutdown().await;
40    }
41
42    let service = Arc::new(
43        TraceSpanService::new(
44            storage_settings,
45            compaction_interval_hours,
46            flush_interval_secs,
47            retention_days,
48        )
49        .await?,
50    );
51
52    {
53        let mut guard = TRACE_SPAN_SERVICE.write().unwrap();
54        *guard = Some(service.clone());
55    }
56
57    info!("TraceSpanService global singleton initialized");
58    Ok(service)
59}
60
61/// Retrieve the global `TraceSpanService` initialized during startup.
62///
63/// Returns `None` if called before `init_trace_span_service()`.
64pub fn get_trace_span_service() -> Option<Arc<TraceSpanService>> {
65    TRACE_SPAN_SERVICE.read().unwrap().clone()
66}
67
68pub struct TraceSpanService {
69    engine_tx: mpsc::Sender<TableCommand>,
70    span_tx: mpsc::Sender<Vec<TraceSpanRecord>>,
71    shutdown_tx: mpsc::Sender<()>,
72    engine_handle: tokio::task::JoinHandle<()>,
73    buffer_handle: tokio::task::JoinHandle<()>,
74    pub query_service: TraceQueries,
75    /// Shared SessionContext — exposes `trace_spans` registration for TraceSummaryService.
76    pub ctx: Arc<SessionContext>,
77}
78
79impl TraceSpanService {
80    /// Create a new `TraceSpanService` with the given storage settings and start the engine and buffering actors.
81    /// The buffering actor will flush spans to storage when the buffer reaches capacity or after a time interval.
82    /// # Arguments
83    /// * `storage_settings` - Configuration for object storage where trace spans will be persisted.
84    /// * `compaction_interval_hours` - How often the engine should perform compaction
85    ///   (merging small files into larger ones). Longer intervals reduce write amplification
86    ///   but may increase read latency.
87    /// * `flush_interval_secs` - Optional interval in seconds for flushing the buffer to storage
88    pub async fn new(
89        storage_settings: &ObjectStorageSettings,
90        compaction_interval_hours: u64,
91        flush_interval_secs: Option<u64>,
92        retention_days: Option<u32>,
93    ) -> Result<Self, TraceEngineError> {
94        let buffer_size = storage_settings.trace_buffer_size();
95        let engine = TraceSpanDBEngine::new(storage_settings).await?;
96
97        info!(
98            "TraceSpanService initialized with buffer_size: {}",
99            buffer_size
100        );
101
102        let ctx = engine.ctx.clone();
103        let (engine_tx, engine_handle) =
104            engine.start_actor(compaction_interval_hours, retention_days);
105        let (span_tx, span_rx) = mpsc::channel::<Vec<TraceSpanRecord>>(100);
106        let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
107
108        let buffer_handle = Self::start_buffering_actor(
109            engine_tx.clone(),
110            span_rx,
111            shutdown_rx,
112            flush_interval_secs,
113            buffer_size,
114        );
115
116        Ok(TraceSpanService {
117            engine_tx,
118            span_tx,
119            shutdown_tx,
120            engine_handle,
121            buffer_handle,
122            query_service: TraceQueries::new(ctx.clone()),
123            ctx,
124        })
125    }
126
127    fn start_buffering_actor(
128        engine_tx: mpsc::Sender<TableCommand>,
129        mut span_rx: mpsc::Receiver<Vec<TraceSpanRecord>>,
130        mut shutdown_rx: mpsc::Receiver<()>,
131        flush_interval_secs: Option<u64>,
132        buffer_size: usize,
133    ) -> tokio::task::JoinHandle<()> {
134        tokio::spawn(async move {
135            let mut buffer: Vec<TraceSpanRecord> = Vec::with_capacity(buffer_size);
136            let mut flush_ticker = interval(Duration::from_secs(
137                flush_interval_secs.unwrap_or(FLUSH_INTERVAL_SECS),
138            ));
139            flush_ticker.tick().await;
140
141            loop {
142                tokio::select! {
143                    Some(spans) = span_rx.recv() => {
144                        buffer.extend(spans);
145                        if buffer.len() >= buffer_size {
146                            Self::flush_buffer(&engine_tx, &mut buffer).await;
147                        }
148                    }
149                    _ = flush_ticker.tick() => {
150                        if !buffer.is_empty() {
151                            info!("Flushing spans buffer with {} spans", buffer.len());
152                            Self::flush_buffer(&engine_tx, &mut buffer).await;
153                        }
154                    }
155                    _ = shutdown_rx.recv() => {
156                        info!("Buffer actor received shutdown signal");
157                        if !buffer.is_empty() {
158                            info!("Flushing final {} spans before shutdown", buffer.len());
159                            Self::flush_buffer(&engine_tx, &mut buffer).await;
160                        }
161                        break;
162                    }
163                }
164            }
165
166            info!("Buffering actor shutting down");
167        })
168    }
169
170    async fn flush_buffer(
171        engine_tx: &mpsc::Sender<TableCommand>,
172        buffer: &mut Vec<TraceSpanRecord>,
173    ) {
174        if buffer.is_empty() {
175            return;
176        }
177
178        let capacity = buffer.capacity();
179        let spans_to_write = std::mem::replace(buffer, Vec::with_capacity(capacity));
180        let span_count = spans_to_write.len();
181
182        debug!("Sending write command to engine for {} spans", span_count);
183
184        let (tx, rx) = tokio::sync::oneshot::channel();
185
186        if let Err(e) = engine_tx
187            .send(TableCommand::Write {
188                spans: spans_to_write,
189                respond_to: tx,
190            })
191            .await
192        {
193            tracing::error!("Failed to send write command: {}", e);
194            return;
195        }
196
197        match rx.await {
198            Ok(Ok(())) => info!("Successfully flushed {} spans", span_count),
199            Ok(Err(e)) => tracing::error!("Write failed: {}", e),
200            Err(e) => tracing::error!("Failed to receive write response: {}", e),
201        }
202    }
203
204    /// Send spans to the buffering actor for async write to Delta Lake.
205    /// # Arguments
206    /// * `spans` - A batch of `TraceSpanRecord` to write. The buffering actor will flush to storage
207    ///   when the buffer reaches capacity or after a time interval.
208    pub async fn write_spans(&self, spans: Vec<TraceSpanRecord>) -> Result<(), TraceEngineError> {
209        self.span_tx
210            .send(spans)
211            .await
212            .map_err(|_| TraceEngineError::ChannelClosed)?;
213        Ok(())
214    }
215
216    /// Write spans directly to the engine actor, bypassing the buffer.
217    ///
218    /// Unlike `write_spans()`, this method sends a single large batch as one
219    /// Delta Lake commit and awaits the result. Use for bulk seeding/migration
220    /// where you need deterministic commit boundaries and maximum throughput.
221    /// This is used in benchmarks and stress tests to simulate high-volume writes without caching effects
222    pub async fn write_spans_direct(
223        &self,
224        spans: Vec<TraceSpanRecord>,
225    ) -> Result<(), TraceEngineError> {
226        let (tx, rx) = tokio::sync::oneshot::channel();
227        self.engine_tx
228            .send(TableCommand::Write {
229                spans,
230                respond_to: tx,
231            })
232            .await
233            .map_err(|_| TraceEngineError::ChannelClosed)?;
234        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
235    }
236
237    // Admin operations — these can be called directly or via the control table for single-writer cron-style execution across
238    // Primarily used in tests and benches. In Production, we default to the control table approach to ensure only worker runs them at a time, but the direct methods remain available for manual invocation when needed.
239    pub async fn optimize(&self) -> Result<(), TraceEngineError> {
240        let (tx, rx) = tokio::sync::oneshot::channel();
241
242        self.engine_tx
243            .send(TableCommand::Optimize { respond_to: tx })
244            .await
245            .map_err(|_| TraceEngineError::ChannelClosed)?;
246
247        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
248    }
249
250    pub async fn vacuum(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
251        let (tx, rx) = tokio::sync::oneshot::channel();
252
253        self.engine_tx
254            .send(TableCommand::Vacuum {
255                retention_hours,
256                respond_to: tx,
257            })
258            .await
259            .map_err(|_| TraceEngineError::ChannelClosed)?;
260
261        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
262    }
263
264    /// Delete all spans with a `partition_date` older than `retention_days` ago,
265    /// then run VACUUM to physically reclaim disk space.
266    ///
267    /// Call order matters: DELETE marks files as unreferenced in the Delta log;
268    /// VACUUM then removes the orphaned Parquet files from storage.
269    pub async fn expire(&self, retention_days: u32) -> Result<(), TraceEngineError> {
270        let cutoff_date =
271            (chrono::Utc::now() - chrono::Duration::days(retention_days as i64)).date_naive();
272
273        // Step 1: DELETE WHERE partition_date < cutoff (marks rows removed in Delta log)
274        let (tx, rx) = tokio::sync::oneshot::channel();
275        self.engine_tx
276            .send(TableCommand::Expire {
277                cutoff_date,
278                respond_to: tx,
279            })
280            .await
281            .map_err(|_| TraceEngineError::ChannelClosed)?;
282        rx.await.map_err(|_| TraceEngineError::ChannelClosed)??;
283
284        // Step 2: VACUUM — physically deletes orphaned files from storage.
285        // retention_hours=0 with enforce_retention_duration=false removes all
286        // post-DELETE orphans immediately.
287        self.vacuum(0).await
288    }
289
290    /// Signal shutdown without consuming `self` — safe to call from `Arc<TraceSpanService>`.
291    ///
292    /// Sends the shutdown signal to the buffering actor and engine actor.
293    /// Callers that own `self` should prefer `shutdown()` to await full drain.
294    pub async fn signal_shutdown(&self) {
295        info!("TraceSpanService signaling shutdown");
296        let _ = self.shutdown_tx.send(()).await;
297        let _ = self.engine_tx.send(TableCommand::Shutdown).await;
298    }
299
300    pub async fn shutdown(self) -> Result<(), TraceEngineError> {
301        info!("TraceSpanService shutting down");
302
303        let _ = self.shutdown_tx.send(()).await;
304
305        if let Err(e) = self.buffer_handle.await {
306            tracing::error!("Buffer handle error: {}", e);
307        }
308
309        self.engine_tx
310            .send(TableCommand::Shutdown)
311            .await
312            .map_err(|_| TraceEngineError::ChannelClosed)?;
313
314        if let Err(e) = self.engine_handle.await {
315            tracing::error!("Engine handle error: {}", e);
316        }
317
318        info!("TraceSpanService shutdown complete");
319        Ok(())
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326    use crate::parquet::tracing::queries::{
327        date_lit, ts_lit, PARTITION_DATE_COL, SPAN_TABLE_NAME, START_TIME_COL,
328    };
329    use arrow_array::Array;
330    use chrono::Utc;
331    use datafusion::logical_expr::col;
332    use scouter_mocks::generate_trace_with_spans;
333    use scouter_settings::ObjectStorageSettings;
334    use scouter_types::sql::TraceSpan;
335    use scouter_types::{Attribute, SpanId, TraceId, TraceSpanRecord};
336    use serde_json::Value;
337    use tracing_subscriber;
338
339    fn cleanup() {
340        let _ = tracing_subscriber::fmt()
341            .with_max_level(tracing::Level::INFO)
342            .try_init();
343
344        let storage_settings = ObjectStorageSettings::default();
345        let current_dir = std::env::current_dir().unwrap();
346        let storage_path = current_dir.join(storage_settings.storage_root());
347        if storage_path.exists() {
348            std::fs::remove_dir_all(storage_path).unwrap();
349        }
350    }
351
352    /// Build a deterministic `TraceSpanRecord` with the given IDs and attributes.
353    fn make_span(
354        trace_id: &TraceId,
355        span_id: SpanId,
356        parent_span_id: Option<SpanId>,
357        service_name: &str,
358        span_name: &str,
359        attributes: Vec<Attribute>,
360    ) -> TraceSpanRecord {
361        let now = Utc::now();
362        TraceSpanRecord {
363            created_at: now,
364            trace_id: trace_id.clone(),
365            span_id,
366            parent_span_id,
367            flags: 1,
368            trace_state: String::new(),
369            scope_name: "test.scope".to_string(),
370            scope_version: None,
371            span_name: span_name.to_string(),
372            span_kind: "INTERNAL".to_string(),
373            start_time: now,
374            end_time: now + chrono::Duration::milliseconds(100),
375            duration_ms: 100,
376            status_code: 0,
377            status_message: "OK".to_string(),
378            attributes,
379            events: vec![],
380            links: vec![],
381            label: None,
382            input: Value::Null,
383            output: Value::Null,
384            service_name: service_name.to_string(),
385            resource_attributes: vec![],
386        }
387    }
388
389    #[tokio::test]
390    async fn test_service_initialization() -> Result<(), TraceEngineError> {
391        cleanup();
392
393        let storage_settings = ObjectStorageSettings::default();
394        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
395        service.shutdown().await?;
396        cleanup();
397        Ok(())
398    }
399
400    #[tokio::test]
401    async fn test_dataframe_trace_write_single_batch() -> Result<(), TraceEngineError> {
402        cleanup();
403
404        let storage_settings = ObjectStorageSettings::default();
405        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
406
407        let (_trace_record, spans, _tags) = generate_trace_with_spans(3, 0);
408        info!("Test: writing {} spans", spans.len());
409
410        let first_trace_id = spans.first().unwrap().trace_id.clone();
411        service.write_spans(spans).await?;
412
413        info!("Test: waiting for flush");
414        tokio::time::sleep(Duration::from_secs(5)).await;
415
416        let trace_id_bytes = first_trace_id.as_bytes();
417        let result_spans: Vec<TraceSpan> = service
418            .query_service
419            .get_trace_spans(Some(trace_id_bytes.as_slice()), None, None, None, None)
420            .await?;
421
422        assert!(
423            !result_spans.is_empty(),
424            "Expected at least 1 span but got 0"
425        );
426
427        service.shutdown().await?;
428        cleanup();
429        Ok(())
430    }
431
432    /// Verify that `build_span_tree` returns spans in DFS (depth-first) order with correct
433    /// depth, path, and root_span_id fields — matching what the Postgres recursive CTE produced.
434    #[tokio::test]
435    async fn test_span_tree_sort_order() -> Result<(), TraceEngineError> {
436        cleanup();
437
438        let storage_settings = ObjectStorageSettings::default();
439        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
440
441        // Build a deterministic tree: root → child → grandchild
442        let trace_id = TraceId::from_bytes([1u8; 16]);
443        let root_span_id = SpanId::from_bytes([1u8; 8]);
444        let child_span_id = SpanId::from_bytes([2u8; 8]);
445        let grandchild_span_id = SpanId::from_bytes([3u8; 8]);
446
447        let root = make_span(
448            &trace_id,
449            root_span_id.clone(),
450            None,
451            "svc",
452            "root_op",
453            vec![],
454        );
455        let child = make_span(
456            &trace_id,
457            child_span_id.clone(),
458            Some(root_span_id.clone()),
459            "svc",
460            "child_op",
461            vec![],
462        );
463        let grandchild = make_span(
464            &trace_id,
465            grandchild_span_id.clone(),
466            Some(child_span_id.clone()),
467            "svc",
468            "grandchild_op",
469            vec![],
470        );
471
472        service.write_spans(vec![root, child, grandchild]).await?;
473        tokio::time::sleep(Duration::from_secs(4)).await;
474
475        let spans: Vec<TraceSpan> = service
476            .query_service
477            .get_trace_spans(Some(trace_id.as_bytes().as_slice()), None, None, None, None)
478            .await?;
479
480        assert_eq!(spans.len(), 3, "Expected 3 spans");
481
482        // DFS order: root(0), child(1), grandchild(2)
483        let by_order: Vec<&TraceSpan> = {
484            let mut v: Vec<&TraceSpan> = spans.iter().collect();
485            v.sort_by_key(|s| s.span_order);
486            v
487        };
488
489        assert_eq!(
490            by_order[0].span_name, "root_op",
491            "span_order=0 should be root"
492        );
493        assert_eq!(by_order[0].depth, 0);
494        assert_eq!(by_order[0].path.len(), 1);
495
496        assert_eq!(
497            by_order[1].span_name, "child_op",
498            "span_order=1 should be child"
499        );
500        assert_eq!(by_order[1].depth, 1);
501        assert_eq!(by_order[1].path.len(), 2);
502
503        assert_eq!(
504            by_order[2].span_name, "grandchild_op",
505            "span_order=2 should be grandchild"
506        );
507        assert_eq!(by_order[2].depth, 2);
508        assert_eq!(by_order[2].path.len(), 3);
509
510        // All spans should share the same root_span_id
511        let root_sid = root_span_id.to_hex();
512        for span in &spans {
513            assert_eq!(
514                span.root_span_id, root_sid,
515                "root_span_id mismatch for {}",
516                span.span_name
517            );
518        }
519
520        service.shutdown().await?;
521        cleanup();
522        Ok(())
523    }
524
525    /// Verify `get_trace_metrics` returns time-bucketed aggregate rows.
526    #[tokio::test]
527    async fn test_trace_metrics_basic() -> Result<(), TraceEngineError> {
528        cleanup();
529
530        let storage_settings = ObjectStorageSettings::default();
531        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
532
533        let (_record, spans, _tags) = generate_trace_with_spans(5, 0);
534        service.write_spans(spans).await?;
535        tokio::time::sleep(Duration::from_secs(4)).await;
536
537        let start = Utc::now() - chrono::Duration::hours(1);
538        let end = Utc::now() + chrono::Duration::hours(1);
539
540        let metrics = service
541            .query_service
542            .get_trace_metrics(None, start, end, "hour", None, None)
543            .await?;
544
545        assert!(!metrics.is_empty(), "Expected at least one metric bucket");
546        assert!(metrics[0].trace_count > 0, "Expected non-zero trace count");
547
548        service.shutdown().await?;
549        cleanup();
550        Ok(())
551    }
552
553    /// Verify `get_trace_metrics` with a service_name filter excludes other services.
554    #[tokio::test]
555    async fn test_trace_metrics_service_filter() -> Result<(), TraceEngineError> {
556        cleanup();
557
558        let storage_settings = ObjectStorageSettings::default();
559        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
560
561        // Write spans for two distinct services using deterministic IDs
562        let trace_a = TraceId::from_bytes([10u8; 16]);
563        let trace_b = TraceId::from_bytes([20u8; 16]);
564
565        let span_a = make_span(
566            &trace_a,
567            SpanId::from_bytes([10u8; 8]),
568            None,
569            "service_alpha",
570            "op_a",
571            vec![],
572        );
573        let span_b = make_span(
574            &trace_b,
575            SpanId::from_bytes([20u8; 8]),
576            None,
577            "service_beta",
578            "op_b",
579            vec![],
580        );
581
582        service.write_spans(vec![span_a, span_b]).await?;
583        tokio::time::sleep(Duration::from_secs(4)).await;
584
585        let start = Utc::now() - chrono::Duration::hours(1);
586        let end = Utc::now() + chrono::Duration::hours(1);
587
588        // Filter to service_alpha only
589        let metrics_alpha = service
590            .query_service
591            .get_trace_metrics(Some("service_alpha"), start, end, "hour", None, None)
592            .await?;
593
594        // Filter to service_beta only
595        let metrics_beta = service
596            .query_service
597            .get_trace_metrics(Some("service_beta"), start, end, "hour", None, None)
598            .await?;
599
600        let alpha_count: i64 = metrics_alpha.iter().map(|m| m.trace_count).sum();
601        let beta_count: i64 = metrics_beta.iter().map(|m| m.trace_count).sum();
602
603        assert!(alpha_count > 0, "Expected non-zero count for service_alpha");
604        assert!(beta_count > 0, "Expected non-zero count for service_beta");
605
606        // Querying with a non-existent service returns nothing
607        let metrics_none = service
608            .query_service
609            .get_trace_metrics(Some("nonexistent_svc"), start, end, "hour", None, None)
610            .await?;
611        assert!(
612            metrics_none.is_empty(),
613            "Expected no buckets for nonexistent service"
614        );
615
616        service.shutdown().await?;
617        cleanup();
618        Ok(())
619    }
620
621    /// Verify all three DataFusion filter layers are wired correctly by inspecting the
622    /// physical plan produced for a partitioned time-window query.
623    ///
624    /// Layer 1 — partition pruning: `partition_date` appears in the plan because Delta Lake
625    /// pushes partition column filters to directory enumeration before reading any files.
626    ///
627    /// Layer 2 — row-group stats: `start_time` appears because typed Timestamp literals
628    /// enable Parquet min/max pruning across row groups.
629    ///
630    /// Layer 3 — bloom filter: querying a nonexistent `trace_id` within a tight window
631    /// returns 0 rows; bloom filters discard row groups instantly without page-level scanning.
632    #[tokio::test]
633    async fn test_query_plan_shows_filter_layers() -> Result<(), TraceEngineError> {
634        cleanup();
635
636        let storage_settings = ObjectStorageSettings::default();
637        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
638
639        // Write a known batch directly so it is immediately queryable
640        // Use distinct byte values that don't collide with other tests.
641        let trace_id = TraceId::from_bytes([0xAA_u8; 16]);
642        let root_id = SpanId::from_bytes([0xAA_u8; 8]);
643        let child_id = SpanId::from_bytes([0xBB_u8; 8]);
644        let spans = vec![
645            make_span(&trace_id, root_id.clone(), None, "svc-a", "root-op", vec![]),
646            make_span(
647                &trace_id,
648                child_id.clone(),
649                Some(root_id),
650                "svc-a",
651                "child-op",
652                vec![],
653            ),
654        ];
655        service.write_spans_direct(spans).await?;
656
657        // ── Layers 1 & 2: inspect physical plan ──────────────────────────────────
658        let now = Utc::now();
659        let start = now - chrono::Duration::hours(1);
660        let end = now + chrono::Duration::hours(1);
661
662        // Build the same DataFrame the query path would produce internally
663        let df = service
664            .ctx
665            .table(SPAN_TABLE_NAME)
666            .await
667            .map_err(TraceEngineError::DatafusionError)?;
668
669        // Partition filter — pushed to directory enumeration (Layer 1)
670        let df = df
671            .filter(
672                col(PARTITION_DATE_COL)
673                    .gt_eq(date_lit(&start))
674                    .and(col(PARTITION_DATE_COL).lt_eq(date_lit(&end))),
675            )
676            .map_err(TraceEngineError::DatafusionError)?;
677
678        // Timestamp filter — enables row-group min/max pruning (Layer 2)
679        let df = df
680            .filter(
681                col(START_TIME_COL)
682                    .gt_eq(ts_lit(&start))
683                    .and(col(START_TIME_COL).lt(ts_lit(&end))),
684            )
685            .map_err(TraceEngineError::DatafusionError)?;
686
687        // Collect the physical plan as a string via EXPLAIN
688        let explain_df = df
689            .explain(false, false)
690            .map_err(TraceEngineError::DatafusionError)?;
691        let batches = explain_df
692            .collect()
693            .await
694            .map_err(TraceEngineError::DatafusionError)?;
695
696        let plan_text: String = batches
697            .iter()
698            .flat_map(|b| {
699                let plan_col = b.column_by_name("plan").unwrap();
700                let arr =
701                    arrow::compute::cast(plan_col, &arrow::datatypes::DataType::Utf8).unwrap();
702                let s = arr
703                    .as_any()
704                    .downcast_ref::<arrow::array::StringArray>()
705                    .unwrap();
706                (0..s.len())
707                    .map(|i| s.value(i).to_string())
708                    .collect::<Vec<_>>()
709            })
710            .collect::<Vec<_>>()
711            .join("\n");
712
713        // Layer 1: partition filter must appear in plan
714        assert!(
715            plan_text.contains("partition_date"),
716            "Partition filter not found in physical plan:\n{plan_text}"
717        );
718        // Layer 2: start_time row-group filter must appear in plan
719        assert!(
720            plan_text.contains("start_time"),
721            "Row-group time filter not found in physical plan:\n{plan_text}"
722        );
723
724        // ── Layer 3: bloom filter — behavioral proof ──────────────────────────────
725        // A nonexistent trace_id with bloom filters enabled returns 0 rows.
726        // Without bloom filters, DataFusion would scan every row group to confirm absence.
727        let fake_id = TraceId::from_bytes([0xFF_u8; 16]);
728        let result = service
729            .query_service
730            .get_trace_spans(
731                Some(fake_id.as_bytes()),
732                None,
733                Some(&start),
734                Some(&end),
735                None,
736            )
737            .await?;
738        assert!(
739            result.is_empty(),
740            "Expected 0 spans for nonexistent trace_id"
741        );
742
743        service.shutdown().await?;
744        cleanup();
745        Ok(())
746    }
747
748    /// Verify `get_trace_metrics` with attribute_filters narrows results to matching spans.
749    #[tokio::test]
750    async fn test_trace_metrics_with_attribute_filter() -> Result<(), TraceEngineError> {
751        cleanup();
752
753        let storage_settings = ObjectStorageSettings::default();
754        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
755
756        let trace_kafka = TraceId::from_bytes([30u8; 16]);
757        let trace_http = TraceId::from_bytes([40u8; 16]);
758
759        // span with component:kafka attribute
760        let span_kafka = make_span(
761            &trace_kafka,
762            SpanId::from_bytes([30u8; 8]),
763            None,
764            "my_service",
765            "kafka_consumer",
766            vec![Attribute {
767                key: "component".to_string(),
768                value: Value::String("kafka".to_string()),
769            }],
770        );
771        // span with component:http attribute (should NOT match kafka filter)
772        let span_http = make_span(
773            &trace_http,
774            SpanId::from_bytes([40u8; 8]),
775            None,
776            "my_service",
777            "http_handler",
778            vec![Attribute {
779                key: "component".to_string(),
780                value: Value::String("http".to_string()),
781            }],
782        );
783
784        service.write_spans(vec![span_kafka, span_http]).await?;
785        tokio::time::sleep(Duration::from_secs(4)).await;
786
787        let start = Utc::now() - chrono::Duration::hours(1);
788        let end = Utc::now() + chrono::Duration::hours(1);
789        let kafka_filter = vec!["component:kafka".to_string()];
790
791        // With filter, only kafka trace should appear
792        let filtered = service
793            .query_service
794            .get_trace_metrics(None, start, end, "hour", Some(&kafka_filter), None)
795            .await?;
796
797        let filtered_count: i64 = filtered.iter().map(|m| m.trace_count).sum();
798        assert!(
799            filtered_count > 0,
800            "Expected non-zero count with kafka attribute filter"
801        );
802
803        // Without filter, both traces appear
804        let unfiltered = service
805            .query_service
806            .get_trace_metrics(None, start, end, "hour", None, None)
807            .await?;
808        let unfiltered_count: i64 = unfiltered.iter().map(|m| m.trace_count).sum();
809        assert!(
810            unfiltered_count >= filtered_count,
811            "Unfiltered count ({}) should be >= filtered count ({})",
812            unfiltered_count,
813            filtered_count
814        );
815
816        service.shutdown().await?;
817        cleanup();
818        Ok(())
819    }
820}