1use crate::error::TraceEngineError;
2use crate::parquet::tracing::engine::{TableCommand, TraceSpanDBEngine};
3use crate::parquet::tracing::queries::TraceQueries;
4use crate::storage::ObjectStore;
5use datafusion::prelude::SessionContext;
6use scouter_settings::ObjectStorageSettings;
7use scouter_types::TraceSpanRecord;
8use std::sync::Arc;
9use tokio::sync::mpsc;
10use tokio::time::{interval, Duration};
11use tracing::{debug, info};
12
13const FLUSH_INTERVAL_SECS: u64 = 5;
14
15static TRACE_SPAN_SERVICE: std::sync::RwLock<Option<Arc<TraceSpanService>>> =
21 std::sync::RwLock::new(None);
22
23pub async fn init_trace_span_service(
28 storage_settings: &ObjectStorageSettings,
29 compaction_interval_hours: u64,
30 flush_interval_secs: Option<u64>,
31 retention_days: Option<u32>,
32 refresh_interval_secs: u64,
33) -> Result<Arc<TraceSpanService>, TraceEngineError> {
34 let old_service = {
36 let guard = TRACE_SPAN_SERVICE.read().unwrap();
37 guard.clone()
38 };
39 if let Some(old) = old_service {
40 info!("Shutting down previous TraceSpanService before re-initialization");
41 old.signal_shutdown().await;
42 }
43
44 let service = Arc::new(
45 TraceSpanService::new(
46 storage_settings,
47 compaction_interval_hours,
48 flush_interval_secs,
49 retention_days,
50 refresh_interval_secs,
51 )
52 .await?,
53 );
54
55 {
56 let mut guard = TRACE_SPAN_SERVICE.write().unwrap();
57 *guard = Some(service.clone());
58 }
59
60 info!("TraceSpanService global singleton initialized");
61 Ok(service)
62}
63
64pub fn get_trace_span_service() -> Option<Arc<TraceSpanService>> {
68 TRACE_SPAN_SERVICE.read().unwrap().clone()
69}
70
71pub struct TraceSpanService {
72 engine_tx: mpsc::Sender<TableCommand>,
73 span_tx: mpsc::Sender<Vec<TraceSpanRecord>>,
74 shutdown_tx: mpsc::Sender<()>,
75 engine_handle: tokio::task::JoinHandle<()>,
76 buffer_handle: tokio::task::JoinHandle<()>,
77 pub query_service: TraceQueries,
78 pub ctx: Arc<SessionContext>,
80 pub object_store: ObjectStore,
83}
84
85impl TraceSpanService {
86 pub async fn new(
96 storage_settings: &ObjectStorageSettings,
97 compaction_interval_hours: u64,
98 flush_interval_secs: Option<u64>,
99 retention_days: Option<u32>,
100 refresh_interval_secs: u64,
101 ) -> Result<Self, TraceEngineError> {
102 let buffer_size = storage_settings.trace_buffer_size();
103 let engine = TraceSpanDBEngine::new(storage_settings).await?;
104
105 info!(
106 "TraceSpanService initialized with buffer_size: {}",
107 buffer_size
108 );
109
110 let ctx = engine.ctx.clone();
111 let object_store = engine.object_store.clone();
112 let (engine_tx, engine_handle) = engine.start_actor(
113 compaction_interval_hours,
114 retention_days,
115 refresh_interval_secs,
116 );
117 let (span_tx, span_rx) = mpsc::channel::<Vec<TraceSpanRecord>>(100);
118 let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
119
120 let buffer_handle = Self::start_buffering_actor(
121 engine_tx.clone(),
122 span_rx,
123 shutdown_rx,
124 flush_interval_secs,
125 buffer_size,
126 );
127
128 Ok(TraceSpanService {
129 engine_tx,
130 span_tx,
131 shutdown_tx,
132 engine_handle,
133 buffer_handle,
134 query_service: TraceQueries::new(ctx.clone()),
135 ctx,
136 object_store,
137 })
138 }
139
140 fn start_buffering_actor(
141 engine_tx: mpsc::Sender<TableCommand>,
142 mut span_rx: mpsc::Receiver<Vec<TraceSpanRecord>>,
143 mut shutdown_rx: mpsc::Receiver<()>,
144 flush_interval_secs: Option<u64>,
145 buffer_size: usize,
146 ) -> tokio::task::JoinHandle<()> {
147 tokio::spawn(async move {
148 let mut buffer: Vec<TraceSpanRecord> = Vec::with_capacity(buffer_size);
149 let mut flush_ticker = interval(Duration::from_secs(
150 flush_interval_secs.unwrap_or(FLUSH_INTERVAL_SECS),
151 ));
152 flush_ticker.tick().await;
153
154 loop {
155 tokio::select! {
156 Some(spans) = span_rx.recv() => {
157 buffer.extend(spans);
158 if buffer.len() >= buffer_size {
159 Self::flush_buffer(&engine_tx, &mut buffer).await;
160 }
161 }
162 _ = flush_ticker.tick() => {
163 if !buffer.is_empty() {
164 info!("Flushing spans buffer with {} spans", buffer.len());
165 Self::flush_buffer(&engine_tx, &mut buffer).await;
166 }
167 }
168 _ = shutdown_rx.recv() => {
169 info!("Buffer actor received shutdown signal");
170 if !buffer.is_empty() {
171 info!("Flushing final {} spans before shutdown", buffer.len());
172 Self::flush_buffer(&engine_tx, &mut buffer).await;
173 }
174 break;
175 }
176 }
177 }
178
179 info!("Buffering actor shutting down");
180 })
181 }
182
183 async fn flush_buffer(
184 engine_tx: &mpsc::Sender<TableCommand>,
185 buffer: &mut Vec<TraceSpanRecord>,
186 ) {
187 if buffer.is_empty() {
188 return;
189 }
190
191 let capacity = buffer.capacity();
192 let spans_to_write = std::mem::replace(buffer, Vec::with_capacity(capacity));
193 let span_count = spans_to_write.len();
194
195 debug!("Sending write command to engine for {} spans", span_count);
196
197 let (tx, rx) = tokio::sync::oneshot::channel();
198
199 if let Err(e) = engine_tx
200 .send(TableCommand::Write {
201 spans: spans_to_write,
202 respond_to: tx,
203 })
204 .await
205 {
206 tracing::error!("Failed to send write command: {}", e);
207 return;
208 }
209
210 match rx.await {
211 Ok(Ok(())) => info!("Successfully flushed {} spans", span_count),
212 Ok(Err(e)) => tracing::error!("Write failed: {}", e),
213 Err(e) => tracing::error!("Failed to receive write response: {}", e),
214 }
215 }
216
217 pub async fn write_spans(&self, spans: Vec<TraceSpanRecord>) -> Result<(), TraceEngineError> {
222 self.span_tx
223 .send(spans)
224 .await
225 .map_err(|_| TraceEngineError::ChannelClosed)?;
226 Ok(())
227 }
228
229 pub async fn write_spans_direct(
236 &self,
237 spans: Vec<TraceSpanRecord>,
238 ) -> Result<(), TraceEngineError> {
239 let (tx, rx) = tokio::sync::oneshot::channel();
240 self.engine_tx
241 .send(TableCommand::Write {
242 spans,
243 respond_to: tx,
244 })
245 .await
246 .map_err(|_| TraceEngineError::ChannelClosed)?;
247 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
248 }
249
250 pub async fn optimize(&self) -> Result<(), TraceEngineError> {
253 let (tx, rx) = tokio::sync::oneshot::channel();
254
255 self.engine_tx
256 .send(TableCommand::Optimize { respond_to: tx })
257 .await
258 .map_err(|_| TraceEngineError::ChannelClosed)?;
259
260 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
261 }
262
263 pub async fn vacuum(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
264 let (tx, rx) = tokio::sync::oneshot::channel();
265
266 self.engine_tx
267 .send(TableCommand::Vacuum {
268 retention_hours,
269 respond_to: tx,
270 })
271 .await
272 .map_err(|_| TraceEngineError::ChannelClosed)?;
273
274 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
275 }
276
277 pub async fn expire(&self, retention_days: u32) -> Result<(), TraceEngineError> {
283 let cutoff_date =
284 (chrono::Utc::now() - chrono::Duration::days(retention_days as i64)).date_naive();
285
286 let (tx, rx) = tokio::sync::oneshot::channel();
288 self.engine_tx
289 .send(TableCommand::Expire {
290 cutoff_date,
291 respond_to: tx,
292 })
293 .await
294 .map_err(|_| TraceEngineError::ChannelClosed)?;
295 rx.await.map_err(|_| TraceEngineError::ChannelClosed)??;
296
297 self.vacuum(0).await
301 }
302
303 pub async fn signal_shutdown(&self) {
308 info!("TraceSpanService signaling shutdown");
309 let _ = self.shutdown_tx.send(()).await;
310 let _ = self.engine_tx.send(TableCommand::Shutdown).await;
311 }
312
313 pub async fn shutdown(self) -> Result<(), TraceEngineError> {
314 info!("TraceSpanService shutting down");
315
316 let _ = self.shutdown_tx.send(()).await;
317
318 if let Err(e) = self.buffer_handle.await {
319 tracing::error!("Buffer handle error: {}", e);
320 }
321
322 self.engine_tx
323 .send(TableCommand::Shutdown)
324 .await
325 .map_err(|_| TraceEngineError::ChannelClosed)?;
326
327 if let Err(e) = self.engine_handle.await {
328 tracing::error!("Engine handle error: {}", e);
329 }
330
331 info!("TraceSpanService shutdown complete");
332 Ok(())
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339 use crate::parquet::tracing::queries::{
340 date_lit, ts_lit, PARTITION_DATE_COL, SPAN_TABLE_NAME, START_TIME_COL,
341 };
342 use arrow_array::Array;
343 use chrono::Utc;
344 use datafusion::logical_expr::col;
345 use scouter_mocks::generate_trace_with_spans;
346 use scouter_settings::ObjectStorageSettings;
347 use scouter_types::sql::TraceSpan;
348 use scouter_types::{Attribute, SpanId, TraceId, TraceSpanRecord};
349 use serde_json::Value;
350 use tracing_subscriber;
351
352 fn cleanup() {
353 let _ = tracing_subscriber::fmt()
354 .with_max_level(tracing::Level::INFO)
355 .try_init();
356
357 let storage_settings = ObjectStorageSettings::default();
358 let current_dir = std::env::current_dir().unwrap();
359 let storage_path = current_dir.join(storage_settings.storage_root());
360 if storage_path.exists() {
361 let _ = std::fs::remove_dir_all(storage_path);
362 }
363 }
364
365 fn make_span(
367 trace_id: &TraceId,
368 span_id: SpanId,
369 parent_span_id: Option<SpanId>,
370 service_name: &str,
371 span_name: &str,
372 attributes: Vec<Attribute>,
373 ) -> TraceSpanRecord {
374 let now = Utc::now();
375 TraceSpanRecord {
376 created_at: now,
377 trace_id: *trace_id,
378 span_id,
379 parent_span_id,
380 flags: 1,
381 trace_state: String::new(),
382 scope_name: "test.scope".to_string(),
383 scope_version: None,
384 span_name: span_name.to_string(),
385 span_kind: "INTERNAL".to_string(),
386 start_time: now,
387 end_time: now + chrono::Duration::milliseconds(100),
388 duration_ms: 100,
389 status_code: 0,
390 status_message: "OK".to_string(),
391 attributes,
392 events: vec![],
393 links: vec![],
394 label: None,
395 input: Value::Null,
396 output: Value::Null,
397 service_name: service_name.to_string(),
398 resource_attributes: vec![],
399 }
400 }
401
402 #[tokio::test]
403 async fn test_service_initialization() -> Result<(), TraceEngineError> {
404 cleanup();
405
406 let storage_settings = ObjectStorageSettings::default();
407 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
408 service.shutdown().await?;
409 cleanup();
410 Ok(())
411 }
412
413 #[tokio::test]
414 async fn test_dataframe_trace_write_single_batch() -> Result<(), TraceEngineError> {
415 cleanup();
416
417 let storage_settings = ObjectStorageSettings::default();
418 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
419
420 let (_trace_record, spans, _tags) = generate_trace_with_spans(3, 0);
421 info!("Test: writing {} spans", spans.len());
422
423 let first_trace_id = spans.first().unwrap().trace_id;
424 service.write_spans(spans).await?;
425
426 info!("Test: waiting for flush");
427 tokio::time::sleep(Duration::from_secs(5)).await;
428
429 let trace_id_bytes = first_trace_id.as_bytes();
430 let result_spans: Vec<TraceSpan> = service
431 .query_service
432 .get_trace_spans(Some(trace_id_bytes.as_slice()), None, None, None, None)
433 .await?;
434
435 assert!(
436 !result_spans.is_empty(),
437 "Expected at least 1 span but got 0"
438 );
439
440 service.shutdown().await?;
441 cleanup();
442 Ok(())
443 }
444
445 #[tokio::test]
448 async fn test_span_tree_sort_order() -> Result<(), TraceEngineError> {
449 cleanup();
450
451 let storage_settings = ObjectStorageSettings::default();
452 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
453
454 let trace_id = TraceId::from_bytes([1u8; 16]);
456 let root_span_id = SpanId::from_bytes([1u8; 8]);
457 let child_span_id = SpanId::from_bytes([2u8; 8]);
458 let grandchild_span_id = SpanId::from_bytes([3u8; 8]);
459
460 let root = make_span(
461 &trace_id,
462 root_span_id.clone(),
463 None,
464 "svc",
465 "root_op",
466 vec![],
467 );
468 let child = make_span(
469 &trace_id,
470 child_span_id.clone(),
471 Some(root_span_id.clone()),
472 "svc",
473 "child_op",
474 vec![],
475 );
476 let grandchild = make_span(
477 &trace_id,
478 grandchild_span_id.clone(),
479 Some(child_span_id.clone()),
480 "svc",
481 "grandchild_op",
482 vec![],
483 );
484
485 service.write_spans(vec![root, child, grandchild]).await?;
486 tokio::time::sleep(Duration::from_secs(4)).await;
487
488 let spans: Vec<TraceSpan> = service
489 .query_service
490 .get_trace_spans(Some(trace_id.as_bytes().as_slice()), None, None, None, None)
491 .await?;
492
493 assert_eq!(spans.len(), 3, "Expected 3 spans");
494
495 let by_order: Vec<&TraceSpan> = {
497 let mut v: Vec<&TraceSpan> = spans.iter().collect();
498 v.sort_by_key(|s| s.span_order);
499 v
500 };
501
502 assert_eq!(
503 by_order[0].span_name, "root_op",
504 "span_order=0 should be root"
505 );
506 assert_eq!(by_order[0].depth, 0);
507 assert_eq!(by_order[0].path.len(), 1);
508
509 assert_eq!(
510 by_order[1].span_name, "child_op",
511 "span_order=1 should be child"
512 );
513 assert_eq!(by_order[1].depth, 1);
514 assert_eq!(by_order[1].path.len(), 2);
515
516 assert_eq!(
517 by_order[2].span_name, "grandchild_op",
518 "span_order=2 should be grandchild"
519 );
520 assert_eq!(by_order[2].depth, 2);
521 assert_eq!(by_order[2].path.len(), 3);
522
523 let root_sid = root_span_id.to_hex();
525 for span in &spans {
526 assert_eq!(
527 span.root_span_id, root_sid,
528 "root_span_id mismatch for {}",
529 span.span_name
530 );
531 }
532
533 service.shutdown().await?;
534 cleanup();
535 Ok(())
536 }
537
538 #[tokio::test]
540 async fn test_trace_metrics_basic() -> Result<(), TraceEngineError> {
541 cleanup();
542
543 let storage_settings = ObjectStorageSettings::default();
544 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
545
546 let (_record, spans, _tags) = generate_trace_with_spans(5, 0);
547 service.write_spans(spans).await?;
548 tokio::time::sleep(Duration::from_secs(4)).await;
549
550 let start = Utc::now() - chrono::Duration::hours(1);
551 let end = Utc::now() + chrono::Duration::hours(1);
552
553 let metrics = service
554 .query_service
555 .get_trace_metrics(None, start, end, "hour", None, None)
556 .await?;
557
558 assert!(!metrics.is_empty(), "Expected at least one metric bucket");
559 assert!(metrics[0].trace_count > 0, "Expected non-zero trace count");
560
561 service.shutdown().await?;
562 cleanup();
563 Ok(())
564 }
565
566 #[tokio::test]
568 async fn test_trace_metrics_service_filter() -> Result<(), TraceEngineError> {
569 cleanup();
570
571 let storage_settings = ObjectStorageSettings::default();
572 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
573
574 let trace_a = TraceId::from_bytes([10u8; 16]);
576 let trace_b = TraceId::from_bytes([20u8; 16]);
577
578 let span_a = make_span(
579 &trace_a,
580 SpanId::from_bytes([10u8; 8]),
581 None,
582 "service_alpha",
583 "op_a",
584 vec![],
585 );
586 let span_b = make_span(
587 &trace_b,
588 SpanId::from_bytes([20u8; 8]),
589 None,
590 "service_beta",
591 "op_b",
592 vec![],
593 );
594
595 service.write_spans(vec![span_a, span_b]).await?;
596 tokio::time::sleep(Duration::from_secs(4)).await;
597
598 let start = Utc::now() - chrono::Duration::hours(1);
599 let end = Utc::now() + chrono::Duration::hours(1);
600
601 let metrics_alpha = service
603 .query_service
604 .get_trace_metrics(Some("service_alpha"), start, end, "hour", None, None)
605 .await?;
606
607 let metrics_beta = service
609 .query_service
610 .get_trace_metrics(Some("service_beta"), start, end, "hour", None, None)
611 .await?;
612
613 let alpha_count: i64 = metrics_alpha.iter().map(|m| m.trace_count).sum();
614 let beta_count: i64 = metrics_beta.iter().map(|m| m.trace_count).sum();
615
616 assert!(alpha_count > 0, "Expected non-zero count for service_alpha");
617 assert!(beta_count > 0, "Expected non-zero count for service_beta");
618
619 let metrics_none = service
621 .query_service
622 .get_trace_metrics(Some("nonexistent_svc"), start, end, "hour", None, None)
623 .await?;
624 assert!(
625 metrics_none.is_empty(),
626 "Expected no buckets for nonexistent service"
627 );
628
629 service.shutdown().await?;
630 cleanup();
631 Ok(())
632 }
633
634 #[tokio::test]
646 async fn test_query_plan_shows_filter_layers() -> Result<(), TraceEngineError> {
647 cleanup();
648
649 let storage_settings = ObjectStorageSettings::default();
650 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
651
652 let trace_id = TraceId::from_bytes([0xAA_u8; 16]);
655 let root_id = SpanId::from_bytes([0xAA_u8; 8]);
656 let child_id = SpanId::from_bytes([0xBB_u8; 8]);
657 let spans = vec![
658 make_span(&trace_id, root_id.clone(), None, "svc-a", "root-op", vec![]),
659 make_span(
660 &trace_id,
661 child_id.clone(),
662 Some(root_id),
663 "svc-a",
664 "child-op",
665 vec![],
666 ),
667 ];
668 service.write_spans_direct(spans).await?;
669
670 let now = Utc::now();
672 let start = now - chrono::Duration::hours(1);
673 let end = now + chrono::Duration::hours(1);
674
675 let df = service
677 .ctx
678 .table(SPAN_TABLE_NAME)
679 .await
680 .map_err(TraceEngineError::DatafusionError)?;
681
682 let df = df
684 .filter(
685 col(PARTITION_DATE_COL)
686 .gt_eq(date_lit(&start))
687 .and(col(PARTITION_DATE_COL).lt_eq(date_lit(&end))),
688 )
689 .map_err(TraceEngineError::DatafusionError)?;
690
691 let df = df
693 .filter(
694 col(START_TIME_COL)
695 .gt_eq(ts_lit(&start))
696 .and(col(START_TIME_COL).lt(ts_lit(&end))),
697 )
698 .map_err(TraceEngineError::DatafusionError)?;
699
700 let explain_df = df
702 .explain(false, false)
703 .map_err(TraceEngineError::DatafusionError)?;
704 let batches = explain_df
705 .collect()
706 .await
707 .map_err(TraceEngineError::DatafusionError)?;
708
709 let plan_text: String = batches
710 .iter()
711 .flat_map(|b| {
712 let plan_col = b.column_by_name("plan").unwrap();
713 let arr =
714 arrow::compute::cast(plan_col, &arrow::datatypes::DataType::Utf8).unwrap();
715 let s = arr
716 .as_any()
717 .downcast_ref::<arrow::array::StringArray>()
718 .unwrap();
719 (0..s.len())
720 .map(|i| s.value(i).to_string())
721 .collect::<Vec<_>>()
722 })
723 .collect::<Vec<_>>()
724 .join("\n");
725
726 assert!(
728 plan_text.contains("partition_date"),
729 "Partition filter not found in physical plan:\n{plan_text}"
730 );
731 assert!(
733 plan_text.contains("start_time"),
734 "Row-group time filter not found in physical plan:\n{plan_text}"
735 );
736
737 let fake_id = TraceId::from_bytes([0xFF_u8; 16]);
741 let result = service
742 .query_service
743 .get_trace_spans(
744 Some(fake_id.as_bytes()),
745 None,
746 Some(&start),
747 Some(&end),
748 None,
749 )
750 .await?;
751 assert!(
752 result.is_empty(),
753 "Expected 0 spans for nonexistent trace_id"
754 );
755
756 service.shutdown().await?;
757 cleanup();
758 Ok(())
759 }
760
761 #[tokio::test]
763 async fn test_trace_metrics_with_attribute_filter() -> Result<(), TraceEngineError> {
764 cleanup();
765
766 let storage_settings = ObjectStorageSettings::default();
767 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
768
769 let trace_kafka = TraceId::from_bytes([30u8; 16]);
770 let trace_http = TraceId::from_bytes([40u8; 16]);
771
772 let span_kafka = make_span(
774 &trace_kafka,
775 SpanId::from_bytes([30u8; 8]),
776 None,
777 "my_service",
778 "kafka_consumer",
779 vec![Attribute {
780 key: "component".to_string(),
781 value: Value::String("kafka".to_string()),
782 }],
783 );
784 let span_http = make_span(
786 &trace_http,
787 SpanId::from_bytes([40u8; 8]),
788 None,
789 "my_service",
790 "http_handler",
791 vec![Attribute {
792 key: "component".to_string(),
793 value: Value::String("http".to_string()),
794 }],
795 );
796
797 service.write_spans(vec![span_kafka, span_http]).await?;
798 tokio::time::sleep(Duration::from_secs(4)).await;
799
800 let start = Utc::now() - chrono::Duration::hours(1);
801 let end = Utc::now() + chrono::Duration::hours(1);
802 let kafka_filter = vec!["component:kafka".to_string()];
803
804 let filtered = service
806 .query_service
807 .get_trace_metrics(None, start, end, "hour", Some(&kafka_filter), None)
808 .await?;
809
810 let filtered_count: i64 = filtered.iter().map(|m| m.trace_count).sum();
811 assert!(
812 filtered_count > 0,
813 "Expected non-zero count with kafka attribute filter"
814 );
815
816 let unfiltered = service
818 .query_service
819 .get_trace_metrics(None, start, end, "hour", None, None)
820 .await?;
821 let unfiltered_count: i64 = unfiltered.iter().map(|m| m.trace_count).sum();
822 assert!(
823 unfiltered_count >= filtered_count,
824 "Unfiltered count ({}) should be >= filtered count ({})",
825 unfiltered_count,
826 filtered_count
827 );
828
829 service.shutdown().await?;
830 cleanup();
831 Ok(())
832 }
833
834 #[tokio::test]
844 async fn test_distributed_refresh() -> Result<(), TraceEngineError> {
845 cleanup();
846
847 let storage_settings = ObjectStorageSettings::default();
848
849 let writer = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
851
852 let reader = TraceSpanService::new(&storage_settings, 24, Some(2), None, 1).await?;
854
855 let trace_id = TraceId::from_bytes([0xDD_u8; 16]);
857 let span = make_span(
858 &trace_id,
859 SpanId::from_bytes([0xDD_u8; 8]),
860 None,
861 "distributed-svc",
862 "test-op",
863 vec![],
864 );
865 writer.write_spans_direct(vec![span]).await?;
866
867 tokio::time::sleep(Duration::from_secs(3)).await;
869
870 let results = reader
872 .query_service
873 .get_trace_spans(Some(trace_id.as_bytes()), None, None, None, None)
874 .await?;
875
876 assert!(
877 !results.is_empty(),
878 "Reader pod should see spans written by writer pod after refresh"
879 );
880
881 writer.shutdown().await?;
882 reader.shutdown().await?;
883 cleanup();
884 Ok(())
885 }
886
887 #[tokio::test]
894 async fn test_span_write_visibility_across_multiple_writes() -> Result<(), TraceEngineError> {
895 cleanup();
896
897 let storage_settings = ObjectStorageSettings::default();
898 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None, 10).await?;
899
900 let start = Utc::now() - chrono::Duration::hours(1);
901 let end = Utc::now() + chrono::Duration::hours(1);
902
903 let trace1 = TraceId::from_bytes([0xB0; 16]);
905 let spans1 = vec![
906 make_span(
907 &trace1,
908 SpanId::from_bytes([0xB0; 8]),
909 None,
910 "svc_vis",
911 "op1",
912 vec![],
913 ),
914 make_span(
915 &trace1,
916 SpanId::from_bytes([0xB1; 8]),
917 Some(SpanId::from_bytes([0xB0; 8])),
918 "svc_vis",
919 "op2",
920 vec![],
921 ),
922 ];
923 service.write_spans_direct(spans1).await?;
924
925 let result = service
926 .query_service
927 .get_trace_spans(
928 Some(trace1.as_bytes()),
929 None,
930 Some(&start),
931 Some(&end),
932 None,
933 )
934 .await?;
935 assert_eq!(
936 result.len(),
937 2,
938 "After write #1: expected 2 spans, got {}",
939 result.len()
940 );
941
942 let trace2 = TraceId::from_bytes([0xB2; 16]);
944 let spans2 = vec![
945 make_span(
946 &trace2,
947 SpanId::from_bytes([0xB2; 8]),
948 None,
949 "svc_vis",
950 "op3",
951 vec![],
952 ),
953 make_span(
954 &trace2,
955 SpanId::from_bytes([0xB3; 8]),
956 Some(SpanId::from_bytes([0xB2; 8])),
957 "svc_vis",
958 "op4",
959 vec![],
960 ),
961 ];
962 service.write_spans_direct(spans2).await?;
963
964 let result = service
965 .query_service
966 .get_trace_spans(
967 Some(trace2.as_bytes()),
968 None,
969 Some(&start),
970 Some(&end),
971 None,
972 )
973 .await?;
974 assert_eq!(
975 result.len(),
976 2,
977 "After write #2: expected 2 spans for trace2, got {} (stale snapshot?)",
978 result.len()
979 );
980
981 let trace3 = TraceId::from_bytes([0xB4; 16]);
983 let spans3 = vec![
984 make_span(
985 &trace3,
986 SpanId::from_bytes([0xB4; 8]),
987 None,
988 "svc_vis",
989 "op5",
990 vec![],
991 ),
992 make_span(
993 &trace3,
994 SpanId::from_bytes([0xB5; 8]),
995 Some(SpanId::from_bytes([0xB4; 8])),
996 "svc_vis",
997 "op6",
998 vec![],
999 ),
1000 ];
1001 service.write_spans_direct(spans3).await?;
1002
1003 let result = service
1004 .query_service
1005 .get_trace_spans(
1006 Some(trace3.as_bytes()),
1007 None,
1008 Some(&start),
1009 Some(&end),
1010 None,
1011 )
1012 .await?;
1013 assert_eq!(
1014 result.len(),
1015 2,
1016 "After write #3: expected 2 spans for trace3, got {} (stale snapshot?)",
1017 result.len()
1018 );
1019
1020 service.shutdown().await?;
1021 cleanup();
1022 Ok(())
1023 }
1024}