scouter_tracing/
utils.rs

1use crate::error::TraceError;
2use crate::tracer::ActiveSpan;
3use opentelemetry::global::ObjectSafeSpan;
4use opentelemetry::trace::SpanContext;
5use opentelemetry::{trace, KeyValue};
6use opentelemetry_otlp::ExportConfig as OtlpExportConfig;
7use pyo3::types::{PyDict, PyModule, PyTuple};
8use pyo3::{prelude::*, IntoPyObjectExt};
9use scouter_types::CompressionType;
10use scouter_types::{
11    FUNCTION_MODULE, FUNCTION_NAME, FUNCTION_QUALNAME, FUNCTION_STREAMING, FUNCTION_TYPE,
12};
13use serde::Serialize;
14use std::borrow::Cow;
15use std::collections::HashMap;
16use std::fmt;
17use std::fmt::Display;
18use std::sync::OnceLock;
19use std::sync::{Arc, RwLock};
20use std::time::Duration;
21use std::time::SystemTime;
22use tracing::{debug, instrument};
23
24/// Global static instance of the context store.
25static CONTEXT_STORE: OnceLock<ContextStore> = OnceLock::new();
26
27/// Global static instance of the context variable for async context propagation. Caching the import for speed
28static OTEL_CONTEXT_VAR: OnceLock<Py<PyAny>> = OnceLock::new();
29
30// Quick access to commonly used Python modules
31static PY_IMPORTS: OnceLock<HelperImports> = OnceLock::new();
32const ASYNCIO_MODULE: &str = "asyncio";
33const INSPECT_MODULE: &str = "inspect";
34const CONTEXTVARS_MODULE: &str = "contextvars";
35
36#[pyclass(eq)]
37#[derive(PartialEq, Clone, Debug)]
38pub enum FunctionType {
39    Async,
40    AsyncGenerator,
41    SyncGenerator,
42    Sync,
43}
44
45impl Display for FunctionType {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        match self {
48            FunctionType::Async => write!(f, "Async"),
49            FunctionType::AsyncGenerator => write!(f, "AsyncGenerator"),
50            FunctionType::SyncGenerator => write!(f, "SyncGenerator"),
51            FunctionType::Sync => write!(f, "Sync"),
52        }
53    }
54}
55
56impl FunctionType {
57    pub fn as_str(&self) -> &str {
58        match self {
59            FunctionType::Async => "Async",
60            FunctionType::AsyncGenerator => "AsyncGenerator",
61            FunctionType::SyncGenerator => "SyncGenerator",
62            FunctionType::Sync => "Sync",
63        }
64    }
65}
66
67impl std::str::FromStr for FunctionType {
68    type Err = TraceError;
69    fn from_str(s: &str) -> Result<Self, Self::Err> {
70        match s {
71            "Async" => Ok(FunctionType::Async),
72            "AsyncGenerator" => Ok(FunctionType::AsyncGenerator),
73            "SyncGenerator" => Ok(FunctionType::SyncGenerator),
74            "Sync" => Ok(FunctionType::Sync),
75            _ => Err(TraceError::InvalidFunctionType(s.to_string())),
76        }
77    }
78}
79
80pub struct HelperImports {
81    pub asyncio: Py<PyModule>,
82    pub inspect: Py<PyModule>,
83}
84
85/// Initialize and get helper imports for asyncio and inspect modules
86fn get_helper_imports(py: Python<'_>) -> &'static HelperImports {
87    PY_IMPORTS.get_or_init(|| {
88        let asyncio = py
89            .import(ASYNCIO_MODULE)
90            .expect("Failed to import asyncio")
91            .unbind();
92        let inspect = py
93            .import(INSPECT_MODULE)
94            .expect("Failed to import inspect")
95            .unbind();
96        HelperImports { asyncio, inspect }
97    })
98}
99
100fn py_asyncio(py: Python<'_>) -> &Bound<'_, PyModule> {
101    let imports = get_helper_imports(py);
102    imports.asyncio.bind(py)
103}
104
105fn py_inspect(py: Python<'_>) -> &Bound<'_, PyModule> {
106    let imports = get_helper_imports(py);
107    imports.inspect.bind(py)
108}
109
110/// Function to determine if a Python function is async, async generator, or generator
111/// This is a helper util function used in tracing decorators
112#[pyfunction]
113pub fn get_function_type(
114    py: Python<'_>,
115    func: &Bound<'_, PyAny>,
116) -> Result<FunctionType, TraceError> {
117    // Check for async generator first (most specific)
118    let is_async_gen = py_inspect(py)
119        .call_method1("isasyncgenfunction", (func,))?
120        .extract::<bool>()?;
121
122    if is_async_gen {
123        return Ok(FunctionType::AsyncGenerator);
124    }
125
126    // Check for sync generator
127    let is_gen = py_inspect(py)
128        .call_method1("isgeneratorfunction", (func,))?
129        .extract::<bool>()?;
130
131    if is_gen {
132        return Ok(FunctionType::SyncGenerator);
133    }
134
135    // Check for regular async function
136    let is_async = py_asyncio(py)
137        .call_method1("iscoroutinefunction", (func,))?
138        .extract::<bool>()?;
139
140    if is_async {
141        return Ok(FunctionType::Async);
142    }
143
144    // Default to sync
145    Ok(FunctionType::Sync)
146}
147
148/// Capture function inputs by binding args and kwargs to the function signature
149/// # Arguments
150/// * `py` - The Python GIL token
151/// * `func` - The Python function object
152/// * `args` - The positional arguments passed to the function
153/// * `kwargs` - The keyword arguments passed to the function
154/// # Returns
155/// Result with Bound arguments or TraceError
156pub(crate) fn capture_function_arguments<'py>(
157    py: Python<'py>,
158    func: &Bound<'py, PyAny>,
159    args: &Bound<'py, PyTuple>,
160    kwargs: Option<&Bound<'py, PyDict>>,
161) -> Result<Bound<'py, PyAny>, TraceError> {
162    let sig = py_inspect(py).call_method1("signature", (func,))?;
163    let bound_args = sig.call_method("bind", args, kwargs)?;
164    bound_args.call_method0("apply_defaults")?;
165    Ok(bound_args.getattr("arguments")?)
166}
167
168/// Set function attributes on the span
169/// # Arguments
170/// * `func` - The Python function object
171/// * `span` - The ActiveSpan to set attributes on
172/// # Returns
173/// Result<(), TraceError>
174#[instrument(skip_all)]
175pub fn set_function_attributes(
176    func: &Bound<'_, PyAny>,
177    span: &mut ActiveSpan,
178) -> Result<(), TraceError> {
179    debug!("Setting function attributes on span");
180    let function_name = match func.getattr("__name__") {
181        Ok(name) => name.extract::<String>()?,
182        Err(_) => "<unknown>".to_string(),
183    };
184
185    let func_module = match func.getattr("__module__") {
186        Ok(module) => module.extract::<String>()?,
187        Err(_) => "<unknown>".to_string(),
188    };
189
190    let func_qualname = match func.getattr("__qualname__") {
191        Ok(qualname) => qualname.extract::<String>()?,
192        Err(_) => "<unknown>".to_string(),
193    };
194
195    span.set_attribute_static(FUNCTION_NAME, function_name)?;
196    span.set_attribute_static(FUNCTION_MODULE, func_module)?;
197    span.set_attribute_static(FUNCTION_QUALNAME, func_qualname)?;
198
199    Ok(())
200}
201
202#[instrument(skip_all)]
203pub(crate) fn set_function_type_attribute(
204    func_type: &FunctionType,
205    span: &mut ActiveSpan,
206) -> Result<(), TraceError> {
207    debug!("Setting function type attribute on span");
208    if func_type == &FunctionType::AsyncGenerator || func_type == &FunctionType::SyncGenerator {
209        span.set_attribute_static(FUNCTION_STREAMING, "true".to_string())?;
210    } else {
211        span.set_attribute_static(FUNCTION_STREAMING, "false".to_string())?;
212    }
213    span.set_attribute_static(FUNCTION_TYPE, func_type.as_str().to_string())?;
214
215    Ok(())
216}
217
218/// Global Context Store to hold SpanContexts associated with context IDs.
219#[derive(Clone)]
220pub(crate) struct ContextStore {
221    inner: Arc<RwLock<HashMap<String, SpanContext>>>,
222}
223
224impl ContextStore {
225    fn new() -> Self {
226        Self {
227            inner: Arc::new(RwLock::new(HashMap::new())),
228        }
229    }
230
231    pub(crate) fn set(&self, key: String, ctx: SpanContext) -> Result<(), TraceError> {
232        self.inner
233            .write()
234            .map_err(|e| TraceError::PoisonError(e.to_string()))?
235            .insert(key, ctx);
236        Ok(())
237    }
238
239    pub(crate) fn get(&self, key: &str) -> Result<Option<SpanContext>, TraceError> {
240        Ok(self
241            .inner
242            .read()
243            .map_err(|e| TraceError::PoisonError(e.to_string()))?
244            .get(key)
245            .cloned())
246    }
247
248    pub(crate) fn remove(&self, key: &str) -> Result<(), TraceError> {
249        self.inner
250            .write()
251            .map_err(|e| TraceError::PoisonError(e.to_string()))?
252            .remove(key);
253        Ok(())
254    }
255}
256
257pub(crate) fn get_context_store() -> &'static ContextStore {
258    CONTEXT_STORE.get_or_init(ContextStore::new)
259}
260
261/// Initialize the context variable for storing the current span context ID.
262/// This is important for async context propagation in python.
263/// This will be used to store Py<ActiveSpan> objects.
264fn init_context_var(py: Python<'_>) -> PyResult<Py<PyAny>> {
265    let contextvars = py.import(CONTEXTVARS_MODULE)?;
266    let context_var = contextvars
267        .call_method1("ContextVar", ("_otel_current_span",))?
268        .unbind();
269    Ok(context_var)
270}
271
272pub(crate) fn get_context_var(py: Python<'_>) -> PyResult<&Py<PyAny>> {
273    Ok(OTEL_CONTEXT_VAR
274        .get_or_init(|| init_context_var(py).expect("Failed to initialize context var")))
275}
276
277pub(crate) fn set_current_span(py: Python<'_>, obj: Bound<'_, ActiveSpan>) -> PyResult<Py<PyAny>> {
278    let context_var = get_context_var(py)?;
279    let token = context_var.bind(py).call_method1("set", (obj,))?;
280    Ok(token.unbind())
281}
282
283/// Get the current context ID from the context variable.
284/// Returns None if no current context is set.
285pub(crate) fn get_current_context_id(py: Python<'_>) -> PyResult<Option<String>> {
286    // Try to get the current value, returns None if not set
287    match get_context_var(py)?.bind(py).call_method0("get") {
288        Ok(val) => {
289            if val.is_none() {
290                Ok(None)
291            } else {
292                // this will be the Py<ActiveSpan> object,
293                // so we need to call context_id method to get the string
294                let val = val.getattr("context_id")?;
295                Ok(Some(val.extract::<String>()?))
296            }
297        }
298        Err(_) => Ok(None),
299    }
300}
301
302/// Get the current active span from the context variable.
303/// Returns TraceError::NoActiveSpan if no active span is set.
304pub(crate) fn get_current_active_span(py: Python<'_>) -> Result<Bound<'_, PyAny>, TraceError> {
305    match get_context_var(py)?.bind(py).call_method0("get") {
306        Ok(val) => {
307            if val.is_none() {
308                Err(TraceError::NoActiveSpan)
309            } else {
310                Ok(val)
311            }
312        }
313        Err(_) => Err(TraceError::NoActiveSpan),
314    }
315}
316
317#[pyclass(eq)]
318#[derive(PartialEq, Clone, Debug)]
319pub enum SpanKind {
320    Client,
321    Server,
322    Producer,
323    Consumer,
324    Internal,
325}
326
327impl Display for SpanKind {
328    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
329        match self {
330            SpanKind::Client => write!(f, "client"),
331            SpanKind::Server => write!(f, "server"),
332            SpanKind::Producer => write!(f, "producer"),
333            SpanKind::Consumer => write!(f, "consumer"),
334            SpanKind::Internal => write!(f, "internal"),
335        }
336    }
337}
338
339impl SpanKind {
340    pub fn to_otel_span_kind(&self) -> opentelemetry::trace::SpanKind {
341        match self {
342            SpanKind::Client => opentelemetry::trace::SpanKind::Client,
343            SpanKind::Server => opentelemetry::trace::SpanKind::Server,
344            SpanKind::Producer => opentelemetry::trace::SpanKind::Producer,
345            SpanKind::Consumer => opentelemetry::trace::SpanKind::Consumer,
346            SpanKind::Internal => opentelemetry::trace::SpanKind::Internal,
347        }
348    }
349}
350
351pub(crate) struct ActiveSpanInner {
352    pub context_id: String,
353    pub span: BoxedSpan,
354    pub context_token: Option<Py<PyAny>>,
355}
356
357#[pyclass(eq)]
358#[derive(PartialEq, Clone, Debug, Default, Serialize)]
359pub enum OtelProtocol {
360    #[default]
361    HttpBinary,
362    HttpJson,
363}
364
365impl OtelProtocol {
366    pub fn to_otel_protocol(&self) -> opentelemetry_otlp::Protocol {
367        match self {
368            OtelProtocol::HttpBinary => opentelemetry_otlp::Protocol::HttpBinary,
369            OtelProtocol::HttpJson => opentelemetry_otlp::Protocol::HttpJson,
370        }
371    }
372}
373
374#[derive(Debug)]
375#[pyclass]
376pub struct ExportConfig {
377    #[pyo3(get)]
378    pub endpoint: Option<String>,
379    #[pyo3(get)]
380    pub protocol: OtelProtocol,
381    #[pyo3(get)]
382    pub timeout: Option<u64>,
383}
384
385#[pymethods]
386impl ExportConfig {
387    #[new]
388    #[pyo3(signature = (protocol=OtelProtocol::HttpBinary,endpoint=None,  timeout=None))]
389    pub fn new(protocol: OtelProtocol, endpoint: Option<String>, timeout: Option<u64>) -> Self {
390        ExportConfig {
391            endpoint,
392            protocol,
393            timeout,
394        }
395    }
396}
397
398impl ExportConfig {
399    pub fn to_otel_config(&self) -> OtlpExportConfig {
400        let timeout = self.timeout.map(Duration::from_secs);
401        OtlpExportConfig {
402            endpoint: self.endpoint.clone(),
403            protocol: self.protocol.to_otel_protocol(),
404            timeout,
405        }
406    }
407}
408
409#[derive(Debug)]
410#[pyclass]
411pub struct OtelHttpConfig {
412    #[pyo3(get)]
413    pub headers: Option<HashMap<String, String>>,
414    #[pyo3(get)]
415    pub compression: Option<CompressionType>,
416}
417
418#[pymethods]
419impl OtelHttpConfig {
420    #[new]
421    pub fn new(
422        headers: Option<HashMap<String, String>>,
423        compression: Option<CompressionType>,
424    ) -> Self {
425        OtelHttpConfig {
426            headers,
427            compression,
428        }
429    }
430}
431
432#[derive(Debug)]
433#[pyclass]
434pub struct GrpcConfig {
435    #[pyo3(get)]
436    pub compression: Option<CompressionType>,
437}
438
439#[pymethods]
440impl GrpcConfig {
441    #[new]
442    pub fn new(compression: Option<CompressionType>) -> Self {
443        GrpcConfig { compression }
444    }
445}
446
447pub fn format_traceback(py: Python, exc_tb: &Py<PyAny>) -> Result<String, TraceError> {
448    // Import the traceback module
449    let traceback_module = py.import("traceback")?;
450
451    // Use traceback.format_tb() to get a list of strings
452    let tb_lines = traceback_module.call_method1("format_tb", (exc_tb.bind(py),))?;
453
454    // Join the lines into a single string
455    let empty_string = "".into_bound_py_any(py)?;
456    let formatted = empty_string.call_method1("join", (tb_lines,))?;
457
458    Ok(formatted.extract::<String>()?)
459}
460
461/// This re-implements a boxed span from opentelemetry since BoxedSpan::new is not public
462pub struct BoxedSpan(Box<dyn ObjectSafeSpan + Send + Sync>);
463
464impl BoxedSpan {
465    pub(crate) fn new<T>(span: T) -> Self
466    where
467        T: ObjectSafeSpan + Send + Sync + 'static,
468    {
469        BoxedSpan(Box::new(span))
470    }
471}
472
473impl fmt::Debug for BoxedSpan {
474    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
475        f.write_str("BoxedSpan")
476    }
477}
478
479impl trace::Span for BoxedSpan {
480    /// Records events at a specific time in the context of a given `Span`.
481    ///
482    /// Note that the OpenTelemetry project documents certain ["standard event names and
483    /// keys"](https://github.com/open-telemetry/opentelemetry-specification/tree/v0.5.0/specification/trace/semantic_conventions/README.md)
484    /// which have prescribed semantic meanings.
485    fn add_event_with_timestamp<T>(
486        &mut self,
487        name: T,
488        timestamp: SystemTime,
489        attributes: Vec<KeyValue>,
490    ) where
491        T: Into<Cow<'static, str>>,
492    {
493        self.0
494            .add_event_with_timestamp(name.into(), timestamp, attributes)
495    }
496
497    /// Returns the `SpanContext` for the given `Span`.
498    fn span_context(&self) -> &trace::SpanContext {
499        self.0.span_context()
500    }
501
502    /// Returns true if this `Span` is recording information like events with the `add_event`
503    /// operation, attributes using `set_attributes`, status with `set_status`, etc.
504    fn is_recording(&self) -> bool {
505        self.0.is_recording()
506    }
507
508    /// Sets a single `Attribute` where the attribute properties are passed as arguments.
509    ///
510    /// Note that the OpenTelemetry project documents certain ["standard
511    /// attributes"](https://github.com/open-telemetry/opentelemetry-specification/tree/v0.5.0/specification/trace/semantic_conventions/README.md)
512    /// that have prescribed semantic meanings.
513    fn set_attribute(&mut self, attribute: KeyValue) {
514        self.0.set_attribute(attribute)
515    }
516
517    /// Sets the status of the `Span`. If used, this will override the default `Span`
518    /// status, which is `Unset`.
519    fn set_status(&mut self, status: trace::Status) {
520        self.0.set_status(status)
521    }
522
523    /// Updates the `Span`'s name.
524    fn update_name<T>(&mut self, new_name: T)
525    where
526        T: Into<Cow<'static, str>>,
527    {
528        self.0.update_name(new_name.into())
529    }
530
531    /// Adds a link to this span
532    ///
533    fn add_link(&mut self, span_context: trace::SpanContext, attributes: Vec<KeyValue>) {
534        self.0.add_link(span_context, attributes)
535    }
536
537    /// Finishes the span with given timestamp.
538    fn end_with_timestamp(&mut self, timestamp: SystemTime) {
539        self.0.end_with_timestamp(timestamp);
540    }
541}