Skip to main content

scouter_dataframe/parquet/tracing/
service.rs

1use crate::error::TraceEngineError;
2use crate::parquet::tracing::engine::{TableCommand, TraceSpanDBEngine};
3use crate::parquet::tracing::queries::TraceQueries;
4use crate::storage::ObjectStore;
5use datafusion::prelude::SessionContext;
6use scouter_settings::ObjectStorageSettings;
7use scouter_types::TraceSpanRecord;
8use std::sync::Arc;
9use tokio::sync::mpsc;
10use tokio::time::{interval, Duration};
11use tracing::{debug, info};
12
13const FLUSH_INTERVAL_SECS: u64 = 5;
14
15/// Global singleton for the TraceSpanService.
16///
17/// Initialized via `init_trace_span_service()` in server setup.
18/// Consumer workers call `get_trace_span_service()` to obtain the Arc for writing.
19/// Uses `RwLock<Option<...>>` so tests can re-initialize with a fresh service.
20static TRACE_SPAN_SERVICE: std::sync::RwLock<Option<Arc<TraceSpanService>>> =
21    std::sync::RwLock::new(None);
22
23/// Initialize the global `TraceSpanService`.
24///
25/// If a previous service exists (e.g. in test re-initialization), it is signaled to
26/// shut down before being replaced.
27pub async fn init_trace_span_service(
28    storage_settings: &ObjectStorageSettings,
29    compaction_interval_hours: u64,
30    flush_interval_secs: Option<u64>,
31    retention_days: Option<u32>,
32) -> Result<Arc<TraceSpanService>, TraceEngineError> {
33    // Shut down any existing service before replacing
34    let old_service = {
35        let guard = TRACE_SPAN_SERVICE.read().unwrap();
36        guard.clone()
37    };
38    if let Some(old) = old_service {
39        info!("Shutting down previous TraceSpanService before re-initialization");
40        old.signal_shutdown().await;
41    }
42
43    let service = Arc::new(
44        TraceSpanService::new(
45            storage_settings,
46            compaction_interval_hours,
47            flush_interval_secs,
48            retention_days,
49        )
50        .await?,
51    );
52
53    {
54        let mut guard = TRACE_SPAN_SERVICE.write().unwrap();
55        *guard = Some(service.clone());
56    }
57
58    info!("TraceSpanService global singleton initialized");
59    Ok(service)
60}
61
62/// Retrieve the global `TraceSpanService` initialized during startup.
63///
64/// Returns `None` if called before `init_trace_span_service()`.
65pub fn get_trace_span_service() -> Option<Arc<TraceSpanService>> {
66    TRACE_SPAN_SERVICE.read().unwrap().clone()
67}
68
69pub struct TraceSpanService {
70    engine_tx: mpsc::Sender<TableCommand>,
71    span_tx: mpsc::Sender<Vec<TraceSpanRecord>>,
72    shutdown_tx: mpsc::Sender<()>,
73    engine_handle: tokio::task::JoinHandle<()>,
74    buffer_handle: tokio::task::JoinHandle<()>,
75    pub query_service: TraceQueries,
76    /// Shared SessionContext — exposes `trace_spans` registration for TraceSummaryService.
77    pub ctx: Arc<SessionContext>,
78    /// Shared ObjectStore — passed to TraceSummaryService so both engines use the same
79    /// CachingStore instance, preventing stale reads on cloud backends (GCS/S3).
80    pub object_store: ObjectStore,
81}
82
83impl TraceSpanService {
84    /// Create a new `TraceSpanService` with the given storage settings and start the engine and buffering actors.
85    /// The buffering actor will flush spans to storage when the buffer reaches capacity or after a time interval.
86    /// # Arguments
87    /// * `storage_settings` - Configuration for object storage where trace spans will be persisted.
88    /// * `compaction_interval_hours` - How often the engine should perform compaction
89    ///   (merging small files into larger ones). Longer intervals reduce write amplification
90    ///   but may increase read latency.
91    /// * `flush_interval_secs` - Optional interval in seconds for flushing the buffer to storage
92    pub async fn new(
93        storage_settings: &ObjectStorageSettings,
94        compaction_interval_hours: u64,
95        flush_interval_secs: Option<u64>,
96        retention_days: Option<u32>,
97    ) -> Result<Self, TraceEngineError> {
98        let buffer_size = storage_settings.trace_buffer_size();
99        let engine = TraceSpanDBEngine::new(storage_settings).await?;
100
101        info!(
102            "TraceSpanService initialized with buffer_size: {}",
103            buffer_size
104        );
105
106        let ctx = engine.ctx.clone();
107        let object_store = engine.object_store.clone();
108        let (engine_tx, engine_handle) =
109            engine.start_actor(compaction_interval_hours, retention_days);
110        let (span_tx, span_rx) = mpsc::channel::<Vec<TraceSpanRecord>>(100);
111        let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
112
113        let buffer_handle = Self::start_buffering_actor(
114            engine_tx.clone(),
115            span_rx,
116            shutdown_rx,
117            flush_interval_secs,
118            buffer_size,
119        );
120
121        Ok(TraceSpanService {
122            engine_tx,
123            span_tx,
124            shutdown_tx,
125            engine_handle,
126            buffer_handle,
127            query_service: TraceQueries::new(ctx.clone()),
128            ctx,
129            object_store,
130        })
131    }
132
133    fn start_buffering_actor(
134        engine_tx: mpsc::Sender<TableCommand>,
135        mut span_rx: mpsc::Receiver<Vec<TraceSpanRecord>>,
136        mut shutdown_rx: mpsc::Receiver<()>,
137        flush_interval_secs: Option<u64>,
138        buffer_size: usize,
139    ) -> tokio::task::JoinHandle<()> {
140        tokio::spawn(async move {
141            let mut buffer: Vec<TraceSpanRecord> = Vec::with_capacity(buffer_size);
142            let mut flush_ticker = interval(Duration::from_secs(
143                flush_interval_secs.unwrap_or(FLUSH_INTERVAL_SECS),
144            ));
145            flush_ticker.tick().await;
146
147            loop {
148                tokio::select! {
149                    Some(spans) = span_rx.recv() => {
150                        buffer.extend(spans);
151                        if buffer.len() >= buffer_size {
152                            Self::flush_buffer(&engine_tx, &mut buffer).await;
153                        }
154                    }
155                    _ = flush_ticker.tick() => {
156                        if !buffer.is_empty() {
157                            info!("Flushing spans buffer with {} spans", buffer.len());
158                            Self::flush_buffer(&engine_tx, &mut buffer).await;
159                        }
160                    }
161                    _ = shutdown_rx.recv() => {
162                        info!("Buffer actor received shutdown signal");
163                        if !buffer.is_empty() {
164                            info!("Flushing final {} spans before shutdown", buffer.len());
165                            Self::flush_buffer(&engine_tx, &mut buffer).await;
166                        }
167                        break;
168                    }
169                }
170            }
171
172            info!("Buffering actor shutting down");
173        })
174    }
175
176    async fn flush_buffer(
177        engine_tx: &mpsc::Sender<TableCommand>,
178        buffer: &mut Vec<TraceSpanRecord>,
179    ) {
180        if buffer.is_empty() {
181            return;
182        }
183
184        let capacity = buffer.capacity();
185        let spans_to_write = std::mem::replace(buffer, Vec::with_capacity(capacity));
186        let span_count = spans_to_write.len();
187
188        debug!("Sending write command to engine for {} spans", span_count);
189
190        let (tx, rx) = tokio::sync::oneshot::channel();
191
192        if let Err(e) = engine_tx
193            .send(TableCommand::Write {
194                spans: spans_to_write,
195                respond_to: tx,
196            })
197            .await
198        {
199            tracing::error!("Failed to send write command: {}", e);
200            return;
201        }
202
203        match rx.await {
204            Ok(Ok(())) => info!("Successfully flushed {} spans", span_count),
205            Ok(Err(e)) => tracing::error!("Write failed: {}", e),
206            Err(e) => tracing::error!("Failed to receive write response: {}", e),
207        }
208    }
209
210    /// Send spans to the buffering actor for async write to Delta Lake.
211    /// # Arguments
212    /// * `spans` - A batch of `TraceSpanRecord` to write. The buffering actor will flush to storage
213    ///   when the buffer reaches capacity or after a time interval.
214    pub async fn write_spans(&self, spans: Vec<TraceSpanRecord>) -> Result<(), TraceEngineError> {
215        self.span_tx
216            .send(spans)
217            .await
218            .map_err(|_| TraceEngineError::ChannelClosed)?;
219        Ok(())
220    }
221
222    /// Write spans directly to the engine actor, bypassing the buffer.
223    ///
224    /// Unlike `write_spans()`, this method sends a single large batch as one
225    /// Delta Lake commit and awaits the result. Use for bulk seeding/migration
226    /// where you need deterministic commit boundaries and maximum throughput.
227    /// This is used in benchmarks and stress tests to simulate high-volume writes without caching effects
228    pub async fn write_spans_direct(
229        &self,
230        spans: Vec<TraceSpanRecord>,
231    ) -> Result<(), TraceEngineError> {
232        let (tx, rx) = tokio::sync::oneshot::channel();
233        self.engine_tx
234            .send(TableCommand::Write {
235                spans,
236                respond_to: tx,
237            })
238            .await
239            .map_err(|_| TraceEngineError::ChannelClosed)?;
240        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
241    }
242
243    // Admin operations — these can be called directly or via the control table for single-writer cron-style execution across
244    // Primarily used in tests and benches. In Production, we default to the control table approach to ensure only worker runs them at a time, but the direct methods remain available for manual invocation when needed.
245    pub async fn optimize(&self) -> Result<(), TraceEngineError> {
246        let (tx, rx) = tokio::sync::oneshot::channel();
247
248        self.engine_tx
249            .send(TableCommand::Optimize { respond_to: tx })
250            .await
251            .map_err(|_| TraceEngineError::ChannelClosed)?;
252
253        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
254    }
255
256    pub async fn vacuum(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
257        let (tx, rx) = tokio::sync::oneshot::channel();
258
259        self.engine_tx
260            .send(TableCommand::Vacuum {
261                retention_hours,
262                respond_to: tx,
263            })
264            .await
265            .map_err(|_| TraceEngineError::ChannelClosed)?;
266
267        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
268    }
269
270    /// Delete all spans with a `partition_date` older than `retention_days` ago,
271    /// then run VACUUM to physically reclaim disk space.
272    ///
273    /// Call order matters: DELETE marks files as unreferenced in the Delta log;
274    /// VACUUM then removes the orphaned Parquet files from storage.
275    pub async fn expire(&self, retention_days: u32) -> Result<(), TraceEngineError> {
276        let cutoff_date =
277            (chrono::Utc::now() - chrono::Duration::days(retention_days as i64)).date_naive();
278
279        // Step 1: DELETE WHERE partition_date < cutoff (marks rows removed in Delta log)
280        let (tx, rx) = tokio::sync::oneshot::channel();
281        self.engine_tx
282            .send(TableCommand::Expire {
283                cutoff_date,
284                respond_to: tx,
285            })
286            .await
287            .map_err(|_| TraceEngineError::ChannelClosed)?;
288        rx.await.map_err(|_| TraceEngineError::ChannelClosed)??;
289
290        // Step 2: VACUUM — physically deletes orphaned files from storage.
291        // retention_hours=0 with enforce_retention_duration=false removes all
292        // post-DELETE orphans immediately.
293        self.vacuum(0).await
294    }
295
296    /// Signal shutdown without consuming `self` — safe to call from `Arc<TraceSpanService>`.
297    ///
298    /// Sends the shutdown signal to the buffering actor and engine actor.
299    /// Callers that own `self` should prefer `shutdown()` to await full drain.
300    pub async fn signal_shutdown(&self) {
301        info!("TraceSpanService signaling shutdown");
302        let _ = self.shutdown_tx.send(()).await;
303        let _ = self.engine_tx.send(TableCommand::Shutdown).await;
304    }
305
306    pub async fn shutdown(self) -> Result<(), TraceEngineError> {
307        info!("TraceSpanService shutting down");
308
309        let _ = self.shutdown_tx.send(()).await;
310
311        if let Err(e) = self.buffer_handle.await {
312            tracing::error!("Buffer handle error: {}", e);
313        }
314
315        self.engine_tx
316            .send(TableCommand::Shutdown)
317            .await
318            .map_err(|_| TraceEngineError::ChannelClosed)?;
319
320        if let Err(e) = self.engine_handle.await {
321            tracing::error!("Engine handle error: {}", e);
322        }
323
324        info!("TraceSpanService shutdown complete");
325        Ok(())
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332    use crate::parquet::tracing::queries::{
333        date_lit, ts_lit, PARTITION_DATE_COL, SPAN_TABLE_NAME, START_TIME_COL,
334    };
335    use arrow_array::Array;
336    use chrono::Utc;
337    use datafusion::logical_expr::col;
338    use scouter_mocks::generate_trace_with_spans;
339    use scouter_settings::ObjectStorageSettings;
340    use scouter_types::sql::TraceSpan;
341    use scouter_types::{Attribute, SpanId, TraceId, TraceSpanRecord};
342    use serde_json::Value;
343    use tracing_subscriber;
344
345    fn cleanup() {
346        let _ = tracing_subscriber::fmt()
347            .with_max_level(tracing::Level::INFO)
348            .try_init();
349
350        let storage_settings = ObjectStorageSettings::default();
351        let current_dir = std::env::current_dir().unwrap();
352        let storage_path = current_dir.join(storage_settings.storage_root());
353        if storage_path.exists() {
354            let _ = std::fs::remove_dir_all(storage_path);
355        }
356    }
357
358    /// Build a deterministic `TraceSpanRecord` with the given IDs and attributes.
359    fn make_span(
360        trace_id: &TraceId,
361        span_id: SpanId,
362        parent_span_id: Option<SpanId>,
363        service_name: &str,
364        span_name: &str,
365        attributes: Vec<Attribute>,
366    ) -> TraceSpanRecord {
367        let now = Utc::now();
368        TraceSpanRecord {
369            created_at: now,
370            trace_id: *trace_id,
371            span_id,
372            parent_span_id,
373            flags: 1,
374            trace_state: String::new(),
375            scope_name: "test.scope".to_string(),
376            scope_version: None,
377            span_name: span_name.to_string(),
378            span_kind: "INTERNAL".to_string(),
379            start_time: now,
380            end_time: now + chrono::Duration::milliseconds(100),
381            duration_ms: 100,
382            status_code: 0,
383            status_message: "OK".to_string(),
384            attributes,
385            events: vec![],
386            links: vec![],
387            label: None,
388            input: Value::Null,
389            output: Value::Null,
390            service_name: service_name.to_string(),
391            resource_attributes: vec![],
392        }
393    }
394
395    #[tokio::test]
396    async fn test_service_initialization() -> Result<(), TraceEngineError> {
397        cleanup();
398
399        let storage_settings = ObjectStorageSettings::default();
400        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
401        service.shutdown().await?;
402        cleanup();
403        Ok(())
404    }
405
406    #[tokio::test]
407    async fn test_dataframe_trace_write_single_batch() -> Result<(), TraceEngineError> {
408        cleanup();
409
410        let storage_settings = ObjectStorageSettings::default();
411        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
412
413        let (_trace_record, spans, _tags) = generate_trace_with_spans(3, 0);
414        info!("Test: writing {} spans", spans.len());
415
416        let first_trace_id = spans.first().unwrap().trace_id;
417        service.write_spans(spans).await?;
418
419        info!("Test: waiting for flush");
420        tokio::time::sleep(Duration::from_secs(5)).await;
421
422        let trace_id_bytes = first_trace_id.as_bytes();
423        let result_spans: Vec<TraceSpan> = service
424            .query_service
425            .get_trace_spans(Some(trace_id_bytes.as_slice()), None, None, None, None)
426            .await?;
427
428        assert!(
429            !result_spans.is_empty(),
430            "Expected at least 1 span but got 0"
431        );
432
433        service.shutdown().await?;
434        cleanup();
435        Ok(())
436    }
437
438    /// Verify that `build_span_tree` returns spans in DFS (depth-first) order with correct
439    /// depth, path, and root_span_id fields — matching what the Postgres recursive CTE produced.
440    #[tokio::test]
441    async fn test_span_tree_sort_order() -> Result<(), TraceEngineError> {
442        cleanup();
443
444        let storage_settings = ObjectStorageSettings::default();
445        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
446
447        // Build a deterministic tree: root → child → grandchild
448        let trace_id = TraceId::from_bytes([1u8; 16]);
449        let root_span_id = SpanId::from_bytes([1u8; 8]);
450        let child_span_id = SpanId::from_bytes([2u8; 8]);
451        let grandchild_span_id = SpanId::from_bytes([3u8; 8]);
452
453        let root = make_span(
454            &trace_id,
455            root_span_id.clone(),
456            None,
457            "svc",
458            "root_op",
459            vec![],
460        );
461        let child = make_span(
462            &trace_id,
463            child_span_id.clone(),
464            Some(root_span_id.clone()),
465            "svc",
466            "child_op",
467            vec![],
468        );
469        let grandchild = make_span(
470            &trace_id,
471            grandchild_span_id.clone(),
472            Some(child_span_id.clone()),
473            "svc",
474            "grandchild_op",
475            vec![],
476        );
477
478        service.write_spans(vec![root, child, grandchild]).await?;
479        tokio::time::sleep(Duration::from_secs(4)).await;
480
481        let spans: Vec<TraceSpan> = service
482            .query_service
483            .get_trace_spans(Some(trace_id.as_bytes().as_slice()), None, None, None, None)
484            .await?;
485
486        assert_eq!(spans.len(), 3, "Expected 3 spans");
487
488        // DFS order: root(0), child(1), grandchild(2)
489        let by_order: Vec<&TraceSpan> = {
490            let mut v: Vec<&TraceSpan> = spans.iter().collect();
491            v.sort_by_key(|s| s.span_order);
492            v
493        };
494
495        assert_eq!(
496            by_order[0].span_name, "root_op",
497            "span_order=0 should be root"
498        );
499        assert_eq!(by_order[0].depth, 0);
500        assert_eq!(by_order[0].path.len(), 1);
501
502        assert_eq!(
503            by_order[1].span_name, "child_op",
504            "span_order=1 should be child"
505        );
506        assert_eq!(by_order[1].depth, 1);
507        assert_eq!(by_order[1].path.len(), 2);
508
509        assert_eq!(
510            by_order[2].span_name, "grandchild_op",
511            "span_order=2 should be grandchild"
512        );
513        assert_eq!(by_order[2].depth, 2);
514        assert_eq!(by_order[2].path.len(), 3);
515
516        // All spans should share the same root_span_id
517        let root_sid = root_span_id.to_hex();
518        for span in &spans {
519            assert_eq!(
520                span.root_span_id, root_sid,
521                "root_span_id mismatch for {}",
522                span.span_name
523            );
524        }
525
526        service.shutdown().await?;
527        cleanup();
528        Ok(())
529    }
530
531    /// Verify `get_trace_metrics` returns time-bucketed aggregate rows.
532    #[tokio::test]
533    async fn test_trace_metrics_basic() -> Result<(), TraceEngineError> {
534        cleanup();
535
536        let storage_settings = ObjectStorageSettings::default();
537        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
538
539        let (_record, spans, _tags) = generate_trace_with_spans(5, 0);
540        service.write_spans(spans).await?;
541        tokio::time::sleep(Duration::from_secs(4)).await;
542
543        let start = Utc::now() - chrono::Duration::hours(1);
544        let end = Utc::now() + chrono::Duration::hours(1);
545
546        let metrics = service
547            .query_service
548            .get_trace_metrics(None, start, end, "hour", None, None)
549            .await?;
550
551        assert!(!metrics.is_empty(), "Expected at least one metric bucket");
552        assert!(metrics[0].trace_count > 0, "Expected non-zero trace count");
553
554        service.shutdown().await?;
555        cleanup();
556        Ok(())
557    }
558
559    /// Verify `get_trace_metrics` with a service_name filter excludes other services.
560    #[tokio::test]
561    async fn test_trace_metrics_service_filter() -> Result<(), TraceEngineError> {
562        cleanup();
563
564        let storage_settings = ObjectStorageSettings::default();
565        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
566
567        // Write spans for two distinct services using deterministic IDs
568        let trace_a = TraceId::from_bytes([10u8; 16]);
569        let trace_b = TraceId::from_bytes([20u8; 16]);
570
571        let span_a = make_span(
572            &trace_a,
573            SpanId::from_bytes([10u8; 8]),
574            None,
575            "service_alpha",
576            "op_a",
577            vec![],
578        );
579        let span_b = make_span(
580            &trace_b,
581            SpanId::from_bytes([20u8; 8]),
582            None,
583            "service_beta",
584            "op_b",
585            vec![],
586        );
587
588        service.write_spans(vec![span_a, span_b]).await?;
589        tokio::time::sleep(Duration::from_secs(4)).await;
590
591        let start = Utc::now() - chrono::Duration::hours(1);
592        let end = Utc::now() + chrono::Duration::hours(1);
593
594        // Filter to service_alpha only
595        let metrics_alpha = service
596            .query_service
597            .get_trace_metrics(Some("service_alpha"), start, end, "hour", None, None)
598            .await?;
599
600        // Filter to service_beta only
601        let metrics_beta = service
602            .query_service
603            .get_trace_metrics(Some("service_beta"), start, end, "hour", None, None)
604            .await?;
605
606        let alpha_count: i64 = metrics_alpha.iter().map(|m| m.trace_count).sum();
607        let beta_count: i64 = metrics_beta.iter().map(|m| m.trace_count).sum();
608
609        assert!(alpha_count > 0, "Expected non-zero count for service_alpha");
610        assert!(beta_count > 0, "Expected non-zero count for service_beta");
611
612        // Querying with a non-existent service returns nothing
613        let metrics_none = service
614            .query_service
615            .get_trace_metrics(Some("nonexistent_svc"), start, end, "hour", None, None)
616            .await?;
617        assert!(
618            metrics_none.is_empty(),
619            "Expected no buckets for nonexistent service"
620        );
621
622        service.shutdown().await?;
623        cleanup();
624        Ok(())
625    }
626
627    /// Verify all three DataFusion filter layers are wired correctly by inspecting the
628    /// physical plan produced for a partitioned time-window query.
629    ///
630    /// Layer 1 — partition pruning: `partition_date` appears in the plan because Delta Lake
631    /// pushes partition column filters to directory enumeration before reading any files.
632    ///
633    /// Layer 2 — row-group stats: `start_time` appears because typed Timestamp literals
634    /// enable Parquet min/max pruning across row groups.
635    ///
636    /// Layer 3 — bloom filter: querying a nonexistent `trace_id` within a tight window
637    /// returns 0 rows; bloom filters discard row groups instantly without page-level scanning.
638    #[tokio::test]
639    async fn test_query_plan_shows_filter_layers() -> Result<(), TraceEngineError> {
640        cleanup();
641
642        let storage_settings = ObjectStorageSettings::default();
643        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
644
645        // Write a known batch directly so it is immediately queryable
646        // Use distinct byte values that don't collide with other tests.
647        let trace_id = TraceId::from_bytes([0xAA_u8; 16]);
648        let root_id = SpanId::from_bytes([0xAA_u8; 8]);
649        let child_id = SpanId::from_bytes([0xBB_u8; 8]);
650        let spans = vec![
651            make_span(&trace_id, root_id.clone(), None, "svc-a", "root-op", vec![]),
652            make_span(
653                &trace_id,
654                child_id.clone(),
655                Some(root_id),
656                "svc-a",
657                "child-op",
658                vec![],
659            ),
660        ];
661        service.write_spans_direct(spans).await?;
662
663        // ── Layers 1 & 2: inspect physical plan ──────────────────────────────────
664        let now = Utc::now();
665        let start = now - chrono::Duration::hours(1);
666        let end = now + chrono::Duration::hours(1);
667
668        // Build the same DataFrame the query path would produce internally
669        let df = service
670            .ctx
671            .table(SPAN_TABLE_NAME)
672            .await
673            .map_err(TraceEngineError::DatafusionError)?;
674
675        // Partition filter — pushed to directory enumeration (Layer 1)
676        let df = df
677            .filter(
678                col(PARTITION_DATE_COL)
679                    .gt_eq(date_lit(&start))
680                    .and(col(PARTITION_DATE_COL).lt_eq(date_lit(&end))),
681            )
682            .map_err(TraceEngineError::DatafusionError)?;
683
684        // Timestamp filter — enables row-group min/max pruning (Layer 2)
685        let df = df
686            .filter(
687                col(START_TIME_COL)
688                    .gt_eq(ts_lit(&start))
689                    .and(col(START_TIME_COL).lt(ts_lit(&end))),
690            )
691            .map_err(TraceEngineError::DatafusionError)?;
692
693        // Collect the physical plan as a string via EXPLAIN
694        let explain_df = df
695            .explain(false, false)
696            .map_err(TraceEngineError::DatafusionError)?;
697        let batches = explain_df
698            .collect()
699            .await
700            .map_err(TraceEngineError::DatafusionError)?;
701
702        let plan_text: String = batches
703            .iter()
704            .flat_map(|b| {
705                let plan_col = b.column_by_name("plan").unwrap();
706                let arr =
707                    arrow::compute::cast(plan_col, &arrow::datatypes::DataType::Utf8).unwrap();
708                let s = arr
709                    .as_any()
710                    .downcast_ref::<arrow::array::StringArray>()
711                    .unwrap();
712                (0..s.len())
713                    .map(|i| s.value(i).to_string())
714                    .collect::<Vec<_>>()
715            })
716            .collect::<Vec<_>>()
717            .join("\n");
718
719        // Layer 1: partition filter must appear in plan
720        assert!(
721            plan_text.contains("partition_date"),
722            "Partition filter not found in physical plan:\n{plan_text}"
723        );
724        // Layer 2: start_time row-group filter must appear in plan
725        assert!(
726            plan_text.contains("start_time"),
727            "Row-group time filter not found in physical plan:\n{plan_text}"
728        );
729
730        // ── Layer 3: bloom filter — behavioral proof ──────────────────────────────
731        // A nonexistent trace_id with bloom filters enabled returns 0 rows.
732        // Without bloom filters, DataFusion would scan every row group to confirm absence.
733        let fake_id = TraceId::from_bytes([0xFF_u8; 16]);
734        let result = service
735            .query_service
736            .get_trace_spans(
737                Some(fake_id.as_bytes()),
738                None,
739                Some(&start),
740                Some(&end),
741                None,
742            )
743            .await?;
744        assert!(
745            result.is_empty(),
746            "Expected 0 spans for nonexistent trace_id"
747        );
748
749        service.shutdown().await?;
750        cleanup();
751        Ok(())
752    }
753
754    /// Verify `get_trace_metrics` with attribute_filters narrows results to matching spans.
755    #[tokio::test]
756    async fn test_trace_metrics_with_attribute_filter() -> Result<(), TraceEngineError> {
757        cleanup();
758
759        let storage_settings = ObjectStorageSettings::default();
760        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
761
762        let trace_kafka = TraceId::from_bytes([30u8; 16]);
763        let trace_http = TraceId::from_bytes([40u8; 16]);
764
765        // span with component:kafka attribute
766        let span_kafka = make_span(
767            &trace_kafka,
768            SpanId::from_bytes([30u8; 8]),
769            None,
770            "my_service",
771            "kafka_consumer",
772            vec![Attribute {
773                key: "component".to_string(),
774                value: Value::String("kafka".to_string()),
775            }],
776        );
777        // span with component:http attribute (should NOT match kafka filter)
778        let span_http = make_span(
779            &trace_http,
780            SpanId::from_bytes([40u8; 8]),
781            None,
782            "my_service",
783            "http_handler",
784            vec![Attribute {
785                key: "component".to_string(),
786                value: Value::String("http".to_string()),
787            }],
788        );
789
790        service.write_spans(vec![span_kafka, span_http]).await?;
791        tokio::time::sleep(Duration::from_secs(4)).await;
792
793        let start = Utc::now() - chrono::Duration::hours(1);
794        let end = Utc::now() + chrono::Duration::hours(1);
795        let kafka_filter = vec!["component:kafka".to_string()];
796
797        // With filter, only kafka trace should appear
798        let filtered = service
799            .query_service
800            .get_trace_metrics(None, start, end, "hour", Some(&kafka_filter), None)
801            .await?;
802
803        let filtered_count: i64 = filtered.iter().map(|m| m.trace_count).sum();
804        assert!(
805            filtered_count > 0,
806            "Expected non-zero count with kafka attribute filter"
807        );
808
809        // Without filter, both traces appear
810        let unfiltered = service
811            .query_service
812            .get_trace_metrics(None, start, end, "hour", None, None)
813            .await?;
814        let unfiltered_count: i64 = unfiltered.iter().map(|m| m.trace_count).sum();
815        assert!(
816            unfiltered_count >= filtered_count,
817            "Unfiltered count ({}) should be >= filtered count ({})",
818            unfiltered_count,
819            filtered_count
820        );
821
822        service.shutdown().await?;
823        cleanup();
824        Ok(())
825    }
826
827    /// Regression test for stale DataFusion session state after Delta Lake writes.
828    ///
829    /// Before the fix, `SessionContext` cached file metadata from the first write and
830    /// subsequent writes were invisible to queries — the session held a stale snapshot
831    /// of the Delta log. The fix calls `update_datafusion_session()` after each write
832    /// to re-register the table provider with the latest Delta log state.
833    #[tokio::test]
834    async fn test_span_write_visibility_across_multiple_writes() -> Result<(), TraceEngineError> {
835        cleanup();
836
837        let storage_settings = ObjectStorageSettings::default();
838        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
839
840        let start = Utc::now() - chrono::Duration::hours(1);
841        let end = Utc::now() + chrono::Duration::hours(1);
842
843        // Write batch #1 (2 spans)
844        let trace1 = TraceId::from_bytes([0xB0; 16]);
845        let spans1 = vec![
846            make_span(
847                &trace1,
848                SpanId::from_bytes([0xB0; 8]),
849                None,
850                "svc_vis",
851                "op1",
852                vec![],
853            ),
854            make_span(
855                &trace1,
856                SpanId::from_bytes([0xB1; 8]),
857                Some(SpanId::from_bytes([0xB0; 8])),
858                "svc_vis",
859                "op2",
860                vec![],
861            ),
862        ];
863        service.write_spans_direct(spans1).await?;
864
865        let result = service
866            .query_service
867            .get_trace_spans(
868                Some(trace1.as_bytes()),
869                None,
870                Some(&start),
871                Some(&end),
872                None,
873            )
874            .await?;
875        assert_eq!(
876            result.len(),
877            2,
878            "After write #1: expected 2 spans, got {}",
879            result.len()
880        );
881
882        // Write batch #2 (2 more spans, different trace)
883        let trace2 = TraceId::from_bytes([0xB2; 16]);
884        let spans2 = vec![
885            make_span(
886                &trace2,
887                SpanId::from_bytes([0xB2; 8]),
888                None,
889                "svc_vis",
890                "op3",
891                vec![],
892            ),
893            make_span(
894                &trace2,
895                SpanId::from_bytes([0xB3; 8]),
896                Some(SpanId::from_bytes([0xB2; 8])),
897                "svc_vis",
898                "op4",
899                vec![],
900            ),
901        ];
902        service.write_spans_direct(spans2).await?;
903
904        let result = service
905            .query_service
906            .get_trace_spans(
907                Some(trace2.as_bytes()),
908                None,
909                Some(&start),
910                Some(&end),
911                None,
912            )
913            .await?;
914        assert_eq!(
915            result.len(),
916            2,
917            "After write #2: expected 2 spans for trace2, got {} (stale snapshot?)",
918            result.len()
919        );
920
921        // Write batch #3 (2 more spans, third trace)
922        let trace3 = TraceId::from_bytes([0xB4; 16]);
923        let spans3 = vec![
924            make_span(
925                &trace3,
926                SpanId::from_bytes([0xB4; 8]),
927                None,
928                "svc_vis",
929                "op5",
930                vec![],
931            ),
932            make_span(
933                &trace3,
934                SpanId::from_bytes([0xB5; 8]),
935                Some(SpanId::from_bytes([0xB4; 8])),
936                "svc_vis",
937                "op6",
938                vec![],
939            ),
940        ];
941        service.write_spans_direct(spans3).await?;
942
943        let result = service
944            .query_service
945            .get_trace_spans(
946                Some(trace3.as_bytes()),
947                None,
948                Some(&start),
949                Some(&end),
950                None,
951            )
952            .await?;
953        assert_eq!(
954            result.len(),
955            2,
956            "After write #3: expected 2 spans for trace3, got {} (stale snapshot?)",
957            result.len()
958        );
959
960        service.shutdown().await?;
961        cleanup();
962        Ok(())
963    }
964}