use crate::error::TraceError;
use crate::tracer::ActiveSpan;
use opentelemetry::global::ObjectSafeSpan;
use opentelemetry::trace::Status;
use opentelemetry::trace::{SpanContext, TraceState};
use opentelemetry::{trace, KeyValue, SpanId, TraceFlags, TraceId};
use opentelemetry_otlp::ExportConfig as OtlpExportConfig;
use pyo3::types::PyString;
use pyo3::types::{PyDict, PyModule, PyTuple};
use pyo3::{prelude::*, IntoPyObjectExt};
use scouter_events::queue::ScouterQueue;
use scouter_types::is_pydantic_basemodel;
use scouter_types::pydict_to_otel_keyvalue;
use scouter_types::CompressionType;
use scouter_types::{
FUNCTION_MODULE, FUNCTION_NAME, FUNCTION_QUALNAME, FUNCTION_STREAMING, FUNCTION_TYPE,
};
use serde::Serialize;
use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt;
use std::fmt::Display;
use std::sync::OnceLock;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::time::SystemTime;
use tracing::{debug, instrument};
static CONTEXT_STORE: OnceLock<ContextStore> = OnceLock::new();
static OTEL_CONTEXT_VAR: OnceLock<Py<PyAny>> = OnceLock::new();
static PY_IMPORTS: OnceLock<HelperImports> = OnceLock::new();
const ASYNCIO_MODULE: &str = "asyncio";
const INSPECT_MODULE: &str = "inspect";
const CONTEXTVARS_MODULE: &str = "contextvars";
pub trait SpanContextExt {
fn from_py_span_context(py_ctx: &Bound<'_, PyAny>) -> Result<SpanContext, TraceError>;
}
impl SpanContextExt for SpanContext {
fn from_py_span_context(py_ctx: &Bound<'_, PyAny>) -> Result<SpanContext, TraceError> {
let trace_id = py_ctx
.getattr("trace_id")?
.extract::<String>()
.map_err(|e| TraceError::DowncastError(e.to_string()))?;
let span_id = py_ctx
.getattr("span_id")?
.extract::<String>()
.map_err(|e| TraceError::DowncastError(e.to_string()))?;
let trace_flags = py_ctx
.getattr("trace_flags")?
.extract::<u8>()
.map_err(|e| TraceError::DowncastError(e.to_string()))?;
let trace_state = py_ctx.getattr("trace_state")?.cast::<PyDict>()?.clone();
let trace_state_vec: Vec<(String, String)> = trace_state
.iter()
.map(|(k, v)| {
let key = k.extract::<String>()?;
let value = v.extract::<String>()?;
Ok((key, value))
})
.collect::<Result<Vec<(String, String)>, PyErr>>()?;
Ok(SpanContext::new(
TraceId::from_hex(&trace_id)?,
SpanId::from_hex(&span_id)?,
TraceFlags::new(trace_flags),
false,
TraceState::from_key_value(trace_state_vec)
.map_err(|e| TraceError::TraceStateError(e.to_string()))?,
))
}
}
#[pyclass(eq)]
#[derive(PartialEq, Clone, Debug)]
pub enum FunctionType {
Async,
AsyncGenerator,
SyncGenerator,
Sync,
}
impl Display for FunctionType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FunctionType::Async => write!(f, "Async"),
FunctionType::AsyncGenerator => write!(f, "AsyncGenerator"),
FunctionType::SyncGenerator => write!(f, "SyncGenerator"),
FunctionType::Sync => write!(f, "Sync"),
}
}
}
impl FunctionType {
pub fn as_str(&self) -> &str {
match self {
FunctionType::Async => "Async",
FunctionType::AsyncGenerator => "AsyncGenerator",
FunctionType::SyncGenerator => "SyncGenerator",
FunctionType::Sync => "Sync",
}
}
}
impl std::str::FromStr for FunctionType {
type Err = TraceError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"Async" => Ok(FunctionType::Async),
"AsyncGenerator" => Ok(FunctionType::AsyncGenerator),
"SyncGenerator" => Ok(FunctionType::SyncGenerator),
"Sync" => Ok(FunctionType::Sync),
_ => Err(TraceError::InvalidFunctionType(s.to_string())),
}
}
}
pub struct HelperImports {
pub asyncio: Py<PyModule>,
pub inspect: Py<PyModule>,
}
fn get_helper_imports(py: Python<'_>) -> &'static HelperImports {
PY_IMPORTS.get_or_init(|| {
let asyncio = py
.import(ASYNCIO_MODULE)
.expect("Failed to import asyncio")
.unbind();
let inspect = py
.import(INSPECT_MODULE)
.expect("Failed to import inspect")
.unbind();
HelperImports { asyncio, inspect }
})
}
fn py_asyncio(py: Python<'_>) -> &Bound<'_, PyModule> {
let imports = get_helper_imports(py);
imports.asyncio.bind(py)
}
fn py_inspect(py: Python<'_>) -> &Bound<'_, PyModule> {
let imports = get_helper_imports(py);
imports.inspect.bind(py)
}
#[pyfunction]
pub fn get_function_type(
py: Python<'_>,
func: &Bound<'_, PyAny>,
) -> Result<FunctionType, TraceError> {
let is_async_gen = py_inspect(py)
.call_method1("isasyncgenfunction", (func,))?
.extract::<bool>()?;
if is_async_gen {
return Ok(FunctionType::AsyncGenerator);
}
let is_gen = py_inspect(py)
.call_method1("isgeneratorfunction", (func,))?
.extract::<bool>()?;
if is_gen {
return Ok(FunctionType::SyncGenerator);
}
let is_async = py_asyncio(py)
.call_method1("iscoroutinefunction", (func,))?
.extract::<bool>()?;
if is_async {
return Ok(FunctionType::Async);
}
Ok(FunctionType::Sync)
}
pub(crate) fn capture_function_arguments<'py>(
py: Python<'py>,
func: &Bound<'py, PyAny>,
args: &Bound<'py, PyTuple>,
kwargs: Option<&Bound<'py, PyDict>>,
) -> Result<Bound<'py, PyAny>, TraceError> {
let sig = py_inspect(py).call_method1("signature", (func,))?;
let bound_args = sig.call_method("bind", args, kwargs)?;
bound_args.call_method0("apply_defaults")?;
Ok(bound_args.getattr("arguments")?)
}
#[instrument(skip_all)]
pub fn set_function_attributes(
func: &Bound<'_, PyAny>,
span: &mut ActiveSpan,
) -> Result<(), TraceError> {
debug!("Setting function attributes on span");
let function_name = match func.getattr("__name__") {
Ok(name) => name.extract::<String>()?,
Err(_) => "<unknown>".to_string(),
};
let func_module = match func.getattr("__module__") {
Ok(module) => module.extract::<String>()?,
Err(_) => "<unknown>".to_string(),
};
let func_qualname = match func.getattr("__qualname__") {
Ok(qualname) => qualname.extract::<String>()?,
Err(_) => "<unknown>".to_string(),
};
span.set_attribute_static(FUNCTION_NAME, function_name)?;
span.set_attribute_static(FUNCTION_MODULE, func_module)?;
span.set_attribute_static(FUNCTION_QUALNAME, func_qualname)?;
Ok(())
}
#[instrument(skip_all)]
pub(crate) fn set_function_type_attribute(
func_type: &FunctionType,
span: &mut ActiveSpan,
) -> Result<(), TraceError> {
debug!("Setting function type attribute on span");
if func_type == &FunctionType::AsyncGenerator || func_type == &FunctionType::SyncGenerator {
span.set_attribute_static(FUNCTION_STREAMING, "true".to_string())?;
} else {
span.set_attribute_static(FUNCTION_STREAMING, "false".to_string())?;
}
span.set_attribute_static(FUNCTION_TYPE, func_type.as_str().to_string())?;
Ok(())
}
#[derive(Clone)]
pub(crate) struct ContextStore {
inner: Arc<RwLock<HashMap<String, SpanContext>>>,
}
impl ContextStore {
fn new() -> Self {
Self {
inner: Arc::new(RwLock::new(HashMap::new())),
}
}
pub(crate) fn set(&self, key: String, ctx: SpanContext) -> Result<(), TraceError> {
self.inner
.write()
.map_err(|e| TraceError::PoisonError(e.to_string()))?
.insert(key, ctx);
Ok(())
}
pub(crate) fn get(&self, key: &str) -> Result<Option<SpanContext>, TraceError> {
Ok(self
.inner
.read()
.map_err(|e| TraceError::PoisonError(e.to_string()))?
.get(key)
.cloned())
}
pub(crate) fn remove(&self, key: &str) -> Result<(), TraceError> {
self.inner
.write()
.map_err(|e| TraceError::PoisonError(e.to_string()))?
.remove(key);
Ok(())
}
}
pub(crate) fn get_context_store() -> &'static ContextStore {
CONTEXT_STORE.get_or_init(ContextStore::new)
}
fn init_context_var(py: Python<'_>) -> PyResult<Py<PyAny>> {
let contextvars = py.import(CONTEXTVARS_MODULE)?;
let context_var = contextvars
.call_method1("ContextVar", ("_otel_current_span",))?
.unbind();
Ok(context_var)
}
pub(crate) fn get_context_var(py: Python<'_>) -> PyResult<&Py<PyAny>> {
Ok(OTEL_CONTEXT_VAR
.get_or_init(|| init_context_var(py).expect("Failed to initialize context var")))
}
pub(crate) fn set_current_span(py: Python<'_>, obj: Bound<'_, ActiveSpan>) -> PyResult<Py<PyAny>> {
let context_var = get_context_var(py)?;
let token = context_var.bind(py).call_method1("set", (obj,))?;
Ok(token.unbind())
}
pub(crate) fn get_current_context_id(py: Python<'_>) -> PyResult<Option<String>> {
match get_context_var(py)?.bind(py).call_method0("get") {
Ok(val) => {
if val.is_none() {
Ok(None)
} else {
let val = val.getattr("context_id")?;
Ok(Some(val.extract::<String>()?))
}
}
Err(_) => Ok(None),
}
}
#[pyfunction]
pub fn get_current_active_span(py: Python<'_>) -> Result<Bound<'_, PyAny>, TraceError> {
match get_context_var(py)?.bind(py).call_method0("get") {
Ok(val) => {
if val.is_none() {
Err(TraceError::NoActiveSpan)
} else {
Ok(val)
}
}
Err(_) => Err(TraceError::NoActiveSpan),
}
}
#[pyclass(eq)]
#[derive(PartialEq, Clone, Debug)]
pub enum SpanKind {
Client,
Server,
Producer,
Consumer,
Internal,
}
impl Display for SpanKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SpanKind::Client => write!(f, "client"),
SpanKind::Server => write!(f, "server"),
SpanKind::Producer => write!(f, "producer"),
SpanKind::Consumer => write!(f, "consumer"),
SpanKind::Internal => write!(f, "internal"),
}
}
}
impl SpanKind {
pub fn to_otel_span_kind(&self) -> opentelemetry::trace::SpanKind {
match self {
SpanKind::Client => opentelemetry::trace::SpanKind::Client,
SpanKind::Server => opentelemetry::trace::SpanKind::Server,
SpanKind::Producer => opentelemetry::trace::SpanKind::Producer,
SpanKind::Consumer => opentelemetry::trace::SpanKind::Consumer,
SpanKind::Internal => opentelemetry::trace::SpanKind::Internal,
}
}
}
pub(crate) struct ActiveSpanInner {
pub context_id: String,
pub span: BoxedSpan,
pub context_token: Option<Py<PyAny>>,
pub queue: Option<Py<ScouterQueue>>,
}
#[pyclass(eq)]
#[derive(PartialEq, Clone, Debug, Default, Serialize)]
pub enum OtelProtocol {
#[default]
HttpBinary,
HttpJson,
}
impl OtelProtocol {
pub fn to_otel_protocol(&self) -> opentelemetry_otlp::Protocol {
match self {
OtelProtocol::HttpBinary => opentelemetry_otlp::Protocol::HttpBinary,
OtelProtocol::HttpJson => opentelemetry_otlp::Protocol::HttpJson,
}
}
}
#[derive(Debug)]
#[pyclass]
pub struct OtelExportConfig {
#[pyo3(get)]
pub endpoint: Option<String>,
#[pyo3(get)]
pub protocol: OtelProtocol,
#[pyo3(get)]
pub timeout: Option<u64>,
#[pyo3(get)]
pub compression: Option<CompressionType>,
#[pyo3(get)]
pub headers: Option<HashMap<String, String>>,
}
#[pymethods]
impl OtelExportConfig {
#[new]
#[pyo3(signature = (protocol=OtelProtocol::HttpBinary, endpoint=None, timeout=None, compression=None, headers=None))]
pub fn new(
protocol: OtelProtocol,
endpoint: Option<String>,
timeout: Option<u64>,
compression: Option<CompressionType>,
headers: Option<HashMap<String, String>>,
) -> Self {
OtelExportConfig {
endpoint,
protocol,
timeout,
compression,
headers,
}
}
}
impl OtelExportConfig {
pub fn to_otel_config(&self) -> OtlpExportConfig {
let timeout = self.timeout.map(Duration::from_secs);
OtlpExportConfig {
endpoint: self.endpoint.clone(),
protocol: self.protocol.to_otel_protocol(),
timeout,
}
}
}
pub fn format_traceback(py: Python, exc_tb: &Py<PyAny>) -> Result<String, TraceError> {
let traceback_module = py.import("traceback")?;
let tb_lines = traceback_module.call_method1("format_tb", (exc_tb.bind(py),))?;
let empty_string = "".into_bound_py_any(py)?;
let formatted = empty_string.call_method1("join", (tb_lines,))?;
Ok(formatted.extract::<String>()?)
}
pub struct BoxedSpan(Box<dyn ObjectSafeSpan + Send + Sync>);
impl BoxedSpan {
pub(crate) fn new<T>(span: T) -> Self
where
T: ObjectSafeSpan + Send + Sync + 'static,
{
BoxedSpan(Box::new(span))
}
}
impl fmt::Debug for BoxedSpan {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("BoxedSpan")
}
}
impl trace::Span for BoxedSpan {
fn add_event_with_timestamp<T>(
&mut self,
name: T,
timestamp: SystemTime,
attributes: Vec<KeyValue>,
) where
T: Into<Cow<'static, str>>,
{
self.0
.add_event_with_timestamp(name.into(), timestamp, attributes)
}
fn span_context(&self) -> &trace::SpanContext {
self.0.span_context()
}
fn is_recording(&self) -> bool {
self.0.is_recording()
}
fn set_attribute(&mut self, attribute: KeyValue) {
self.0.set_attribute(attribute)
}
fn set_status(&mut self, status: trace::Status) {
self.0.set_status(status)
}
fn update_name<T>(&mut self, new_name: T)
where
T: Into<Cow<'static, str>>,
{
self.0.update_name(new_name.into())
}
fn add_link(&mut self, span_context: trace::SpanContext, attributes: Vec<KeyValue>) {
self.0.add_link(span_context, attributes)
}
fn end_with_timestamp(&mut self, timestamp: SystemTime) {
self.0.end_with_timestamp(timestamp);
}
}
pub(crate) fn py_obj_to_otel_keyvalue(
py: Python<'_>,
attributes: Option<Bound<'_, PyAny>>,
) -> Result<Vec<KeyValue>, TraceError> {
let pairs: Vec<KeyValue> = if let Some(attrs) = attributes {
if is_pydantic_basemodel(py, &attrs)? {
let dumped = attrs.call_method0("model_dump")?;
let dict = dumped
.cast::<PyDict>()
.map_err(|e| TraceError::DowncastError(e.to_string()))?;
pydict_to_otel_keyvalue(dict)?
} else if attrs.is_instance_of::<PyDict>() {
let dict = attrs
.cast::<PyDict>()
.map_err(|e| TraceError::DowncastError(e.to_string()))?;
pydict_to_otel_keyvalue(dict)?
} else {
return Err(TraceError::EventMustBeDict);
}
} else {
vec![]
};
Ok(pairs)
}
pub(crate) fn parse_status(status: &Bound<'_, PyAny>, description: Option<String>) -> Status {
if status.is_instance_of::<PyString>() {
let status_str = status.extract::<String>().unwrap_or_default();
match status_str.to_lowercase().as_str() {
"ok" => Status::Ok,
"error" => Status::error(description.unwrap_or_default()),
_ => Status::Unset,
}
} else {
if let Ok(value) = status.getattr("value").and_then(|v| v.extract::<i32>()) {
match value {
0 => Status::Unset,
1 => Status::Ok,
2 => Status::error("Error status set"),
_ => Status::Unset,
}
} else {
Status::Unset
}
}
}
pub(crate) fn parse_span_kind(kind: Option<&Bound<'_, PyAny>>) -> Result<SpanKind, TraceError> {
let kind = match kind {
Some(k) => k,
None => return Ok(SpanKind::Internal),
};
if kind.is_instance_of::<SpanKind>() {
let span_kind = kind.extract::<SpanKind>()?;
return Ok(span_kind);
}
if let Ok(value) = kind.getattr("value").and_then(|v| v.extract::<i32>()) {
return match value {
0 => Ok(SpanKind::Internal),
1 => Ok(SpanKind::Server),
2 => Ok(SpanKind::Client),
3 => Ok(SpanKind::Producer),
4 => Ok(SpanKind::Consumer),
_ => Err(TraceError::InvalidSpanKind(format!(
"Unknown span kind value: {}",
value
))),
};
}
if let Ok(value) = kind.extract::<i32>() {
return match value {
0 => Ok(SpanKind::Internal),
1 => Ok(SpanKind::Server),
2 => Ok(SpanKind::Client),
3 => Ok(SpanKind::Producer),
4 => Ok(SpanKind::Consumer),
_ => Err(TraceError::InvalidSpanKind(format!(
"Unknown span kind value: {}",
value
))),
};
}
Err(TraceError::InvalidSpanKind(
"Could not extract span kind value".to_string(),
))
}