1use crate::error::TraceEngineError;
2use crate::parquet::tracing::engine::{TableCommand, TraceSpanDBEngine};
3use crate::parquet::tracing::queries::TraceQueries;
4use datafusion::prelude::SessionContext;
5use scouter_settings::ObjectStorageSettings;
6use scouter_types::TraceSpanRecord;
7use std::sync::Arc;
8use tokio::sync::mpsc;
9use tokio::time::{interval, Duration};
10use tracing::{debug, info};
11
12const FLUSH_INTERVAL_SECS: u64 = 5;
13
14static TRACE_SPAN_SERVICE: std::sync::RwLock<Option<Arc<TraceSpanService>>> =
20 std::sync::RwLock::new(None);
21
22pub async fn init_trace_span_service(
27 storage_settings: &ObjectStorageSettings,
28 compaction_interval_hours: u64,
29 flush_interval_secs: Option<u64>,
30 retention_days: Option<u32>,
31) -> Result<Arc<TraceSpanService>, TraceEngineError> {
32 let old_service = {
34 let guard = TRACE_SPAN_SERVICE.read().unwrap();
35 guard.clone()
36 };
37 if let Some(old) = old_service {
38 info!("Shutting down previous TraceSpanService before re-initialization");
39 old.signal_shutdown().await;
40 }
41
42 let service = Arc::new(
43 TraceSpanService::new(
44 storage_settings,
45 compaction_interval_hours,
46 flush_interval_secs,
47 retention_days,
48 )
49 .await?,
50 );
51
52 {
53 let mut guard = TRACE_SPAN_SERVICE.write().unwrap();
54 *guard = Some(service.clone());
55 }
56
57 info!("TraceSpanService global singleton initialized");
58 Ok(service)
59}
60
61pub fn get_trace_span_service() -> Option<Arc<TraceSpanService>> {
65 TRACE_SPAN_SERVICE.read().unwrap().clone()
66}
67
68pub struct TraceSpanService {
69 engine_tx: mpsc::Sender<TableCommand>,
70 span_tx: mpsc::Sender<Vec<TraceSpanRecord>>,
71 shutdown_tx: mpsc::Sender<()>,
72 engine_handle: tokio::task::JoinHandle<()>,
73 buffer_handle: tokio::task::JoinHandle<()>,
74 pub query_service: TraceQueries,
75 pub ctx: Arc<SessionContext>,
77}
78
79impl TraceSpanService {
80 pub async fn new(
89 storage_settings: &ObjectStorageSettings,
90 compaction_interval_hours: u64,
91 flush_interval_secs: Option<u64>,
92 retention_days: Option<u32>,
93 ) -> Result<Self, TraceEngineError> {
94 let buffer_size = storage_settings.trace_buffer_size();
95 let engine = TraceSpanDBEngine::new(storage_settings).await?;
96
97 info!(
98 "TraceSpanService initialized with buffer_size: {}",
99 buffer_size
100 );
101
102 let ctx = engine.ctx.clone();
103 let (engine_tx, engine_handle) =
104 engine.start_actor(compaction_interval_hours, retention_days);
105 let (span_tx, span_rx) = mpsc::channel::<Vec<TraceSpanRecord>>(100);
106 let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
107
108 let buffer_handle = Self::start_buffering_actor(
109 engine_tx.clone(),
110 span_rx,
111 shutdown_rx,
112 flush_interval_secs,
113 buffer_size,
114 );
115
116 Ok(TraceSpanService {
117 engine_tx,
118 span_tx,
119 shutdown_tx,
120 engine_handle,
121 buffer_handle,
122 query_service: TraceQueries::new(ctx.clone()),
123 ctx,
124 })
125 }
126
127 fn start_buffering_actor(
128 engine_tx: mpsc::Sender<TableCommand>,
129 mut span_rx: mpsc::Receiver<Vec<TraceSpanRecord>>,
130 mut shutdown_rx: mpsc::Receiver<()>,
131 flush_interval_secs: Option<u64>,
132 buffer_size: usize,
133 ) -> tokio::task::JoinHandle<()> {
134 tokio::spawn(async move {
135 let mut buffer: Vec<TraceSpanRecord> = Vec::with_capacity(buffer_size);
136 let mut flush_ticker = interval(Duration::from_secs(
137 flush_interval_secs.unwrap_or(FLUSH_INTERVAL_SECS),
138 ));
139 flush_ticker.tick().await;
140
141 loop {
142 tokio::select! {
143 Some(spans) = span_rx.recv() => {
144 buffer.extend(spans);
145 if buffer.len() >= buffer_size {
146 Self::flush_buffer(&engine_tx, &mut buffer).await;
147 }
148 }
149 _ = flush_ticker.tick() => {
150 if !buffer.is_empty() {
151 info!("Flushing spans buffer with {} spans", buffer.len());
152 Self::flush_buffer(&engine_tx, &mut buffer).await;
153 }
154 }
155 _ = shutdown_rx.recv() => {
156 info!("Buffer actor received shutdown signal");
157 if !buffer.is_empty() {
158 info!("Flushing final {} spans before shutdown", buffer.len());
159 Self::flush_buffer(&engine_tx, &mut buffer).await;
160 }
161 break;
162 }
163 }
164 }
165
166 info!("Buffering actor shutting down");
167 })
168 }
169
170 async fn flush_buffer(
171 engine_tx: &mpsc::Sender<TableCommand>,
172 buffer: &mut Vec<TraceSpanRecord>,
173 ) {
174 if buffer.is_empty() {
175 return;
176 }
177
178 let capacity = buffer.capacity();
179 let spans_to_write = std::mem::replace(buffer, Vec::with_capacity(capacity));
180 let span_count = spans_to_write.len();
181
182 debug!("Sending write command to engine for {} spans", span_count);
183
184 let (tx, rx) = tokio::sync::oneshot::channel();
185
186 if let Err(e) = engine_tx
187 .send(TableCommand::Write {
188 spans: spans_to_write,
189 respond_to: tx,
190 })
191 .await
192 {
193 tracing::error!("Failed to send write command: {}", e);
194 return;
195 }
196
197 match rx.await {
198 Ok(Ok(())) => info!("Successfully flushed {} spans", span_count),
199 Ok(Err(e)) => tracing::error!("Write failed: {}", e),
200 Err(e) => tracing::error!("Failed to receive write response: {}", e),
201 }
202 }
203
204 pub async fn write_spans(&self, spans: Vec<TraceSpanRecord>) -> Result<(), TraceEngineError> {
209 self.span_tx
210 .send(spans)
211 .await
212 .map_err(|_| TraceEngineError::ChannelClosed)?;
213 Ok(())
214 }
215
216 pub async fn write_spans_direct(
223 &self,
224 spans: Vec<TraceSpanRecord>,
225 ) -> Result<(), TraceEngineError> {
226 let (tx, rx) = tokio::sync::oneshot::channel();
227 self.engine_tx
228 .send(TableCommand::Write {
229 spans,
230 respond_to: tx,
231 })
232 .await
233 .map_err(|_| TraceEngineError::ChannelClosed)?;
234 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
235 }
236
237 pub async fn optimize(&self) -> Result<(), TraceEngineError> {
240 let (tx, rx) = tokio::sync::oneshot::channel();
241
242 self.engine_tx
243 .send(TableCommand::Optimize { respond_to: tx })
244 .await
245 .map_err(|_| TraceEngineError::ChannelClosed)?;
246
247 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
248 }
249
250 pub async fn vacuum(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
251 let (tx, rx) = tokio::sync::oneshot::channel();
252
253 self.engine_tx
254 .send(TableCommand::Vacuum {
255 retention_hours,
256 respond_to: tx,
257 })
258 .await
259 .map_err(|_| TraceEngineError::ChannelClosed)?;
260
261 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
262 }
263
264 pub async fn expire(&self, retention_days: u32) -> Result<(), TraceEngineError> {
270 let cutoff_date =
271 (chrono::Utc::now() - chrono::Duration::days(retention_days as i64)).date_naive();
272
273 let (tx, rx) = tokio::sync::oneshot::channel();
275 self.engine_tx
276 .send(TableCommand::Expire {
277 cutoff_date,
278 respond_to: tx,
279 })
280 .await
281 .map_err(|_| TraceEngineError::ChannelClosed)?;
282 rx.await.map_err(|_| TraceEngineError::ChannelClosed)??;
283
284 self.vacuum(0).await
288 }
289
290 pub async fn signal_shutdown(&self) {
295 info!("TraceSpanService signaling shutdown");
296 let _ = self.shutdown_tx.send(()).await;
297 let _ = self.engine_tx.send(TableCommand::Shutdown).await;
298 }
299
300 pub async fn shutdown(self) -> Result<(), TraceEngineError> {
301 info!("TraceSpanService shutting down");
302
303 let _ = self.shutdown_tx.send(()).await;
304
305 if let Err(e) = self.buffer_handle.await {
306 tracing::error!("Buffer handle error: {}", e);
307 }
308
309 self.engine_tx
310 .send(TableCommand::Shutdown)
311 .await
312 .map_err(|_| TraceEngineError::ChannelClosed)?;
313
314 if let Err(e) = self.engine_handle.await {
315 tracing::error!("Engine handle error: {}", e);
316 }
317
318 info!("TraceSpanService shutdown complete");
319 Ok(())
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326 use crate::parquet::tracing::queries::{
327 date_lit, ts_lit, PARTITION_DATE_COL, SPAN_TABLE_NAME, START_TIME_COL,
328 };
329 use arrow_array::Array;
330 use chrono::Utc;
331 use datafusion::logical_expr::col;
332 use scouter_mocks::generate_trace_with_spans;
333 use scouter_settings::ObjectStorageSettings;
334 use scouter_types::sql::TraceSpan;
335 use scouter_types::{Attribute, SpanId, TraceId, TraceSpanRecord};
336 use serde_json::Value;
337 use tracing_subscriber;
338
339 fn cleanup() {
340 let _ = tracing_subscriber::fmt()
341 .with_max_level(tracing::Level::INFO)
342 .try_init();
343
344 let storage_settings = ObjectStorageSettings::default();
345 let current_dir = std::env::current_dir().unwrap();
346 let storage_path = current_dir.join(storage_settings.storage_root());
347 if storage_path.exists() {
348 std::fs::remove_dir_all(storage_path).unwrap();
349 }
350 }
351
352 fn make_span(
354 trace_id: &TraceId,
355 span_id: SpanId,
356 parent_span_id: Option<SpanId>,
357 service_name: &str,
358 span_name: &str,
359 attributes: Vec<Attribute>,
360 ) -> TraceSpanRecord {
361 let now = Utc::now();
362 TraceSpanRecord {
363 created_at: now,
364 trace_id: trace_id.clone(),
365 span_id,
366 parent_span_id,
367 flags: 1,
368 trace_state: String::new(),
369 scope_name: "test.scope".to_string(),
370 scope_version: None,
371 span_name: span_name.to_string(),
372 span_kind: "INTERNAL".to_string(),
373 start_time: now,
374 end_time: now + chrono::Duration::milliseconds(100),
375 duration_ms: 100,
376 status_code: 0,
377 status_message: "OK".to_string(),
378 attributes,
379 events: vec![],
380 links: vec![],
381 label: None,
382 input: Value::Null,
383 output: Value::Null,
384 service_name: service_name.to_string(),
385 resource_attributes: vec![],
386 }
387 }
388
389 #[tokio::test]
390 async fn test_service_initialization() -> Result<(), TraceEngineError> {
391 cleanup();
392
393 let storage_settings = ObjectStorageSettings::default();
394 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
395 service.shutdown().await?;
396 cleanup();
397 Ok(())
398 }
399
400 #[tokio::test]
401 async fn test_dataframe_trace_write_single_batch() -> Result<(), TraceEngineError> {
402 cleanup();
403
404 let storage_settings = ObjectStorageSettings::default();
405 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
406
407 let (_trace_record, spans, _tags) = generate_trace_with_spans(3, 0);
408 info!("Test: writing {} spans", spans.len());
409
410 let first_trace_id = spans.first().unwrap().trace_id.clone();
411 service.write_spans(spans).await?;
412
413 info!("Test: waiting for flush");
414 tokio::time::sleep(Duration::from_secs(5)).await;
415
416 let trace_id_bytes = first_trace_id.as_bytes();
417 let result_spans: Vec<TraceSpan> = service
418 .query_service
419 .get_trace_spans(Some(trace_id_bytes.as_slice()), None, None, None, None)
420 .await?;
421
422 assert!(
423 !result_spans.is_empty(),
424 "Expected at least 1 span but got 0"
425 );
426
427 service.shutdown().await?;
428 cleanup();
429 Ok(())
430 }
431
432 #[tokio::test]
435 async fn test_span_tree_sort_order() -> Result<(), TraceEngineError> {
436 cleanup();
437
438 let storage_settings = ObjectStorageSettings::default();
439 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
440
441 let trace_id = TraceId::from_bytes([1u8; 16]);
443 let root_span_id = SpanId::from_bytes([1u8; 8]);
444 let child_span_id = SpanId::from_bytes([2u8; 8]);
445 let grandchild_span_id = SpanId::from_bytes([3u8; 8]);
446
447 let root = make_span(
448 &trace_id,
449 root_span_id.clone(),
450 None,
451 "svc",
452 "root_op",
453 vec![],
454 );
455 let child = make_span(
456 &trace_id,
457 child_span_id.clone(),
458 Some(root_span_id.clone()),
459 "svc",
460 "child_op",
461 vec![],
462 );
463 let grandchild = make_span(
464 &trace_id,
465 grandchild_span_id.clone(),
466 Some(child_span_id.clone()),
467 "svc",
468 "grandchild_op",
469 vec![],
470 );
471
472 service.write_spans(vec![root, child, grandchild]).await?;
473 tokio::time::sleep(Duration::from_secs(4)).await;
474
475 let spans: Vec<TraceSpan> = service
476 .query_service
477 .get_trace_spans(Some(trace_id.as_bytes().as_slice()), None, None, None, None)
478 .await?;
479
480 assert_eq!(spans.len(), 3, "Expected 3 spans");
481
482 let by_order: Vec<&TraceSpan> = {
484 let mut v: Vec<&TraceSpan> = spans.iter().collect();
485 v.sort_by_key(|s| s.span_order);
486 v
487 };
488
489 assert_eq!(
490 by_order[0].span_name, "root_op",
491 "span_order=0 should be root"
492 );
493 assert_eq!(by_order[0].depth, 0);
494 assert_eq!(by_order[0].path.len(), 1);
495
496 assert_eq!(
497 by_order[1].span_name, "child_op",
498 "span_order=1 should be child"
499 );
500 assert_eq!(by_order[1].depth, 1);
501 assert_eq!(by_order[1].path.len(), 2);
502
503 assert_eq!(
504 by_order[2].span_name, "grandchild_op",
505 "span_order=2 should be grandchild"
506 );
507 assert_eq!(by_order[2].depth, 2);
508 assert_eq!(by_order[2].path.len(), 3);
509
510 let root_sid = root_span_id.to_hex();
512 for span in &spans {
513 assert_eq!(
514 span.root_span_id, root_sid,
515 "root_span_id mismatch for {}",
516 span.span_name
517 );
518 }
519
520 service.shutdown().await?;
521 cleanup();
522 Ok(())
523 }
524
525 #[tokio::test]
527 async fn test_trace_metrics_basic() -> Result<(), TraceEngineError> {
528 cleanup();
529
530 let storage_settings = ObjectStorageSettings::default();
531 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
532
533 let (_record, spans, _tags) = generate_trace_with_spans(5, 0);
534 service.write_spans(spans).await?;
535 tokio::time::sleep(Duration::from_secs(4)).await;
536
537 let start = Utc::now() - chrono::Duration::hours(1);
538 let end = Utc::now() + chrono::Duration::hours(1);
539
540 let metrics = service
541 .query_service
542 .get_trace_metrics(None, start, end, "hour", None, None)
543 .await?;
544
545 assert!(!metrics.is_empty(), "Expected at least one metric bucket");
546 assert!(metrics[0].trace_count > 0, "Expected non-zero trace count");
547
548 service.shutdown().await?;
549 cleanup();
550 Ok(())
551 }
552
553 #[tokio::test]
555 async fn test_trace_metrics_service_filter() -> Result<(), TraceEngineError> {
556 cleanup();
557
558 let storage_settings = ObjectStorageSettings::default();
559 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
560
561 let trace_a = TraceId::from_bytes([10u8; 16]);
563 let trace_b = TraceId::from_bytes([20u8; 16]);
564
565 let span_a = make_span(
566 &trace_a,
567 SpanId::from_bytes([10u8; 8]),
568 None,
569 "service_alpha",
570 "op_a",
571 vec![],
572 );
573 let span_b = make_span(
574 &trace_b,
575 SpanId::from_bytes([20u8; 8]),
576 None,
577 "service_beta",
578 "op_b",
579 vec![],
580 );
581
582 service.write_spans(vec![span_a, span_b]).await?;
583 tokio::time::sleep(Duration::from_secs(4)).await;
584
585 let start = Utc::now() - chrono::Duration::hours(1);
586 let end = Utc::now() + chrono::Duration::hours(1);
587
588 let metrics_alpha = service
590 .query_service
591 .get_trace_metrics(Some("service_alpha"), start, end, "hour", None, None)
592 .await?;
593
594 let metrics_beta = service
596 .query_service
597 .get_trace_metrics(Some("service_beta"), start, end, "hour", None, None)
598 .await?;
599
600 let alpha_count: i64 = metrics_alpha.iter().map(|m| m.trace_count).sum();
601 let beta_count: i64 = metrics_beta.iter().map(|m| m.trace_count).sum();
602
603 assert!(alpha_count > 0, "Expected non-zero count for service_alpha");
604 assert!(beta_count > 0, "Expected non-zero count for service_beta");
605
606 let metrics_none = service
608 .query_service
609 .get_trace_metrics(Some("nonexistent_svc"), start, end, "hour", None, None)
610 .await?;
611 assert!(
612 metrics_none.is_empty(),
613 "Expected no buckets for nonexistent service"
614 );
615
616 service.shutdown().await?;
617 cleanup();
618 Ok(())
619 }
620
621 #[tokio::test]
633 async fn test_query_plan_shows_filter_layers() -> Result<(), TraceEngineError> {
634 cleanup();
635
636 let storage_settings = ObjectStorageSettings::default();
637 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
638
639 let trace_id = TraceId::from_bytes([0xAA_u8; 16]);
642 let root_id = SpanId::from_bytes([0xAA_u8; 8]);
643 let child_id = SpanId::from_bytes([0xBB_u8; 8]);
644 let spans = vec![
645 make_span(&trace_id, root_id.clone(), None, "svc-a", "root-op", vec![]),
646 make_span(
647 &trace_id,
648 child_id.clone(),
649 Some(root_id),
650 "svc-a",
651 "child-op",
652 vec![],
653 ),
654 ];
655 service.write_spans_direct(spans).await?;
656
657 let now = Utc::now();
659 let start = now - chrono::Duration::hours(1);
660 let end = now + chrono::Duration::hours(1);
661
662 let df = service
664 .ctx
665 .table(SPAN_TABLE_NAME)
666 .await
667 .map_err(TraceEngineError::DatafusionError)?;
668
669 let df = df
671 .filter(
672 col(PARTITION_DATE_COL)
673 .gt_eq(date_lit(&start))
674 .and(col(PARTITION_DATE_COL).lt_eq(date_lit(&end))),
675 )
676 .map_err(TraceEngineError::DatafusionError)?;
677
678 let df = df
680 .filter(
681 col(START_TIME_COL)
682 .gt_eq(ts_lit(&start))
683 .and(col(START_TIME_COL).lt(ts_lit(&end))),
684 )
685 .map_err(TraceEngineError::DatafusionError)?;
686
687 let explain_df = df
689 .explain(false, false)
690 .map_err(TraceEngineError::DatafusionError)?;
691 let batches = explain_df
692 .collect()
693 .await
694 .map_err(TraceEngineError::DatafusionError)?;
695
696 let plan_text: String = batches
697 .iter()
698 .flat_map(|b| {
699 let plan_col = b.column_by_name("plan").unwrap();
700 let arr =
701 arrow::compute::cast(plan_col, &arrow::datatypes::DataType::Utf8).unwrap();
702 let s = arr
703 .as_any()
704 .downcast_ref::<arrow::array::StringArray>()
705 .unwrap();
706 (0..s.len())
707 .map(|i| s.value(i).to_string())
708 .collect::<Vec<_>>()
709 })
710 .collect::<Vec<_>>()
711 .join("\n");
712
713 assert!(
715 plan_text.contains("partition_date"),
716 "Partition filter not found in physical plan:\n{plan_text}"
717 );
718 assert!(
720 plan_text.contains("start_time"),
721 "Row-group time filter not found in physical plan:\n{plan_text}"
722 );
723
724 let fake_id = TraceId::from_bytes([0xFF_u8; 16]);
728 let result = service
729 .query_service
730 .get_trace_spans(
731 Some(fake_id.as_bytes()),
732 None,
733 Some(&start),
734 Some(&end),
735 None,
736 )
737 .await?;
738 assert!(
739 result.is_empty(),
740 "Expected 0 spans for nonexistent trace_id"
741 );
742
743 service.shutdown().await?;
744 cleanup();
745 Ok(())
746 }
747
748 #[tokio::test]
750 async fn test_trace_metrics_with_attribute_filter() -> Result<(), TraceEngineError> {
751 cleanup();
752
753 let storage_settings = ObjectStorageSettings::default();
754 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
755
756 let trace_kafka = TraceId::from_bytes([30u8; 16]);
757 let trace_http = TraceId::from_bytes([40u8; 16]);
758
759 let span_kafka = make_span(
761 &trace_kafka,
762 SpanId::from_bytes([30u8; 8]),
763 None,
764 "my_service",
765 "kafka_consumer",
766 vec![Attribute {
767 key: "component".to_string(),
768 value: Value::String("kafka".to_string()),
769 }],
770 );
771 let span_http = make_span(
773 &trace_http,
774 SpanId::from_bytes([40u8; 8]),
775 None,
776 "my_service",
777 "http_handler",
778 vec![Attribute {
779 key: "component".to_string(),
780 value: Value::String("http".to_string()),
781 }],
782 );
783
784 service.write_spans(vec![span_kafka, span_http]).await?;
785 tokio::time::sleep(Duration::from_secs(4)).await;
786
787 let start = Utc::now() - chrono::Duration::hours(1);
788 let end = Utc::now() + chrono::Duration::hours(1);
789 let kafka_filter = vec!["component:kafka".to_string()];
790
791 let filtered = service
793 .query_service
794 .get_trace_metrics(None, start, end, "hour", Some(&kafka_filter), None)
795 .await?;
796
797 let filtered_count: i64 = filtered.iter().map(|m| m.trace_count).sum();
798 assert!(
799 filtered_count > 0,
800 "Expected non-zero count with kafka attribute filter"
801 );
802
803 let unfiltered = service
805 .query_service
806 .get_trace_metrics(None, start, end, "hour", None, None)
807 .await?;
808 let unfiltered_count: i64 = unfiltered.iter().map(|m| m.trace_count).sum();
809 assert!(
810 unfiltered_count >= filtered_count,
811 "Unfiltered count ({}) should be >= filtered count ({})",
812 unfiltered_count,
813 filtered_count
814 );
815
816 service.shutdown().await?;
817 cleanup();
818 Ok(())
819 }
820}