1use crate::error::TraceError;
9use crate::exporter::processor::BatchConfig;
10use crate::exporter::scouter::ScouterSpanExporter;
11use crate::exporter::SpanExporterNum;
12use crate::utils::BoxedSpan;
13use crate::utils::{
14 capture_function_arguments, format_traceback, get_context_store, get_context_var,
15 get_current_active_span, get_current_context_id, set_current_span, set_function_attributes,
16 set_function_type_attribute, ActiveSpanInner, FunctionType, SpanKind,
17};
18use chrono::{DateTime, Utc};
19use opentelemetry::baggage::BaggageExt;
20use opentelemetry::trace::Tracer as OTelTracer;
21use opentelemetry::trace::TracerProvider;
22use opentelemetry::{
23 trace::{Span, Status, TraceContextExt},
24 Context as OtelContext, KeyValue,
25};
26use opentelemetry_sdk::trace::SdkTracer;
27use opentelemetry_sdk::trace::SdkTracerProvider;
28use opentelemetry_sdk::Resource;
29use potato_head::create_uuid7;
30use pyo3::prelude::*;
31use pyo3::types::{PyDict, PyTuple};
32use pyo3::IntoPyObjectExt;
33use scouter_events::queue::types::TransportConfig;
34use scouter_settings::http::HttpConfig;
35
36use scouter_types::{
37 is_pydantic_basemodel, pydict_to_otel_keyvalue, pyobject_to_otel_value,
38 pyobject_to_tracing_json, BAGGAGE_PREFIX, SCOUTER_TAG_PREFIX, SCOUTER_TRACING_INPUT,
39 SCOUTER_TRACING_LABEL, SCOUTER_TRACING_OUTPUT, SERVICE_NAME, TRACE_START_TIME_KEY,
40};
41use std::sync::{Arc, RwLock};
42use std::{collections::HashMap, sync::OnceLock};
43use tracing::{debug, info, instrument};
44
45static TRACER_PROVIDER_STORE: OnceLock<Arc<RwLock<Option<SdkTracerProvider>>>> = OnceLock::new();
47
48static TRACE_METADATA_STORE: OnceLock<TraceMetadataStore> = OnceLock::new();
50
51fn get_tracer_provider_store() -> &'static Arc<RwLock<Option<SdkTracerProvider>>> {
52 TRACER_PROVIDER_STORE.get_or_init(|| Arc::new(RwLock::new(None)))
53}
54
55fn get_tracer_provider() -> Result<Arc<RwLock<Option<SdkTracerProvider>>>, TraceError> {
56 TRACER_PROVIDER_STORE.get().cloned().ok_or_else(|| {
57 TraceError::InitializationError("Tracer provider store not initialized".to_string())
59 })
60}
61
62const MISSING: &str = "unknown";
63
64#[derive(Clone)]
65struct TraceMetadata {
66 start_time: DateTime<Utc>,
67 span_count: u32,
68}
69
70#[derive(Clone)]
71struct TraceMetadataStore {
72 inner: Arc<RwLock<HashMap<String, TraceMetadata>>>,
73}
74
75impl TraceMetadataStore {
76 fn new() -> Self {
77 Self {
78 inner: Arc::new(RwLock::new(HashMap::new())),
79 }
80 }
81
82 fn set_trace_start(
83 &self,
84 trace_id: String,
85 start_time: DateTime<Utc>,
86 ) -> Result<(), TraceError> {
87 self.inner
88 .write()
89 .map_err(|e| TraceError::PoisonError(e.to_string()))?
90 .insert(
91 trace_id.clone(),
92 TraceMetadata {
93 start_time,
94 span_count: 0,
95 },
96 );
97 Ok(())
98 }
99
100 fn get_trace_metadata(&self, trace_id: &str) -> Result<Option<TraceMetadata>, TraceError> {
101 Ok(self
102 .inner
103 .read()
104 .map_err(|e| TraceError::PoisonError(e.to_string()))?
105 .get(trace_id)
106 .cloned())
107 }
108
109 fn remove_trace(&self, trace_id: &str) -> Result<(), TraceError> {
110 self.inner
111 .write()
112 .map_err(|e| TraceError::PoisonError(e.to_string()))?
113 .remove(trace_id);
114 Ok(())
115 }
116
117 fn increment_span_count(&self, trace_id: &str) -> Result<(), TraceError> {
118 if let Some(mut metadata) = self.get_trace_metadata(trace_id)? {
119 metadata.span_count += 1;
120 self.inner
121 .write()
122 .map_err(|e| TraceError::PoisonError(e.to_string()))?
123 .insert(trace_id.to_string(), metadata);
124 }
125 Ok(())
126 }
127
128 fn decrement_span_count(&self, trace_id: &str) -> Result<(), TraceError> {
130 if let Some(mut metadata) = self.get_trace_metadata(trace_id)? {
131 if metadata.span_count > 0 {
132 metadata.span_count -= 1;
133 }
134
135 if metadata.span_count == 0 {
136 self.remove_trace(trace_id)?;
137 } else {
138 self.inner
139 .write()
140 .map_err(|e| TraceError::PoisonError(e.to_string()))?
141 .insert(trace_id.to_string(), metadata);
142 }
143 }
144
145 Ok(())
146 }
147
148 fn clear_all(&self) -> Result<(), TraceError> {
149 self.inner
150 .write()
151 .map_err(|e| TraceError::PoisonError(e.to_string()))?
152 .clear();
153 Ok(())
154 }
155}
156
157fn get_trace_metadata_store() -> &'static TraceMetadataStore {
158 TRACE_METADATA_STORE.get_or_init(TraceMetadataStore::new)
159}
160
161#[pyfunction]
173#[pyo3(signature = (
174 service_name="scouter_service".to_string(),
175 transport_config=None,
176 exporter=None,
177 batch_config=None,
178 space=None,
179 name=None,
180 version=None
181))]
182#[instrument(skip_all)]
183#[allow(clippy::too_many_arguments)]
184pub fn init_tracer(
185 py: Python,
186 service_name: String,
187 transport_config: Option<&Bound<'_, PyAny>>,
188 exporter: Option<&Bound<'_, PyAny>>,
189 batch_config: Option<Py<BatchConfig>>,
190 space: Option<String>,
191 name: Option<String>,
192 version: Option<String>,
193) -> Result<(), TraceError> {
194 debug!("Initializing tracer");
195
196 let transport_config = match transport_config {
197 Some(config) => TransportConfig::from_py_config(config)?,
198 None => {
199 let config = HttpConfig::default();
201 TransportConfig::Http(config)
202 }
203 };
204
205 let batch_config = batch_config
206 .map(|bc| bc.extract::<BatchConfig>(py))
207 .transpose()?;
208
209 let scouter_export = ScouterSpanExporter::new(
210 space.unwrap_or_else(|| MISSING.to_string()),
211 name.unwrap_or_else(|| MISSING.to_string()),
212 version.unwrap_or_else(|| MISSING.to_string()),
213 transport_config,
214 )?;
215
216 let provider_store = get_tracer_provider_store();
217
218 let mut store_guard = provider_store
219 .write()
220 .map_err(|e| TraceError::PoisonError(e.to_string()))?;
221
222 if store_guard.is_some() {
223 return Err(TraceError::InitializationError(
224 "Tracer provider already initialized. Call shutdown_tracer() first.".to_string(),
225 ));
226 }
227
228 let resource = Resource::builder()
229 .with_attributes(vec![KeyValue::new(SERVICE_NAME, service_name.clone())])
230 .build();
231
232 let span_exporter = if let Some(exporter) = exporter {
233 SpanExporterNum::from_pyobject(exporter).expect("failed to convert exporter")
234 } else {
235 SpanExporterNum::default()
236 };
237
238 let provider = span_exporter
239 .build_provider(resource, scouter_export, batch_config)
240 .expect("failed to build tracer provider");
241
242 *store_guard = Some(provider);
243
244 Ok(())
245}
246
247fn reset_current_context(py: Python, token: &Py<PyAny>) -> PyResult<()> {
248 let context_var = get_context_var(py)?;
249 context_var.bind(py).call_method1("reset", (token,))?;
250 Ok(())
251}
252
253#[pyclass]
256pub struct ActiveSpan {
257 inner: Arc<RwLock<ActiveSpanInner>>,
258}
259
260#[pymethods]
261impl ActiveSpan {
262 #[getter]
263 fn context_id(&self) -> Result<String, TraceError> {
264 self.with_inner(|inner| inner.context_id.clone())
265 }
266
267 #[pyo3(signature = (input, max_length=1000))]
272 #[instrument(skip_all)]
273 fn set_input(&self, input: &Bound<'_, PyAny>, max_length: usize) -> Result<(), TraceError> {
274 let value = pyobject_to_tracing_json(input, &max_length)?;
275 self.with_inner_mut(|inner| {
276 inner.span.set_attribute(KeyValue::new(
277 SCOUTER_TRACING_INPUT,
278 serde_json::to_string(&value).unwrap(),
279 ))
280 })
281 }
282
283 #[pyo3(signature = (output, max_length=1000))]
284 #[instrument(skip_all)]
285 fn set_output(&self, output: &Bound<'_, PyAny>, max_length: usize) -> Result<(), TraceError> {
286 debug!("Setting output on span");
287 let value = pyobject_to_tracing_json(output, &max_length)?;
288 self.with_inner_mut(|inner| {
289 inner.span.set_attribute(KeyValue::new(
290 SCOUTER_TRACING_OUTPUT,
291 serde_json::to_string(&value).unwrap(),
292 ))
293 })
294 }
295
296 pub fn set_attribute(&self, key: String, value: Bound<'_, PyAny>) -> Result<(), TraceError> {
301 let value = pyobject_to_otel_value(&value)?;
302 self.with_inner_mut(|inner| inner.span.set_attribute(KeyValue::new(key, value)))
303 }
304
305 fn add_event(
310 &self,
311 py: Python,
312 name: String,
313 attributes: Option<Bound<'_, PyAny>>,
314 ) -> Result<(), TraceError> {
315 let pairs: Vec<KeyValue> = if let Some(attrs) = attributes {
316 if is_pydantic_basemodel(py, &attrs)? {
317 let dumped = attrs.call_method0("model_dump")?;
318 let dict = dumped
319 .downcast::<PyDict>()
320 .map_err(|e| TraceError::DowncastError(e.to_string()))?;
321 pydict_to_otel_keyvalue(dict)?
322 } else if attrs.is_instance_of::<PyDict>() {
323 let dict = attrs
324 .downcast::<PyDict>()
325 .map_err(|e| TraceError::DowncastError(e.to_string()))?;
326 pydict_to_otel_keyvalue(dict)?
327 } else {
328 return Err(TraceError::EventMustBeDict);
329 }
330 } else {
331 vec![]
332 };
333
334 self.with_inner_mut(|inner| inner.span.add_event(name, pairs))
335 }
336
337 fn set_status(&self, status: String, description: Option<String>) -> Result<(), TraceError> {
342 let otel_status = match status.to_lowercase().as_str() {
343 "ok" => Status::Ok,
344 "error" => Status::error(description.unwrap_or_default()),
345 _ => Status::Unset,
346 };
347
348 self.with_inner_mut(|inner| inner.span.set_status(otel_status))
349 }
350
351 #[instrument(skip_all)]
353 fn __enter__<'py>(slf: PyRef<'py, Self>) -> PyResult<PyRef<'py, Self>> {
354 debug!("Entering span context: {}", slf.context_id()?);
355 Ok(slf)
356 }
357
358 #[pyo3(signature = (exc_type=None, exc_val=None, exc_tb=None))]
360 #[instrument(skip_all)]
361 fn __exit__(
362 &mut self,
363 py: Python<'_>,
364 exc_type: Option<Py<PyAny>>,
365 exc_val: Option<Py<PyAny>>,
366 exc_tb: Option<Py<PyAny>>,
367 ) -> Result<bool, TraceError> {
368 debug!("Exiting span context: {}", self.context_id()?);
369 let (context_id, trace_id, context_token) = {
370 let mut inner = self
371 .inner
372 .write()
373 .map_err(|e| TraceError::PoisonError(e.to_string()))?;
374
375 if let Some(exc_type) = exc_type {
377 inner.span.set_status(Status::error("Exception occurred"));
378 inner
379 .span
380 .set_attribute(KeyValue::new("exception.type", exc_type.to_string()));
381
382 if let Some(exc_val) = exc_val {
383 inner
384 .span
385 .set_attribute(KeyValue::new("exception.value", exc_val.to_string()));
386 }
387
388 if let Some(exc_tb) = exc_tb {
389 let tb = format_traceback(py, &exc_tb)?;
391 inner
392 .span
393 .set_attribute(KeyValue::new("exception.traceback", tb));
394 }
395 }
396
397 inner.span.end();
398
399 let context_id = inner.context_id.clone();
401 let trace_id = inner.span.span_context().trace_id().to_string();
402 let context_token = inner.context_token.take();
403
404 (context_id, trace_id, context_token)
405 };
406
407 let store = get_trace_metadata_store();
408 store.decrement_span_count(&trace_id)?;
409
410 if let Some(token) = context_token {
411 reset_current_context(py, &token)?;
412 }
413
414 get_context_store().remove(&context_id)?;
415 Ok(false)
416 }
417
418 fn __aenter__<'py>(slf: PyRef<'py, Self>, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
420 let slf_py: Py<PyAny> = slf.into_py_any(py)?;
421
422 pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(slf_py) })
424 }
425
426 #[pyo3(signature = (exc_type=None, exc_val=None, exc_tb=None))]
428 fn __aexit__<'py>(
429 &mut self,
430 py: Python<'py>,
431 exc_type: Option<Py<PyAny>>,
432 exc_val: Option<Py<PyAny>>,
433 exc_tb: Option<Py<PyAny>>,
434 ) -> PyResult<Bound<'py, PyAny>> {
435 let result = self.__exit__(py, exc_type, exc_val, exc_tb)?;
436 let py_result = result.into_py_any(py)?;
437
438 pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(py_result) })
440 }
441}
442
443impl ActiveSpan {
444 pub fn set_attribute_static(
445 &mut self,
446 key: &'static str,
447 value: String,
448 ) -> Result<(), TraceError> {
449 self.inner
450 .write()
451 .map_err(|e| TraceError::PoisonError(e.to_string()))?
452 .span
453 .set_attribute(KeyValue::new(key, value));
454 Ok(())
455 }
456
457 fn with_inner_mut<F, R>(&self, f: F) -> Result<R, TraceError>
458 where
459 F: FnOnce(&mut ActiveSpanInner) -> R,
460 {
461 let mut inner = self
462 .inner
463 .write()
464 .map_err(|e| TraceError::PoisonError(e.to_string()))?;
465 Ok(f(&mut inner))
466 }
467
468 fn with_inner<F, R>(&self, f: F) -> Result<R, TraceError>
469 where
470 F: FnOnce(&ActiveSpanInner) -> R,
471 {
472 let inner = self
473 .inner
474 .read()
475 .map_err(|e| TraceError::PoisonError(e.to_string()))?;
476 Ok(f(&inner))
477 }
478}
479
480#[pyclass(subclass)]
482pub struct BaseTracer {
483 tracer: SdkTracer,
484}
485
486impl BaseTracer {
487 fn set_start_time(&self, span: &mut BoxedSpan) {
488 let trace_id = span.span_context().trace_id().to_string();
489 let trace_metadata_store = get_trace_metadata_store();
490
491 let start_time = match trace_metadata_store.get_trace_metadata(&trace_id) {
492 Ok(Some(metadata)) => {
493 metadata.start_time
495 }
496 Ok(None) => {
497 let current_time = Utc::now();
499 if let Err(e) = trace_metadata_store.set_trace_start(trace_id, current_time) {
500 tracing::warn!("Failed to set trace start time: {}", e);
501 }
502 current_time
503 }
504 Err(e) => {
505 tracing::warn!("Failed to get trace metadata: {}", e);
506 Utc::now()
507 }
508 };
509
510 span.set_attribute(KeyValue::new(TRACE_START_TIME_KEY, start_time.to_rfc3339()));
511 }
512
513 fn increment_span_count(&self, trace_id: &str) -> Result<(), TraceError> {
514 let trace_metadata_store = get_trace_metadata_store();
515 trace_metadata_store.increment_span_count(trace_id)
516 }
517
518 fn setup_trace_metadata(&self, span: &mut BoxedSpan) -> Result<(), TraceError> {
519 let trace_id = span.span_context().trace_id().to_string();
520 self.set_start_time(span);
521 self.increment_span_count(&trace_id)?;
522 Ok(())
523 }
524
525 fn create_baggage_items(
526 baggage: &[HashMap<String, String>],
527 tags: &[HashMap<String, String>],
528 ) -> Vec<KeyValue> {
529 let mut keyval_baggage: Vec<KeyValue> = baggage
530 .iter()
531 .flat_map(|baggage_map| {
532 baggage_map
533 .iter()
534 .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
535 .collect::<Vec<KeyValue>>()
536 })
537 .collect();
538
539 tags.iter().for_each(|tag_map| {
541 tag_map.iter().for_each(|(k, v)| {
542 keyval_baggage.push(KeyValue::new(
543 format!("{}.{}.{}", BAGGAGE_PREFIX, SCOUTER_TAG_PREFIX, k),
544 v.clone(),
545 ));
546 });
547 });
548 keyval_baggage
549 }
550}
551
552#[pymethods]
553impl BaseTracer {
554 #[new]
555 #[pyo3(signature = (name))]
556 fn new(name: String) -> Result<Self, TraceError> {
557 let tracer = get_tracer(name)?;
558 Ok(BaseTracer { tracer })
559 }
560
561 #[pyo3(signature = (name, kind=SpanKind::Internal, attributes=vec![], baggage=vec![], tags=vec![], label=None, parent_context_id=None))]
572 #[allow(clippy::too_many_arguments)]
573 #[instrument(skip_all)]
574 fn start_as_current_span(
575 &self,
576 py: Python<'_>,
577 name: String,
578 kind: SpanKind,
579 attributes: Vec<HashMap<String, String>>,
580 baggage: Vec<HashMap<String, String>>,
581 tags: Vec<HashMap<String, String>>,
582 label: Option<String>,
583 parent_context_id: Option<String>,
584 ) -> Result<ActiveSpan, TraceError> {
585 let parent_id = parent_context_id.or_else(|| get_current_context_id(py).ok().flatten());
587
588 let base_ctx = parent_id
590 .and_then(|id| get_context_store().get(&id).ok().flatten())
591 .map(|parent_span_ctx| OtelContext::current().with_remote_span_context(parent_span_ctx))
592 .unwrap_or_else(OtelContext::current);
593
594 let baggage_items = Self::create_baggage_items(&baggage, &tags);
596
597 let final_ctx = if !baggage_items.is_empty() {
598 base_ctx.with_baggage(baggage_items)
599 } else {
600 base_ctx
601 };
602
603 let mut span = BoxedSpan::new(
606 self.tracer
607 .span_builder(name.clone())
608 .with_kind(kind.to_otel_span_kind())
609 .start_with_context(&self.tracer, &final_ctx),
610 );
611
612 attributes.iter().for_each(|attr_map| {
613 attr_map.iter().for_each(|(k, v)| {
614 span.set_attribute(KeyValue::new(k.clone(), v.clone()));
615 });
616 });
617
618 if let Some(label) = label {
620 span.set_attribute(KeyValue::new(SCOUTER_TRACING_LABEL, label));
621 }
622
623 let context_id = Self::set_context_id(self, &mut span)?;
624 let inner = Arc::new(RwLock::new(ActiveSpanInner {
625 context_id,
626 span,
627 context_token: None,
628 }));
629
630 self.set_current_span(py, &inner)?;
632
633 Ok(ActiveSpan { inner })
634 }
635
636 #[pyo3(name="_start_decorated_as_current_span", signature = (
651 name,
652 func,
653 func_args,
654 kind=SpanKind::Internal,
655 attributes=vec![],
656 baggage=vec![],
657 tags=vec![],
658 label=None,
659 parent_context_id=None,
660 max_length=1000,
661 func_type=FunctionType::Sync,
662 func_kwargs=None
663 ))]
664 #[allow(clippy::too_many_arguments)]
665 fn start_decorated_as_current_span<'py>(
666 &self,
667 py: Python<'py>,
668 name: String,
669 func: &Bound<'py, PyAny>,
670 func_args: &Bound<'_, PyTuple>,
671 kind: SpanKind,
672 attributes: Vec<HashMap<String, String>>,
673 baggage: Vec<HashMap<String, String>>,
674 tags: Vec<HashMap<String, String>>,
675 label: Option<String>,
676 parent_context_id: Option<String>,
677 max_length: usize,
678 func_type: FunctionType,
679 func_kwargs: Option<&Bound<'_, PyDict>>,
680 ) -> Result<ActiveSpan, TraceError> {
681 let mut span = self.start_as_current_span(
682 py,
683 name,
684 kind,
685 attributes,
686 baggage,
687 tags,
688 label,
689 parent_context_id,
690 )?;
691
692 set_function_attributes(func, &mut span)?;
693
694 set_function_type_attribute(&func_type, &mut span)?;
695
696 span.set_input(
697 &capture_function_arguments(py, func, func_args, func_kwargs)?,
698 max_length,
699 )?;
700
701 Ok(span)
702 }
703
704 #[getter]
706 pub fn current_span<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyAny>, TraceError> {
707 let span = get_current_active_span(py)?;
708 Ok(span)
709 }
710
711 pub fn shutdown(&self) -> Result<(), TraceError> {
713 shutdown_tracer()
714 }
715}
716
717impl BaseTracer {
718 fn set_current_span(
719 &self,
720 py: Python<'_>,
721 inner: &Arc<RwLock<ActiveSpanInner>>,
722 ) -> Result<(), TraceError> {
723 let py_span = Py::new(
724 py,
725 ActiveSpan {
726 inner: inner.clone(),
727 },
728 )?;
729 let token = set_current_span(py, py_span.bind(py).clone())?;
730 inner
731 .write()
732 .map_err(|e| TraceError::PoisonError(e.to_string()))?
733 .context_token = Some(token);
734 Ok(())
735 }
736
737 fn set_context_id(&self, span: &mut BoxedSpan) -> Result<String, TraceError> {
738 let context_id = format!("span_{}", create_uuid7());
739 Self::setup_trace_metadata(self, span)?;
740 get_context_store().set(context_id.clone(), span.span_context().clone())?;
741 Ok(context_id)
742 }
743}
744
745#[pyfunction]
747pub fn flush_tracer() -> Result<(), TraceError> {
748 let store = get_tracer_provider_store();
749
750 let provider_guard = store
751 .read()
752 .map_err(|e| TraceError::PoisonError(e.to_string()))?;
753
754 let provider = provider_guard.as_ref().ok_or_else(|| {
755 TraceError::InitializationError(
756 "Tracer provider not initialized or already shut down".to_string(),
757 )
758 })?;
759 provider.force_flush()?;
760
761 Ok(())
762}
763
764#[pyfunction]
765pub fn shutdown_tracer() -> Result<(), TraceError> {
766 info!("Shutting down tracer");
767
768 let store_arc = get_tracer_provider()?;
769 let mut store_guard = store_arc
770 .write()
771 .map_err(|e| TraceError::PoisonError(e.to_string()))?;
772
773 if let Some(provider) = store_guard.take() {
774 provider.shutdown()?;
775 } else {
776 tracing::warn!("Tracer provider was already shut down or never initialized.");
777 }
778
779 get_trace_metadata_store().clear_all()?;
780
781 Ok(())
782}
783
784fn get_tracer(name: String) -> Result<SdkTracer, TraceError> {
785 let store = get_tracer_provider_store();
786
787 let provider_guard = store
788 .read()
789 .map_err(|e| TraceError::PoisonError(e.to_string()))?;
790
791 let provider = provider_guard.as_ref().ok_or_else(|| {
792 TraceError::InitializationError(
793 "Tracer provider not initialized or already shut down".to_string(),
794 )
795 })?;
796
797 Ok(provider.tracer(name))
798}