Skip to main content

scouter_dataframe/parquet/tracing/
service.rs

1use crate::error::TraceEngineError;
2use crate::parquet::tracing::engine::{TableCommand, TraceSpanDBEngine};
3use crate::parquet::tracing::queries::TraceQueries;
4use crate::storage::ObjectStore;
5use datafusion::prelude::SessionContext;
6use scouter_settings::ObjectStorageSettings;
7use scouter_types::TraceSpanRecord;
8use std::sync::Arc;
9use tokio::sync::mpsc;
10use tokio::time::{interval, Duration};
11use tracing::{debug, info};
12
13const FLUSH_INTERVAL_SECS: u64 = 5;
14
15/// Global singleton for the TraceSpanService.
16///
17/// Initialized via `init_trace_span_service()` in server setup.
18/// Consumer workers call `get_trace_span_service()` to obtain the Arc for writing.
19/// Uses `RwLock<Option<...>>` so tests can re-initialize with a fresh service.
20static TRACE_SPAN_SERVICE: std::sync::RwLock<Option<Arc<TraceSpanService>>> =
21    std::sync::RwLock::new(None);
22
23/// Initialize the global `TraceSpanService`.
24///
25/// If a previous service exists (e.g. in test re-initialization), it is signaled to
26/// shut down before being replaced.
27pub async fn init_trace_span_service(
28    storage_settings: &ObjectStorageSettings,
29    compaction_interval_hours: u64,
30    flush_interval_secs: Option<u64>,
31    retention_days: Option<u32>,
32    refresh_interval_secs: u64,
33) -> Result<Arc<TraceSpanService>, TraceEngineError> {
34    // Shut down any existing service before replacing
35    let old_service = {
36        let guard = TRACE_SPAN_SERVICE.read().unwrap();
37        guard.clone()
38    };
39    if let Some(old) = old_service {
40        info!("Shutting down previous TraceSpanService before re-initialization");
41        old.signal_shutdown().await;
42    }
43
44    let service = Arc::new(
45        TraceSpanService::new(
46            storage_settings,
47            compaction_interval_hours,
48            flush_interval_secs,
49            retention_days,
50            refresh_interval_secs,
51        )
52        .await?,
53    );
54
55    {
56        let mut guard = TRACE_SPAN_SERVICE.write().unwrap();
57        *guard = Some(service.clone());
58    }
59
60    info!("TraceSpanService global singleton initialized");
61    Ok(service)
62}
63
64/// Retrieve the global `TraceSpanService` initialized during startup.
65///
66/// Returns `None` if called before `init_trace_span_service()`.
67pub fn get_trace_span_service() -> Option<Arc<TraceSpanService>> {
68    TRACE_SPAN_SERVICE.read().unwrap().clone()
69}
70
71pub struct TraceSpanService {
72    engine_tx: mpsc::Sender<TableCommand>,
73    span_tx: mpsc::Sender<Vec<TraceSpanRecord>>,
74    shutdown_tx: mpsc::Sender<()>,
75    engine_handle: tokio::task::JoinHandle<()>,
76    buffer_handle: tokio::task::JoinHandle<()>,
77    pub query_service: TraceQueries,
78    /// Shared SessionContext — exposes `trace_spans` registration for TraceSummaryService.
79    pub ctx: Arc<SessionContext>,
80    /// Shared ObjectStore — passed to TraceSummaryService so both engines use the same
81    /// CachingStore instance, preventing stale reads on cloud backends (GCS/S3).
82    pub object_store: ObjectStore,
83}
84
85impl TraceSpanService {
86    /// Create a new `TraceSpanService` with the given storage settings and start the engine and buffering actors.
87    /// The buffering actor will flush spans to storage when the buffer reaches capacity or after a time interval.
88    /// # Arguments
89    /// * `storage_settings` - Configuration for object storage where trace spans will be persisted.
90    /// * `compaction_interval_hours` - How often the engine should perform compaction
91    ///   (merging small files into larger ones). Longer intervals reduce write amplification
92    ///   but may increase read latency.
93    /// * `flush_interval_secs` - Optional interval in seconds for flushing the buffer to storage
94    /// * `refresh_interval_secs` - How often to refresh the in-memory Delta table snapshot from shared object storage. Useful for multi-pod deployments to pick up new commits.
95    pub async fn new(
96        storage_settings: &ObjectStorageSettings,
97        compaction_interval_hours: u64,
98        flush_interval_secs: Option<u64>,
99        retention_days: Option<u32>,
100        refresh_interval_secs: u64,
101    ) -> Result<Self, TraceEngineError> {
102        let buffer_size = storage_settings.trace_buffer_size();
103        let engine = TraceSpanDBEngine::new(storage_settings).await?;
104
105        info!(
106            "TraceSpanService initialized with buffer_size: {}",
107            buffer_size
108        );
109
110        let ctx = engine.ctx.clone();
111        let object_store = engine.object_store.clone();
112        let (engine_tx, engine_handle) = engine.start_actor(
113            compaction_interval_hours,
114            retention_days,
115            refresh_interval_secs,
116        );
117        let (span_tx, span_rx) = mpsc::channel::<Vec<TraceSpanRecord>>(100);
118        let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
119
120        let buffer_handle = Self::start_buffering_actor(
121            engine_tx.clone(),
122            span_rx,
123            shutdown_rx,
124            flush_interval_secs,
125            buffer_size,
126        );
127
128        Ok(TraceSpanService {
129            engine_tx,
130            span_tx,
131            shutdown_tx,
132            engine_handle,
133            buffer_handle,
134            query_service: TraceQueries::new(ctx.clone()),
135            ctx,
136            object_store,
137        })
138    }
139
140    fn start_buffering_actor(
141        engine_tx: mpsc::Sender<TableCommand>,
142        mut span_rx: mpsc::Receiver<Vec<TraceSpanRecord>>,
143        mut shutdown_rx: mpsc::Receiver<()>,
144        flush_interval_secs: Option<u64>,
145        buffer_size: usize,
146    ) -> tokio::task::JoinHandle<()> {
147        tokio::spawn(async move {
148            let mut buffer: Vec<TraceSpanRecord> = Vec::with_capacity(buffer_size);
149            let mut flush_ticker = interval(Duration::from_secs(
150                flush_interval_secs.unwrap_or(FLUSH_INTERVAL_SECS),
151            ));
152            flush_ticker.tick().await;
153
154            loop {
155                tokio::select! {
156                    Some(spans) = span_rx.recv() => {
157                        buffer.extend(spans);
158                        if buffer.len() >= buffer_size {
159                            Self::flush_buffer(&engine_tx, &mut buffer).await;
160                        }
161                    }
162                    _ = flush_ticker.tick() => {
163                        if !buffer.is_empty() {
164                            info!("Flushing spans buffer with {} spans", buffer.len());
165                            Self::flush_buffer(&engine_tx, &mut buffer).await;
166                        }
167                    }
168                    _ = shutdown_rx.recv() => {
169                        info!("Buffer actor received shutdown signal");
170                        if !buffer.is_empty() {
171                            info!("Flushing final {} spans before shutdown", buffer.len());
172                            Self::flush_buffer(&engine_tx, &mut buffer).await;
173                        }
174                        break;
175                    }
176                }
177            }
178
179            info!("Buffering actor shutting down");
180        })
181    }
182
183    async fn flush_buffer(
184        engine_tx: &mpsc::Sender<TableCommand>,
185        buffer: &mut Vec<TraceSpanRecord>,
186    ) {
187        if buffer.is_empty() {
188            return;
189        }
190
191        let capacity = buffer.capacity();
192        let spans_to_write = std::mem::replace(buffer, Vec::with_capacity(capacity));
193        let span_count = spans_to_write.len();
194
195        debug!("Sending write command to engine for {} spans", span_count);
196
197        let (tx, rx) = tokio::sync::oneshot::channel();
198
199        if let Err(e) = engine_tx
200            .send(TableCommand::Write {
201                spans: spans_to_write,
202                respond_to: tx,
203            })
204            .await
205        {
206            tracing::error!("Failed to send write command: {}", e);
207            return;
208        }
209
210        match rx.await {
211            Ok(Ok(())) => info!("Successfully flushed {} spans", span_count),
212            Ok(Err(e)) => tracing::error!("Write failed: {}", e),
213            Err(e) => tracing::error!("Failed to receive write response: {}", e),
214        }
215    }
216
217    /// Send spans to the buffering actor for async write to Delta Lake.
218    /// # Arguments
219    /// * `spans` - A batch of `TraceSpanRecord` to write. The buffering actor will flush to storage
220    ///   when the buffer reaches capacity or after a time interval.
221    pub async fn write_spans(&self, spans: Vec<TraceSpanRecord>) -> Result<(), TraceEngineError> {
222        self.span_tx
223            .send(spans)
224            .await
225            .map_err(|_| TraceEngineError::ChannelClosed)?;
226        Ok(())
227    }
228
229    /// Write spans directly to the engine actor, bypassing the buffer.
230    ///
231    /// Unlike `write_spans()`, this method sends a single large batch as one
232    /// Delta Lake commit and awaits the result. Use for bulk seeding/migration
233    /// where you need deterministic commit boundaries and maximum throughput.
234    /// This is used in benchmarks and stress tests to simulate high-volume writes without caching effects
235    pub async fn write_spans_direct(
236        &self,
237        spans: Vec<TraceSpanRecord>,
238    ) -> Result<(), TraceEngineError> {
239        let (tx, rx) = tokio::sync::oneshot::channel();
240        self.engine_tx
241            .send(TableCommand::Write {
242                spans,
243                respond_to: tx,
244            })
245            .await
246            .map_err(|_| TraceEngineError::ChannelClosed)?;
247        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
248    }
249
250    // Admin operations — these can be called directly or via the control table for single-writer cron-style execution across
251    // Primarily used in tests and benches. In Production, we default to the control table approach to ensure only worker runs them at a time, but the direct methods remain available for manual invocation when needed.
252    pub async fn optimize(&self) -> Result<(), TraceEngineError> {
253        let (tx, rx) = tokio::sync::oneshot::channel();
254
255        self.engine_tx
256            .send(TableCommand::Optimize { respond_to: tx })
257            .await
258            .map_err(|_| TraceEngineError::ChannelClosed)?;
259
260        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
261    }
262
263    pub async fn vacuum(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
264        let (tx, rx) = tokio::sync::oneshot::channel();
265
266        self.engine_tx
267            .send(TableCommand::Vacuum {
268                retention_hours,
269                respond_to: tx,
270            })
271            .await
272            .map_err(|_| TraceEngineError::ChannelClosed)?;
273
274        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
275    }
276
277    /// Delete all spans with a `partition_date` older than `retention_days` ago,
278    /// then run VACUUM to physically reclaim disk space.
279    ///
280    /// Call order matters: DELETE marks files as unreferenced in the Delta log;
281    /// VACUUM then removes the orphaned Parquet files from storage.
282    pub async fn expire(&self, retention_days: u32) -> Result<(), TraceEngineError> {
283        let cutoff_date =
284            (chrono::Utc::now() - chrono::Duration::days(retention_days as i64)).date_naive();
285
286        // Step 1: DELETE WHERE partition_date < cutoff (marks rows removed in Delta log)
287        let (tx, rx) = tokio::sync::oneshot::channel();
288        self.engine_tx
289            .send(TableCommand::Expire {
290                cutoff_date,
291                respond_to: tx,
292            })
293            .await
294            .map_err(|_| TraceEngineError::ChannelClosed)?;
295        rx.await.map_err(|_| TraceEngineError::ChannelClosed)??;
296
297        // Step 2: VACUUM — physically deletes orphaned files from storage.
298        // retention_hours=0 with enforce_retention_duration=false removes all
299        // post-DELETE orphans immediately.
300        self.vacuum(0).await
301    }
302
303    /// Signal shutdown without consuming `self` — safe to call from `Arc<TraceSpanService>`.
304    ///
305    /// Sends the shutdown signal to the buffering actor and engine actor.
306    /// Callers that own `self` should prefer `shutdown()` to await full drain.
307    pub async fn signal_shutdown(&self) {
308        info!("TraceSpanService signaling shutdown");
309        let _ = self.shutdown_tx.send(()).await;
310        let _ = self.engine_tx.send(TableCommand::Shutdown).await;
311    }
312
313    pub async fn shutdown(self) -> Result<(), TraceEngineError> {
314        info!("TraceSpanService shutting down");
315
316        let _ = self.shutdown_tx.send(()).await;
317
318        if let Err(e) = self.buffer_handle.await {
319            tracing::error!("Buffer handle error: {}", e);
320        }
321
322        self.engine_tx
323            .send(TableCommand::Shutdown)
324            .await
325            .map_err(|_| TraceEngineError::ChannelClosed)?;
326
327        if let Err(e) = self.engine_handle.await {
328            tracing::error!("Engine handle error: {}", e);
329        }
330
331        info!("TraceSpanService shutdown complete");
332        Ok(())
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339    use crate::parquet::tracing::queries::{
340        date_lit, ts_lit, PARTITION_DATE_COL, SPAN_TABLE_NAME, START_TIME_COL,
341    };
342    use arrow_array::Array;
343    use chrono::Utc;
344    use datafusion::logical_expr::col;
345    use scouter_mocks::generate_trace_with_spans;
346    use scouter_settings::ObjectStorageSettings;
347    use scouter_types::sql::TraceSpan;
348    use scouter_types::{Attribute, SpanId, TraceId, TraceSpanRecord};
349    use serde_json::Value;
350    use tracing_subscriber;
351
352    fn cleanup() {
353        let _ = tracing_subscriber::fmt()
354            .with_max_level(tracing::Level::INFO)
355            .try_init();
356
357        let storage_settings = ObjectStorageSettings::default();
358        let current_dir = std::env::current_dir().unwrap();
359        let storage_path = current_dir.join(storage_settings.storage_root());
360        if storage_path.exists() {
361            let _ = std::fs::remove_dir_all(storage_path);
362        }
363    }
364
365    /// Build a deterministic `TraceSpanRecord` with the given IDs and attributes.
366    fn make_span(
367        trace_id: &TraceId,
368        span_id: SpanId,
369        parent_span_id: Option<SpanId>,
370        service_name: &str,
371        span_name: &str,
372        attributes: Vec<Attribute>,
373    ) -> TraceSpanRecord {
374        let now = Utc::now();
375        TraceSpanRecord {
376            created_at: now,
377            trace_id: *trace_id,
378            span_id,
379            parent_span_id,
380            flags: 1,
381            trace_state: String::new(),
382            scope_name: "test.scope".to_string(),
383            scope_version: None,
384            span_name: span_name.to_string(),
385            span_kind: "INTERNAL".to_string(),
386            start_time: now,
387            end_time: now + chrono::Duration::milliseconds(100),
388            duration_ms: 100,
389            status_code: 0,
390            status_message: "OK".to_string(),
391            attributes,
392            events: vec![],
393            links: vec![],
394            label: None,
395            input: Value::Null,
396            output: Value::Null,
397            service_name: service_name.to_string(),
398            resource_attributes: vec![],
399        }
400    }
401
402    #[tokio::test]
403    async fn test_service_initialization() -> Result<(), TraceEngineError> {
404        cleanup();
405
406        let storage_settings = ObjectStorageSettings::default();
407        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
408        service.shutdown().await?;
409        cleanup();
410        Ok(())
411    }
412
413    #[tokio::test]
414    async fn test_dataframe_trace_write_single_batch() -> Result<(), TraceEngineError> {
415        cleanup();
416
417        let storage_settings = ObjectStorageSettings::default();
418        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
419
420        let (_trace_record, spans, _tags) = generate_trace_with_spans(3, 0);
421        info!("Test: writing {} spans", spans.len());
422
423        let first_trace_id = spans.first().unwrap().trace_id;
424        service.write_spans(spans).await?;
425
426        info!("Test: waiting for flush");
427        tokio::time::sleep(Duration::from_secs(5)).await;
428
429        let trace_id_bytes = first_trace_id.as_bytes();
430        let result_spans: Vec<TraceSpan> = service
431            .query_service
432            .get_trace_spans(Some(trace_id_bytes.as_slice()), None, None, None, None)
433            .await?;
434
435        assert!(
436            !result_spans.is_empty(),
437            "Expected at least 1 span but got 0"
438        );
439
440        service.shutdown().await?;
441        cleanup();
442        Ok(())
443    }
444
445    /// Verify that `build_span_tree` returns spans in DFS (depth-first) order with correct
446    /// depth, path, and root_span_id fields — matching what the Postgres recursive CTE produced.
447    #[tokio::test]
448    async fn test_span_tree_sort_order() -> Result<(), TraceEngineError> {
449        cleanup();
450
451        let storage_settings = ObjectStorageSettings::default();
452        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
453
454        // Build a deterministic tree: root → child → grandchild
455        let trace_id = TraceId::from_bytes([1u8; 16]);
456        let root_span_id = SpanId::from_bytes([1u8; 8]);
457        let child_span_id = SpanId::from_bytes([2u8; 8]);
458        let grandchild_span_id = SpanId::from_bytes([3u8; 8]);
459
460        let root = make_span(
461            &trace_id,
462            root_span_id.clone(),
463            None,
464            "svc",
465            "root_op",
466            vec![],
467        );
468        let child = make_span(
469            &trace_id,
470            child_span_id.clone(),
471            Some(root_span_id.clone()),
472            "svc",
473            "child_op",
474            vec![],
475        );
476        let grandchild = make_span(
477            &trace_id,
478            grandchild_span_id.clone(),
479            Some(child_span_id.clone()),
480            "svc",
481            "grandchild_op",
482            vec![],
483        );
484
485        service.write_spans(vec![root, child, grandchild]).await?;
486        tokio::time::sleep(Duration::from_secs(4)).await;
487
488        let spans: Vec<TraceSpan> = service
489            .query_service
490            .get_trace_spans(Some(trace_id.as_bytes().as_slice()), None, None, None, None)
491            .await?;
492
493        assert_eq!(spans.len(), 3, "Expected 3 spans");
494
495        // DFS order: root(0), child(1), grandchild(2)
496        let by_order: Vec<&TraceSpan> = {
497            let mut v: Vec<&TraceSpan> = spans.iter().collect();
498            v.sort_by_key(|s| s.span_order);
499            v
500        };
501
502        assert_eq!(
503            by_order[0].span_name, "root_op",
504            "span_order=0 should be root"
505        );
506        assert_eq!(by_order[0].depth, 0);
507        assert_eq!(by_order[0].path.len(), 1);
508
509        assert_eq!(
510            by_order[1].span_name, "child_op",
511            "span_order=1 should be child"
512        );
513        assert_eq!(by_order[1].depth, 1);
514        assert_eq!(by_order[1].path.len(), 2);
515
516        assert_eq!(
517            by_order[2].span_name, "grandchild_op",
518            "span_order=2 should be grandchild"
519        );
520        assert_eq!(by_order[2].depth, 2);
521        assert_eq!(by_order[2].path.len(), 3);
522
523        // All spans should share the same root_span_id
524        let root_sid = root_span_id.to_hex();
525        for span in &spans {
526            assert_eq!(
527                span.root_span_id, root_sid,
528                "root_span_id mismatch for {}",
529                span.span_name
530            );
531        }
532
533        service.shutdown().await?;
534        cleanup();
535        Ok(())
536    }
537
538    /// Verify `get_trace_metrics` returns time-bucketed aggregate rows.
539    #[tokio::test]
540    async fn test_trace_metrics_basic() -> Result<(), TraceEngineError> {
541        cleanup();
542
543        let storage_settings = ObjectStorageSettings::default();
544        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
545
546        let (_record, spans, _tags) = generate_trace_with_spans(5, 0);
547        service.write_spans(spans).await?;
548        tokio::time::sleep(Duration::from_secs(4)).await;
549
550        let start = Utc::now() - chrono::Duration::hours(1);
551        let end = Utc::now() + chrono::Duration::hours(1);
552
553        let metrics = service
554            .query_service
555            .get_trace_metrics(None, start, end, "hour", None, None)
556            .await?;
557
558        assert!(!metrics.is_empty(), "Expected at least one metric bucket");
559        assert!(metrics[0].trace_count > 0, "Expected non-zero trace count");
560
561        service.shutdown().await?;
562        cleanup();
563        Ok(())
564    }
565
566    /// Verify `get_trace_metrics` with a service_name filter excludes other services.
567    #[tokio::test]
568    async fn test_trace_metrics_service_filter() -> Result<(), TraceEngineError> {
569        cleanup();
570
571        let storage_settings = ObjectStorageSettings::default();
572        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
573
574        // Write spans for two distinct services using deterministic IDs
575        let trace_a = TraceId::from_bytes([10u8; 16]);
576        let trace_b = TraceId::from_bytes([20u8; 16]);
577
578        let span_a = make_span(
579            &trace_a,
580            SpanId::from_bytes([10u8; 8]),
581            None,
582            "service_alpha",
583            "op_a",
584            vec![],
585        );
586        let span_b = make_span(
587            &trace_b,
588            SpanId::from_bytes([20u8; 8]),
589            None,
590            "service_beta",
591            "op_b",
592            vec![],
593        );
594
595        service.write_spans(vec![span_a, span_b]).await?;
596        tokio::time::sleep(Duration::from_secs(4)).await;
597
598        let start = Utc::now() - chrono::Duration::hours(1);
599        let end = Utc::now() + chrono::Duration::hours(1);
600
601        // Filter to service_alpha only
602        let metrics_alpha = service
603            .query_service
604            .get_trace_metrics(Some("service_alpha"), start, end, "hour", None, None)
605            .await?;
606
607        // Filter to service_beta only
608        let metrics_beta = service
609            .query_service
610            .get_trace_metrics(Some("service_beta"), start, end, "hour", None, None)
611            .await?;
612
613        let alpha_count: i64 = metrics_alpha.iter().map(|m| m.trace_count).sum();
614        let beta_count: i64 = metrics_beta.iter().map(|m| m.trace_count).sum();
615
616        assert!(alpha_count > 0, "Expected non-zero count for service_alpha");
617        assert!(beta_count > 0, "Expected non-zero count for service_beta");
618
619        // Querying with a non-existent service returns nothing
620        let metrics_none = service
621            .query_service
622            .get_trace_metrics(Some("nonexistent_svc"), start, end, "hour", None, None)
623            .await?;
624        assert!(
625            metrics_none.is_empty(),
626            "Expected no buckets for nonexistent service"
627        );
628
629        service.shutdown().await?;
630        cleanup();
631        Ok(())
632    }
633
634    /// Verify all three DataFusion filter layers are wired correctly by inspecting the
635    /// physical plan produced for a partitioned time-window query.
636    ///
637    /// Layer 1 — partition pruning: `partition_date` appears in the plan because Delta Lake
638    /// pushes partition column filters to directory enumeration before reading any files.
639    ///
640    /// Layer 2 — row-group stats: `start_time` appears because typed Timestamp literals
641    /// enable Parquet min/max pruning across row groups.
642    ///
643    /// Layer 3 — bloom filter: querying a nonexistent `trace_id` within a tight window
644    /// returns 0 rows; bloom filters discard row groups instantly without page-level scanning.
645    #[tokio::test]
646    async fn test_query_plan_shows_filter_layers() -> Result<(), TraceEngineError> {
647        cleanup();
648
649        let storage_settings = ObjectStorageSettings::default();
650        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
651
652        // Write a known batch directly so it is immediately queryable
653        // Use distinct byte values that don't collide with other tests.
654        let trace_id = TraceId::from_bytes([0xAA_u8; 16]);
655        let root_id = SpanId::from_bytes([0xAA_u8; 8]);
656        let child_id = SpanId::from_bytes([0xBB_u8; 8]);
657        let spans = vec![
658            make_span(&trace_id, root_id.clone(), None, "svc-a", "root-op", vec![]),
659            make_span(
660                &trace_id,
661                child_id.clone(),
662                Some(root_id),
663                "svc-a",
664                "child-op",
665                vec![],
666            ),
667        ];
668        service.write_spans_direct(spans).await?;
669
670        // ── Layers 1 & 2: inspect physical plan ──────────────────────────────────
671        let now = Utc::now();
672        let start = now - chrono::Duration::hours(1);
673        let end = now + chrono::Duration::hours(1);
674
675        // Build the same DataFrame the query path would produce internally
676        let df = service
677            .ctx
678            .table(SPAN_TABLE_NAME)
679            .await
680            .map_err(TraceEngineError::DatafusionError)?;
681
682        // Partition filter — pushed to directory enumeration (Layer 1)
683        let df = df
684            .filter(
685                col(PARTITION_DATE_COL)
686                    .gt_eq(date_lit(&start))
687                    .and(col(PARTITION_DATE_COL).lt_eq(date_lit(&end))),
688            )
689            .map_err(TraceEngineError::DatafusionError)?;
690
691        // Timestamp filter — enables row-group min/max pruning (Layer 2)
692        let df = df
693            .filter(
694                col(START_TIME_COL)
695                    .gt_eq(ts_lit(&start))
696                    .and(col(START_TIME_COL).lt(ts_lit(&end))),
697            )
698            .map_err(TraceEngineError::DatafusionError)?;
699
700        // Collect the physical plan as a string via EXPLAIN
701        let explain_df = df
702            .explain(false, false)
703            .map_err(TraceEngineError::DatafusionError)?;
704        let batches = explain_df
705            .collect()
706            .await
707            .map_err(TraceEngineError::DatafusionError)?;
708
709        let plan_text: String = batches
710            .iter()
711            .flat_map(|b| {
712                let plan_col = b.column_by_name("plan").unwrap();
713                let arr =
714                    arrow::compute::cast(plan_col, &arrow::datatypes::DataType::Utf8).unwrap();
715                let s = arr
716                    .as_any()
717                    .downcast_ref::<arrow::array::StringArray>()
718                    .unwrap();
719                (0..s.len())
720                    .map(|i| s.value(i).to_string())
721                    .collect::<Vec<_>>()
722            })
723            .collect::<Vec<_>>()
724            .join("\n");
725
726        // Layer 1: partition filter must appear in plan
727        assert!(
728            plan_text.contains("partition_date"),
729            "Partition filter not found in physical plan:\n{plan_text}"
730        );
731        // Layer 2: start_time row-group filter must appear in plan
732        assert!(
733            plan_text.contains("start_time"),
734            "Row-group time filter not found in physical plan:\n{plan_text}"
735        );
736
737        // ── Layer 3: bloom filter — behavioral proof ──────────────────────────────
738        // A nonexistent trace_id with bloom filters enabled returns 0 rows.
739        // Without bloom filters, DataFusion would scan every row group to confirm absence.
740        let fake_id = TraceId::from_bytes([0xFF_u8; 16]);
741        let result = service
742            .query_service
743            .get_trace_spans(
744                Some(fake_id.as_bytes()),
745                None,
746                Some(&start),
747                Some(&end),
748                None,
749            )
750            .await?;
751        assert!(
752            result.is_empty(),
753            "Expected 0 spans for nonexistent trace_id"
754        );
755
756        service.shutdown().await?;
757        cleanup();
758        Ok(())
759    }
760
761    /// Verify `get_trace_metrics` with attribute_filters narrows results to matching spans.
762    #[tokio::test]
763    async fn test_trace_metrics_with_attribute_filter() -> Result<(), TraceEngineError> {
764        cleanup();
765
766        let storage_settings = ObjectStorageSettings::default();
767        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
768
769        let trace_kafka = TraceId::from_bytes([30u8; 16]);
770        let trace_http = TraceId::from_bytes([40u8; 16]);
771
772        // span with component:kafka attribute
773        let span_kafka = make_span(
774            &trace_kafka,
775            SpanId::from_bytes([30u8; 8]),
776            None,
777            "my_service",
778            "kafka_consumer",
779            vec![Attribute {
780                key: "component".to_string(),
781                value: Value::String("kafka".to_string()),
782            }],
783        );
784        // span with component:http attribute (should NOT match kafka filter)
785        let span_http = make_span(
786            &trace_http,
787            SpanId::from_bytes([40u8; 8]),
788            None,
789            "my_service",
790            "http_handler",
791            vec![Attribute {
792                key: "component".to_string(),
793                value: Value::String("http".to_string()),
794            }],
795        );
796
797        service.write_spans(vec![span_kafka, span_http]).await?;
798        tokio::time::sleep(Duration::from_secs(4)).await;
799
800        let start = Utc::now() - chrono::Duration::hours(1);
801        let end = Utc::now() + chrono::Duration::hours(1);
802        let kafka_filter = vec!["component:kafka".to_string()];
803
804        // With filter, only kafka trace should appear
805        let filtered = service
806            .query_service
807            .get_trace_metrics(None, start, end, "hour", Some(&kafka_filter), None)
808            .await?;
809
810        let filtered_count: i64 = filtered.iter().map(|m| m.trace_count).sum();
811        assert!(
812            filtered_count > 0,
813            "Expected non-zero count with kafka attribute filter"
814        );
815
816        // Without filter, both traces appear
817        let unfiltered = service
818            .query_service
819            .get_trace_metrics(None, start, end, "hour", None, None)
820            .await?;
821        let unfiltered_count: i64 = unfiltered.iter().map(|m| m.trace_count).sum();
822        assert!(
823            unfiltered_count >= filtered_count,
824            "Unfiltered count ({}) should be >= filtered count ({})",
825            unfiltered_count,
826            filtered_count
827        );
828
829        service.shutdown().await?;
830        cleanup();
831        Ok(())
832    }
833
834    /// Simulate a 2-pod deployment: writer pod commits, reader pod picks it up via refresh.
835    ///
836    /// Two `TraceSpanService` instances share the same local storage directory (same as
837    /// sharing GCS/S3 in production). The writer commits spans directly; the reader's refresh
838    /// ticker (1s interval) picks up the new Delta log entry and re-registers the SessionContext.
839    ///
840    /// Note: we use `TraceSpanService::new()` directly (not `init_trace_span_service()`) because
841    /// the global singleton pattern shuts down the previous service on re-init, which would
842    /// kill the writer before the reader can see its data.
843    #[tokio::test]
844    async fn test_distributed_refresh() -> Result<(), TraceEngineError> {
845        cleanup();
846
847        let storage_settings = ObjectStorageSettings::default();
848
849        // "Writer pod" — standard refresh interval (won't need to refresh since it's the writer)
850        let writer = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
851
852        // "Reader pod" — 1s refresh interval for fast test turnaround
853        let reader = TraceSpanService::new(&storage_settings, 24, Some(2), None, 1).await?;
854
855        // Write spans via the writer using a deterministic trace ID
856        let trace_id = TraceId::from_bytes([0xDD_u8; 16]);
857        let span = make_span(
858            &trace_id,
859            SpanId::from_bytes([0xDD_u8; 8]),
860            None,
861            "distributed-svc",
862            "test-op",
863            vec![],
864        );
865        writer.write_spans_direct(vec![span]).await?;
866
867        // Wait for the reader's refresh ticker to fire (1s interval + margin)
868        tokio::time::sleep(Duration::from_secs(3)).await;
869
870        // Reader should now see the data written by the writer
871        let results = reader
872            .query_service
873            .get_trace_spans(Some(trace_id.as_bytes()), None, None, None, None)
874            .await?;
875
876        assert!(
877            !results.is_empty(),
878            "Reader pod should see spans written by writer pod after refresh"
879        );
880
881        writer.shutdown().await?;
882        reader.shutdown().await?;
883        cleanup();
884        Ok(())
885    }
886
887    /// Regression test for stale DataFusion session state after Delta Lake writes.
888    ///
889    /// Before the fix, `SessionContext` cached file metadata from the first write and
890    /// subsequent writes were invisible to queries — the session held a stale snapshot
891    /// of the Delta log. The fix calls `update_datafusion_session()` after each write
892    /// to re-register the table provider with the latest Delta log state.
893    #[tokio::test]
894    async fn test_span_write_visibility_across_multiple_writes() -> Result<(), TraceEngineError> {
895        cleanup();
896
897        let storage_settings = ObjectStorageSettings::default();
898        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
899
900        let start = Utc::now() - chrono::Duration::hours(1);
901        let end = Utc::now() + chrono::Duration::hours(1);
902
903        // Write batch #1 (2 spans)
904        let trace1 = TraceId::from_bytes([0xB0; 16]);
905        let spans1 = vec![
906            make_span(
907                &trace1,
908                SpanId::from_bytes([0xB0; 8]),
909                None,
910                "svc_vis",
911                "op1",
912                vec![],
913            ),
914            make_span(
915                &trace1,
916                SpanId::from_bytes([0xB1; 8]),
917                Some(SpanId::from_bytes([0xB0; 8])),
918                "svc_vis",
919                "op2",
920                vec![],
921            ),
922        ];
923        service.write_spans_direct(spans1).await?;
924
925        let result = service
926            .query_service
927            .get_trace_spans(
928                Some(trace1.as_bytes()),
929                None,
930                Some(&start),
931                Some(&end),
932                None,
933            )
934            .await?;
935        assert_eq!(
936            result.len(),
937            2,
938            "After write #1: expected 2 spans, got {}",
939            result.len()
940        );
941
942        // Write batch #2 (2 more spans, different trace)
943        let trace2 = TraceId::from_bytes([0xB2; 16]);
944        let spans2 = vec![
945            make_span(
946                &trace2,
947                SpanId::from_bytes([0xB2; 8]),
948                None,
949                "svc_vis",
950                "op3",
951                vec![],
952            ),
953            make_span(
954                &trace2,
955                SpanId::from_bytes([0xB3; 8]),
956                Some(SpanId::from_bytes([0xB2; 8])),
957                "svc_vis",
958                "op4",
959                vec![],
960            ),
961        ];
962        service.write_spans_direct(spans2).await?;
963
964        let result = service
965            .query_service
966            .get_trace_spans(
967                Some(trace2.as_bytes()),
968                None,
969                Some(&start),
970                Some(&end),
971                None,
972            )
973            .await?;
974        assert_eq!(
975            result.len(),
976            2,
977            "After write #2: expected 2 spans for trace2, got {} (stale snapshot?)",
978            result.len()
979        );
980
981        // Write batch #3 (2 more spans, third trace)
982        let trace3 = TraceId::from_bytes([0xB4; 16]);
983        let spans3 = vec![
984            make_span(
985                &trace3,
986                SpanId::from_bytes([0xB4; 8]),
987                None,
988                "svc_vis",
989                "op5",
990                vec![],
991            ),
992            make_span(
993                &trace3,
994                SpanId::from_bytes([0xB5; 8]),
995                Some(SpanId::from_bytes([0xB4; 8])),
996                "svc_vis",
997                "op6",
998                vec![],
999            ),
1000        ];
1001        service.write_spans_direct(spans3).await?;
1002
1003        let result = service
1004            .query_service
1005            .get_trace_spans(
1006                Some(trace3.as_bytes()),
1007                None,
1008                Some(&start),
1009                Some(&end),
1010                None,
1011            )
1012            .await?;
1013        assert_eq!(
1014            result.len(),
1015            2,
1016            "After write #3: expected 2 spans for trace3, got {} (stale snapshot?)",
1017            result.len()
1018        );
1019
1020        service.shutdown().await?;
1021        cleanup();
1022        Ok(())
1023    }
1024}