1use crate::error::TraceEngineError;
2use crate::parquet::tracing::engine::{TableCommand, TraceSpanDBEngine};
3use crate::parquet::tracing::queries::TraceQueries;
4use crate::storage::ObjectStore;
5use datafusion::prelude::SessionContext;
6use scouter_settings::ObjectStorageSettings;
7use scouter_types::TraceSpanRecord;
8use std::sync::Arc;
9use tokio::sync::mpsc;
10use tokio::time::{interval, Duration};
11use tracing::{debug, info};
12
13const FLUSH_INTERVAL_SECS: u64 = 5;
14
15static TRACE_SPAN_SERVICE: std::sync::RwLock<Option<Arc<TraceSpanService>>> =
21 std::sync::RwLock::new(None);
22
23pub async fn init_trace_span_service(
28 storage_settings: &ObjectStorageSettings,
29 compaction_interval_hours: u64,
30 flush_interval_secs: Option<u64>,
31 retention_days: Option<u32>,
32) -> Result<Arc<TraceSpanService>, TraceEngineError> {
33 let old_service = {
35 let guard = TRACE_SPAN_SERVICE.read().unwrap();
36 guard.clone()
37 };
38 if let Some(old) = old_service {
39 info!("Shutting down previous TraceSpanService before re-initialization");
40 old.signal_shutdown().await;
41 }
42
43 let service = Arc::new(
44 TraceSpanService::new(
45 storage_settings,
46 compaction_interval_hours,
47 flush_interval_secs,
48 retention_days,
49 )
50 .await?,
51 );
52
53 {
54 let mut guard = TRACE_SPAN_SERVICE.write().unwrap();
55 *guard = Some(service.clone());
56 }
57
58 info!("TraceSpanService global singleton initialized");
59 Ok(service)
60}
61
62pub fn get_trace_span_service() -> Option<Arc<TraceSpanService>> {
66 TRACE_SPAN_SERVICE.read().unwrap().clone()
67}
68
69pub struct TraceSpanService {
70 engine_tx: mpsc::Sender<TableCommand>,
71 span_tx: mpsc::Sender<Vec<TraceSpanRecord>>,
72 shutdown_tx: mpsc::Sender<()>,
73 engine_handle: tokio::task::JoinHandle<()>,
74 buffer_handle: tokio::task::JoinHandle<()>,
75 pub query_service: TraceQueries,
76 pub ctx: Arc<SessionContext>,
78 pub object_store: ObjectStore,
81}
82
83impl TraceSpanService {
84 pub async fn new(
93 storage_settings: &ObjectStorageSettings,
94 compaction_interval_hours: u64,
95 flush_interval_secs: Option<u64>,
96 retention_days: Option<u32>,
97 ) -> Result<Self, TraceEngineError> {
98 let buffer_size = storage_settings.trace_buffer_size();
99 let engine = TraceSpanDBEngine::new(storage_settings).await?;
100
101 info!(
102 "TraceSpanService initialized with buffer_size: {}",
103 buffer_size
104 );
105
106 let ctx = engine.ctx.clone();
107 let object_store = engine.object_store.clone();
108 let (engine_tx, engine_handle) =
109 engine.start_actor(compaction_interval_hours, retention_days);
110 let (span_tx, span_rx) = mpsc::channel::<Vec<TraceSpanRecord>>(100);
111 let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
112
113 let buffer_handle = Self::start_buffering_actor(
114 engine_tx.clone(),
115 span_rx,
116 shutdown_rx,
117 flush_interval_secs,
118 buffer_size,
119 );
120
121 Ok(TraceSpanService {
122 engine_tx,
123 span_tx,
124 shutdown_tx,
125 engine_handle,
126 buffer_handle,
127 query_service: TraceQueries::new(ctx.clone()),
128 ctx,
129 object_store,
130 })
131 }
132
133 fn start_buffering_actor(
134 engine_tx: mpsc::Sender<TableCommand>,
135 mut span_rx: mpsc::Receiver<Vec<TraceSpanRecord>>,
136 mut shutdown_rx: mpsc::Receiver<()>,
137 flush_interval_secs: Option<u64>,
138 buffer_size: usize,
139 ) -> tokio::task::JoinHandle<()> {
140 tokio::spawn(async move {
141 let mut buffer: Vec<TraceSpanRecord> = Vec::with_capacity(buffer_size);
142 let mut flush_ticker = interval(Duration::from_secs(
143 flush_interval_secs.unwrap_or(FLUSH_INTERVAL_SECS),
144 ));
145 flush_ticker.tick().await;
146
147 loop {
148 tokio::select! {
149 Some(spans) = span_rx.recv() => {
150 buffer.extend(spans);
151 if buffer.len() >= buffer_size {
152 Self::flush_buffer(&engine_tx, &mut buffer).await;
153 }
154 }
155 _ = flush_ticker.tick() => {
156 if !buffer.is_empty() {
157 info!("Flushing spans buffer with {} spans", buffer.len());
158 Self::flush_buffer(&engine_tx, &mut buffer).await;
159 }
160 }
161 _ = shutdown_rx.recv() => {
162 info!("Buffer actor received shutdown signal");
163 if !buffer.is_empty() {
164 info!("Flushing final {} spans before shutdown", buffer.len());
165 Self::flush_buffer(&engine_tx, &mut buffer).await;
166 }
167 break;
168 }
169 }
170 }
171
172 info!("Buffering actor shutting down");
173 })
174 }
175
176 async fn flush_buffer(
177 engine_tx: &mpsc::Sender<TableCommand>,
178 buffer: &mut Vec<TraceSpanRecord>,
179 ) {
180 if buffer.is_empty() {
181 return;
182 }
183
184 let capacity = buffer.capacity();
185 let spans_to_write = std::mem::replace(buffer, Vec::with_capacity(capacity));
186 let span_count = spans_to_write.len();
187
188 debug!("Sending write command to engine for {} spans", span_count);
189
190 let (tx, rx) = tokio::sync::oneshot::channel();
191
192 if let Err(e) = engine_tx
193 .send(TableCommand::Write {
194 spans: spans_to_write,
195 respond_to: tx,
196 })
197 .await
198 {
199 tracing::error!("Failed to send write command: {}", e);
200 return;
201 }
202
203 match rx.await {
204 Ok(Ok(())) => info!("Successfully flushed {} spans", span_count),
205 Ok(Err(e)) => tracing::error!("Write failed: {}", e),
206 Err(e) => tracing::error!("Failed to receive write response: {}", e),
207 }
208 }
209
210 pub async fn write_spans(&self, spans: Vec<TraceSpanRecord>) -> Result<(), TraceEngineError> {
215 self.span_tx
216 .send(spans)
217 .await
218 .map_err(|_| TraceEngineError::ChannelClosed)?;
219 Ok(())
220 }
221
222 pub async fn write_spans_direct(
229 &self,
230 spans: Vec<TraceSpanRecord>,
231 ) -> Result<(), TraceEngineError> {
232 let (tx, rx) = tokio::sync::oneshot::channel();
233 self.engine_tx
234 .send(TableCommand::Write {
235 spans,
236 respond_to: tx,
237 })
238 .await
239 .map_err(|_| TraceEngineError::ChannelClosed)?;
240 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
241 }
242
243 pub async fn optimize(&self) -> Result<(), TraceEngineError> {
246 let (tx, rx) = tokio::sync::oneshot::channel();
247
248 self.engine_tx
249 .send(TableCommand::Optimize { respond_to: tx })
250 .await
251 .map_err(|_| TraceEngineError::ChannelClosed)?;
252
253 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
254 }
255
256 pub async fn vacuum(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
257 let (tx, rx) = tokio::sync::oneshot::channel();
258
259 self.engine_tx
260 .send(TableCommand::Vacuum {
261 retention_hours,
262 respond_to: tx,
263 })
264 .await
265 .map_err(|_| TraceEngineError::ChannelClosed)?;
266
267 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
268 }
269
270 pub async fn expire(&self, retention_days: u32) -> Result<(), TraceEngineError> {
276 let cutoff_date =
277 (chrono::Utc::now() - chrono::Duration::days(retention_days as i64)).date_naive();
278
279 let (tx, rx) = tokio::sync::oneshot::channel();
281 self.engine_tx
282 .send(TableCommand::Expire {
283 cutoff_date,
284 respond_to: tx,
285 })
286 .await
287 .map_err(|_| TraceEngineError::ChannelClosed)?;
288 rx.await.map_err(|_| TraceEngineError::ChannelClosed)??;
289
290 self.vacuum(0).await
294 }
295
296 pub async fn signal_shutdown(&self) {
301 info!("TraceSpanService signaling shutdown");
302 let _ = self.shutdown_tx.send(()).await;
303 let _ = self.engine_tx.send(TableCommand::Shutdown).await;
304 }
305
306 pub async fn shutdown(self) -> Result<(), TraceEngineError> {
307 info!("TraceSpanService shutting down");
308
309 let _ = self.shutdown_tx.send(()).await;
310
311 if let Err(e) = self.buffer_handle.await {
312 tracing::error!("Buffer handle error: {}", e);
313 }
314
315 self.engine_tx
316 .send(TableCommand::Shutdown)
317 .await
318 .map_err(|_| TraceEngineError::ChannelClosed)?;
319
320 if let Err(e) = self.engine_handle.await {
321 tracing::error!("Engine handle error: {}", e);
322 }
323
324 info!("TraceSpanService shutdown complete");
325 Ok(())
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332 use crate::parquet::tracing::queries::{
333 date_lit, ts_lit, PARTITION_DATE_COL, SPAN_TABLE_NAME, START_TIME_COL,
334 };
335 use arrow_array::Array;
336 use chrono::Utc;
337 use datafusion::logical_expr::col;
338 use scouter_mocks::generate_trace_with_spans;
339 use scouter_settings::ObjectStorageSettings;
340 use scouter_types::sql::TraceSpan;
341 use scouter_types::{Attribute, SpanId, TraceId, TraceSpanRecord};
342 use serde_json::Value;
343 use tracing_subscriber;
344
345 fn cleanup() {
346 let _ = tracing_subscriber::fmt()
347 .with_max_level(tracing::Level::INFO)
348 .try_init();
349
350 let storage_settings = ObjectStorageSettings::default();
351 let current_dir = std::env::current_dir().unwrap();
352 let storage_path = current_dir.join(storage_settings.storage_root());
353 if storage_path.exists() {
354 let _ = std::fs::remove_dir_all(storage_path);
355 }
356 }
357
358 fn make_span(
360 trace_id: &TraceId,
361 span_id: SpanId,
362 parent_span_id: Option<SpanId>,
363 service_name: &str,
364 span_name: &str,
365 attributes: Vec<Attribute>,
366 ) -> TraceSpanRecord {
367 let now = Utc::now();
368 TraceSpanRecord {
369 created_at: now,
370 trace_id: *trace_id,
371 span_id,
372 parent_span_id,
373 flags: 1,
374 trace_state: String::new(),
375 scope_name: "test.scope".to_string(),
376 scope_version: None,
377 span_name: span_name.to_string(),
378 span_kind: "INTERNAL".to_string(),
379 start_time: now,
380 end_time: now + chrono::Duration::milliseconds(100),
381 duration_ms: 100,
382 status_code: 0,
383 status_message: "OK".to_string(),
384 attributes,
385 events: vec![],
386 links: vec![],
387 label: None,
388 input: Value::Null,
389 output: Value::Null,
390 service_name: service_name.to_string(),
391 resource_attributes: vec![],
392 }
393 }
394
395 #[tokio::test]
396 async fn test_service_initialization() -> Result<(), TraceEngineError> {
397 cleanup();
398
399 let storage_settings = ObjectStorageSettings::default();
400 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
401 service.shutdown().await?;
402 cleanup();
403 Ok(())
404 }
405
406 #[tokio::test]
407 async fn test_dataframe_trace_write_single_batch() -> Result<(), TraceEngineError> {
408 cleanup();
409
410 let storage_settings = ObjectStorageSettings::default();
411 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
412
413 let (_trace_record, spans, _tags) = generate_trace_with_spans(3, 0);
414 info!("Test: writing {} spans", spans.len());
415
416 let first_trace_id = spans.first().unwrap().trace_id;
417 service.write_spans(spans).await?;
418
419 info!("Test: waiting for flush");
420 tokio::time::sleep(Duration::from_secs(5)).await;
421
422 let trace_id_bytes = first_trace_id.as_bytes();
423 let result_spans: Vec<TraceSpan> = service
424 .query_service
425 .get_trace_spans(Some(trace_id_bytes.as_slice()), None, None, None, None)
426 .await?;
427
428 assert!(
429 !result_spans.is_empty(),
430 "Expected at least 1 span but got 0"
431 );
432
433 service.shutdown().await?;
434 cleanup();
435 Ok(())
436 }
437
438 #[tokio::test]
441 async fn test_span_tree_sort_order() -> Result<(), TraceEngineError> {
442 cleanup();
443
444 let storage_settings = ObjectStorageSettings::default();
445 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
446
447 let trace_id = TraceId::from_bytes([1u8; 16]);
449 let root_span_id = SpanId::from_bytes([1u8; 8]);
450 let child_span_id = SpanId::from_bytes([2u8; 8]);
451 let grandchild_span_id = SpanId::from_bytes([3u8; 8]);
452
453 let root = make_span(
454 &trace_id,
455 root_span_id.clone(),
456 None,
457 "svc",
458 "root_op",
459 vec![],
460 );
461 let child = make_span(
462 &trace_id,
463 child_span_id.clone(),
464 Some(root_span_id.clone()),
465 "svc",
466 "child_op",
467 vec![],
468 );
469 let grandchild = make_span(
470 &trace_id,
471 grandchild_span_id.clone(),
472 Some(child_span_id.clone()),
473 "svc",
474 "grandchild_op",
475 vec![],
476 );
477
478 service.write_spans(vec![root, child, grandchild]).await?;
479 tokio::time::sleep(Duration::from_secs(4)).await;
480
481 let spans: Vec<TraceSpan> = service
482 .query_service
483 .get_trace_spans(Some(trace_id.as_bytes().as_slice()), None, None, None, None)
484 .await?;
485
486 assert_eq!(spans.len(), 3, "Expected 3 spans");
487
488 let by_order: Vec<&TraceSpan> = {
490 let mut v: Vec<&TraceSpan> = spans.iter().collect();
491 v.sort_by_key(|s| s.span_order);
492 v
493 };
494
495 assert_eq!(
496 by_order[0].span_name, "root_op",
497 "span_order=0 should be root"
498 );
499 assert_eq!(by_order[0].depth, 0);
500 assert_eq!(by_order[0].path.len(), 1);
501
502 assert_eq!(
503 by_order[1].span_name, "child_op",
504 "span_order=1 should be child"
505 );
506 assert_eq!(by_order[1].depth, 1);
507 assert_eq!(by_order[1].path.len(), 2);
508
509 assert_eq!(
510 by_order[2].span_name, "grandchild_op",
511 "span_order=2 should be grandchild"
512 );
513 assert_eq!(by_order[2].depth, 2);
514 assert_eq!(by_order[2].path.len(), 3);
515
516 let root_sid = root_span_id.to_hex();
518 for span in &spans {
519 assert_eq!(
520 span.root_span_id, root_sid,
521 "root_span_id mismatch for {}",
522 span.span_name
523 );
524 }
525
526 service.shutdown().await?;
527 cleanup();
528 Ok(())
529 }
530
531 #[tokio::test]
533 async fn test_trace_metrics_basic() -> Result<(), TraceEngineError> {
534 cleanup();
535
536 let storage_settings = ObjectStorageSettings::default();
537 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
538
539 let (_record, spans, _tags) = generate_trace_with_spans(5, 0);
540 service.write_spans(spans).await?;
541 tokio::time::sleep(Duration::from_secs(4)).await;
542
543 let start = Utc::now() - chrono::Duration::hours(1);
544 let end = Utc::now() + chrono::Duration::hours(1);
545
546 let metrics = service
547 .query_service
548 .get_trace_metrics(None, start, end, "hour", None, None)
549 .await?;
550
551 assert!(!metrics.is_empty(), "Expected at least one metric bucket");
552 assert!(metrics[0].trace_count > 0, "Expected non-zero trace count");
553
554 service.shutdown().await?;
555 cleanup();
556 Ok(())
557 }
558
559 #[tokio::test]
561 async fn test_trace_metrics_service_filter() -> Result<(), TraceEngineError> {
562 cleanup();
563
564 let storage_settings = ObjectStorageSettings::default();
565 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
566
567 let trace_a = TraceId::from_bytes([10u8; 16]);
569 let trace_b = TraceId::from_bytes([20u8; 16]);
570
571 let span_a = make_span(
572 &trace_a,
573 SpanId::from_bytes([10u8; 8]),
574 None,
575 "service_alpha",
576 "op_a",
577 vec![],
578 );
579 let span_b = make_span(
580 &trace_b,
581 SpanId::from_bytes([20u8; 8]),
582 None,
583 "service_beta",
584 "op_b",
585 vec![],
586 );
587
588 service.write_spans(vec![span_a, span_b]).await?;
589 tokio::time::sleep(Duration::from_secs(4)).await;
590
591 let start = Utc::now() - chrono::Duration::hours(1);
592 let end = Utc::now() + chrono::Duration::hours(1);
593
594 let metrics_alpha = service
596 .query_service
597 .get_trace_metrics(Some("service_alpha"), start, end, "hour", None, None)
598 .await?;
599
600 let metrics_beta = service
602 .query_service
603 .get_trace_metrics(Some("service_beta"), start, end, "hour", None, None)
604 .await?;
605
606 let alpha_count: i64 = metrics_alpha.iter().map(|m| m.trace_count).sum();
607 let beta_count: i64 = metrics_beta.iter().map(|m| m.trace_count).sum();
608
609 assert!(alpha_count > 0, "Expected non-zero count for service_alpha");
610 assert!(beta_count > 0, "Expected non-zero count for service_beta");
611
612 let metrics_none = service
614 .query_service
615 .get_trace_metrics(Some("nonexistent_svc"), start, end, "hour", None, None)
616 .await?;
617 assert!(
618 metrics_none.is_empty(),
619 "Expected no buckets for nonexistent service"
620 );
621
622 service.shutdown().await?;
623 cleanup();
624 Ok(())
625 }
626
627 #[tokio::test]
639 async fn test_query_plan_shows_filter_layers() -> Result<(), TraceEngineError> {
640 cleanup();
641
642 let storage_settings = ObjectStorageSettings::default();
643 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
644
645 let trace_id = TraceId::from_bytes([0xAA_u8; 16]);
648 let root_id = SpanId::from_bytes([0xAA_u8; 8]);
649 let child_id = SpanId::from_bytes([0xBB_u8; 8]);
650 let spans = vec![
651 make_span(&trace_id, root_id.clone(), None, "svc-a", "root-op", vec![]),
652 make_span(
653 &trace_id,
654 child_id.clone(),
655 Some(root_id),
656 "svc-a",
657 "child-op",
658 vec![],
659 ),
660 ];
661 service.write_spans_direct(spans).await?;
662
663 let now = Utc::now();
665 let start = now - chrono::Duration::hours(1);
666 let end = now + chrono::Duration::hours(1);
667
668 let df = service
670 .ctx
671 .table(SPAN_TABLE_NAME)
672 .await
673 .map_err(TraceEngineError::DatafusionError)?;
674
675 let df = df
677 .filter(
678 col(PARTITION_DATE_COL)
679 .gt_eq(date_lit(&start))
680 .and(col(PARTITION_DATE_COL).lt_eq(date_lit(&end))),
681 )
682 .map_err(TraceEngineError::DatafusionError)?;
683
684 let df = df
686 .filter(
687 col(START_TIME_COL)
688 .gt_eq(ts_lit(&start))
689 .and(col(START_TIME_COL).lt(ts_lit(&end))),
690 )
691 .map_err(TraceEngineError::DatafusionError)?;
692
693 let explain_df = df
695 .explain(false, false)
696 .map_err(TraceEngineError::DatafusionError)?;
697 let batches = explain_df
698 .collect()
699 .await
700 .map_err(TraceEngineError::DatafusionError)?;
701
702 let plan_text: String = batches
703 .iter()
704 .flat_map(|b| {
705 let plan_col = b.column_by_name("plan").unwrap();
706 let arr =
707 arrow::compute::cast(plan_col, &arrow::datatypes::DataType::Utf8).unwrap();
708 let s = arr
709 .as_any()
710 .downcast_ref::<arrow::array::StringArray>()
711 .unwrap();
712 (0..s.len())
713 .map(|i| s.value(i).to_string())
714 .collect::<Vec<_>>()
715 })
716 .collect::<Vec<_>>()
717 .join("\n");
718
719 assert!(
721 plan_text.contains("partition_date"),
722 "Partition filter not found in physical plan:\n{plan_text}"
723 );
724 assert!(
726 plan_text.contains("start_time"),
727 "Row-group time filter not found in physical plan:\n{plan_text}"
728 );
729
730 let fake_id = TraceId::from_bytes([0xFF_u8; 16]);
734 let result = service
735 .query_service
736 .get_trace_spans(
737 Some(fake_id.as_bytes()),
738 None,
739 Some(&start),
740 Some(&end),
741 None,
742 )
743 .await?;
744 assert!(
745 result.is_empty(),
746 "Expected 0 spans for nonexistent trace_id"
747 );
748
749 service.shutdown().await?;
750 cleanup();
751 Ok(())
752 }
753
754 #[tokio::test]
756 async fn test_trace_metrics_with_attribute_filter() -> Result<(), TraceEngineError> {
757 cleanup();
758
759 let storage_settings = ObjectStorageSettings::default();
760 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
761
762 let trace_kafka = TraceId::from_bytes([30u8; 16]);
763 let trace_http = TraceId::from_bytes([40u8; 16]);
764
765 let span_kafka = make_span(
767 &trace_kafka,
768 SpanId::from_bytes([30u8; 8]),
769 None,
770 "my_service",
771 "kafka_consumer",
772 vec![Attribute {
773 key: "component".to_string(),
774 value: Value::String("kafka".to_string()),
775 }],
776 );
777 let span_http = make_span(
779 &trace_http,
780 SpanId::from_bytes([40u8; 8]),
781 None,
782 "my_service",
783 "http_handler",
784 vec![Attribute {
785 key: "component".to_string(),
786 value: Value::String("http".to_string()),
787 }],
788 );
789
790 service.write_spans(vec![span_kafka, span_http]).await?;
791 tokio::time::sleep(Duration::from_secs(4)).await;
792
793 let start = Utc::now() - chrono::Duration::hours(1);
794 let end = Utc::now() + chrono::Duration::hours(1);
795 let kafka_filter = vec!["component:kafka".to_string()];
796
797 let filtered = service
799 .query_service
800 .get_trace_metrics(None, start, end, "hour", Some(&kafka_filter), None)
801 .await?;
802
803 let filtered_count: i64 = filtered.iter().map(|m| m.trace_count).sum();
804 assert!(
805 filtered_count > 0,
806 "Expected non-zero count with kafka attribute filter"
807 );
808
809 let unfiltered = service
811 .query_service
812 .get_trace_metrics(None, start, end, "hour", None, None)
813 .await?;
814 let unfiltered_count: i64 = unfiltered.iter().map(|m| m.trace_count).sum();
815 assert!(
816 unfiltered_count >= filtered_count,
817 "Unfiltered count ({}) should be >= filtered count ({})",
818 unfiltered_count,
819 filtered_count
820 );
821
822 service.shutdown().await?;
823 cleanup();
824 Ok(())
825 }
826
827 #[tokio::test]
834 async fn test_span_write_visibility_across_multiple_writes() -> Result<(), TraceEngineError> {
835 cleanup();
836
837 let storage_settings = ObjectStorageSettings::default();
838 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
839
840 let start = Utc::now() - chrono::Duration::hours(1);
841 let end = Utc::now() + chrono::Duration::hours(1);
842
843 let trace1 = TraceId::from_bytes([0xB0; 16]);
845 let spans1 = vec![
846 make_span(
847 &trace1,
848 SpanId::from_bytes([0xB0; 8]),
849 None,
850 "svc_vis",
851 "op1",
852 vec![],
853 ),
854 make_span(
855 &trace1,
856 SpanId::from_bytes([0xB1; 8]),
857 Some(SpanId::from_bytes([0xB0; 8])),
858 "svc_vis",
859 "op2",
860 vec![],
861 ),
862 ];
863 service.write_spans_direct(spans1).await?;
864
865 let result = service
866 .query_service
867 .get_trace_spans(
868 Some(trace1.as_bytes()),
869 None,
870 Some(&start),
871 Some(&end),
872 None,
873 )
874 .await?;
875 assert_eq!(
876 result.len(),
877 2,
878 "After write #1: expected 2 spans, got {}",
879 result.len()
880 );
881
882 let trace2 = TraceId::from_bytes([0xB2; 16]);
884 let spans2 = vec![
885 make_span(
886 &trace2,
887 SpanId::from_bytes([0xB2; 8]),
888 None,
889 "svc_vis",
890 "op3",
891 vec![],
892 ),
893 make_span(
894 &trace2,
895 SpanId::from_bytes([0xB3; 8]),
896 Some(SpanId::from_bytes([0xB2; 8])),
897 "svc_vis",
898 "op4",
899 vec![],
900 ),
901 ];
902 service.write_spans_direct(spans2).await?;
903
904 let result = service
905 .query_service
906 .get_trace_spans(
907 Some(trace2.as_bytes()),
908 None,
909 Some(&start),
910 Some(&end),
911 None,
912 )
913 .await?;
914 assert_eq!(
915 result.len(),
916 2,
917 "After write #2: expected 2 spans for trace2, got {} (stale snapshot?)",
918 result.len()
919 );
920
921 let trace3 = TraceId::from_bytes([0xB4; 16]);
923 let spans3 = vec![
924 make_span(
925 &trace3,
926 SpanId::from_bytes([0xB4; 8]),
927 None,
928 "svc_vis",
929 "op5",
930 vec![],
931 ),
932 make_span(
933 &trace3,
934 SpanId::from_bytes([0xB5; 8]),
935 Some(SpanId::from_bytes([0xB4; 8])),
936 "svc_vis",
937 "op6",
938 vec![],
939 ),
940 ];
941 service.write_spans_direct(spans3).await?;
942
943 let result = service
944 .query_service
945 .get_trace_spans(
946 Some(trace3.as_bytes()),
947 None,
948 Some(&start),
949 Some(&end),
950 None,
951 )
952 .await?;
953 assert_eq!(
954 result.len(),
955 2,
956 "After write #3: expected 2 spans for trace3, got {} (stale snapshot?)",
957 result.len()
958 );
959
960 service.shutdown().await?;
961 cleanup();
962 Ok(())
963 }
964}