1use crate::error::TraceError;
2use crate::tracer::ActiveSpan;
3use opentelemetry::global::ObjectSafeSpan;
4use opentelemetry::trace::SpanContext;
5use opentelemetry::{trace, KeyValue};
6use opentelemetry_otlp::ExportConfig as OtlpExportConfig;
7use pyo3::types::{PyDict, PyModule, PyTuple};
8use pyo3::{prelude::*, IntoPyObjectExt};
9use scouter_types::CompressionType;
10use scouter_types::{
11 FUNCTION_MODULE, FUNCTION_NAME, FUNCTION_QUALNAME, FUNCTION_STREAMING, FUNCTION_TYPE,
12};
13use serde::Serialize;
14use std::borrow::Cow;
15use std::collections::HashMap;
16use std::fmt;
17use std::fmt::Display;
18use std::sync::OnceLock;
19use std::sync::{Arc, RwLock};
20use std::time::Duration;
21use std::time::SystemTime;
22use tracing::{debug, instrument};
23
24static CONTEXT_STORE: OnceLock<ContextStore> = OnceLock::new();
26
27static OTEL_CONTEXT_VAR: OnceLock<Py<PyAny>> = OnceLock::new();
29
30static PY_IMPORTS: OnceLock<HelperImports> = OnceLock::new();
32const ASYNCIO_MODULE: &str = "asyncio";
33const INSPECT_MODULE: &str = "inspect";
34const CONTEXTVARS_MODULE: &str = "contextvars";
35
36#[pyclass(eq)]
37#[derive(PartialEq, Clone, Debug)]
38pub enum FunctionType {
39 Async,
40 AsyncGenerator,
41 SyncGenerator,
42 Sync,
43}
44
45impl Display for FunctionType {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 match self {
48 FunctionType::Async => write!(f, "Async"),
49 FunctionType::AsyncGenerator => write!(f, "AsyncGenerator"),
50 FunctionType::SyncGenerator => write!(f, "SyncGenerator"),
51 FunctionType::Sync => write!(f, "Sync"),
52 }
53 }
54}
55
56impl FunctionType {
57 pub fn as_str(&self) -> &str {
58 match self {
59 FunctionType::Async => "Async",
60 FunctionType::AsyncGenerator => "AsyncGenerator",
61 FunctionType::SyncGenerator => "SyncGenerator",
62 FunctionType::Sync => "Sync",
63 }
64 }
65}
66
67impl std::str::FromStr for FunctionType {
68 type Err = TraceError;
69 fn from_str(s: &str) -> Result<Self, Self::Err> {
70 match s {
71 "Async" => Ok(FunctionType::Async),
72 "AsyncGenerator" => Ok(FunctionType::AsyncGenerator),
73 "SyncGenerator" => Ok(FunctionType::SyncGenerator),
74 "Sync" => Ok(FunctionType::Sync),
75 _ => Err(TraceError::InvalidFunctionType(s.to_string())),
76 }
77 }
78}
79
80pub struct HelperImports {
81 pub asyncio: Py<PyModule>,
82 pub inspect: Py<PyModule>,
83}
84
85fn get_helper_imports(py: Python<'_>) -> &'static HelperImports {
87 PY_IMPORTS.get_or_init(|| {
88 let asyncio = py
89 .import(ASYNCIO_MODULE)
90 .expect("Failed to import asyncio")
91 .unbind();
92 let inspect = py
93 .import(INSPECT_MODULE)
94 .expect("Failed to import inspect")
95 .unbind();
96 HelperImports { asyncio, inspect }
97 })
98}
99
100fn py_asyncio(py: Python<'_>) -> &Bound<'_, PyModule> {
101 let imports = get_helper_imports(py);
102 imports.asyncio.bind(py)
103}
104
105fn py_inspect(py: Python<'_>) -> &Bound<'_, PyModule> {
106 let imports = get_helper_imports(py);
107 imports.inspect.bind(py)
108}
109
110#[pyfunction]
113pub fn get_function_type(
114 py: Python<'_>,
115 func: &Bound<'_, PyAny>,
116) -> Result<FunctionType, TraceError> {
117 let is_async_gen = py_inspect(py)
119 .call_method1("isasyncgenfunction", (func,))?
120 .extract::<bool>()?;
121
122 if is_async_gen {
123 return Ok(FunctionType::AsyncGenerator);
124 }
125
126 let is_gen = py_inspect(py)
128 .call_method1("isgeneratorfunction", (func,))?
129 .extract::<bool>()?;
130
131 if is_gen {
132 return Ok(FunctionType::SyncGenerator);
133 }
134
135 let is_async = py_asyncio(py)
137 .call_method1("iscoroutinefunction", (func,))?
138 .extract::<bool>()?;
139
140 if is_async {
141 return Ok(FunctionType::Async);
142 }
143
144 Ok(FunctionType::Sync)
146}
147
148pub(crate) fn capture_function_arguments<'py>(
157 py: Python<'py>,
158 func: &Bound<'py, PyAny>,
159 args: &Bound<'py, PyTuple>,
160 kwargs: Option<&Bound<'py, PyDict>>,
161) -> Result<Bound<'py, PyAny>, TraceError> {
162 let sig = py_inspect(py).call_method1("signature", (func,))?;
163 let bound_args = sig.call_method("bind", args, kwargs)?;
164 bound_args.call_method0("apply_defaults")?;
165 Ok(bound_args.getattr("arguments")?)
166}
167
168#[instrument(skip_all)]
175pub fn set_function_attributes(
176 func: &Bound<'_, PyAny>,
177 span: &mut ActiveSpan,
178) -> Result<(), TraceError> {
179 debug!("Setting function attributes on span");
180 let function_name = match func.getattr("__name__") {
181 Ok(name) => name.extract::<String>()?,
182 Err(_) => "<unknown>".to_string(),
183 };
184
185 let func_module = match func.getattr("__module__") {
186 Ok(module) => module.extract::<String>()?,
187 Err(_) => "<unknown>".to_string(),
188 };
189
190 let func_qualname = match func.getattr("__qualname__") {
191 Ok(qualname) => qualname.extract::<String>()?,
192 Err(_) => "<unknown>".to_string(),
193 };
194
195 span.set_attribute_static(FUNCTION_NAME, function_name)?;
196 span.set_attribute_static(FUNCTION_MODULE, func_module)?;
197 span.set_attribute_static(FUNCTION_QUALNAME, func_qualname)?;
198
199 Ok(())
200}
201
202#[instrument(skip_all)]
203pub(crate) fn set_function_type_attribute(
204 func_type: &FunctionType,
205 span: &mut ActiveSpan,
206) -> Result<(), TraceError> {
207 debug!("Setting function type attribute on span");
208 if func_type == &FunctionType::AsyncGenerator || func_type == &FunctionType::SyncGenerator {
209 span.set_attribute_static(FUNCTION_STREAMING, "true".to_string())?;
210 } else {
211 span.set_attribute_static(FUNCTION_STREAMING, "false".to_string())?;
212 }
213 span.set_attribute_static(FUNCTION_TYPE, func_type.as_str().to_string())?;
214
215 Ok(())
216}
217
218#[derive(Clone)]
220pub(crate) struct ContextStore {
221 inner: Arc<RwLock<HashMap<String, SpanContext>>>,
222}
223
224impl ContextStore {
225 fn new() -> Self {
226 Self {
227 inner: Arc::new(RwLock::new(HashMap::new())),
228 }
229 }
230
231 pub(crate) fn set(&self, key: String, ctx: SpanContext) -> Result<(), TraceError> {
232 self.inner
233 .write()
234 .map_err(|e| TraceError::PoisonError(e.to_string()))?
235 .insert(key, ctx);
236 Ok(())
237 }
238
239 pub(crate) fn get(&self, key: &str) -> Result<Option<SpanContext>, TraceError> {
240 Ok(self
241 .inner
242 .read()
243 .map_err(|e| TraceError::PoisonError(e.to_string()))?
244 .get(key)
245 .cloned())
246 }
247
248 pub(crate) fn remove(&self, key: &str) -> Result<(), TraceError> {
249 self.inner
250 .write()
251 .map_err(|e| TraceError::PoisonError(e.to_string()))?
252 .remove(key);
253 Ok(())
254 }
255}
256
257pub(crate) fn get_context_store() -> &'static ContextStore {
258 CONTEXT_STORE.get_or_init(ContextStore::new)
259}
260
261fn init_context_var(py: Python<'_>) -> PyResult<Py<PyAny>> {
265 let contextvars = py.import(CONTEXTVARS_MODULE)?;
266 let context_var = contextvars
267 .call_method1("ContextVar", ("_otel_current_span",))?
268 .unbind();
269 Ok(context_var)
270}
271
272pub(crate) fn get_context_var(py: Python<'_>) -> PyResult<&Py<PyAny>> {
273 Ok(OTEL_CONTEXT_VAR
274 .get_or_init(|| init_context_var(py).expect("Failed to initialize context var")))
275}
276
277pub(crate) fn set_current_span(py: Python<'_>, obj: Bound<'_, ActiveSpan>) -> PyResult<Py<PyAny>> {
278 let context_var = get_context_var(py)?;
279 let token = context_var.bind(py).call_method1("set", (obj,))?;
280 Ok(token.unbind())
281}
282
283pub(crate) fn get_current_context_id(py: Python<'_>) -> PyResult<Option<String>> {
286 match get_context_var(py)?.bind(py).call_method0("get") {
288 Ok(val) => {
289 if val.is_none() {
290 Ok(None)
291 } else {
292 let val = val.getattr("context_id")?;
295 Ok(Some(val.extract::<String>()?))
296 }
297 }
298 Err(_) => Ok(None),
299 }
300}
301
302pub(crate) fn get_current_active_span(py: Python<'_>) -> Result<Bound<'_, PyAny>, TraceError> {
305 match get_context_var(py)?.bind(py).call_method0("get") {
306 Ok(val) => {
307 if val.is_none() {
308 Err(TraceError::NoActiveSpan)
309 } else {
310 Ok(val)
311 }
312 }
313 Err(_) => Err(TraceError::NoActiveSpan),
314 }
315}
316
317#[pyclass(eq)]
318#[derive(PartialEq, Clone, Debug)]
319pub enum SpanKind {
320 Client,
321 Server,
322 Producer,
323 Consumer,
324 Internal,
325}
326
327impl Display for SpanKind {
328 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
329 match self {
330 SpanKind::Client => write!(f, "client"),
331 SpanKind::Server => write!(f, "server"),
332 SpanKind::Producer => write!(f, "producer"),
333 SpanKind::Consumer => write!(f, "consumer"),
334 SpanKind::Internal => write!(f, "internal"),
335 }
336 }
337}
338
339impl SpanKind {
340 pub fn to_otel_span_kind(&self) -> opentelemetry::trace::SpanKind {
341 match self {
342 SpanKind::Client => opentelemetry::trace::SpanKind::Client,
343 SpanKind::Server => opentelemetry::trace::SpanKind::Server,
344 SpanKind::Producer => opentelemetry::trace::SpanKind::Producer,
345 SpanKind::Consumer => opentelemetry::trace::SpanKind::Consumer,
346 SpanKind::Internal => opentelemetry::trace::SpanKind::Internal,
347 }
348 }
349}
350
351pub(crate) struct ActiveSpanInner {
352 pub context_id: String,
353 pub span: BoxedSpan,
354 pub context_token: Option<Py<PyAny>>,
355}
356
357#[pyclass(eq)]
358#[derive(PartialEq, Clone, Debug, Default, Serialize)]
359pub enum OtelProtocol {
360 #[default]
361 HttpBinary,
362 HttpJson,
363}
364
365impl OtelProtocol {
366 pub fn to_otel_protocol(&self) -> opentelemetry_otlp::Protocol {
367 match self {
368 OtelProtocol::HttpBinary => opentelemetry_otlp::Protocol::HttpBinary,
369 OtelProtocol::HttpJson => opentelemetry_otlp::Protocol::HttpJson,
370 }
371 }
372}
373
374#[derive(Debug)]
375#[pyclass]
376pub struct ExportConfig {
377 #[pyo3(get)]
378 pub endpoint: Option<String>,
379 #[pyo3(get)]
380 pub protocol: OtelProtocol,
381 #[pyo3(get)]
382 pub timeout: Option<u64>,
383}
384
385#[pymethods]
386impl ExportConfig {
387 #[new]
388 #[pyo3(signature = (protocol=OtelProtocol::HttpBinary,endpoint=None, timeout=None))]
389 pub fn new(protocol: OtelProtocol, endpoint: Option<String>, timeout: Option<u64>) -> Self {
390 ExportConfig {
391 endpoint,
392 protocol,
393 timeout,
394 }
395 }
396}
397
398impl ExportConfig {
399 pub fn to_otel_config(&self) -> OtlpExportConfig {
400 let timeout = self.timeout.map(Duration::from_secs);
401 OtlpExportConfig {
402 endpoint: self.endpoint.clone(),
403 protocol: self.protocol.to_otel_protocol(),
404 timeout,
405 }
406 }
407}
408
409#[derive(Debug)]
410#[pyclass]
411pub struct OtelHttpConfig {
412 #[pyo3(get)]
413 pub headers: Option<HashMap<String, String>>,
414 #[pyo3(get)]
415 pub compression: Option<CompressionType>,
416}
417
418#[pymethods]
419impl OtelHttpConfig {
420 #[new]
421 pub fn new(
422 headers: Option<HashMap<String, String>>,
423 compression: Option<CompressionType>,
424 ) -> Self {
425 OtelHttpConfig {
426 headers,
427 compression,
428 }
429 }
430}
431
432#[derive(Debug)]
433#[pyclass]
434pub struct GrpcConfig {
435 #[pyo3(get)]
436 pub compression: Option<CompressionType>,
437}
438
439#[pymethods]
440impl GrpcConfig {
441 #[new]
442 pub fn new(compression: Option<CompressionType>) -> Self {
443 GrpcConfig { compression }
444 }
445}
446
447pub fn format_traceback(py: Python, exc_tb: &Py<PyAny>) -> Result<String, TraceError> {
448 let traceback_module = py.import("traceback")?;
450
451 let tb_lines = traceback_module.call_method1("format_tb", (exc_tb.bind(py),))?;
453
454 let empty_string = "".into_bound_py_any(py)?;
456 let formatted = empty_string.call_method1("join", (tb_lines,))?;
457
458 Ok(formatted.extract::<String>()?)
459}
460
461pub struct BoxedSpan(Box<dyn ObjectSafeSpan + Send + Sync>);
463
464impl BoxedSpan {
465 pub(crate) fn new<T>(span: T) -> Self
466 where
467 T: ObjectSafeSpan + Send + Sync + 'static,
468 {
469 BoxedSpan(Box::new(span))
470 }
471}
472
473impl fmt::Debug for BoxedSpan {
474 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
475 f.write_str("BoxedSpan")
476 }
477}
478
479impl trace::Span for BoxedSpan {
480 fn add_event_with_timestamp<T>(
486 &mut self,
487 name: T,
488 timestamp: SystemTime,
489 attributes: Vec<KeyValue>,
490 ) where
491 T: Into<Cow<'static, str>>,
492 {
493 self.0
494 .add_event_with_timestamp(name.into(), timestamp, attributes)
495 }
496
497 fn span_context(&self) -> &trace::SpanContext {
499 self.0.span_context()
500 }
501
502 fn is_recording(&self) -> bool {
505 self.0.is_recording()
506 }
507
508 fn set_attribute(&mut self, attribute: KeyValue) {
514 self.0.set_attribute(attribute)
515 }
516
517 fn set_status(&mut self, status: trace::Status) {
520 self.0.set_status(status)
521 }
522
523 fn update_name<T>(&mut self, new_name: T)
525 where
526 T: Into<Cow<'static, str>>,
527 {
528 self.0.update_name(new_name.into())
529 }
530
531 fn add_link(&mut self, span_context: trace::SpanContext, attributes: Vec<KeyValue>) {
534 self.0.add_link(span_context, attributes)
535 }
536
537 fn end_with_timestamp(&mut self, timestamp: SystemTime) {
539 self.0.end_with_timestamp(timestamp);
540 }
541}