1use crate::error::TraceEngineError;
2use crate::parquet::tracing::catalog::TraceCatalogProvider;
3use crate::parquet::tracing::engine::{TableCommand, TraceSpanDBEngine};
4use crate::parquet::tracing::queries::TraceQueries;
5use crate::storage::ObjectStore;
6use datafusion::prelude::SessionContext;
7use scouter_settings::ObjectStorageSettings;
8use scouter_types::TraceSpanRecord;
9use std::sync::Arc;
10use tokio::sync::mpsc;
11use tokio::time::{interval, Duration};
12use tracing::{debug, info};
13
14const FLUSH_INTERVAL_SECS: u64 = 5;
15
16static TRACE_SPAN_SERVICE: std::sync::RwLock<Option<Arc<TraceSpanService>>> =
22 std::sync::RwLock::new(None);
23
24pub async fn init_trace_span_service(
29 storage_settings: &ObjectStorageSettings,
30 compaction_interval_hours: u64,
31 flush_interval_secs: Option<u64>,
32 retention_days: Option<u32>,
33 refresh_interval_secs: u64,
34) -> Result<Arc<TraceSpanService>, TraceEngineError> {
35 let old_service = {
37 let guard = TRACE_SPAN_SERVICE.read().unwrap();
38 guard.clone()
39 };
40 if let Some(old) = old_service {
41 info!("Shutting down previous TraceSpanService before re-initialization");
42 old.signal_shutdown().await;
43 }
44
45 let service = Arc::new(
46 TraceSpanService::new(
47 storage_settings,
48 compaction_interval_hours,
49 flush_interval_secs,
50 retention_days,
51 refresh_interval_secs,
52 )
53 .await?,
54 );
55
56 {
57 let mut guard = TRACE_SPAN_SERVICE.write().unwrap();
58 *guard = Some(service.clone());
59 }
60
61 info!("TraceSpanService global singleton initialized");
62 Ok(service)
63}
64
65pub fn get_trace_span_service() -> Option<Arc<TraceSpanService>> {
69 TRACE_SPAN_SERVICE.read().unwrap().clone()
70}
71
72pub struct TraceSpanService {
73 engine_tx: mpsc::Sender<TableCommand>,
74 span_tx: mpsc::Sender<Vec<TraceSpanRecord>>,
75 shutdown_tx: mpsc::Sender<()>,
76 engine_handle: tokio::task::JoinHandle<()>,
77 buffer_handle: tokio::task::JoinHandle<()>,
78 pub query_service: TraceQueries,
79 pub ctx: Arc<SessionContext>,
82 pub catalog: Arc<TraceCatalogProvider>,
85 pub object_store: ObjectStore,
88}
89
90impl TraceSpanService {
91 pub async fn new(
101 storage_settings: &ObjectStorageSettings,
102 compaction_interval_hours: u64,
103 flush_interval_secs: Option<u64>,
104 retention_days: Option<u32>,
105 refresh_interval_secs: u64,
106 ) -> Result<Self, TraceEngineError> {
107 let buffer_size = storage_settings.trace_buffer_size();
108 let engine = TraceSpanDBEngine::new(storage_settings).await?;
109
110 info!(
111 "TraceSpanService initialized with buffer_size: {}",
112 buffer_size
113 );
114
115 let ctx = engine.ctx();
116 let catalog = engine.catalog.clone();
117 let object_store = engine.object_store.clone();
118 let (engine_tx, engine_handle) = engine.start_actor(
119 compaction_interval_hours,
120 retention_days,
121 refresh_interval_secs,
122 );
123 let (span_tx, span_rx) = mpsc::channel::<Vec<TraceSpanRecord>>(100);
124 let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
125
126 let buffer_handle = Self::start_buffering_actor(
127 engine_tx.clone(),
128 span_rx,
129 shutdown_rx,
130 flush_interval_secs,
131 buffer_size,
132 );
133
134 Ok(TraceSpanService {
135 engine_tx,
136 span_tx,
137 shutdown_tx,
138 engine_handle,
139 buffer_handle,
140 query_service: TraceQueries::new(ctx.clone()),
141 ctx,
142 catalog,
143 object_store,
144 })
145 }
146
147 fn start_buffering_actor(
148 engine_tx: mpsc::Sender<TableCommand>,
149 mut span_rx: mpsc::Receiver<Vec<TraceSpanRecord>>,
150 mut shutdown_rx: mpsc::Receiver<()>,
151 flush_interval_secs: Option<u64>,
152 buffer_size: usize,
153 ) -> tokio::task::JoinHandle<()> {
154 tokio::spawn(async move {
155 let mut buffer: Vec<TraceSpanRecord> = Vec::with_capacity(buffer_size);
156 let mut flush_ticker = interval(Duration::from_secs(
157 flush_interval_secs.unwrap_or(FLUSH_INTERVAL_SECS),
158 ));
159 flush_ticker.tick().await;
160
161 loop {
162 tokio::select! {
163 Some(spans) = span_rx.recv() => {
164 buffer.extend(spans);
165 if buffer.len() >= buffer_size {
166 Self::flush_buffer(&engine_tx, &mut buffer).await;
167 }
168 }
169 _ = flush_ticker.tick() => {
170 if !buffer.is_empty() {
171 info!("Flushing spans buffer with {} spans", buffer.len());
172 Self::flush_buffer(&engine_tx, &mut buffer).await;
173 }
174 }
175 _ = shutdown_rx.recv() => {
176 info!("Buffer actor received shutdown signal");
177 if !buffer.is_empty() {
178 info!("Flushing final {} spans before shutdown", buffer.len());
179 Self::flush_buffer(&engine_tx, &mut buffer).await;
180 }
181 break;
182 }
183 }
184 }
185
186 info!("Buffering actor shutting down");
187 })
188 }
189
190 async fn flush_buffer(
191 engine_tx: &mpsc::Sender<TableCommand>,
192 buffer: &mut Vec<TraceSpanRecord>,
193 ) {
194 if buffer.is_empty() {
195 return;
196 }
197
198 let capacity = buffer.capacity();
199 let spans_to_write = std::mem::replace(buffer, Vec::with_capacity(capacity));
200 let span_count = spans_to_write.len();
201
202 debug!("Sending write command to engine for {} spans", span_count);
203
204 let (tx, rx) = tokio::sync::oneshot::channel();
205
206 if let Err(e) = engine_tx
207 .send(TableCommand::Write {
208 spans: spans_to_write,
209 respond_to: tx,
210 })
211 .await
212 {
213 tracing::error!("Failed to send write command: {}", e);
214 return;
215 }
216
217 match rx.await {
218 Ok(Ok(())) => info!("Successfully flushed {} spans", span_count),
219 Ok(Err(e)) => tracing::error!("Write failed: {}", e),
220 Err(e) => tracing::error!("Failed to receive write response: {}", e),
221 }
222 }
223
224 pub async fn write_spans(&self, spans: Vec<TraceSpanRecord>) -> Result<(), TraceEngineError> {
229 self.span_tx
230 .send(spans)
231 .await
232 .map_err(|_| TraceEngineError::ChannelClosed)?;
233 Ok(())
234 }
235
236 pub async fn write_spans_direct(
243 &self,
244 spans: Vec<TraceSpanRecord>,
245 ) -> Result<(), TraceEngineError> {
246 let (tx, rx) = tokio::sync::oneshot::channel();
247 self.engine_tx
248 .send(TableCommand::Write {
249 spans,
250 respond_to: tx,
251 })
252 .await
253 .map_err(|_| TraceEngineError::ChannelClosed)?;
254 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
255 }
256
257 pub async fn optimize(&self) -> Result<(), TraceEngineError> {
260 let (tx, rx) = tokio::sync::oneshot::channel();
261
262 self.engine_tx
263 .send(TableCommand::Optimize { respond_to: tx })
264 .await
265 .map_err(|_| TraceEngineError::ChannelClosed)?;
266
267 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
268 }
269
270 pub async fn vacuum(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
271 let (tx, rx) = tokio::sync::oneshot::channel();
272
273 self.engine_tx
274 .send(TableCommand::Vacuum {
275 retention_hours,
276 respond_to: tx,
277 })
278 .await
279 .map_err(|_| TraceEngineError::ChannelClosed)?;
280
281 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
282 }
283
284 pub async fn expire(&self, retention_days: u32) -> Result<(), TraceEngineError> {
290 let cutoff_date =
291 (chrono::Utc::now() - chrono::Duration::days(retention_days as i64)).date_naive();
292
293 let (tx, rx) = tokio::sync::oneshot::channel();
295 self.engine_tx
296 .send(TableCommand::Expire {
297 cutoff_date,
298 respond_to: tx,
299 })
300 .await
301 .map_err(|_| TraceEngineError::ChannelClosed)?;
302 rx.await.map_err(|_| TraceEngineError::ChannelClosed)??;
303
304 self.vacuum(0).await
308 }
309
310 pub async fn signal_shutdown(&self) {
315 info!("TraceSpanService signaling shutdown");
316 let _ = self.shutdown_tx.send(()).await;
317 let _ = self.engine_tx.send(TableCommand::Shutdown).await;
318 }
319
320 pub async fn shutdown(self) -> Result<(), TraceEngineError> {
321 info!("TraceSpanService shutting down");
322
323 let _ = self.shutdown_tx.send(()).await;
324
325 if let Err(e) = self.buffer_handle.await {
326 tracing::error!("Buffer handle error: {}", e);
327 }
328
329 self.engine_tx
330 .send(TableCommand::Shutdown)
331 .await
332 .map_err(|_| TraceEngineError::ChannelClosed)?;
333
334 if let Err(e) = self.engine_handle.await {
335 tracing::error!("Engine handle error: {}", e);
336 }
337
338 info!("TraceSpanService shutdown complete");
339 Ok(())
340 }
341}
342
343#[cfg(test)]
344mod tests {
345 use super::*;
346 use crate::parquet::tracing::queries::{
347 date_lit, ts_lit, PARTITION_DATE_COL, SPAN_TABLE_NAME, START_TIME_COL,
348 };
349 use arrow_array::Array;
350 use chrono::Utc;
351 use datafusion::logical_expr::col;
352 use scouter_mocks::generate_trace_with_spans;
353 use scouter_settings::ObjectStorageSettings;
354 use scouter_types::sql::TraceSpan;
355 use scouter_types::{Attribute, SpanId, TraceId, TraceSpanRecord};
356 use serde_json::Value;
357 use tracing_subscriber;
358
359 fn cleanup() {
360 let _ = tracing_subscriber::fmt()
361 .with_max_level(tracing::Level::INFO)
362 .try_init();
363
364 let storage_settings = ObjectStorageSettings::default();
365 let current_dir = std::env::current_dir().unwrap();
366 let storage_path = current_dir.join(storage_settings.storage_root());
367 if storage_path.exists() {
368 let _ = std::fs::remove_dir_all(storage_path);
369 }
370 }
371
372 fn make_span(
374 trace_id: &TraceId,
375 span_id: SpanId,
376 parent_span_id: Option<SpanId>,
377 service_name: &str,
378 span_name: &str,
379 attributes: Vec<Attribute>,
380 ) -> TraceSpanRecord {
381 let now = Utc::now();
382 TraceSpanRecord {
383 created_at: now,
384 trace_id: *trace_id,
385 span_id,
386 parent_span_id,
387 flags: 1,
388 trace_state: String::new(),
389 scope_name: "test.scope".to_string(),
390 scope_version: None,
391 span_name: span_name.to_string(),
392 span_kind: "INTERNAL".to_string(),
393 start_time: now,
394 end_time: now + chrono::Duration::milliseconds(100),
395 duration_ms: 100,
396 status_code: 0,
397 status_message: "OK".to_string(),
398 attributes,
399 events: vec![],
400 links: vec![],
401 label: None,
402 input: Value::Null,
403 output: Value::Null,
404 service_name: service_name.to_string(),
405 resource_attributes: vec![],
406 }
407 }
408
409 #[tokio::test]
410 async fn test_service_initialization() -> Result<(), TraceEngineError> {
411 cleanup();
412
413 let storage_settings = ObjectStorageSettings::default();
414 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
415 service.shutdown().await?;
416 cleanup();
417 Ok(())
418 }
419
420 #[tokio::test]
421 async fn test_dataframe_trace_write_single_batch() -> Result<(), TraceEngineError> {
422 cleanup();
423
424 let storage_settings = ObjectStorageSettings::default();
425 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
426
427 let (_trace_record, spans, _tags) = generate_trace_with_spans(3, 0);
428 info!("Test: writing {} spans", spans.len());
429
430 let first_trace_id = spans.first().unwrap().trace_id;
431 service.write_spans(spans).await?;
432
433 info!("Test: waiting for flush");
434 tokio::time::sleep(Duration::from_secs(5)).await;
435
436 let trace_id_bytes = first_trace_id.as_bytes();
437 let result_spans: Vec<TraceSpan> = service
438 .query_service
439 .get_trace_spans(Some(trace_id_bytes.as_slice()), None, None, None, None)
440 .await?;
441
442 assert!(
443 !result_spans.is_empty(),
444 "Expected at least 1 span but got 0"
445 );
446
447 service.shutdown().await?;
448 cleanup();
449 Ok(())
450 }
451
452 #[tokio::test]
455 async fn test_span_tree_sort_order() -> Result<(), TraceEngineError> {
456 cleanup();
457
458 let storage_settings = ObjectStorageSettings::default();
459 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
460
461 let trace_id = TraceId::from_bytes([1u8; 16]);
463 let root_span_id = SpanId::from_bytes([1u8; 8]);
464 let child_span_id = SpanId::from_bytes([2u8; 8]);
465 let grandchild_span_id = SpanId::from_bytes([3u8; 8]);
466
467 let root = make_span(
468 &trace_id,
469 root_span_id.clone(),
470 None,
471 "svc",
472 "root_op",
473 vec![],
474 );
475 let child = make_span(
476 &trace_id,
477 child_span_id.clone(),
478 Some(root_span_id.clone()),
479 "svc",
480 "child_op",
481 vec![],
482 );
483 let grandchild = make_span(
484 &trace_id,
485 grandchild_span_id.clone(),
486 Some(child_span_id.clone()),
487 "svc",
488 "grandchild_op",
489 vec![],
490 );
491
492 service.write_spans(vec![root, child, grandchild]).await?;
493 tokio::time::sleep(Duration::from_secs(4)).await;
494
495 let spans: Vec<TraceSpan> = service
496 .query_service
497 .get_trace_spans(Some(trace_id.as_bytes().as_slice()), None, None, None, None)
498 .await?;
499
500 assert_eq!(spans.len(), 3, "Expected 3 spans");
501
502 let by_order: Vec<&TraceSpan> = {
504 let mut v: Vec<&TraceSpan> = spans.iter().collect();
505 v.sort_by_key(|s| s.span_order);
506 v
507 };
508
509 assert_eq!(
510 by_order[0].span_name, "root_op",
511 "span_order=0 should be root"
512 );
513 assert_eq!(by_order[0].depth, 0);
514 assert_eq!(by_order[0].path.len(), 1);
515
516 assert_eq!(
517 by_order[1].span_name, "child_op",
518 "span_order=1 should be child"
519 );
520 assert_eq!(by_order[1].depth, 1);
521 assert_eq!(by_order[1].path.len(), 2);
522
523 assert_eq!(
524 by_order[2].span_name, "grandchild_op",
525 "span_order=2 should be grandchild"
526 );
527 assert_eq!(by_order[2].depth, 2);
528 assert_eq!(by_order[2].path.len(), 3);
529
530 let root_sid = root_span_id.to_hex();
532 for span in &spans {
533 assert_eq!(
534 span.root_span_id, root_sid,
535 "root_span_id mismatch for {}",
536 span.span_name
537 );
538 }
539
540 service.shutdown().await?;
541 cleanup();
542 Ok(())
543 }
544
545 #[tokio::test]
547 async fn test_trace_metrics_basic() -> Result<(), TraceEngineError> {
548 cleanup();
549
550 let storage_settings = ObjectStorageSettings::default();
551 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
552
553 let (_record, spans, _tags) = generate_trace_with_spans(5, 0);
554 service.write_spans(spans).await?;
555 tokio::time::sleep(Duration::from_secs(4)).await;
556
557 let start = Utc::now() - chrono::Duration::hours(1);
558 let end = Utc::now() + chrono::Duration::hours(1);
559
560 let metrics = service
561 .query_service
562 .get_trace_metrics(None, start, end, "hour", None, None)
563 .await?;
564
565 assert!(!metrics.is_empty(), "Expected at least one metric bucket");
566 assert!(metrics[0].trace_count > 0, "Expected non-zero trace count");
567
568 service.shutdown().await?;
569 cleanup();
570 Ok(())
571 }
572
573 #[tokio::test]
575 async fn test_trace_metrics_service_filter() -> Result<(), TraceEngineError> {
576 cleanup();
577
578 let storage_settings = ObjectStorageSettings::default();
579 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
580
581 let trace_a = TraceId::from_bytes([10u8; 16]);
583 let trace_b = TraceId::from_bytes([20u8; 16]);
584
585 let span_a = make_span(
586 &trace_a,
587 SpanId::from_bytes([10u8; 8]),
588 None,
589 "service_alpha",
590 "op_a",
591 vec![],
592 );
593 let span_b = make_span(
594 &trace_b,
595 SpanId::from_bytes([20u8; 8]),
596 None,
597 "service_beta",
598 "op_b",
599 vec![],
600 );
601
602 service.write_spans(vec![span_a, span_b]).await?;
603 tokio::time::sleep(Duration::from_secs(4)).await;
604
605 let start = Utc::now() - chrono::Duration::hours(1);
606 let end = Utc::now() + chrono::Duration::hours(1);
607
608 let metrics_alpha = service
610 .query_service
611 .get_trace_metrics(Some("service_alpha"), start, end, "hour", None, None)
612 .await?;
613
614 let metrics_beta = service
616 .query_service
617 .get_trace_metrics(Some("service_beta"), start, end, "hour", None, None)
618 .await?;
619
620 let alpha_count: i64 = metrics_alpha.iter().map(|m| m.trace_count).sum();
621 let beta_count: i64 = metrics_beta.iter().map(|m| m.trace_count).sum();
622
623 assert!(alpha_count > 0, "Expected non-zero count for service_alpha");
624 assert!(beta_count > 0, "Expected non-zero count for service_beta");
625
626 let metrics_none = service
628 .query_service
629 .get_trace_metrics(Some("nonexistent_svc"), start, end, "hour", None, None)
630 .await?;
631 assert!(
632 metrics_none.is_empty(),
633 "Expected no buckets for nonexistent service"
634 );
635
636 service.shutdown().await?;
637 cleanup();
638 Ok(())
639 }
640
641 #[tokio::test]
653 async fn test_query_plan_shows_filter_layers() -> Result<(), TraceEngineError> {
654 cleanup();
655
656 let storage_settings = ObjectStorageSettings::default();
657 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
658
659 let trace_id = TraceId::from_bytes([0xAA_u8; 16]);
662 let root_id = SpanId::from_bytes([0xAA_u8; 8]);
663 let child_id = SpanId::from_bytes([0xBB_u8; 8]);
664 let spans = vec![
665 make_span(&trace_id, root_id.clone(), None, "svc-a", "root-op", vec![]),
666 make_span(
667 &trace_id,
668 child_id.clone(),
669 Some(root_id),
670 "svc-a",
671 "child-op",
672 vec![],
673 ),
674 ];
675 service.write_spans_direct(spans).await?;
676
677 let now = Utc::now();
679 let start = now - chrono::Duration::hours(1);
680 let end = now + chrono::Duration::hours(1);
681
682 let df = service
684 .ctx
685 .table(SPAN_TABLE_NAME)
686 .await
687 .map_err(TraceEngineError::DatafusionError)?;
688
689 let df = df
691 .filter(
692 col(PARTITION_DATE_COL)
693 .gt_eq(date_lit(&start))
694 .and(col(PARTITION_DATE_COL).lt_eq(date_lit(&end))),
695 )
696 .map_err(TraceEngineError::DatafusionError)?;
697
698 let df = df
700 .filter(
701 col(START_TIME_COL)
702 .gt_eq(ts_lit(&start))
703 .and(col(START_TIME_COL).lt(ts_lit(&end))),
704 )
705 .map_err(TraceEngineError::DatafusionError)?;
706
707 let explain_df = df
709 .explain(false, false)
710 .map_err(TraceEngineError::DatafusionError)?;
711 let batches = explain_df
712 .collect()
713 .await
714 .map_err(TraceEngineError::DatafusionError)?;
715
716 let plan_text: String = batches
717 .iter()
718 .flat_map(|b| {
719 let plan_col = b.column_by_name("plan").unwrap();
720 let arr =
721 arrow::compute::cast(plan_col, &arrow::datatypes::DataType::Utf8).unwrap();
722 let s = arr
723 .as_any()
724 .downcast_ref::<arrow::array::StringArray>()
725 .unwrap();
726 (0..s.len())
727 .map(|i| s.value(i).to_string())
728 .collect::<Vec<_>>()
729 })
730 .collect::<Vec<_>>()
731 .join("\n");
732
733 assert!(
735 plan_text.contains("partition_date"),
736 "Partition filter not found in physical plan:\n{plan_text}"
737 );
738 assert!(
740 plan_text.contains("start_time"),
741 "Row-group time filter not found in physical plan:\n{plan_text}"
742 );
743
744 let fake_id = TraceId::from_bytes([0xFF_u8; 16]);
748 let result = service
749 .query_service
750 .get_trace_spans(
751 Some(fake_id.as_bytes()),
752 None,
753 Some(&start),
754 Some(&end),
755 None,
756 )
757 .await?;
758 assert!(
759 result.is_empty(),
760 "Expected 0 spans for nonexistent trace_id"
761 );
762
763 service.shutdown().await?;
764 cleanup();
765 Ok(())
766 }
767
768 #[tokio::test]
770 async fn test_trace_metrics_with_attribute_filter() -> Result<(), TraceEngineError> {
771 cleanup();
772
773 let storage_settings = ObjectStorageSettings::default();
774 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
775
776 let trace_kafka = TraceId::from_bytes([30u8; 16]);
777 let trace_http = TraceId::from_bytes([40u8; 16]);
778
779 let span_kafka = make_span(
781 &trace_kafka,
782 SpanId::from_bytes([30u8; 8]),
783 None,
784 "my_service",
785 "kafka_consumer",
786 vec![Attribute {
787 key: "component".to_string(),
788 value: Value::String("kafka".to_string()),
789 }],
790 );
791 let span_http = make_span(
793 &trace_http,
794 SpanId::from_bytes([40u8; 8]),
795 None,
796 "my_service",
797 "http_handler",
798 vec![Attribute {
799 key: "component".to_string(),
800 value: Value::String("http".to_string()),
801 }],
802 );
803
804 service.write_spans(vec![span_kafka, span_http]).await?;
805 tokio::time::sleep(Duration::from_secs(4)).await;
806
807 let start = Utc::now() - chrono::Duration::hours(1);
808 let end = Utc::now() + chrono::Duration::hours(1);
809 let kafka_filter = vec!["component:kafka".to_string()];
810
811 let filtered = service
813 .query_service
814 .get_trace_metrics(None, start, end, "hour", Some(&kafka_filter), None)
815 .await?;
816
817 let filtered_count: i64 = filtered.iter().map(|m| m.trace_count).sum();
818 assert!(
819 filtered_count > 0,
820 "Expected non-zero count with kafka attribute filter"
821 );
822
823 let unfiltered = service
825 .query_service
826 .get_trace_metrics(None, start, end, "hour", None, None)
827 .await?;
828 let unfiltered_count: i64 = unfiltered.iter().map(|m| m.trace_count).sum();
829 assert!(
830 unfiltered_count >= filtered_count,
831 "Unfiltered count ({}) should be >= filtered count ({})",
832 unfiltered_count,
833 filtered_count
834 );
835
836 service.shutdown().await?;
837 cleanup();
838 Ok(())
839 }
840
841 #[tokio::test]
851 async fn test_distributed_refresh() -> Result<(), TraceEngineError> {
852 cleanup();
853
854 let storage_settings = ObjectStorageSettings::default();
855
856 let writer = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
858
859 let reader = TraceSpanService::new(&storage_settings, 24, Some(2), None, 1).await?;
861
862 let trace_id = TraceId::from_bytes([0xDD_u8; 16]);
864 let span = make_span(
865 &trace_id,
866 SpanId::from_bytes([0xDD_u8; 8]),
867 None,
868 "distributed-svc",
869 "test-op",
870 vec![],
871 );
872 writer.write_spans_direct(vec![span]).await?;
873
874 tokio::time::sleep(Duration::from_secs(3)).await;
876
877 let results = reader
879 .query_service
880 .get_trace_spans(Some(trace_id.as_bytes()), None, None, None, None)
881 .await?;
882
883 assert!(
884 !results.is_empty(),
885 "Reader pod should see spans written by writer pod after refresh"
886 );
887
888 writer.shutdown().await?;
889 reader.shutdown().await?;
890 cleanup();
891 Ok(())
892 }
893
894 #[tokio::test]
901 async fn test_span_write_visibility_across_multiple_writes() -> Result<(), TraceEngineError> {
902 cleanup();
903
904 let storage_settings = ObjectStorageSettings::default();
905 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
906
907 let start = Utc::now() - chrono::Duration::hours(1);
908 let end = Utc::now() + chrono::Duration::hours(1);
909
910 let trace1 = TraceId::from_bytes([0xB0; 16]);
912 let spans1 = vec![
913 make_span(
914 &trace1,
915 SpanId::from_bytes([0xB0; 8]),
916 None,
917 "svc_vis",
918 "op1",
919 vec![],
920 ),
921 make_span(
922 &trace1,
923 SpanId::from_bytes([0xB1; 8]),
924 Some(SpanId::from_bytes([0xB0; 8])),
925 "svc_vis",
926 "op2",
927 vec![],
928 ),
929 ];
930 service.write_spans_direct(spans1).await?;
931
932 let result = service
933 .query_service
934 .get_trace_spans(
935 Some(trace1.as_bytes()),
936 None,
937 Some(&start),
938 Some(&end),
939 None,
940 )
941 .await?;
942 assert_eq!(
943 result.len(),
944 2,
945 "After write #1: expected 2 spans, got {}",
946 result.len()
947 );
948
949 let trace2 = TraceId::from_bytes([0xB2; 16]);
951 let spans2 = vec![
952 make_span(
953 &trace2,
954 SpanId::from_bytes([0xB2; 8]),
955 None,
956 "svc_vis",
957 "op3",
958 vec![],
959 ),
960 make_span(
961 &trace2,
962 SpanId::from_bytes([0xB3; 8]),
963 Some(SpanId::from_bytes([0xB2; 8])),
964 "svc_vis",
965 "op4",
966 vec![],
967 ),
968 ];
969 service.write_spans_direct(spans2).await?;
970
971 let result = service
972 .query_service
973 .get_trace_spans(
974 Some(trace2.as_bytes()),
975 None,
976 Some(&start),
977 Some(&end),
978 None,
979 )
980 .await?;
981 assert_eq!(
982 result.len(),
983 2,
984 "After write #2: expected 2 spans for trace2, got {} (stale snapshot?)",
985 result.len()
986 );
987
988 let trace3 = TraceId::from_bytes([0xB4; 16]);
990 let spans3 = vec![
991 make_span(
992 &trace3,
993 SpanId::from_bytes([0xB4; 8]),
994 None,
995 "svc_vis",
996 "op5",
997 vec![],
998 ),
999 make_span(
1000 &trace3,
1001 SpanId::from_bytes([0xB5; 8]),
1002 Some(SpanId::from_bytes([0xB4; 8])),
1003 "svc_vis",
1004 "op6",
1005 vec![],
1006 ),
1007 ];
1008 service.write_spans_direct(spans3).await?;
1009
1010 let result = service
1011 .query_service
1012 .get_trace_spans(
1013 Some(trace3.as_bytes()),
1014 None,
1015 Some(&start),
1016 Some(&end),
1017 None,
1018 )
1019 .await?;
1020 assert_eq!(
1021 result.len(),
1022 2,
1023 "After write #3: expected 2 spans for trace3, got {} (stale snapshot?)",
1024 result.len()
1025 );
1026
1027 service.shutdown().await?;
1028 cleanup();
1029 Ok(())
1030 }
1031}