Skip to main content

scouter_dataframe/parquet/tracing/
service.rs

1use crate::error::TraceEngineError;
2use crate::parquet::tracing::catalog::TraceCatalogProvider;
3use crate::parquet::tracing::engine::{TableCommand, TraceSpanDBEngine};
4use crate::parquet::tracing::queries::TraceQueries;
5use crate::storage::ObjectStore;
6use datafusion::prelude::SessionContext;
7use scouter_settings::ObjectStorageSettings;
8use scouter_types::TraceSpanRecord;
9use std::sync::Arc;
10use tokio::sync::mpsc;
11use tokio::time::{interval, Duration};
12use tracing::{debug, info};
13
14const FLUSH_INTERVAL_SECS: u64 = 5;
15
16/// Global singleton for the TraceSpanService.
17///
18/// Initialized via `init_trace_span_service()` in server setup.
19/// Consumer workers call `get_trace_span_service()` to obtain the Arc for writing.
20/// Uses `RwLock<Option<...>>` so tests can re-initialize with a fresh service.
21static TRACE_SPAN_SERVICE: std::sync::RwLock<Option<Arc<TraceSpanService>>> =
22    std::sync::RwLock::new(None);
23
24/// Initialize the global `TraceSpanService`.
25///
26/// If a previous service exists (e.g. in test re-initialization), it is signaled to
27/// shut down before being replaced.
28pub async fn init_trace_span_service(
29    storage_settings: &ObjectStorageSettings,
30    compaction_interval_hours: u64,
31    flush_interval_secs: Option<u64>,
32    retention_days: Option<u32>,
33    refresh_interval_secs: u64,
34) -> Result<Arc<TraceSpanService>, TraceEngineError> {
35    // Shut down any existing service before replacing
36    let old_service = {
37        let guard = TRACE_SPAN_SERVICE.read().unwrap();
38        guard.clone()
39    };
40    if let Some(old) = old_service {
41        info!("Shutting down previous TraceSpanService before re-initialization");
42        old.signal_shutdown().await;
43    }
44
45    let service = Arc::new(
46        TraceSpanService::new(
47            storage_settings,
48            compaction_interval_hours,
49            flush_interval_secs,
50            retention_days,
51            refresh_interval_secs,
52        )
53        .await?,
54    );
55
56    {
57        let mut guard = TRACE_SPAN_SERVICE.write().unwrap();
58        *guard = Some(service.clone());
59    }
60
61    info!("TraceSpanService global singleton initialized");
62    Ok(service)
63}
64
65/// Retrieve the global `TraceSpanService` initialized during startup.
66///
67/// Returns `None` if called before `init_trace_span_service()`.
68pub fn get_trace_span_service() -> Option<Arc<TraceSpanService>> {
69    TRACE_SPAN_SERVICE.read().unwrap().clone()
70}
71
72pub struct TraceSpanService {
73    engine_tx: mpsc::Sender<TableCommand>,
74    span_tx: mpsc::Sender<Vec<TraceSpanRecord>>,
75    shutdown_tx: mpsc::Sender<()>,
76    engine_handle: tokio::task::JoinHandle<()>,
77    buffer_handle: tokio::task::JoinHandle<()>,
78    pub query_service: TraceQueries,
79    /// Shared SessionContext — passed to TraceSummaryService so both engines share the same
80    /// context (and thus the same UDF registrations and object-store bindings).
81    pub ctx: Arc<SessionContext>,
82    /// Shared catalog — passed to TraceSummaryService so summary engine can call
83    /// `catalog.swap()` for atomic `TableProvider` updates.
84    pub catalog: Arc<TraceCatalogProvider>,
85    /// Shared ObjectStore — passed to TraceSummaryService so both engines use the same
86    /// CachingStore instance, preventing stale reads on cloud backends (GCS/S3).
87    pub object_store: ObjectStore,
88}
89
90impl TraceSpanService {
91    /// Create a new `TraceSpanService` with the given storage settings and start the engine and buffering actors.
92    /// The buffering actor will flush spans to storage when the buffer reaches capacity or after a time interval.
93    /// # Arguments
94    /// * `storage_settings` - Configuration for object storage where trace spans will be persisted.
95    /// * `compaction_interval_hours` - How often the engine should perform compaction
96    ///   (merging small files into larger ones). Longer intervals reduce write amplification
97    ///   but may increase read latency.
98    /// * `flush_interval_secs` - Optional interval in seconds for flushing the buffer to storage
99    /// * `refresh_interval_secs` - How often to refresh the in-memory Delta table snapshot from shared object storage. Useful for multi-pod deployments to pick up new commits.
100    pub async fn new(
101        storage_settings: &ObjectStorageSettings,
102        compaction_interval_hours: u64,
103        flush_interval_secs: Option<u64>,
104        retention_days: Option<u32>,
105        refresh_interval_secs: u64,
106    ) -> Result<Self, TraceEngineError> {
107        let buffer_size = storage_settings.trace_buffer_size();
108        let engine = TraceSpanDBEngine::new(storage_settings).await?;
109
110        info!(
111            "TraceSpanService initialized with buffer_size: {}",
112            buffer_size
113        );
114
115        let ctx = engine.ctx();
116        let catalog = engine.catalog.clone();
117        let object_store = engine.object_store.clone();
118        let (engine_tx, engine_handle) = engine.start_actor(
119            compaction_interval_hours,
120            retention_days,
121            refresh_interval_secs,
122        );
123        let (span_tx, span_rx) = mpsc::channel::<Vec<TraceSpanRecord>>(100);
124        let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
125
126        let buffer_handle = Self::start_buffering_actor(
127            engine_tx.clone(),
128            span_rx,
129            shutdown_rx,
130            flush_interval_secs,
131            buffer_size,
132        );
133
134        Ok(TraceSpanService {
135            engine_tx,
136            span_tx,
137            shutdown_tx,
138            engine_handle,
139            buffer_handle,
140            query_service: TraceQueries::new(ctx.clone()),
141            ctx,
142            catalog,
143            object_store,
144        })
145    }
146
147    fn start_buffering_actor(
148        engine_tx: mpsc::Sender<TableCommand>,
149        mut span_rx: mpsc::Receiver<Vec<TraceSpanRecord>>,
150        mut shutdown_rx: mpsc::Receiver<()>,
151        flush_interval_secs: Option<u64>,
152        buffer_size: usize,
153    ) -> tokio::task::JoinHandle<()> {
154        tokio::spawn(async move {
155            let mut buffer: Vec<TraceSpanRecord> = Vec::with_capacity(buffer_size);
156            let mut flush_ticker = interval(Duration::from_secs(
157                flush_interval_secs.unwrap_or(FLUSH_INTERVAL_SECS),
158            ));
159            flush_ticker.tick().await;
160
161            loop {
162                tokio::select! {
163                    Some(spans) = span_rx.recv() => {
164                        buffer.extend(spans);
165                        if buffer.len() >= buffer_size {
166                            Self::flush_buffer(&engine_tx, &mut buffer).await;
167                        }
168                    }
169                    _ = flush_ticker.tick() => {
170                        if !buffer.is_empty() {
171                            info!("Flushing spans buffer with {} spans", buffer.len());
172                            Self::flush_buffer(&engine_tx, &mut buffer).await;
173                        }
174                    }
175                    _ = shutdown_rx.recv() => {
176                        info!("Buffer actor received shutdown signal");
177                        if !buffer.is_empty() {
178                            info!("Flushing final {} spans before shutdown", buffer.len());
179                            Self::flush_buffer(&engine_tx, &mut buffer).await;
180                        }
181                        break;
182                    }
183                }
184            }
185
186            info!("Buffering actor shutting down");
187        })
188    }
189
190    async fn flush_buffer(
191        engine_tx: &mpsc::Sender<TableCommand>,
192        buffer: &mut Vec<TraceSpanRecord>,
193    ) {
194        if buffer.is_empty() {
195            return;
196        }
197
198        let capacity = buffer.capacity();
199        let spans_to_write = std::mem::replace(buffer, Vec::with_capacity(capacity));
200        let span_count = spans_to_write.len();
201
202        debug!("Sending write command to engine for {} spans", span_count);
203
204        let (tx, rx) = tokio::sync::oneshot::channel();
205
206        if let Err(e) = engine_tx
207            .send(TableCommand::Write {
208                spans: spans_to_write,
209                respond_to: tx,
210            })
211            .await
212        {
213            tracing::error!("Failed to send write command: {}", e);
214            return;
215        }
216
217        match rx.await {
218            Ok(Ok(())) => info!("Successfully flushed {} spans", span_count),
219            Ok(Err(e)) => tracing::error!("Write failed: {}", e),
220            Err(e) => tracing::error!("Failed to receive write response: {}", e),
221        }
222    }
223
224    /// Send spans to the buffering actor for async write to Delta Lake.
225    /// # Arguments
226    /// * `spans` - A batch of `TraceSpanRecord` to write. The buffering actor will flush to storage
227    ///   when the buffer reaches capacity or after a time interval.
228    pub async fn write_spans(&self, spans: Vec<TraceSpanRecord>) -> Result<(), TraceEngineError> {
229        self.span_tx
230            .send(spans)
231            .await
232            .map_err(|_| TraceEngineError::ChannelClosed)?;
233        Ok(())
234    }
235
236    /// Write spans directly to the engine actor, bypassing the buffer.
237    ///
238    /// Unlike `write_spans()`, this method sends a single large batch as one
239    /// Delta Lake commit and awaits the result. Use for bulk seeding/migration
240    /// where you need deterministic commit boundaries and maximum throughput.
241    /// This is used in benchmarks and stress tests to simulate high-volume writes without caching effects
242    pub async fn write_spans_direct(
243        &self,
244        spans: Vec<TraceSpanRecord>,
245    ) -> Result<(), TraceEngineError> {
246        let (tx, rx) = tokio::sync::oneshot::channel();
247        self.engine_tx
248            .send(TableCommand::Write {
249                spans,
250                respond_to: tx,
251            })
252            .await
253            .map_err(|_| TraceEngineError::ChannelClosed)?;
254        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
255    }
256
257    // Admin operations — these can be called directly or via the control table for single-writer cron-style execution across
258    // Primarily used in tests and benches. In Production, we default to the control table approach to ensure only worker runs them at a time, but the direct methods remain available for manual invocation when needed.
259    pub async fn optimize(&self) -> Result<(), TraceEngineError> {
260        let (tx, rx) = tokio::sync::oneshot::channel();
261
262        self.engine_tx
263            .send(TableCommand::Optimize { respond_to: tx })
264            .await
265            .map_err(|_| TraceEngineError::ChannelClosed)?;
266
267        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
268    }
269
270    pub async fn vacuum(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
271        let (tx, rx) = tokio::sync::oneshot::channel();
272
273        self.engine_tx
274            .send(TableCommand::Vacuum {
275                retention_hours,
276                respond_to: tx,
277            })
278            .await
279            .map_err(|_| TraceEngineError::ChannelClosed)?;
280
281        rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
282    }
283
284    /// Delete all spans with a `partition_date` older than `retention_days` ago,
285    /// then run VACUUM to physically reclaim disk space.
286    ///
287    /// Call order matters: DELETE marks files as unreferenced in the Delta log;
288    /// VACUUM then removes the orphaned Parquet files from storage.
289    pub async fn expire(&self, retention_days: u32) -> Result<(), TraceEngineError> {
290        let cutoff_date =
291            (chrono::Utc::now() - chrono::Duration::days(retention_days as i64)).date_naive();
292
293        // Step 1: DELETE WHERE partition_date < cutoff (marks rows removed in Delta log)
294        let (tx, rx) = tokio::sync::oneshot::channel();
295        self.engine_tx
296            .send(TableCommand::Expire {
297                cutoff_date,
298                respond_to: tx,
299            })
300            .await
301            .map_err(|_| TraceEngineError::ChannelClosed)?;
302        rx.await.map_err(|_| TraceEngineError::ChannelClosed)??;
303
304        // Step 2: VACUUM — physically deletes orphaned files from storage.
305        // retention_hours=0 with enforce_retention_duration=false removes all
306        // post-DELETE orphans immediately.
307        self.vacuum(0).await
308    }
309
310    /// Signal shutdown without consuming `self` — safe to call from `Arc<TraceSpanService>`.
311    ///
312    /// Sends the shutdown signal to the buffering actor and engine actor.
313    /// Callers that own `self` should prefer `shutdown()` to await full drain.
314    pub async fn signal_shutdown(&self) {
315        info!("TraceSpanService signaling shutdown");
316        let _ = self.shutdown_tx.send(()).await;
317        let _ = self.engine_tx.send(TableCommand::Shutdown).await;
318    }
319
320    pub async fn shutdown(self) -> Result<(), TraceEngineError> {
321        info!("TraceSpanService shutting down");
322
323        let _ = self.shutdown_tx.send(()).await;
324
325        if let Err(e) = self.buffer_handle.await {
326            tracing::error!("Buffer handle error: {}", e);
327        }
328
329        self.engine_tx
330            .send(TableCommand::Shutdown)
331            .await
332            .map_err(|_| TraceEngineError::ChannelClosed)?;
333
334        if let Err(e) = self.engine_handle.await {
335            tracing::error!("Engine handle error: {}", e);
336        }
337
338        info!("TraceSpanService shutdown complete");
339        Ok(())
340    }
341}
342
343#[cfg(test)]
344mod tests {
345    use super::*;
346    use crate::parquet::tracing::queries::{
347        date_lit, ts_lit, PARTITION_DATE_COL, SPAN_TABLE_NAME, START_TIME_COL,
348    };
349    use arrow_array::Array;
350    use chrono::Utc;
351    use datafusion::logical_expr::col;
352    use scouter_mocks::generate_trace_with_spans;
353    use scouter_settings::ObjectStorageSettings;
354    use scouter_types::sql::TraceSpan;
355    use scouter_types::{Attribute, SpanId, TraceId, TraceSpanRecord};
356    use serde_json::Value;
357    use tracing_subscriber;
358
359    fn cleanup() {
360        let _ = tracing_subscriber::fmt()
361            .with_max_level(tracing::Level::INFO)
362            .try_init();
363
364        let storage_settings = ObjectStorageSettings::default();
365        let current_dir = std::env::current_dir().unwrap();
366        let storage_path = current_dir.join(storage_settings.storage_root());
367        if storage_path.exists() {
368            let _ = std::fs::remove_dir_all(storage_path);
369        }
370    }
371
372    /// Build a deterministic `TraceSpanRecord` with the given IDs and attributes.
373    fn make_span(
374        trace_id: &TraceId,
375        span_id: SpanId,
376        parent_span_id: Option<SpanId>,
377        service_name: &str,
378        span_name: &str,
379        attributes: Vec<Attribute>,
380    ) -> TraceSpanRecord {
381        let now = Utc::now();
382        TraceSpanRecord {
383            created_at: now,
384            trace_id: *trace_id,
385            span_id,
386            parent_span_id,
387            flags: 1,
388            trace_state: String::new(),
389            scope_name: "test.scope".to_string(),
390            scope_version: None,
391            span_name: span_name.to_string(),
392            span_kind: "INTERNAL".to_string(),
393            start_time: now,
394            end_time: now + chrono::Duration::milliseconds(100),
395            duration_ms: 100,
396            status_code: 0,
397            status_message: "OK".to_string(),
398            attributes,
399            events: vec![],
400            links: vec![],
401            label: None,
402            input: Value::Null,
403            output: Value::Null,
404            service_name: service_name.to_string(),
405            resource_attributes: vec![],
406        }
407    }
408
409    #[tokio::test]
410    async fn test_service_initialization() -> Result<(), TraceEngineError> {
411        cleanup();
412
413        let storage_settings = ObjectStorageSettings::default();
414        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
415        service.shutdown().await?;
416        cleanup();
417        Ok(())
418    }
419
420    #[tokio::test]
421    async fn test_dataframe_trace_write_single_batch() -> Result<(), TraceEngineError> {
422        cleanup();
423
424        let storage_settings = ObjectStorageSettings::default();
425        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
426
427        let (_trace_record, spans, _tags) = generate_trace_with_spans(3, 0);
428        info!("Test: writing {} spans", spans.len());
429
430        let first_trace_id = spans.first().unwrap().trace_id;
431        service.write_spans(spans).await?;
432
433        info!("Test: waiting for flush");
434        tokio::time::sleep(Duration::from_secs(5)).await;
435
436        let trace_id_bytes = first_trace_id.as_bytes();
437        let result_spans: Vec<TraceSpan> = service
438            .query_service
439            .get_trace_spans(Some(trace_id_bytes.as_slice()), None, None, None, None)
440            .await?;
441
442        assert!(
443            !result_spans.is_empty(),
444            "Expected at least 1 span but got 0"
445        );
446
447        service.shutdown().await?;
448        cleanup();
449        Ok(())
450    }
451
452    /// Verify that `build_span_tree` returns spans in DFS (depth-first) order with correct
453    /// depth, path, and root_span_id fields — matching what the Postgres recursive CTE produced.
454    #[tokio::test]
455    async fn test_span_tree_sort_order() -> Result<(), TraceEngineError> {
456        cleanup();
457
458        let storage_settings = ObjectStorageSettings::default();
459        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
460
461        // Build a deterministic tree: root → child → grandchild
462        let trace_id = TraceId::from_bytes([1u8; 16]);
463        let root_span_id = SpanId::from_bytes([1u8; 8]);
464        let child_span_id = SpanId::from_bytes([2u8; 8]);
465        let grandchild_span_id = SpanId::from_bytes([3u8; 8]);
466
467        let root = make_span(
468            &trace_id,
469            root_span_id.clone(),
470            None,
471            "svc",
472            "root_op",
473            vec![],
474        );
475        let child = make_span(
476            &trace_id,
477            child_span_id.clone(),
478            Some(root_span_id.clone()),
479            "svc",
480            "child_op",
481            vec![],
482        );
483        let grandchild = make_span(
484            &trace_id,
485            grandchild_span_id.clone(),
486            Some(child_span_id.clone()),
487            "svc",
488            "grandchild_op",
489            vec![],
490        );
491
492        service.write_spans(vec![root, child, grandchild]).await?;
493        tokio::time::sleep(Duration::from_secs(4)).await;
494
495        let spans: Vec<TraceSpan> = service
496            .query_service
497            .get_trace_spans(Some(trace_id.as_bytes().as_slice()), None, None, None, None)
498            .await?;
499
500        assert_eq!(spans.len(), 3, "Expected 3 spans");
501
502        // DFS order: root(0), child(1), grandchild(2)
503        let by_order: Vec<&TraceSpan> = {
504            let mut v: Vec<&TraceSpan> = spans.iter().collect();
505            v.sort_by_key(|s| s.span_order);
506            v
507        };
508
509        assert_eq!(
510            by_order[0].span_name, "root_op",
511            "span_order=0 should be root"
512        );
513        assert_eq!(by_order[0].depth, 0);
514        assert_eq!(by_order[0].path.len(), 1);
515
516        assert_eq!(
517            by_order[1].span_name, "child_op",
518            "span_order=1 should be child"
519        );
520        assert_eq!(by_order[1].depth, 1);
521        assert_eq!(by_order[1].path.len(), 2);
522
523        assert_eq!(
524            by_order[2].span_name, "grandchild_op",
525            "span_order=2 should be grandchild"
526        );
527        assert_eq!(by_order[2].depth, 2);
528        assert_eq!(by_order[2].path.len(), 3);
529
530        // All spans should share the same root_span_id
531        let root_sid = root_span_id.to_hex();
532        for span in &spans {
533            assert_eq!(
534                span.root_span_id, root_sid,
535                "root_span_id mismatch for {}",
536                span.span_name
537            );
538        }
539
540        service.shutdown().await?;
541        cleanup();
542        Ok(())
543    }
544
545    /// Verify `get_trace_metrics` returns time-bucketed aggregate rows.
546    #[tokio::test]
547    async fn test_trace_metrics_basic() -> Result<(), TraceEngineError> {
548        cleanup();
549
550        let storage_settings = ObjectStorageSettings::default();
551        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
552
553        let (_record, spans, _tags) = generate_trace_with_spans(5, 0);
554        service.write_spans(spans).await?;
555        tokio::time::sleep(Duration::from_secs(4)).await;
556
557        let start = Utc::now() - chrono::Duration::hours(1);
558        let end = Utc::now() + chrono::Duration::hours(1);
559
560        let metrics = service
561            .query_service
562            .get_trace_metrics(None, start, end, "hour", None, None)
563            .await?;
564
565        assert!(!metrics.is_empty(), "Expected at least one metric bucket");
566        assert!(metrics[0].trace_count > 0, "Expected non-zero trace count");
567
568        service.shutdown().await?;
569        cleanup();
570        Ok(())
571    }
572
573    /// Verify `get_trace_metrics` with a service_name filter excludes other services.
574    #[tokio::test]
575    async fn test_trace_metrics_service_filter() -> Result<(), TraceEngineError> {
576        cleanup();
577
578        let storage_settings = ObjectStorageSettings::default();
579        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
580
581        // Write spans for two distinct services using deterministic IDs
582        let trace_a = TraceId::from_bytes([10u8; 16]);
583        let trace_b = TraceId::from_bytes([20u8; 16]);
584
585        let span_a = make_span(
586            &trace_a,
587            SpanId::from_bytes([10u8; 8]),
588            None,
589            "service_alpha",
590            "op_a",
591            vec![],
592        );
593        let span_b = make_span(
594            &trace_b,
595            SpanId::from_bytes([20u8; 8]),
596            None,
597            "service_beta",
598            "op_b",
599            vec![],
600        );
601
602        service.write_spans(vec![span_a, span_b]).await?;
603        tokio::time::sleep(Duration::from_secs(4)).await;
604
605        let start = Utc::now() - chrono::Duration::hours(1);
606        let end = Utc::now() + chrono::Duration::hours(1);
607
608        // Filter to service_alpha only
609        let metrics_alpha = service
610            .query_service
611            .get_trace_metrics(Some("service_alpha"), start, end, "hour", None, None)
612            .await?;
613
614        // Filter to service_beta only
615        let metrics_beta = service
616            .query_service
617            .get_trace_metrics(Some("service_beta"), start, end, "hour", None, None)
618            .await?;
619
620        let alpha_count: i64 = metrics_alpha.iter().map(|m| m.trace_count).sum();
621        let beta_count: i64 = metrics_beta.iter().map(|m| m.trace_count).sum();
622
623        assert!(alpha_count > 0, "Expected non-zero count for service_alpha");
624        assert!(beta_count > 0, "Expected non-zero count for service_beta");
625
626        // Querying with a non-existent service returns nothing
627        let metrics_none = service
628            .query_service
629            .get_trace_metrics(Some("nonexistent_svc"), start, end, "hour", None, None)
630            .await?;
631        assert!(
632            metrics_none.is_empty(),
633            "Expected no buckets for nonexistent service"
634        );
635
636        service.shutdown().await?;
637        cleanup();
638        Ok(())
639    }
640
641    /// Verify all three DataFusion filter layers are wired correctly by inspecting the
642    /// physical plan produced for a partitioned time-window query.
643    ///
644    /// Layer 1 — partition pruning: `partition_date` appears in the plan because Delta Lake
645    /// pushes partition column filters to directory enumeration before reading any files.
646    ///
647    /// Layer 2 — row-group stats: `start_time` appears because typed Timestamp literals
648    /// enable Parquet min/max pruning across row groups.
649    ///
650    /// Layer 3 — bloom filter: querying a nonexistent `trace_id` within a tight window
651    /// returns 0 rows; bloom filters discard row groups instantly without page-level scanning.
652    #[tokio::test]
653    async fn test_query_plan_shows_filter_layers() -> Result<(), TraceEngineError> {
654        cleanup();
655
656        let storage_settings = ObjectStorageSettings::default();
657        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
658
659        // Write a known batch directly so it is immediately queryable
660        // Use distinct byte values that don't collide with other tests.
661        let trace_id = TraceId::from_bytes([0xAA_u8; 16]);
662        let root_id = SpanId::from_bytes([0xAA_u8; 8]);
663        let child_id = SpanId::from_bytes([0xBB_u8; 8]);
664        let spans = vec![
665            make_span(&trace_id, root_id.clone(), None, "svc-a", "root-op", vec![]),
666            make_span(
667                &trace_id,
668                child_id.clone(),
669                Some(root_id),
670                "svc-a",
671                "child-op",
672                vec![],
673            ),
674        ];
675        service.write_spans_direct(spans).await?;
676
677        // ── Layers 1 & 2: inspect physical plan ──────────────────────────────────
678        let now = Utc::now();
679        let start = now - chrono::Duration::hours(1);
680        let end = now + chrono::Duration::hours(1);
681
682        // Build the same DataFrame the query path would produce internally
683        let df = service
684            .ctx
685            .table(SPAN_TABLE_NAME)
686            .await
687            .map_err(TraceEngineError::DatafusionError)?;
688
689        // Partition filter — pushed to directory enumeration (Layer 1)
690        let df = df
691            .filter(
692                col(PARTITION_DATE_COL)
693                    .gt_eq(date_lit(&start))
694                    .and(col(PARTITION_DATE_COL).lt_eq(date_lit(&end))),
695            )
696            .map_err(TraceEngineError::DatafusionError)?;
697
698        // Timestamp filter — enables row-group min/max pruning (Layer 2)
699        let df = df
700            .filter(
701                col(START_TIME_COL)
702                    .gt_eq(ts_lit(&start))
703                    .and(col(START_TIME_COL).lt(ts_lit(&end))),
704            )
705            .map_err(TraceEngineError::DatafusionError)?;
706
707        // Collect the physical plan as a string via EXPLAIN
708        let explain_df = df
709            .explain(false, false)
710            .map_err(TraceEngineError::DatafusionError)?;
711        let batches = explain_df
712            .collect()
713            .await
714            .map_err(TraceEngineError::DatafusionError)?;
715
716        let plan_text: String = batches
717            .iter()
718            .flat_map(|b| {
719                let plan_col = b.column_by_name("plan").unwrap();
720                let arr =
721                    arrow::compute::cast(plan_col, &arrow::datatypes::DataType::Utf8).unwrap();
722                let s = arr
723                    .as_any()
724                    .downcast_ref::<arrow::array::StringArray>()
725                    .unwrap();
726                (0..s.len())
727                    .map(|i| s.value(i).to_string())
728                    .collect::<Vec<_>>()
729            })
730            .collect::<Vec<_>>()
731            .join("\n");
732
733        // Layer 1: partition filter must appear in plan
734        assert!(
735            plan_text.contains("partition_date"),
736            "Partition filter not found in physical plan:\n{plan_text}"
737        );
738        // Layer 2: start_time row-group filter must appear in plan
739        assert!(
740            plan_text.contains("start_time"),
741            "Row-group time filter not found in physical plan:\n{plan_text}"
742        );
743
744        // ── Layer 3: bloom filter — behavioral proof ──────────────────────────────
745        // A nonexistent trace_id with bloom filters enabled returns 0 rows.
746        // Without bloom filters, DataFusion would scan every row group to confirm absence.
747        let fake_id = TraceId::from_bytes([0xFF_u8; 16]);
748        let result = service
749            .query_service
750            .get_trace_spans(
751                Some(fake_id.as_bytes()),
752                None,
753                Some(&start),
754                Some(&end),
755                None,
756            )
757            .await?;
758        assert!(
759            result.is_empty(),
760            "Expected 0 spans for nonexistent trace_id"
761        );
762
763        service.shutdown().await?;
764        cleanup();
765        Ok(())
766    }
767
768    /// Verify `get_trace_metrics` with attribute_filters narrows results to matching spans.
769    #[tokio::test]
770    async fn test_trace_metrics_with_attribute_filter() -> Result<(), TraceEngineError> {
771        cleanup();
772
773        let storage_settings = ObjectStorageSettings::default();
774        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
775
776        let trace_kafka = TraceId::from_bytes([30u8; 16]);
777        let trace_http = TraceId::from_bytes([40u8; 16]);
778
779        // span with component:kafka attribute
780        let span_kafka = make_span(
781            &trace_kafka,
782            SpanId::from_bytes([30u8; 8]),
783            None,
784            "my_service",
785            "kafka_consumer",
786            vec![Attribute {
787                key: "component".to_string(),
788                value: Value::String("kafka".to_string()),
789            }],
790        );
791        // span with component:http attribute (should NOT match kafka filter)
792        let span_http = make_span(
793            &trace_http,
794            SpanId::from_bytes([40u8; 8]),
795            None,
796            "my_service",
797            "http_handler",
798            vec![Attribute {
799                key: "component".to_string(),
800                value: Value::String("http".to_string()),
801            }],
802        );
803
804        service.write_spans(vec![span_kafka, span_http]).await?;
805        tokio::time::sleep(Duration::from_secs(4)).await;
806
807        let start = Utc::now() - chrono::Duration::hours(1);
808        let end = Utc::now() + chrono::Duration::hours(1);
809        let kafka_filter = vec!["component:kafka".to_string()];
810
811        // With filter, only kafka trace should appear
812        let filtered = service
813            .query_service
814            .get_trace_metrics(None, start, end, "hour", Some(&kafka_filter), None)
815            .await?;
816
817        let filtered_count: i64 = filtered.iter().map(|m| m.trace_count).sum();
818        assert!(
819            filtered_count > 0,
820            "Expected non-zero count with kafka attribute filter"
821        );
822
823        // Without filter, both traces appear
824        let unfiltered = service
825            .query_service
826            .get_trace_metrics(None, start, end, "hour", None, None)
827            .await?;
828        let unfiltered_count: i64 = unfiltered.iter().map(|m| m.trace_count).sum();
829        assert!(
830            unfiltered_count >= filtered_count,
831            "Unfiltered count ({}) should be >= filtered count ({})",
832            unfiltered_count,
833            filtered_count
834        );
835
836        service.shutdown().await?;
837        cleanup();
838        Ok(())
839    }
840
841    /// Simulate a 2-pod deployment: writer pod commits, reader pod picks it up via refresh.
842    ///
843    /// Two `TraceSpanService` instances share the same local storage directory (same as
844    /// sharing GCS/S3 in production). The writer commits spans directly; the reader's refresh
845    /// ticker (1s interval) picks up the new Delta log entry and re-registers the SessionContext.
846    ///
847    /// Note: we use `TraceSpanService::new()` directly (not `init_trace_span_service()`) because
848    /// the global singleton pattern shuts down the previous service on re-init, which would
849    /// kill the writer before the reader can see its data.
850    #[tokio::test]
851    async fn test_distributed_refresh() -> Result<(), TraceEngineError> {
852        cleanup();
853
854        let storage_settings = ObjectStorageSettings::default();
855
856        // "Writer pod" — standard refresh interval (won't need to refresh since it's the writer)
857        let writer = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
858
859        // "Reader pod" — 1s refresh interval for fast test turnaround
860        let reader = TraceSpanService::new(&storage_settings, 24, Some(2), None, 1).await?;
861
862        // Write spans via the writer using a deterministic trace ID
863        let trace_id = TraceId::from_bytes([0xDD_u8; 16]);
864        let span = make_span(
865            &trace_id,
866            SpanId::from_bytes([0xDD_u8; 8]),
867            None,
868            "distributed-svc",
869            "test-op",
870            vec![],
871        );
872        writer.write_spans_direct(vec![span]).await?;
873
874        // Wait for the reader's refresh ticker to fire (1s interval + margin)
875        tokio::time::sleep(Duration::from_secs(3)).await;
876
877        // Reader should now see the data written by the writer
878        let results = reader
879            .query_service
880            .get_trace_spans(Some(trace_id.as_bytes()), None, None, None, None)
881            .await?;
882
883        assert!(
884            !results.is_empty(),
885            "Reader pod should see spans written by writer pod after refresh"
886        );
887
888        writer.shutdown().await?;
889        reader.shutdown().await?;
890        cleanup();
891        Ok(())
892    }
893
894    /// Regression test for stale DataFusion session state after Delta Lake writes.
895    ///
896    /// Before the fix, `SessionContext` cached file metadata from the first write and
897    /// subsequent writes were invisible to queries — the session held a stale snapshot
898    /// of the Delta log. The fix calls `update_datafusion_session()` after each write
899    /// to re-register the table provider with the latest Delta log state.
900    #[tokio::test]
901    async fn test_span_write_visibility_across_multiple_writes() -> Result<(), TraceEngineError> {
902        cleanup();
903
904        let storage_settings = ObjectStorageSettings::default();
905        let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
906
907        let start = Utc::now() - chrono::Duration::hours(1);
908        let end = Utc::now() + chrono::Duration::hours(1);
909
910        // Write batch #1 (2 spans)
911        let trace1 = TraceId::from_bytes([0xB0; 16]);
912        let spans1 = vec![
913            make_span(
914                &trace1,
915                SpanId::from_bytes([0xB0; 8]),
916                None,
917                "svc_vis",
918                "op1",
919                vec![],
920            ),
921            make_span(
922                &trace1,
923                SpanId::from_bytes([0xB1; 8]),
924                Some(SpanId::from_bytes([0xB0; 8])),
925                "svc_vis",
926                "op2",
927                vec![],
928            ),
929        ];
930        service.write_spans_direct(spans1).await?;
931
932        let result = service
933            .query_service
934            .get_trace_spans(
935                Some(trace1.as_bytes()),
936                None,
937                Some(&start),
938                Some(&end),
939                None,
940            )
941            .await?;
942        assert_eq!(
943            result.len(),
944            2,
945            "After write #1: expected 2 spans, got {}",
946            result.len()
947        );
948
949        // Write batch #2 (2 more spans, different trace)
950        let trace2 = TraceId::from_bytes([0xB2; 16]);
951        let spans2 = vec![
952            make_span(
953                &trace2,
954                SpanId::from_bytes([0xB2; 8]),
955                None,
956                "svc_vis",
957                "op3",
958                vec![],
959            ),
960            make_span(
961                &trace2,
962                SpanId::from_bytes([0xB3; 8]),
963                Some(SpanId::from_bytes([0xB2; 8])),
964                "svc_vis",
965                "op4",
966                vec![],
967            ),
968        ];
969        service.write_spans_direct(spans2).await?;
970
971        let result = service
972            .query_service
973            .get_trace_spans(
974                Some(trace2.as_bytes()),
975                None,
976                Some(&start),
977                Some(&end),
978                None,
979            )
980            .await?;
981        assert_eq!(
982            result.len(),
983            2,
984            "After write #2: expected 2 spans for trace2, got {} (stale snapshot?)",
985            result.len()
986        );
987
988        // Write batch #3 (2 more spans, third trace)
989        let trace3 = TraceId::from_bytes([0xB4; 16]);
990        let spans3 = vec![
991            make_span(
992                &trace3,
993                SpanId::from_bytes([0xB4; 8]),
994                None,
995                "svc_vis",
996                "op5",
997                vec![],
998            ),
999            make_span(
1000                &trace3,
1001                SpanId::from_bytes([0xB5; 8]),
1002                Some(SpanId::from_bytes([0xB4; 8])),
1003                "svc_vis",
1004                "op6",
1005                vec![],
1006            ),
1007        ];
1008        service.write_spans_direct(spans3).await?;
1009
1010        let result = service
1011            .query_service
1012            .get_trace_spans(
1013                Some(trace3.as_bytes()),
1014                None,
1015                Some(&start),
1016                Some(&end),
1017                None,
1018            )
1019            .await?;
1020        assert_eq!(
1021            result.len(),
1022            2,
1023            "After write #3: expected 2 spans for trace3, got {} (stale snapshot?)",
1024            result.len()
1025        );
1026
1027        service.shutdown().await?;
1028        cleanup();
1029        Ok(())
1030    }
1031}