scouter_tracing/
tracer.rs

1// This file contains the core implementation logic for tracing within Scouter.
2// The goal is not to re-invent the wheel here as the opentelemetry-rust crate provides a solid implementation for tracing.
3// The use case we're aiming to address is users who save models, drift profiles, llm events and want to correlate them via traces/spans.
4// The only way to do that in our system is to reproduce a tracer and have it be OTEL compatible so that traces are produced
5// to a collector as normal, but also produced to the Scouter backend with the relevant metadata.
6// This data can then be pulled inside of OpsML's UI for trace correlation and analysis.
7
8use crate::error::TraceError;
9use crate::exporter::processor::BatchConfig;
10use crate::exporter::scouter::ScouterSpanExporter;
11use crate::exporter::SpanExporterNum;
12use crate::utils::BoxedSpan;
13use crate::utils::{
14    capture_function_arguments, format_traceback, get_context_store, get_context_var,
15    get_current_active_span, get_current_context_id, set_current_span, set_function_attributes,
16    set_function_type_attribute, ActiveSpanInner, FunctionType, SpanKind,
17};
18use chrono::{DateTime, Utc};
19use opentelemetry::baggage::BaggageExt;
20use opentelemetry::trace::Tracer as OTelTracer;
21use opentelemetry::trace::TracerProvider;
22use opentelemetry::{
23    trace::{Span, Status, TraceContextExt},
24    Context as OtelContext, KeyValue,
25};
26use opentelemetry_sdk::trace::SdkTracer;
27use opentelemetry_sdk::trace::SdkTracerProvider;
28use opentelemetry_sdk::Resource;
29use potato_head::create_uuid7;
30use pyo3::prelude::*;
31use pyo3::types::{PyDict, PyTuple};
32use pyo3::IntoPyObjectExt;
33use scouter_events::queue::types::TransportConfig;
34use scouter_settings::http::HttpConfig;
35
36use scouter_types::{
37    is_pydantic_basemodel, pydict_to_otel_keyvalue, pyobject_to_otel_value,
38    pyobject_to_tracing_json, BAGGAGE_PREFIX, SCOUTER_TAG_PREFIX, SCOUTER_TRACING_INPUT,
39    SCOUTER_TRACING_LABEL, SCOUTER_TRACING_OUTPUT, SERVICE_NAME, TRACE_START_TIME_KEY,
40};
41use std::sync::{Arc, RwLock};
42use std::{collections::HashMap, sync::OnceLock};
43use tracing::{debug, info, instrument};
44
45/// Global static instance of the tracer provider.
46static TRACER_PROVIDER_STORE: OnceLock<Arc<RwLock<Option<SdkTracerProvider>>>> = OnceLock::new();
47
48// Add trace metadata store
49static TRACE_METADATA_STORE: OnceLock<TraceMetadataStore> = OnceLock::new();
50
51fn get_tracer_provider_store() -> &'static Arc<RwLock<Option<SdkTracerProvider>>> {
52    TRACER_PROVIDER_STORE.get_or_init(|| Arc::new(RwLock::new(None)))
53}
54
55fn get_tracer_provider() -> Result<Arc<RwLock<Option<SdkTracerProvider>>>, TraceError> {
56    TRACER_PROVIDER_STORE.get().cloned().ok_or_else(|| {
57        // This should only happen if the store itself hasn't been initialized by get_tracer_provider_store
58        TraceError::InitializationError("Tracer provider store not initialized".to_string())
59    })
60}
61
62const MISSING: &str = "unknown";
63
64#[derive(Clone)]
65struct TraceMetadata {
66    start_time: DateTime<Utc>,
67    span_count: u32,
68}
69
70#[derive(Clone)]
71struct TraceMetadataStore {
72    inner: Arc<RwLock<HashMap<String, TraceMetadata>>>,
73}
74
75impl TraceMetadataStore {
76    fn new() -> Self {
77        Self {
78            inner: Arc::new(RwLock::new(HashMap::new())),
79        }
80    }
81
82    fn set_trace_start(
83        &self,
84        trace_id: String,
85        start_time: DateTime<Utc>,
86    ) -> Result<(), TraceError> {
87        self.inner
88            .write()
89            .map_err(|e| TraceError::PoisonError(e.to_string()))?
90            .insert(
91                trace_id.clone(),
92                TraceMetadata {
93                    start_time,
94                    span_count: 0,
95                },
96            );
97        Ok(())
98    }
99
100    fn get_trace_metadata(&self, trace_id: &str) -> Result<Option<TraceMetadata>, TraceError> {
101        Ok(self
102            .inner
103            .read()
104            .map_err(|e| TraceError::PoisonError(e.to_string()))?
105            .get(trace_id)
106            .cloned())
107    }
108
109    fn remove_trace(&self, trace_id: &str) -> Result<(), TraceError> {
110        self.inner
111            .write()
112            .map_err(|e| TraceError::PoisonError(e.to_string()))?
113            .remove(trace_id);
114        Ok(())
115    }
116
117    fn increment_span_count(&self, trace_id: &str) -> Result<(), TraceError> {
118        if let Some(mut metadata) = self.get_trace_metadata(trace_id)? {
119            metadata.span_count += 1;
120            self.inner
121                .write()
122                .map_err(|e| TraceError::PoisonError(e.to_string()))?
123                .insert(trace_id.to_string(), metadata);
124        }
125        Ok(())
126    }
127
128    /// Decrements the span count for the given trace ID. If the span count reaches zero, the trace metadata is removed.
129    fn decrement_span_count(&self, trace_id: &str) -> Result<(), TraceError> {
130        if let Some(mut metadata) = self.get_trace_metadata(trace_id)? {
131            if metadata.span_count > 0 {
132                metadata.span_count -= 1;
133            }
134
135            if metadata.span_count == 0 {
136                self.remove_trace(trace_id)?;
137            } else {
138                self.inner
139                    .write()
140                    .map_err(|e| TraceError::PoisonError(e.to_string()))?
141                    .insert(trace_id.to_string(), metadata);
142            }
143        }
144
145        Ok(())
146    }
147
148    fn clear_all(&self) -> Result<(), TraceError> {
149        self.inner
150            .write()
151            .map_err(|e| TraceError::PoisonError(e.to_string()))?
152            .clear();
153        Ok(())
154    }
155}
156
157fn get_trace_metadata_store() -> &'static TraceMetadataStore {
158    TRACE_METADATA_STORE.get_or_init(TraceMetadataStore::new)
159}
160
161/// Global initialization function for the tracer.
162/// This sets up the tracer provider with the specified service name, endpoint, and sampling ratio.
163/// If no endpoint is provided, spans will be exported to stdout for debugging purposes.
164/// # Arguments
165/// * `service_name` - Optional service name for the tracer. Defaults to "scouter_service
166/// * `transport_config` - Optional transport configuration for the Scouter exporter
167/// * `exporter` - Optional span exporter to use instead of the default HTTP exporter
168/// * `batch_config` - Optional batch configuration for span exporting
169/// * `space` - Optional space name for Scouter
170/// * `name` - Optional name for Scouter
171/// * `version` - Optional version for Scouter
172#[pyfunction]
173#[pyo3(signature = (
174    service_name="scouter_service".to_string(),
175    transport_config=None,
176    exporter=None,
177    batch_config=None,
178    space=None,
179    name=None,
180    version=None
181))]
182#[instrument(skip_all)]
183#[allow(clippy::too_many_arguments)]
184pub fn init_tracer(
185    py: Python,
186    service_name: String,
187    transport_config: Option<&Bound<'_, PyAny>>,
188    exporter: Option<&Bound<'_, PyAny>>,
189    batch_config: Option<Py<BatchConfig>>,
190    space: Option<String>,
191    name: Option<String>,
192    version: Option<String>,
193) -> Result<(), TraceError> {
194    debug!("Initializing tracer");
195
196    let transport_config = match transport_config {
197        Some(config) => TransportConfig::from_py_config(config)?,
198        None => {
199            // default to http transport config
200            let config = HttpConfig::default();
201            TransportConfig::Http(config)
202        }
203    };
204
205    let batch_config = batch_config
206        .map(|bc| bc.extract::<BatchConfig>(py))
207        .transpose()?;
208
209    let scouter_export = ScouterSpanExporter::new(
210        space.unwrap_or_else(|| MISSING.to_string()),
211        name.unwrap_or_else(|| MISSING.to_string()),
212        version.unwrap_or_else(|| MISSING.to_string()),
213        transport_config,
214    )?;
215
216    let provider_store = get_tracer_provider_store();
217
218    let mut store_guard = provider_store
219        .write()
220        .map_err(|e| TraceError::PoisonError(e.to_string()))?;
221
222    if store_guard.is_some() {
223        return Err(TraceError::InitializationError(
224            "Tracer provider already initialized. Call shutdown_tracer() first.".to_string(),
225        ));
226    }
227
228    let resource = Resource::builder()
229        .with_attributes(vec![KeyValue::new(SERVICE_NAME, service_name.clone())])
230        .build();
231
232    let span_exporter = if let Some(exporter) = exporter {
233        SpanExporterNum::from_pyobject(exporter).expect("failed to convert exporter")
234    } else {
235        SpanExporterNum::default()
236    };
237
238    let provider = span_exporter
239        .build_provider(resource, scouter_export, batch_config)
240        .expect("failed to build tracer provider");
241
242    *store_guard = Some(provider);
243
244    Ok(())
245}
246
247fn reset_current_context(py: Python, token: &Py<PyAny>) -> PyResult<()> {
248    let context_var = get_context_var(py)?;
249    context_var.bind(py).call_method1("reset", (token,))?;
250    Ok(())
251}
252
253/// ActiveSpan where all the magic happens
254/// The active Span attempts to maintain compatibility with the OpenTelemetry Span API
255#[pyclass]
256pub struct ActiveSpan {
257    inner: Arc<RwLock<ActiveSpanInner>>,
258}
259
260#[pymethods]
261impl ActiveSpan {
262    #[getter]
263    fn context_id(&self) -> Result<String, TraceError> {
264        self.with_inner(|inner| inner.context_id.clone())
265    }
266
267    /// Set the input attribute on the span
268    /// # Arguments
269    /// * `input` - The input value (any Python object, but is often a dict)
270    /// * `max_length` - Maximum length of the serialized input (default: 1000)
271    #[pyo3(signature = (input, max_length=1000))]
272    #[instrument(skip_all)]
273    fn set_input(&self, input: &Bound<'_, PyAny>, max_length: usize) -> Result<(), TraceError> {
274        let value = pyobject_to_tracing_json(input, &max_length)?;
275        self.with_inner_mut(|inner| {
276            inner.span.set_attribute(KeyValue::new(
277                SCOUTER_TRACING_INPUT,
278                serde_json::to_string(&value).unwrap(),
279            ))
280        })
281    }
282
283    #[pyo3(signature = (output, max_length=1000))]
284    #[instrument(skip_all)]
285    fn set_output(&self, output: &Bound<'_, PyAny>, max_length: usize) -> Result<(), TraceError> {
286        debug!("Setting output on span");
287        let value = pyobject_to_tracing_json(output, &max_length)?;
288        self.with_inner_mut(|inner| {
289            inner.span.set_attribute(KeyValue::new(
290                SCOUTER_TRACING_OUTPUT,
291                serde_json::to_string(&value).unwrap(),
292            ))
293        })
294    }
295
296    /// Set an attribute on the span
297    /// # Arguments
298    /// * `key` - The attribute key
299    /// * `value` - The attribute value
300    pub fn set_attribute(&self, key: String, value: Bound<'_, PyAny>) -> Result<(), TraceError> {
301        let value = pyobject_to_otel_value(&value)?;
302        self.with_inner_mut(|inner| inner.span.set_attribute(KeyValue::new(key, value)))
303    }
304
305    /// Add an event to the span
306    /// # Arguments
307    /// * `name` - The event name
308    /// * `attributes` - The event attributes as a dictionary or pydantic BaseModel
309    fn add_event(
310        &self,
311        py: Python,
312        name: String,
313        attributes: Option<Bound<'_, PyAny>>,
314    ) -> Result<(), TraceError> {
315        let pairs: Vec<KeyValue> = if let Some(attrs) = attributes {
316            if is_pydantic_basemodel(py, &attrs)? {
317                let dumped = attrs.call_method0("model_dump")?;
318                let dict = dumped
319                    .downcast::<PyDict>()
320                    .map_err(|e| TraceError::DowncastError(e.to_string()))?;
321                pydict_to_otel_keyvalue(dict)?
322            } else if attrs.is_instance_of::<PyDict>() {
323                let dict = attrs
324                    .downcast::<PyDict>()
325                    .map_err(|e| TraceError::DowncastError(e.to_string()))?;
326                pydict_to_otel_keyvalue(dict)?
327            } else {
328                return Err(TraceError::EventMustBeDict);
329            }
330        } else {
331            vec![]
332        };
333
334        self.with_inner_mut(|inner| inner.span.add_event(name, pairs))
335    }
336
337    /// Set the status of the span
338    /// # Arguments
339    /// * `status` - The status string ("ok", "error", or "unset")
340    /// * `description` - Optional description for the status (typically used with error)
341    fn set_status(&self, status: String, description: Option<String>) -> Result<(), TraceError> {
342        let otel_status = match status.to_lowercase().as_str() {
343            "ok" => Status::Ok,
344            "error" => Status::error(description.unwrap_or_default()),
345            _ => Status::Unset,
346        };
347
348        self.with_inner_mut(|inner| inner.span.set_status(otel_status))
349    }
350
351    /// Sync context manager enter
352    #[instrument(skip_all)]
353    fn __enter__<'py>(slf: PyRef<'py, Self>) -> PyResult<PyRef<'py, Self>> {
354        debug!("Entering span context: {}", slf.context_id()?);
355        Ok(slf)
356    }
357
358    /// Sync context manager exit
359    #[pyo3(signature = (exc_type=None, exc_val=None, exc_tb=None))]
360    #[instrument(skip_all)]
361    fn __exit__(
362        &mut self,
363        py: Python<'_>,
364        exc_type: Option<Py<PyAny>>,
365        exc_val: Option<Py<PyAny>>,
366        exc_tb: Option<Py<PyAny>>,
367    ) -> Result<bool, TraceError> {
368        debug!("Exiting span context: {}", self.context_id()?);
369        let (context_id, trace_id, context_token) = {
370            let mut inner = self
371                .inner
372                .write()
373                .map_err(|e| TraceError::PoisonError(e.to_string()))?;
374
375            // Handle exceptions and end span
376            if let Some(exc_type) = exc_type {
377                inner.span.set_status(Status::error("Exception occurred"));
378                inner
379                    .span
380                    .set_attribute(KeyValue::new("exception.type", exc_type.to_string()));
381
382                if let Some(exc_val) = exc_val {
383                    inner
384                        .span
385                        .set_attribute(KeyValue::new("exception.value", exc_val.to_string()));
386                }
387
388                if let Some(exc_tb) = exc_tb {
389                    // need to unpack the traceback object to string
390                    let tb = format_traceback(py, &exc_tb)?;
391                    inner
392                        .span
393                        .set_attribute(KeyValue::new("exception.traceback", tb));
394                }
395            }
396
397            inner.span.end();
398
399            // Extract values before dropping the lock
400            let context_id = inner.context_id.clone();
401            let trace_id = inner.span.span_context().trace_id().to_string();
402            let context_token = inner.context_token.take();
403
404            (context_id, trace_id, context_token)
405        };
406
407        let store = get_trace_metadata_store();
408        store.decrement_span_count(&trace_id)?;
409
410        if let Some(token) = context_token {
411            reset_current_context(py, &token)?;
412        }
413
414        get_context_store().remove(&context_id)?;
415        Ok(false)
416    }
417
418    /// Async context manager enter
419    fn __aenter__<'py>(slf: PyRef<'py, Self>, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
420        let slf_py: Py<PyAny> = slf.into_py_any(py)?;
421
422        // We need to return a Future that resolves to slf_py (__aenter__ is expected to return an awaitable)
423        pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(slf_py) })
424    }
425
426    /// Async context manager exit
427    #[pyo3(signature = (exc_type=None, exc_val=None, exc_tb=None))]
428    fn __aexit__<'py>(
429        &mut self,
430        py: Python<'py>,
431        exc_type: Option<Py<PyAny>>,
432        exc_val: Option<Py<PyAny>>,
433        exc_tb: Option<Py<PyAny>>,
434    ) -> PyResult<Bound<'py, PyAny>> {
435        let result = self.__exit__(py, exc_type, exc_val, exc_tb)?;
436        let py_result = result.into_py_any(py)?;
437
438        // We need to return a Future that resolves to py_result (__aexit__ is expected to return an awaitable)
439        pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(py_result) })
440    }
441}
442
443impl ActiveSpan {
444    pub fn set_attribute_static(
445        &mut self,
446        key: &'static str,
447        value: String,
448    ) -> Result<(), TraceError> {
449        self.inner
450            .write()
451            .map_err(|e| TraceError::PoisonError(e.to_string()))?
452            .span
453            .set_attribute(KeyValue::new(key, value));
454        Ok(())
455    }
456
457    fn with_inner_mut<F, R>(&self, f: F) -> Result<R, TraceError>
458    where
459        F: FnOnce(&mut ActiveSpanInner) -> R,
460    {
461        let mut inner = self
462            .inner
463            .write()
464            .map_err(|e| TraceError::PoisonError(e.to_string()))?;
465        Ok(f(&mut inner))
466    }
467
468    fn with_inner<F, R>(&self, f: F) -> Result<R, TraceError>
469    where
470        F: FnOnce(&ActiveSpanInner) -> R,
471    {
472        let inner = self
473            .inner
474            .read()
475            .map_err(|e| TraceError::PoisonError(e.to_string()))?;
476        Ok(f(&inner))
477    }
478}
479
480/// The main Tracer class
481#[pyclass(subclass)]
482pub struct BaseTracer {
483    tracer: SdkTracer,
484}
485
486impl BaseTracer {
487    fn set_start_time(&self, span: &mut BoxedSpan) {
488        let trace_id = span.span_context().trace_id().to_string();
489        let trace_metadata_store = get_trace_metadata_store();
490
491        let start_time = match trace_metadata_store.get_trace_metadata(&trace_id) {
492            Ok(Some(metadata)) => {
493                // Use existing trace start time
494                metadata.start_time
495            }
496            Ok(None) => {
497                // Create new trace metadata with current time
498                let current_time = Utc::now();
499                if let Err(e) = trace_metadata_store.set_trace_start(trace_id, current_time) {
500                    tracing::warn!("Failed to set trace start time: {}", e);
501                }
502                current_time
503            }
504            Err(e) => {
505                tracing::warn!("Failed to get trace metadata: {}", e);
506                Utc::now()
507            }
508        };
509
510        span.set_attribute(KeyValue::new(TRACE_START_TIME_KEY, start_time.to_rfc3339()));
511    }
512
513    fn increment_span_count(&self, trace_id: &str) -> Result<(), TraceError> {
514        let trace_metadata_store = get_trace_metadata_store();
515        trace_metadata_store.increment_span_count(trace_id)
516    }
517
518    fn setup_trace_metadata(&self, span: &mut BoxedSpan) -> Result<(), TraceError> {
519        let trace_id = span.span_context().trace_id().to_string();
520        self.set_start_time(span);
521        self.increment_span_count(&trace_id)?;
522        Ok(())
523    }
524
525    fn create_baggage_items(
526        baggage: &[HashMap<String, String>],
527        tags: &[HashMap<String, String>],
528    ) -> Vec<KeyValue> {
529        let mut keyval_baggage: Vec<KeyValue> = baggage
530            .iter()
531            .flat_map(|baggage_map| {
532                baggage_map
533                    .iter()
534                    .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
535                    .collect::<Vec<KeyValue>>()
536            })
537            .collect();
538
539        // add tags to baggage
540        tags.iter().for_each(|tag_map| {
541            tag_map.iter().for_each(|(k, v)| {
542                keyval_baggage.push(KeyValue::new(
543                    format!("{}.{}.{}", BAGGAGE_PREFIX, SCOUTER_TAG_PREFIX, k),
544                    v.clone(),
545                ));
546            });
547        });
548        keyval_baggage
549    }
550}
551
552#[pymethods]
553impl BaseTracer {
554    #[new]
555    #[pyo3(signature = (name))]
556    fn new(name: String) -> Result<Self, TraceError> {
557        let tracer = get_tracer(name)?;
558        Ok(BaseTracer { tracer })
559    }
560
561    /// Start a span and set it as the current span
562    /// # Arguments
563    /// * `name` - The name of the span
564    /// * `kind` - Optional kind of the span ("server", "client", "
565    /// producer", "consumer", "internal")
566    /// * `label` - Optional label for the span
567    /// * `attributes` - Optional attributes as a dictionary
568    /// * `baggage` - Optional baggage items as a dictionary
569    /// * `tags` - Optional tags to prefix baggage items with as a dictionary
570    /// * `parent_context_id` - Optional parent context ID to link the span to (this is automatically set if not provided)
571    #[pyo3(signature = (name, kind=SpanKind::Internal, attributes=vec![], baggage=vec![], tags=vec![], label=None,  parent_context_id=None))]
572    #[allow(clippy::too_many_arguments)]
573    #[instrument(skip_all)]
574    fn start_as_current_span(
575        &self,
576        py: Python<'_>,
577        name: String,
578        kind: SpanKind,
579        attributes: Vec<HashMap<String, String>>,
580        baggage: Vec<HashMap<String, String>>,
581        tags: Vec<HashMap<String, String>>,
582        label: Option<String>,
583        parent_context_id: Option<String>,
584    ) -> Result<ActiveSpan, TraceError> {
585        // Get parent context if available
586        let parent_id = parent_context_id.or_else(|| get_current_context_id(py).ok().flatten());
587
588        // Build the base context first
589        let base_ctx = parent_id
590            .and_then(|id| get_context_store().get(&id).ok().flatten())
591            .map(|parent_span_ctx| OtelContext::current().with_remote_span_context(parent_span_ctx))
592            .unwrap_or_else(OtelContext::current);
593
594        // convert baggage items to vec of KeyValue
595        let baggage_items = Self::create_baggage_items(&baggage, &tags);
596
597        let final_ctx = if !baggage_items.is_empty() {
598            base_ctx.with_baggage(baggage_items)
599        } else {
600            base_ctx
601        };
602
603        // Create span with the final context (this consumes final_ctx)
604
605        let mut span = BoxedSpan::new(
606            self.tracer
607                .span_builder(name.clone())
608                .with_kind(kind.to_otel_span_kind())
609                .start_with_context(&self.tracer, &final_ctx),
610        );
611
612        attributes.iter().for_each(|attr_map| {
613            attr_map.iter().for_each(|(k, v)| {
614                span.set_attribute(KeyValue::new(k.clone(), v.clone()));
615            });
616        });
617
618        // set label if provided
619        if let Some(label) = label {
620            span.set_attribute(KeyValue::new(SCOUTER_TRACING_LABEL, label));
621        }
622
623        let context_id = Self::set_context_id(self, &mut span)?;
624        let inner = Arc::new(RwLock::new(ActiveSpanInner {
625            context_id,
626            span,
627            context_token: None,
628        }));
629
630        // set as current span
631        self.set_current_span(py, &inner)?;
632
633        Ok(ActiveSpan { inner })
634    }
635
636    /// Special method that is used as a decorator to start a span around a function call
637    /// This captures the function arguments and sets them as span attributes
638    /// # Arguments
639    /// * `func` - The function to be decorated
640    /// * `name` - The name of the span
641    /// * `kind` - Optional kind of the span ("server", "client", "
642    /// producer", "consumer", "internal")
643    /// * `label` - Optional label for the span
644    /// * `attributes` - Optional attributes as a dictionary
645    /// * `baggage` - Optional baggage items as a dictionary
646    /// * `tags` - Optional tags to prefix baggage items with as a dictionary
647    /// * `parent_context_id` - Optional parent context ID to link the span to (this is automatically set if not provided)
648    /// * `max_length` - Maximum length of the serialized input (default: 1000)
649    /// * `func_type` - Function type (sync or async)
650    #[pyo3(name="_start_decorated_as_current_span", signature = (
651        name,
652        func,
653        func_args,
654        kind=SpanKind::Internal,
655        attributes=vec![],
656        baggage=vec![],
657        tags=vec![],
658        label=None,
659        parent_context_id=None,
660        max_length=1000,
661        func_type=FunctionType::Sync,
662        func_kwargs=None
663    ))]
664    #[allow(clippy::too_many_arguments)]
665    fn start_decorated_as_current_span<'py>(
666        &self,
667        py: Python<'py>,
668        name: String,
669        func: &Bound<'py, PyAny>,
670        func_args: &Bound<'_, PyTuple>,
671        kind: SpanKind,
672        attributes: Vec<HashMap<String, String>>,
673        baggage: Vec<HashMap<String, String>>,
674        tags: Vec<HashMap<String, String>>,
675        label: Option<String>,
676        parent_context_id: Option<String>,
677        max_length: usize,
678        func_type: FunctionType,
679        func_kwargs: Option<&Bound<'_, PyDict>>,
680    ) -> Result<ActiveSpan, TraceError> {
681        let mut span = self.start_as_current_span(
682            py,
683            name,
684            kind,
685            attributes,
686            baggage,
687            tags,
688            label,
689            parent_context_id,
690        )?;
691
692        set_function_attributes(func, &mut span)?;
693
694        set_function_type_attribute(&func_type, &mut span)?;
695
696        span.set_input(
697            &capture_function_arguments(py, func, func_args, func_kwargs)?,
698            max_length,
699        )?;
700
701        Ok(span)
702    }
703
704    /// Get the current active span from context
705    #[getter]
706    pub fn current_span<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyAny>, TraceError> {
707        let span = get_current_active_span(py)?;
708        Ok(span)
709    }
710
711    // remove?
712    pub fn shutdown(&self) -> Result<(), TraceError> {
713        shutdown_tracer()
714    }
715}
716
717impl BaseTracer {
718    fn set_current_span(
719        &self,
720        py: Python<'_>,
721        inner: &Arc<RwLock<ActiveSpanInner>>,
722    ) -> Result<(), TraceError> {
723        let py_span = Py::new(
724            py,
725            ActiveSpan {
726                inner: inner.clone(),
727            },
728        )?;
729        let token = set_current_span(py, py_span.bind(py).clone())?;
730        inner
731            .write()
732            .map_err(|e| TraceError::PoisonError(e.to_string()))?
733            .context_token = Some(token);
734        Ok(())
735    }
736
737    fn set_context_id(&self, span: &mut BoxedSpan) -> Result<String, TraceError> {
738        let context_id = format!("span_{}", create_uuid7());
739        Self::setup_trace_metadata(self, span)?;
740        get_context_store().set(context_id.clone(), span.span_context().clone())?;
741        Ok(context_id)
742    }
743}
744
745/// Helper function to force flush the tracer provider
746#[pyfunction]
747pub fn flush_tracer() -> Result<(), TraceError> {
748    let store = get_tracer_provider_store();
749
750    let provider_guard = store
751        .read()
752        .map_err(|e| TraceError::PoisonError(e.to_string()))?;
753
754    let provider = provider_guard.as_ref().ok_or_else(|| {
755        TraceError::InitializationError(
756            "Tracer provider not initialized or already shut down".to_string(),
757        )
758    })?;
759    provider.force_flush()?;
760
761    Ok(())
762}
763
764#[pyfunction]
765pub fn shutdown_tracer() -> Result<(), TraceError> {
766    info!("Shutting down tracer");
767
768    let store_arc = get_tracer_provider()?;
769    let mut store_guard = store_arc
770        .write()
771        .map_err(|e| TraceError::PoisonError(e.to_string()))?;
772
773    if let Some(provider) = store_guard.take() {
774        provider.shutdown()?;
775    } else {
776        tracing::warn!("Tracer provider was already shut down or never initialized.");
777    }
778
779    get_trace_metadata_store().clear_all()?;
780
781    Ok(())
782}
783
784fn get_tracer(name: String) -> Result<SdkTracer, TraceError> {
785    let store = get_tracer_provider_store();
786
787    let provider_guard = store
788        .read()
789        .map_err(|e| TraceError::PoisonError(e.to_string()))?;
790
791    let provider = provider_guard.as_ref().ok_or_else(|| {
792        TraceError::InitializationError(
793            "Tracer provider not initialized or already shut down".to_string(),
794        )
795    })?;
796
797    Ok(provider.tracer(name))
798}