1use crate::error::TraceEngineError;
2use crate::parquet::tracing::engine::{TableCommand, TraceSpanDBEngine};
3use crate::parquet::tracing::queries::TraceQueries;
4use datafusion::prelude::SessionContext;
5use scouter_settings::ObjectStorageSettings;
6use scouter_types::TraceSpanRecord;
7use std::sync::Arc;
8use tokio::sync::mpsc;
9use tokio::time::{interval, Duration};
10use tracing::{debug, info};
11
12const FLUSH_INTERVAL_SECS: u64 = 5;
13
14static TRACE_SPAN_SERVICE: std::sync::RwLock<Option<Arc<TraceSpanService>>> =
20 std::sync::RwLock::new(None);
21
22pub async fn init_trace_span_service(
27 storage_settings: &ObjectStorageSettings,
28 compaction_interval_hours: u64,
29 flush_interval_secs: Option<u64>,
30 retention_days: Option<u32>,
31) -> Result<Arc<TraceSpanService>, TraceEngineError> {
32 let old_service = {
34 let guard = TRACE_SPAN_SERVICE.read().unwrap();
35 guard.clone()
36 };
37 if let Some(old) = old_service {
38 info!("Shutting down previous TraceSpanService before re-initialization");
39 old.signal_shutdown().await;
40 }
41
42 let service = Arc::new(
43 TraceSpanService::new(
44 storage_settings,
45 compaction_interval_hours,
46 flush_interval_secs,
47 retention_days,
48 )
49 .await?,
50 );
51
52 {
53 let mut guard = TRACE_SPAN_SERVICE.write().unwrap();
54 *guard = Some(service.clone());
55 }
56
57 info!("TraceSpanService global singleton initialized");
58 Ok(service)
59}
60
61pub fn get_trace_span_service() -> Option<Arc<TraceSpanService>> {
65 TRACE_SPAN_SERVICE.read().unwrap().clone()
66}
67
68pub struct TraceSpanService {
69 engine_tx: mpsc::Sender<TableCommand>,
70 span_tx: mpsc::Sender<Vec<TraceSpanRecord>>,
71 shutdown_tx: mpsc::Sender<()>,
72 engine_handle: tokio::task::JoinHandle<()>,
73 buffer_handle: tokio::task::JoinHandle<()>,
74 pub query_service: TraceQueries,
75 pub ctx: Arc<SessionContext>,
77}
78
79impl TraceSpanService {
80 pub async fn new(
89 storage_settings: &ObjectStorageSettings,
90 compaction_interval_hours: u64,
91 flush_interval_secs: Option<u64>,
92 retention_days: Option<u32>,
93 ) -> Result<Self, TraceEngineError> {
94 let buffer_size = storage_settings.trace_buffer_size();
95 let engine = TraceSpanDBEngine::new(storage_settings).await?;
96
97 info!(
98 "TraceSpanService initialized with buffer_size: {}",
99 buffer_size
100 );
101
102 let ctx = engine.ctx.clone();
103 let (engine_tx, engine_handle) =
104 engine.start_actor(compaction_interval_hours, retention_days);
105 let (span_tx, span_rx) = mpsc::channel::<Vec<TraceSpanRecord>>(100);
106 let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
107
108 let buffer_handle = Self::start_buffering_actor(
109 engine_tx.clone(),
110 span_rx,
111 shutdown_rx,
112 flush_interval_secs,
113 buffer_size,
114 );
115
116 Ok(TraceSpanService {
117 engine_tx,
118 span_tx,
119 shutdown_tx,
120 engine_handle,
121 buffer_handle,
122 query_service: TraceQueries::new(ctx.clone()),
123 ctx,
124 })
125 }
126
127 fn start_buffering_actor(
128 engine_tx: mpsc::Sender<TableCommand>,
129 mut span_rx: mpsc::Receiver<Vec<TraceSpanRecord>>,
130 mut shutdown_rx: mpsc::Receiver<()>,
131 flush_interval_secs: Option<u64>,
132 buffer_size: usize,
133 ) -> tokio::task::JoinHandle<()> {
134 tokio::spawn(async move {
135 let mut buffer: Vec<TraceSpanRecord> = Vec::with_capacity(buffer_size);
136 let mut flush_ticker = interval(Duration::from_secs(
137 flush_interval_secs.unwrap_or(FLUSH_INTERVAL_SECS),
138 ));
139 flush_ticker.tick().await;
140
141 loop {
142 tokio::select! {
143 Some(spans) = span_rx.recv() => {
144 buffer.extend(spans);
145 if buffer.len() >= buffer_size {
146 Self::flush_buffer(&engine_tx, &mut buffer).await;
147 }
148 }
149 _ = flush_ticker.tick() => {
150 if !buffer.is_empty() {
151 info!("Flushing spans buffer with {} spans", buffer.len());
152 Self::flush_buffer(&engine_tx, &mut buffer).await;
153 }
154 }
155 _ = shutdown_rx.recv() => {
156 info!("Buffer actor received shutdown signal");
157 if !buffer.is_empty() {
158 info!("Flushing final {} spans before shutdown", buffer.len());
159 Self::flush_buffer(&engine_tx, &mut buffer).await;
160 }
161 break;
162 }
163 }
164 }
165
166 info!("Buffering actor shutting down");
167 })
168 }
169
170 async fn flush_buffer(
171 engine_tx: &mpsc::Sender<TableCommand>,
172 buffer: &mut Vec<TraceSpanRecord>,
173 ) {
174 if buffer.is_empty() {
175 return;
176 }
177
178 let capacity = buffer.capacity();
179 let spans_to_write = std::mem::replace(buffer, Vec::with_capacity(capacity));
180 let span_count = spans_to_write.len();
181
182 debug!("Sending write command to engine for {} spans", span_count);
183
184 let (tx, rx) = tokio::sync::oneshot::channel();
185
186 if let Err(e) = engine_tx
187 .send(TableCommand::Write {
188 spans: spans_to_write,
189 respond_to: tx,
190 })
191 .await
192 {
193 tracing::error!("Failed to send write command: {}", e);
194 return;
195 }
196
197 match rx.await {
198 Ok(Ok(())) => info!("Successfully flushed {} spans", span_count),
199 Ok(Err(e)) => tracing::error!("Write failed: {}", e),
200 Err(e) => tracing::error!("Failed to receive write response: {}", e),
201 }
202 }
203
204 pub async fn write_spans(&self, spans: Vec<TraceSpanRecord>) -> Result<(), TraceEngineError> {
209 self.span_tx
210 .send(spans)
211 .await
212 .map_err(|_| TraceEngineError::ChannelClosed)?;
213 Ok(())
214 }
215
216 pub async fn write_spans_direct(
223 &self,
224 spans: Vec<TraceSpanRecord>,
225 ) -> Result<(), TraceEngineError> {
226 let (tx, rx) = tokio::sync::oneshot::channel();
227 self.engine_tx
228 .send(TableCommand::Write {
229 spans,
230 respond_to: tx,
231 })
232 .await
233 .map_err(|_| TraceEngineError::ChannelClosed)?;
234 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
235 }
236
237 pub async fn optimize(&self) -> Result<(), TraceEngineError> {
238 let (tx, rx) = tokio::sync::oneshot::channel();
239
240 self.engine_tx
241 .send(TableCommand::Optimize { respond_to: tx })
242 .await
243 .map_err(|_| TraceEngineError::ChannelClosed)?;
244
245 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
246 }
247
248 pub async fn vacuum(&self, retention_hours: u64) -> Result<(), TraceEngineError> {
249 let (tx, rx) = tokio::sync::oneshot::channel();
250
251 self.engine_tx
252 .send(TableCommand::Vacuum {
253 retention_hours,
254 respond_to: tx,
255 })
256 .await
257 .map_err(|_| TraceEngineError::ChannelClosed)?;
258
259 rx.await.map_err(|_| TraceEngineError::ChannelClosed)?
260 }
261
262 pub async fn expire(&self, retention_days: u32) -> Result<(), TraceEngineError> {
268 let cutoff_date =
269 (chrono::Utc::now() - chrono::Duration::days(retention_days as i64)).date_naive();
270
271 let (tx, rx) = tokio::sync::oneshot::channel();
273 self.engine_tx
274 .send(TableCommand::Expire {
275 cutoff_date,
276 respond_to: tx,
277 })
278 .await
279 .map_err(|_| TraceEngineError::ChannelClosed)?;
280 rx.await.map_err(|_| TraceEngineError::ChannelClosed)??;
281
282 self.vacuum(0).await
286 }
287
288 pub async fn signal_shutdown(&self) {
293 info!("TraceSpanService signaling shutdown");
294 let _ = self.shutdown_tx.send(()).await;
295 let _ = self.engine_tx.send(TableCommand::Shutdown).await;
296 }
297
298 pub async fn shutdown(self) -> Result<(), TraceEngineError> {
299 info!("TraceSpanService shutting down");
300
301 let _ = self.shutdown_tx.send(()).await;
302
303 if let Err(e) = self.buffer_handle.await {
304 tracing::error!("Buffer handle error: {}", e);
305 }
306
307 self.engine_tx
308 .send(TableCommand::Shutdown)
309 .await
310 .map_err(|_| TraceEngineError::ChannelClosed)?;
311
312 if let Err(e) = self.engine_handle.await {
313 tracing::error!("Engine handle error: {}", e);
314 }
315
316 info!("TraceSpanService shutdown complete");
317 Ok(())
318 }
319}
320
321#[cfg(test)]
322mod tests {
323 use super::*;
324 use crate::parquet::tracing::queries::{
325 date_lit, ts_lit, PARTITION_DATE_COL, SPAN_TABLE_NAME, START_TIME_COL,
326 };
327 use arrow_array::Array;
328 use chrono::Utc;
329 use datafusion::logical_expr::col;
330 use scouter_mocks::generate_trace_with_spans;
331 use scouter_settings::ObjectStorageSettings;
332 use scouter_types::sql::TraceSpan;
333 use scouter_types::{Attribute, SpanId, TraceId, TraceSpanRecord};
334 use serde_json::Value;
335 use tracing_subscriber;
336
337 fn cleanup() {
338 let _ = tracing_subscriber::fmt()
339 .with_max_level(tracing::Level::INFO)
340 .try_init();
341
342 let storage_settings = ObjectStorageSettings::default();
343 let current_dir = std::env::current_dir().unwrap();
344 let storage_path = current_dir.join(storage_settings.storage_root());
345 if storage_path.exists() {
346 std::fs::remove_dir_all(storage_path).unwrap();
347 }
348 }
349
350 fn make_span(
352 trace_id: &TraceId,
353 span_id: SpanId,
354 parent_span_id: Option<SpanId>,
355 service_name: &str,
356 span_name: &str,
357 attributes: Vec<Attribute>,
358 ) -> TraceSpanRecord {
359 let now = Utc::now();
360 TraceSpanRecord {
361 created_at: now,
362 trace_id: trace_id.clone(),
363 span_id,
364 parent_span_id,
365 flags: 1,
366 trace_state: String::new(),
367 scope_name: "test.scope".to_string(),
368 scope_version: None,
369 span_name: span_name.to_string(),
370 span_kind: "INTERNAL".to_string(),
371 start_time: now,
372 end_time: now + chrono::Duration::milliseconds(100),
373 duration_ms: 100,
374 status_code: 0,
375 status_message: "OK".to_string(),
376 attributes,
377 events: vec![],
378 links: vec![],
379 label: None,
380 input: Value::Null,
381 output: Value::Null,
382 service_name: service_name.to_string(),
383 resource_attributes: vec![],
384 }
385 }
386
387 #[tokio::test]
388 async fn test_service_initialization() -> Result<(), TraceEngineError> {
389 cleanup();
390
391 let storage_settings = ObjectStorageSettings::default();
392 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
393 service.shutdown().await?;
394 cleanup();
395 Ok(())
396 }
397
398 #[tokio::test]
399 async fn test_dataframe_trace_write_single_batch() -> Result<(), TraceEngineError> {
400 cleanup();
401
402 let storage_settings = ObjectStorageSettings::default();
403 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
404
405 let (_trace_record, spans, _tags) = generate_trace_with_spans(3, 0);
406 info!("Test: writing {} spans", spans.len());
407
408 let first_trace_id = spans.first().unwrap().trace_id.clone();
409 service.write_spans(spans).await?;
410
411 info!("Test: waiting for flush");
412 tokio::time::sleep(Duration::from_secs(5)).await;
413
414 let trace_id_bytes = first_trace_id.as_bytes();
415 let result_spans: Vec<TraceSpan> = service
416 .query_service
417 .get_trace_spans(Some(trace_id_bytes.as_slice()), None, None, None, None)
418 .await?;
419
420 assert!(
421 !result_spans.is_empty(),
422 "Expected at least 1 span but got 0"
423 );
424
425 service.shutdown().await?;
426 cleanup();
427 Ok(())
428 }
429
430 #[tokio::test]
433 async fn test_span_tree_sort_order() -> Result<(), TraceEngineError> {
434 cleanup();
435
436 let storage_settings = ObjectStorageSettings::default();
437 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
438
439 let trace_id = TraceId::from_bytes([1u8; 16]);
441 let root_span_id = SpanId::from_bytes([1u8; 8]);
442 let child_span_id = SpanId::from_bytes([2u8; 8]);
443 let grandchild_span_id = SpanId::from_bytes([3u8; 8]);
444
445 let root = make_span(
446 &trace_id,
447 root_span_id.clone(),
448 None,
449 "svc",
450 "root_op",
451 vec![],
452 );
453 let child = make_span(
454 &trace_id,
455 child_span_id.clone(),
456 Some(root_span_id.clone()),
457 "svc",
458 "child_op",
459 vec![],
460 );
461 let grandchild = make_span(
462 &trace_id,
463 grandchild_span_id.clone(),
464 Some(child_span_id.clone()),
465 "svc",
466 "grandchild_op",
467 vec![],
468 );
469
470 service.write_spans(vec![root, child, grandchild]).await?;
471 tokio::time::sleep(Duration::from_secs(4)).await;
472
473 let spans: Vec<TraceSpan> = service
474 .query_service
475 .get_trace_spans(Some(trace_id.as_bytes().as_slice()), None, None, None, None)
476 .await?;
477
478 assert_eq!(spans.len(), 3, "Expected 3 spans");
479
480 let by_order: Vec<&TraceSpan> = {
482 let mut v: Vec<&TraceSpan> = spans.iter().collect();
483 v.sort_by_key(|s| s.span_order);
484 v
485 };
486
487 assert_eq!(
488 by_order[0].span_name, "root_op",
489 "span_order=0 should be root"
490 );
491 assert_eq!(by_order[0].depth, 0);
492 assert_eq!(by_order[0].path.len(), 1);
493
494 assert_eq!(
495 by_order[1].span_name, "child_op",
496 "span_order=1 should be child"
497 );
498 assert_eq!(by_order[1].depth, 1);
499 assert_eq!(by_order[1].path.len(), 2);
500
501 assert_eq!(
502 by_order[2].span_name, "grandchild_op",
503 "span_order=2 should be grandchild"
504 );
505 assert_eq!(by_order[2].depth, 2);
506 assert_eq!(by_order[2].path.len(), 3);
507
508 let root_sid = root_span_id.to_hex();
510 for span in &spans {
511 assert_eq!(
512 span.root_span_id, root_sid,
513 "root_span_id mismatch for {}",
514 span.span_name
515 );
516 }
517
518 service.shutdown().await?;
519 cleanup();
520 Ok(())
521 }
522
523 #[tokio::test]
525 async fn test_trace_metrics_basic() -> Result<(), TraceEngineError> {
526 cleanup();
527
528 let storage_settings = ObjectStorageSettings::default();
529 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
530
531 let (_record, spans, _tags) = generate_trace_with_spans(5, 0);
532 service.write_spans(spans).await?;
533 tokio::time::sleep(Duration::from_secs(4)).await;
534
535 let start = Utc::now() - chrono::Duration::hours(1);
536 let end = Utc::now() + chrono::Duration::hours(1);
537
538 let metrics = service
539 .query_service
540 .get_trace_metrics(None, start, end, "hour", None, None)
541 .await?;
542
543 assert!(!metrics.is_empty(), "Expected at least one metric bucket");
544 assert!(metrics[0].trace_count > 0, "Expected non-zero trace count");
545
546 service.shutdown().await?;
547 cleanup();
548 Ok(())
549 }
550
551 #[tokio::test]
553 async fn test_trace_metrics_service_filter() -> Result<(), TraceEngineError> {
554 cleanup();
555
556 let storage_settings = ObjectStorageSettings::default();
557 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
558
559 let trace_a = TraceId::from_bytes([10u8; 16]);
561 let trace_b = TraceId::from_bytes([20u8; 16]);
562
563 let span_a = make_span(
564 &trace_a,
565 SpanId::from_bytes([10u8; 8]),
566 None,
567 "service_alpha",
568 "op_a",
569 vec![],
570 );
571 let span_b = make_span(
572 &trace_b,
573 SpanId::from_bytes([20u8; 8]),
574 None,
575 "service_beta",
576 "op_b",
577 vec![],
578 );
579
580 service.write_spans(vec![span_a, span_b]).await?;
581 tokio::time::sleep(Duration::from_secs(4)).await;
582
583 let start = Utc::now() - chrono::Duration::hours(1);
584 let end = Utc::now() + chrono::Duration::hours(1);
585
586 let metrics_alpha = service
588 .query_service
589 .get_trace_metrics(Some("service_alpha"), start, end, "hour", None, None)
590 .await?;
591
592 let metrics_beta = service
594 .query_service
595 .get_trace_metrics(Some("service_beta"), start, end, "hour", None, None)
596 .await?;
597
598 let alpha_count: i64 = metrics_alpha.iter().map(|m| m.trace_count).sum();
599 let beta_count: i64 = metrics_beta.iter().map(|m| m.trace_count).sum();
600
601 assert!(alpha_count > 0, "Expected non-zero count for service_alpha");
602 assert!(beta_count > 0, "Expected non-zero count for service_beta");
603
604 let metrics_none = service
606 .query_service
607 .get_trace_metrics(Some("nonexistent_svc"), start, end, "hour", None, None)
608 .await?;
609 assert!(
610 metrics_none.is_empty(),
611 "Expected no buckets for nonexistent service"
612 );
613
614 service.shutdown().await?;
615 cleanup();
616 Ok(())
617 }
618
619 #[tokio::test]
631 async fn test_query_plan_shows_filter_layers() -> Result<(), TraceEngineError> {
632 cleanup();
633
634 let storage_settings = ObjectStorageSettings::default();
635 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
636
637 let trace_id = TraceId::from_bytes([0xAA_u8; 16]);
640 let root_id = SpanId::from_bytes([0xAA_u8; 8]);
641 let child_id = SpanId::from_bytes([0xBB_u8; 8]);
642 let spans = vec![
643 make_span(&trace_id, root_id.clone(), None, "svc-a", "root-op", vec![]),
644 make_span(
645 &trace_id,
646 child_id.clone(),
647 Some(root_id),
648 "svc-a",
649 "child-op",
650 vec![],
651 ),
652 ];
653 service.write_spans_direct(spans).await?;
654
655 let now = Utc::now();
657 let start = now - chrono::Duration::hours(1);
658 let end = now + chrono::Duration::hours(1);
659
660 let df = service
662 .ctx
663 .table(SPAN_TABLE_NAME)
664 .await
665 .map_err(TraceEngineError::DatafusionError)?;
666
667 let df = df
669 .filter(
670 col(PARTITION_DATE_COL)
671 .gt_eq(date_lit(&start))
672 .and(col(PARTITION_DATE_COL).lt_eq(date_lit(&end))),
673 )
674 .map_err(TraceEngineError::DatafusionError)?;
675
676 let df = df
678 .filter(
679 col(START_TIME_COL)
680 .gt_eq(ts_lit(&start))
681 .and(col(START_TIME_COL).lt(ts_lit(&end))),
682 )
683 .map_err(TraceEngineError::DatafusionError)?;
684
685 let explain_df = df
687 .explain(false, false)
688 .map_err(TraceEngineError::DatafusionError)?;
689 let batches = explain_df
690 .collect()
691 .await
692 .map_err(TraceEngineError::DatafusionError)?;
693
694 let plan_text: String = batches
695 .iter()
696 .flat_map(|b| {
697 let plan_col = b.column_by_name("plan").unwrap();
698 let arr =
699 arrow::compute::cast(plan_col, &arrow::datatypes::DataType::Utf8).unwrap();
700 let s = arr
701 .as_any()
702 .downcast_ref::<arrow::array::StringArray>()
703 .unwrap();
704 (0..s.len())
705 .map(|i| s.value(i).to_string())
706 .collect::<Vec<_>>()
707 })
708 .collect::<Vec<_>>()
709 .join("\n");
710
711 assert!(
713 plan_text.contains("partition_date"),
714 "Partition filter not found in physical plan:\n{plan_text}"
715 );
716 assert!(
718 plan_text.contains("start_time"),
719 "Row-group time filter not found in physical plan:\n{plan_text}"
720 );
721
722 let fake_id = TraceId::from_bytes([0xFF_u8; 16]);
726 let result = service
727 .query_service
728 .get_trace_spans(
729 Some(fake_id.as_bytes()),
730 None,
731 Some(&start),
732 Some(&end),
733 None,
734 )
735 .await?;
736 assert!(
737 result.is_empty(),
738 "Expected 0 spans for nonexistent trace_id"
739 );
740
741 service.shutdown().await?;
742 cleanup();
743 Ok(())
744 }
745
746 #[tokio::test]
748 async fn test_trace_metrics_with_attribute_filter() -> Result<(), TraceEngineError> {
749 cleanup();
750
751 let storage_settings = ObjectStorageSettings::default();
752 let service = TraceSpanService::new(&storage_settings, 24, Some(2), None).await?;
753
754 let trace_kafka = TraceId::from_bytes([30u8; 16]);
755 let trace_http = TraceId::from_bytes([40u8; 16]);
756
757 let span_kafka = make_span(
759 &trace_kafka,
760 SpanId::from_bytes([30u8; 8]),
761 None,
762 "my_service",
763 "kafka_consumer",
764 vec![Attribute {
765 key: "component".to_string(),
766 value: Value::String("kafka".to_string()),
767 }],
768 );
769 let span_http = make_span(
771 &trace_http,
772 SpanId::from_bytes([40u8; 8]),
773 None,
774 "my_service",
775 "http_handler",
776 vec![Attribute {
777 key: "component".to_string(),
778 value: Value::String("http".to_string()),
779 }],
780 );
781
782 service.write_spans(vec![span_kafka, span_http]).await?;
783 tokio::time::sleep(Duration::from_secs(4)).await;
784
785 let start = Utc::now() - chrono::Duration::hours(1);
786 let end = Utc::now() + chrono::Duration::hours(1);
787 let kafka_filter = vec!["component:kafka".to_string()];
788
789 let filtered = service
791 .query_service
792 .get_trace_metrics(None, start, end, "hour", Some(&kafka_filter), None)
793 .await?;
794
795 let filtered_count: i64 = filtered.iter().map(|m| m.trace_count).sum();
796 assert!(
797 filtered_count > 0,
798 "Expected non-zero count with kafka attribute filter"
799 );
800
801 let unfiltered = service
803 .query_service
804 .get_trace_metrics(None, start, end, "hour", None, None)
805 .await?;
806 let unfiltered_count: i64 = unfiltered.iter().map(|m| m.trace_count).sum();
807 assert!(
808 unfiltered_count >= filtered_count,
809 "Unfiltered count ({}) should be >= filtered count ({})",
810 unfiltered_count,
811 filtered_count
812 );
813
814 service.shutdown().await?;
815 cleanup();
816 Ok(())
817 }
818}