use crate::error::TraceError;
use crate::exporter::processor::BatchConfig;
use crate::exporter::scouter::ScouterSpanExporter;
use crate::exporter::SpanExporterNum;
use crate::utils::py_obj_to_otel_keyvalue;
use crate::utils::BoxedSpan;
use crate::utils::{
capture_function_arguments, format_traceback, get_context_store, get_context_var,
get_current_active_span, get_current_context_id, parse_span_kind, parse_status,
set_current_span, set_function_attributes, set_function_type_attribute, ActiveSpanInner,
FunctionType, SpanContextExt,
};
use chrono::{DateTime, Utc};
use opentelemetry::baggage::BaggageExt;
use opentelemetry::trace::Tracer as OTelTracer;
use opentelemetry::trace::TracerProvider;
use opentelemetry::InstrumentationScope;
use opentelemetry::{
trace::{Span, SpanContext, Status, TraceContextExt, TraceState},
Context as OtelContext, KeyValue,
};
use opentelemetry::{SpanId, TraceFlags, TraceId};
use opentelemetry_sdk::trace::SdkTracer;
use opentelemetry_sdk::trace::SdkTracerProvider;
use opentelemetry_sdk::Resource;
use potato_head::create_uuid7;
use pyo3::prelude::*;
use pyo3::types::{PyDict, PyTuple};
use pyo3::IntoPyObjectExt;
use scouter_events::queue::types::TransportConfig;
use scouter_events::queue::ScouterQueue;
use scouter_settings::grpc::GrpcConfig;
use scouter_types::SCOUTER_QUEUE_RECORD;
use scouter_types::{
pyobject_to_otel_value, pyobject_to_tracing_json, EntityType, BAGGAGE_PREFIX,
EXCEPTION_TRACEBACK, SCOUTER_ENTITY, SCOUTER_QUEUE_EVENT, SCOUTER_SCOPE, SCOUTER_SCOPE_DEFAULT,
SCOUTER_TAG_PREFIX, SCOUTER_TRACING_INPUT, SCOUTER_TRACING_LABEL, SCOUTER_TRACING_OUTPUT,
SPAN_ERROR, TRACE_START_TIME_KEY,
};
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::{Arc, OnceLock, RwLock};
use std::time::SystemTime;
use tracing::{debug, info, instrument, warn};
static TRACER_PROVIDER_STORE: RwLock<Option<Arc<SdkTracerProvider>>> = RwLock::new(None);
static TRACE_METADATA_STORE: OnceLock<TraceMetadataStore> = OnceLock::new();
static SCOUTER_QUEUE_STORE: RwLock<Option<Py<ScouterQueue>>> = RwLock::new(None);
fn get_tracer_provider() -> Result<Option<Arc<SdkTracerProvider>>, TraceError> {
TRACER_PROVIDER_STORE
.read()
.map(|guard| guard.clone())
.map_err(|e| TraceError::PoisonError(e.to_string()))
}
#[derive(Clone)]
struct TraceMetadata {
start_time: DateTime<Utc>,
span_count: u32,
}
#[derive(Clone)]
struct TraceMetadataStore {
inner: Arc<RwLock<HashMap<String, TraceMetadata>>>,
}
impl TraceMetadataStore {
fn new() -> Self {
Self {
inner: Arc::new(RwLock::new(HashMap::new())),
}
}
fn set_trace_start(
&self,
trace_id: String,
start_time: DateTime<Utc>,
) -> Result<(), TraceError> {
self.inner
.write()
.map_err(|e| TraceError::PoisonError(e.to_string()))?
.insert(
trace_id.clone(),
TraceMetadata {
start_time,
span_count: 0,
},
);
Ok(())
}
fn get_trace_metadata(&self, trace_id: &str) -> Result<Option<TraceMetadata>, TraceError> {
Ok(self
.inner
.read()
.map_err(|e| TraceError::PoisonError(e.to_string()))?
.get(trace_id)
.cloned())
}
fn remove_trace(&self, trace_id: &str) -> Result<(), TraceError> {
self.inner
.write()
.map_err(|e| TraceError::PoisonError(e.to_string()))?
.remove(trace_id);
Ok(())
}
fn increment_span_count(&self, trace_id: &str) -> Result<(), TraceError> {
if let Some(mut metadata) = self.get_trace_metadata(trace_id)? {
metadata.span_count += 1;
self.inner
.write()
.map_err(|e| TraceError::PoisonError(e.to_string()))?
.insert(trace_id.to_string(), metadata);
}
Ok(())
}
fn decrement_span_count(&self, trace_id: &str) -> Result<(), TraceError> {
if let Some(mut metadata) = self.get_trace_metadata(trace_id)? {
if metadata.span_count > 0 {
metadata.span_count -= 1;
}
if metadata.span_count == 0 {
self.remove_trace(trace_id)?;
} else {
self.inner
.write()
.map_err(|e| TraceError::PoisonError(e.to_string()))?
.insert(trace_id.to_string(), metadata);
}
}
Ok(())
}
fn clear_all(&self) -> Result<(), TraceError> {
self.inner
.write()
.map_err(|e| TraceError::PoisonError(e.to_string()))?
.clear();
Ok(())
}
}
fn get_trace_metadata_store() -> &'static TraceMetadataStore {
TRACE_METADATA_STORE.get_or_init(TraceMetadataStore::new)
}
#[pyfunction]
#[pyo3(signature = (
service_name="scouter_service".to_string(),
scope=SCOUTER_SCOPE_DEFAULT.to_string(),
transport_config=None,
exporter=None,
batch_config=None,
sample_ratio=None,
scouter_queue=None,
schema_url=None,
scope_attributes=None,
default_attributes=None,
))]
#[instrument(skip_all)]
#[allow(clippy::too_many_arguments)]
pub fn init_tracer(
py: Python,
service_name: Option<String>,
scope: Option<String>,
transport_config: Option<&Bound<'_, PyAny>>,
exporter: Option<&Bound<'_, PyAny>>,
batch_config: Option<Py<BatchConfig>>,
sample_ratio: Option<f64>,
scouter_queue: Option<Py<ScouterQueue>>,
schema_url: Option<String>,
scope_attributes: Option<Bound<'_, PyAny>>,
default_attributes: Option<Bound<'_, PyAny>>,
) -> Result<BaseTracer, TraceError> {
debug!("Initializing tracer");
let service_name = service_name.unwrap_or_else(|| "scouter_service".to_string());
let scope = scope.unwrap_or_else(|| SCOUTER_SCOPE_DEFAULT.to_string());
let provider_exists = {
let store_guard = TRACER_PROVIDER_STORE
.read()
.map_err(|e| TraceError::PoisonError(e.to_string()))?;
store_guard.is_some()
};
if !provider_exists {
debug!("Setting up tracer provider store");
let transport_config = match transport_config {
Some(config) => TransportConfig::from_py_config(config)?,
None => {
let config = GrpcConfig::default();
TransportConfig::Grpc(config)
}
};
let clamped_sample_ratio = match sample_ratio {
Some(ratio) if (0.0..=1.0).contains(&ratio) => Some(ratio),
Some(ratio) => {
info!(
"Sample ratio {} is out of bounds [0.0, 1.0]. Clamping to valid range.",
ratio
);
Some(ratio.clamp(0.0, 1.0))
}
None => None,
};
let batch_config = if let Some(bc) = batch_config {
Some(bc.extract::<BatchConfig>(py)?)
} else {
None
};
let resource = Resource::builder()
.with_service_name(service_name.clone())
.with_attributes([KeyValue::new(SCOUTER_SCOPE, scope.clone())])
.build();
let scouter_export = ScouterSpanExporter::new(transport_config, &resource)?;
let mut span_exporter = if let Some(exporter) = exporter {
SpanExporterNum::from_pyobject(exporter).expect("failed to convert exporter")
} else {
SpanExporterNum::default()
};
span_exporter.set_sample_ratio(clamped_sample_ratio);
let provider = span_exporter
.build_provider(resource, scouter_export, batch_config)
.expect("failed to build tracer provider");
let mut store_guard = TRACER_PROVIDER_STORE
.write()
.map_err(|e| TraceError::PoisonError(e.to_string()))?;
if store_guard.is_none() {
*store_guard = Some(Arc::new(provider));
}
} else {
debug!("Tracer provider already initialized, skipping setup");
}
BaseTracer::new(
py,
service_name,
schema_url,
default_attributes,
scope_attributes,
scouter_queue,
)
}
fn reset_current_context(py: Python, token: &Py<PyAny>) -> PyResult<()> {
let context_var = get_context_var(py)?;
context_var.bind(py).call_method1("reset", (token,))?;
Ok(())
}
fn add_entity_event_to_span(
queue_item: &Bound<'_, PyAny>,
queue_bus: &Bound<'_, PyAny>,
inner: &mut ActiveSpanInner,
) -> Result<(), TraceError> {
let mut attributes = vec![];
if let Ok(record_uid) = queue_item.getattr("uid") {
let entity_type_py = queue_item.getattr("entity_type")?;
let entity_uid = queue_bus.getattr("entity_uid")?.str()?.to_string();
let entity_type = entity_type_py.extract::<EntityType>()?;
let record_uid_str = record_uid.str()?.to_string();
attributes.push(KeyValue::new(SCOUTER_QUEUE_RECORD, record_uid_str.clone()));
attributes.push(KeyValue::new("entity", entity_type));
attributes.push(KeyValue::new(SCOUTER_ENTITY, entity_uid.clone()));
inner.span.add_event(SCOUTER_QUEUE_EVENT, attributes);
};
Ok(())
}
#[pyclass]
pub struct ActiveSpan {
inner: Arc<RwLock<ActiveSpanInner>>,
}
#[pymethods]
impl ActiveSpan {
#[getter]
fn trace_id(&self) -> Result<String, TraceError> {
self.with_inner(|inner| inner.span.span_context().trace_id().to_string())
}
#[getter]
fn span_id(&self) -> Result<String, TraceError> {
self.with_inner(|inner| inner.span.span_context().span_id().to_string())
}
#[getter]
fn context_id(&self) -> Result<String, TraceError> {
self.with_inner(|inner| inner.context_id.clone())
}
#[pyo3(signature = (input, max_length=1000))]
#[instrument(skip_all)]
fn set_input(&self, input: &Bound<'_, PyAny>, max_length: usize) -> Result<(), TraceError> {
let value = pyobject_to_tracing_json(input, &max_length)?;
self.with_inner_mut(|inner| {
inner.span.set_attribute(KeyValue::new(
SCOUTER_TRACING_INPUT,
serde_json::to_string(&value).unwrap(),
))
})
}
#[pyo3(signature = (output, max_length=1000))]
#[instrument(skip_all)]
fn set_output(&self, output: &Bound<'_, PyAny>, max_length: usize) -> Result<(), TraceError> {
let value = pyobject_to_tracing_json(output, &max_length)?;
self.with_inner_mut(|inner| {
inner.span.set_attribute(KeyValue::new(
SCOUTER_TRACING_OUTPUT,
serde_json::to_string(&value).unwrap(),
))
})
}
pub fn set_attribute(&self, key: String, value: Bound<'_, PyAny>) -> Result<(), TraceError> {
let value = pyobject_to_otel_value(&value)?;
self.with_inner_mut(|inner| inner.span.set_attribute(KeyValue::new(key, value)))
}
pub fn set_entity(&self, entity_id: String) -> Result<(), TraceError> {
self.with_inner_mut(|inner| {
inner
.span
.set_attribute(KeyValue::new(SCOUTER_ENTITY, entity_id))
})
}
pub fn set_tag(&self, key: String, value: Bound<'_, PyAny>) -> Result<(), TraceError> {
let tag_key = format!("{}.{}", SCOUTER_TAG_PREFIX, key);
self.set_attribute(tag_key, value)
}
#[pyo3(signature = (name, attributes=None, timestamp=None))]
fn add_event(
&self,
py: Python,
name: String,
attributes: Option<Bound<'_, PyAny>>,
timestamp: Option<i64>,
) -> Result<(), TraceError> {
let pairs: Vec<KeyValue> = py_obj_to_otel_keyvalue(py, attributes)?;
self.with_inner_mut(|inner| {
if let Some(ts) = timestamp {
let system_time =
SystemTime::UNIX_EPOCH + std::time::Duration::from_nanos(ts as u64);
inner
.span
.add_event_with_timestamp(name, system_time, pairs);
} else {
inner.span.add_event(name, pairs);
}
})
}
fn add_queue_item(
&self,
py: Python<'_>,
alias: String,
item: &Bound<'_, PyAny>,
) -> Result<(), TraceError> {
debug!(
"Attempting to add item to queue '{}' for span {}",
alias,
self.context_id()?
);
self.with_inner_mut(
|inner| match &inner.span.span_context().trace_flags().is_sampled() {
true => {
if let Some(queue) = &inner.queue {
let bound_queue = queue.bind(py).get_item(&alias)?;
bound_queue.call_method1("insert", (item,))?;
add_entity_event_to_span(item, &bound_queue, inner)?;
Ok(())
} else {
warn!(
"Queue not initialized for span {}. Skipping",
inner.context_id
);
Ok(())
}
}
false => {
debug!("Span is not sampled, skipping insert into queue");
Ok(())
}
},
)?
}
#[pyo3(signature = (status, description=None))]
fn set_status(
&self,
status: &Bound<'_, PyAny>,
description: Option<String>,
) -> Result<(), TraceError> {
let otel_status = parse_status(status, description);
self.with_inner_mut(|inner| inner.span.set_status(otel_status))
}
#[instrument(skip_all)]
fn __enter__<'py>(slf: PyRef<'py, Self>) -> PyResult<PyRef<'py, Self>> {
debug!("Entering span context: {}", slf.context_id()?);
Ok(slf)
}
#[pyo3(signature = (exc_type=None, exc_val=None, exc_tb=None))]
#[instrument(skip_all)]
fn __exit__(
&mut self,
py: Python<'_>,
exc_type: Option<Py<PyAny>>,
exc_val: Option<Py<PyAny>>,
exc_tb: Option<Py<PyAny>>,
) -> Result<bool, TraceError> {
debug!("Exiting span context: {}", self.context_id()?);
let (context_id, trace_id, context_token) = {
let mut inner = self
.inner
.write()
.map_err(|e| TraceError::PoisonError(e.to_string()))?;
if let Some(exc_type) = exc_type {
inner.span.set_status(Status::error("Exception occurred"));
let mut error_attributes = vec![];
error_attributes.push(KeyValue::new("exception.type", exc_type.to_string()));
if let Some(exc_val) = exc_val {
error_attributes.push(KeyValue::new("exception.value", exc_val.to_string()));
}
if let Some(exc_tb) = exc_tb {
let tb = format_traceback(py, &exc_tb)?;
error_attributes.push(KeyValue::new(EXCEPTION_TRACEBACK, tb));
}
inner.span.add_event(SPAN_ERROR, error_attributes);
}
else {
inner.span.set_status(Status::Ok);
}
inner.span.end();
let context_id = inner.context_id.clone();
let trace_id = inner.span.span_context().trace_id().to_string();
let context_token = inner.context_token.take();
inner.queue.take();
(context_id, trace_id, context_token)
};
let store = get_trace_metadata_store();
store.decrement_span_count(&trace_id)?;
if let Some(token) = context_token {
reset_current_context(py, &token)?;
}
get_context_store().remove(&context_id)?;
Ok(false)
}
fn __aenter__<'py>(slf: PyRef<'py, Self>, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let slf_py: Py<PyAny> = slf.into_py_any(py)?;
pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(slf_py) })
}
#[pyo3(signature = (exc_type=None, exc_val=None, exc_tb=None))]
fn __aexit__<'py>(
&mut self,
py: Python<'py>,
exc_type: Option<Py<PyAny>>,
exc_val: Option<Py<PyAny>>,
exc_tb: Option<Py<PyAny>>,
) -> PyResult<Bound<'py, PyAny>> {
let result = self.__exit__(py, exc_type, exc_val, exc_tb)?;
let py_result = result.into_py_any(py)?;
pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(py_result) })
}
fn end(&self, end_time: Option<i64>) -> Result<(), TraceError> {
self.with_inner_mut(|inner| {
if let Some(ts) = end_time {
let system_time =
SystemTime::UNIX_EPOCH + std::time::Duration::from_nanos(ts as u64);
inner.span.end_with_timestamp(system_time);
} else {
inner.span.end();
}
})
}
fn get_span_context<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyAny>, TraceError> {
let span_ctx = self.with_inner(|inner| inner.span.span_context().clone())?;
let trace_id_int = u128::from_be_bytes(span_ctx.trace_id().to_bytes());
let span_id_int = u64::from_be_bytes(span_ctx.span_id().to_bytes());
let is_remote = span_ctx.is_remote();
let trace_flags_u8 = span_ctx.trace_flags().to_u8();
let otel_trace = py.import("opentelemetry.trace")?;
let trace_flags_cls = otel_trace.getattr("TraceFlags")?;
let trace_state_cls = otel_trace.getattr("TraceState")?;
let span_ctx_cls = otel_trace.getattr("SpanContext")?;
let py_trace_flags = trace_flags_cls.call1((trace_flags_u8,))?;
let py_trace_state = trace_state_cls.call0()?;
let ctx = span_ctx_cls.call1((
trace_id_int,
span_id_int,
is_remote,
py_trace_flags,
py_trace_state,
))?;
Ok(ctx)
}
fn set_attributes(&self, attributes: &Bound<'_, PyAny>) -> Result<(), TraceError> {
if let Ok(dict) = attributes.cast::<pyo3::types::PyDict>() {
for (key, value) in dict.iter() {
let key_str = key.extract::<String>()?;
self.set_attribute(key_str, value)?
}
}
Ok(())
}
fn update_name(&self, name: String) -> Result<(), TraceError> {
self.with_inner_mut(|inner| inner.span.update_name(Cow::Owned(name)))
}
fn is_recording(&self) -> Result<bool, TraceError> {
self.with_inner(|inner| inner.span.is_recording())
}
#[pyo3(signature = (exception, attributes=None, timestamp=None, escaped=false))]
fn record_exception(
&self,
py: Python<'_>,
exception: &Bound<'_, PyAny>,
attributes: Option<Bound<'_, PyAny>>,
timestamp: Option<i64>,
escaped: bool,
) -> Result<(), TraceError> {
let exc_type = exception.get_type();
let module = exc_type
.getattr("__module__")
.ok()
.and_then(|m| m.extract::<String>().ok());
let qualname = exc_type
.getattr("__qualname__")
.ok()
.and_then(|q| q.extract::<String>().ok());
let type_name = match (module, qualname) {
(Some(m), Some(q)) if m != "builtins" => format!("{}.{}", m, q),
(_, Some(q)) => q,
_ => "UnknownException".to_string(),
};
let message = exception.str()?.to_string();
let mut event_attrs = vec![
KeyValue::new("exception.type", type_name),
KeyValue::new("exception.message", message),
KeyValue::new("exception.escaped", escaped.to_string()),
];
if let Ok(tb_py) = exception.getattr("__traceback__") {
if !tb_py.is_none() {
let tb_unbound: Py<PyAny> = tb_py.unbind();
if let Ok(stacktrace) = format_traceback(py, &tb_unbound) {
event_attrs.push(KeyValue::new("exception.stacktrace", stacktrace));
}
}
}
let extra_attrs = py_obj_to_otel_keyvalue(py, attributes)?;
event_attrs.extend(extra_attrs);
self.with_inner_mut(|inner| {
if let Some(ts) = timestamp {
let system_time =
SystemTime::UNIX_EPOCH + std::time::Duration::from_nanos(ts as u64);
inner
.span
.add_event_with_timestamp("exception", system_time, event_attrs);
} else {
inner.span.add_event("exception", event_attrs);
}
})
}
#[pyo3(signature = (context, attributes=None))]
fn add_link(
&self,
py: Python<'_>,
context: &Bound<'_, PyAny>,
attributes: Option<Bound<'_, PyAny>>,
) -> Result<(), TraceError> {
let span_context = SpanContext::from_py_span_context(context)?;
let attributes = py_obj_to_otel_keyvalue(py, attributes)?;
self.with_inner_mut(|inner| {
inner.span.add_link(span_context, attributes);
})
}
}
impl ActiveSpan {
pub fn set_attribute_static(
&mut self,
key: &'static str,
value: String,
) -> Result<(), TraceError> {
self.inner
.write()
.map_err(|e| TraceError::PoisonError(e.to_string()))?
.span
.set_attribute(KeyValue::new(key, value));
Ok(())
}
fn with_inner_mut<F, R>(&self, f: F) -> Result<R, TraceError>
where
F: FnOnce(&mut ActiveSpanInner) -> R,
{
let mut inner = self
.inner
.write()
.map_err(|e| TraceError::PoisonError(e.to_string()))?;
Ok(f(&mut inner))
}
fn with_inner<F, R>(&self, f: F) -> Result<R, TraceError>
where
F: FnOnce(&ActiveSpanInner) -> R,
{
let inner = self
.inner
.read()
.map_err(|e| TraceError::PoisonError(e.to_string()))?;
Ok(f(&inner))
}
}
#[pyclass(subclass)]
pub struct BaseTracer {
tracer: SdkTracer,
queue: Option<Py<ScouterQueue>>,
default_attributes: Vec<KeyValue>,
}
impl BaseTracer {
fn set_start_time(&self, span: &mut BoxedSpan) {
let trace_id = span.span_context().trace_id().to_string();
let trace_metadata_store = get_trace_metadata_store();
let start_time = match trace_metadata_store.get_trace_metadata(&trace_id) {
Ok(Some(metadata)) => {
metadata.start_time
}
Ok(None) => {
let current_time = Utc::now();
if let Err(e) = trace_metadata_store.set_trace_start(trace_id, current_time) {
tracing::warn!("Failed to set trace start time: {}", e);
}
current_time
}
Err(e) => {
tracing::warn!("Failed to get trace metadata: {}", e);
Utc::now()
}
};
span.set_attribute(KeyValue::new(TRACE_START_TIME_KEY, start_time.to_rfc3339()));
}
fn increment_span_count(&self, trace_id: &str) -> Result<(), TraceError> {
let trace_metadata_store = get_trace_metadata_store();
trace_metadata_store.increment_span_count(trace_id)
}
fn setup_trace_metadata(&self, span: &mut BoxedSpan) -> Result<(), TraceError> {
let trace_id = span.span_context().trace_id().to_string();
self.set_start_time(span);
self.increment_span_count(&trace_id)?;
Ok(())
}
fn create_baggage_items(
baggage: &[HashMap<String, String>],
tags: &[HashMap<String, String>],
) -> Vec<KeyValue> {
let mut keyval_baggage: Vec<KeyValue> = baggage
.iter()
.flat_map(|baggage_map| {
baggage_map
.iter()
.map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
.collect::<Vec<KeyValue>>()
})
.collect();
tags.iter().for_each(|tag_map| {
tag_map.iter().for_each(|(k, v)| {
keyval_baggage.push(KeyValue::new(
format!("{}.{}.{}", BAGGAGE_PREFIX, SCOUTER_TAG_PREFIX, k),
v.clone(),
));
});
});
keyval_baggage
}
}
#[pymethods]
impl BaseTracer {
#[new]
#[pyo3(signature = (
name,
schema_url=None,
// default span attributes that are applied to every span created by this tracer
default_attributes=None,
// scope attributes that are applied to the tracer's instrumentation scope
scope_attributes=None,
queue=None,
))]
#[instrument(skip_all)]
fn new(
py: Python<'_>,
name: String,
schema_url: Option<String>,
default_attributes: Option<Bound<'_, PyAny>>,
scope_attributes: Option<Bound<'_, PyAny>>,
queue: Option<Py<ScouterQueue>>,
) -> Result<Self, TraceError> {
debug!("Creating new BaseTracer instance");
let final_queue = queue.or_else(|| {
SCOUTER_QUEUE_STORE
.read()
.ok()
.and_then(|guard| guard.as_ref().map(|q| q.clone_ref(py)))
});
let scope_attributes = py_obj_to_otel_keyvalue(py, scope_attributes)?;
let default_attributes = py_obj_to_otel_keyvalue(py, default_attributes)?;
let mut scope_builder =
InstrumentationScope::builder(name).with_version(SCOUTER_SCOPE_DEFAULT);
if let Some(url) = schema_url {
scope_builder = scope_builder.with_schema_url(url);
}
if !scope_attributes.is_empty() {
scope_builder = scope_builder.with_attributes(scope_attributes);
}
let scope = scope_builder.build();
let tracer = get_tracer(scope)?;
Ok(BaseTracer {
tracer,
queue: final_queue,
default_attributes,
})
}
pub fn set_scouter_queue(
&mut self,
py: Python<'_>,
queue: Py<ScouterQueue>,
) -> Result<(), TraceError> {
let bound_queue = queue.bind(py);
bound_queue.call_method1("_set_sample_ratio", (1.0,))?;
let mut store_guard = SCOUTER_QUEUE_STORE
.write()
.map_err(|e| TraceError::PoisonError(e.to_string()))?;
*store_guard = Some(queue.clone_ref(py));
self.queue = Some(queue);
Ok(())
}
#[pyo3(signature = (name, context=None, kind=None, attributes=vec![], baggage=vec![], tags=vec![], label=None, parent_context_id=None, trace_id=None, span_id=None, remote_sampled=None))]
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
#[allow(unused_variables)]
fn start_as_current_span(
&self,
py: Python<'_>,
name: String,
context: Option<&Bound<'_, PyAny>>, kind: Option<&Bound<'_, PyAny>>, attributes: Vec<HashMap<String, String>>,
baggage: Vec<HashMap<String, String>>,
tags: Vec<HashMap<String, String>>,
label: Option<String>,
parent_context_id: Option<String>,
trace_id: Option<String>,
span_id: Option<String>,
remote_sampled: Option<bool>, ) -> Result<ActiveSpan, TraceError> {
let kind = parse_span_kind(kind)?;
let parent_id = parent_context_id.or_else(|| get_current_context_id(py).ok().flatten());
let base_ctx = if let (Some(tid), Some(sid)) = (&trace_id, &span_id) {
let parsed_trace_id = TraceId::from_hex(tid)?;
let parsed_span_id = SpanId::from_hex(sid)?;
let remote_span_context = SpanContext::new(
parsed_trace_id, parsed_span_id, remote_sampled.map_or(TraceFlags::default(), |sampled| {
if sampled {
TraceFlags::SAMPLED
} else {
TraceFlags::NOT_SAMPLED
}
}),
true, TraceState::default(),
);
OtelContext::current().with_remote_span_context(remote_span_context)
} else if let Some(parent_id) = parent_id {
get_context_store()
.get(&parent_id)?
.map(|parent_ctx| OtelContext::current().with_remote_span_context(parent_ctx))
.unwrap_or_else(OtelContext::current)
} else {
OtelContext::current()
};
let baggage_items = Self::create_baggage_items(&baggage, &tags);
let final_ctx = if !baggage_items.is_empty() {
base_ctx.with_baggage(baggage_items)
} else {
base_ctx
};
let span_builder = self
.tracer
.span_builder(name.clone())
.with_kind(kind.to_otel_span_kind());
let mut span = BoxedSpan::new(span_builder.start_with_context(&self.tracer, &final_ctx));
attributes.iter().for_each(|attr_map| {
attr_map.iter().for_each(|(k, v)| {
span.set_attribute(KeyValue::new(k.clone(), v.clone()));
});
});
self.default_attributes.iter().for_each(|kv| {
span.set_attribute(kv.clone());
});
if let Some(label) = label {
span.set_attribute(KeyValue::new(SCOUTER_TRACING_LABEL, label));
}
let context_id = Self::set_context_id(self, &mut span)?;
let inner = Arc::new(RwLock::new(ActiveSpanInner {
context_id,
span,
context_token: None,
queue: self.queue.as_ref().map(|q| q.clone_ref(py)),
}));
self.set_current_span(py, &inner)?;
Ok(ActiveSpan { inner })
}
#[pyo3(signature = (
name,
kind=None,
attributes=vec![],
baggage=vec![],
tags=vec![],
label=None,
parent_context_id=None,
trace_id=None,
span_id=None,
remote_sampled=None,
))]
#[allow(clippy::too_many_arguments)]
fn start_span(
&self,
py: Python<'_>,
name: String,
kind: Option<&Bound<'_, PyAny>>,
attributes: Vec<HashMap<String, String>>,
baggage: Vec<HashMap<String, String>>,
tags: Vec<HashMap<String, String>>,
label: Option<String>,
parent_context_id: Option<String>,
trace_id: Option<String>,
span_id: Option<String>,
remote_sampled: Option<bool>,
) -> Result<ActiveSpan, TraceError> {
let kind = parse_span_kind(kind)?;
let parent_id = parent_context_id.or_else(|| get_current_context_id(py).ok().flatten());
let base_ctx = if let (Some(tid), Some(sid)) = (&trace_id, &span_id) {
let parsed_trace_id = TraceId::from_hex(tid)?;
let parsed_span_id = SpanId::from_hex(sid)?;
let remote_span_context = SpanContext::new(
parsed_trace_id,
parsed_span_id,
remote_sampled.map_or(TraceFlags::default(), |sampled| {
if sampled {
TraceFlags::SAMPLED
} else {
TraceFlags::NOT_SAMPLED
}
}),
true,
TraceState::default(),
);
OtelContext::current().with_remote_span_context(remote_span_context)
} else if let Some(parent_id) = parent_id {
get_context_store()
.get(&parent_id)?
.map(|parent_ctx| OtelContext::current().with_remote_span_context(parent_ctx))
.unwrap_or_else(OtelContext::current)
} else {
OtelContext::current()
};
let baggage_items = Self::create_baggage_items(&baggage, &tags);
let final_ctx = if !baggage_items.is_empty() {
base_ctx.with_baggage(baggage_items)
} else {
base_ctx
};
let span_builder = self
.tracer
.span_builder(name)
.with_kind(kind.to_otel_span_kind());
let mut span = BoxedSpan::new(span_builder.start_with_context(&self.tracer, &final_ctx));
attributes.iter().for_each(|attr_map| {
attr_map.iter().for_each(|(k, v)| {
span.set_attribute(KeyValue::new(k.clone(), v.clone()));
});
});
self.default_attributes.iter().for_each(|kv| {
span.set_attribute(kv.clone());
});
if let Some(label) = label {
span.set_attribute(KeyValue::new(SCOUTER_TRACING_LABEL, label));
}
let context_id = Self::set_context_id(self, &mut span)?;
let inner = Arc::new(RwLock::new(ActiveSpanInner {
context_id,
span,
context_token: None, queue: self.queue.as_ref().map(|q| q.clone_ref(py)),
}));
Ok(ActiveSpan { inner })
}
#[pyo3(name="_start_decorated_as_current_span", signature = (
name,
func,
func_args,
kind=None,
attributes=vec![],
baggage=vec![],
tags=vec![],
label=None,
parent_context_id=None,
trace_id=None,
span_id=None,
remote_sampled=None,
max_length=1000,
func_type=FunctionType::Sync,
func_kwargs=None
))]
#[allow(clippy::too_many_arguments)]
fn start_decorated_as_current_span<'py>(
&self,
py: Python<'py>,
name: String,
func: &Bound<'py, PyAny>,
func_args: &Bound<'_, PyTuple>,
kind: Option<&Bound<'_, PyAny>>,
attributes: Vec<HashMap<String, String>>,
baggage: Vec<HashMap<String, String>>,
tags: Vec<HashMap<String, String>>,
label: Option<String>,
parent_context_id: Option<String>,
trace_id: Option<String>,
span_id: Option<String>,
remote_sampled: Option<bool>,
max_length: usize,
func_type: FunctionType,
func_kwargs: Option<&Bound<'_, PyDict>>,
) -> Result<ActiveSpan, TraceError> {
let mut span = self.start_as_current_span(
py,
name,
None,
kind,
attributes,
baggage,
tags,
label,
parent_context_id,
trace_id,
span_id,
remote_sampled,
)?;
set_function_attributes(func, &mut span)?;
set_function_type_attribute(&func_type, &mut span)?;
span.set_input(
&capture_function_arguments(py, func, func_args, func_kwargs)?,
max_length,
)?;
Ok(span)
}
#[getter]
pub fn current_span<'py>(&self, py: Python<'py>) -> Result<Bound<'py, PyAny>, TraceError> {
let span = get_current_active_span(py)?;
Ok(span)
}
pub fn shutdown(&self) -> Result<(), TraceError> {
shutdown_tracer()
}
}
impl BaseTracer {
fn set_current_span(
&self,
py: Python<'_>,
inner: &Arc<RwLock<ActiveSpanInner>>,
) -> Result<(), TraceError> {
let py_span = Py::new(
py,
ActiveSpan {
inner: inner.clone(),
},
)?;
let token = set_current_span(py, py_span.bind(py).clone())?;
inner
.write()
.map_err(|e| TraceError::PoisonError(e.to_string()))?
.context_token = Some(token);
Ok(())
}
fn set_context_id(&self, span: &mut BoxedSpan) -> Result<String, TraceError> {
let context_id = format!("span_{}", create_uuid7());
Self::setup_trace_metadata(self, span)?;
get_context_store().set(context_id.clone(), span.span_context().clone())?;
Ok(context_id)
}
}
#[pyfunction]
pub fn flush_tracer() -> Result<(), TraceError> {
let provider_arc = get_tracer_provider()?.ok_or_else(|| {
TraceError::InitializationError(
"Tracer provider not initialized or already shut down".to_string(),
)
})?;
provider_arc.force_flush()?;
Ok(())
}
#[pyfunction]
pub fn shutdown_tracer() -> Result<(), TraceError> {
info!("Shutting down tracer");
let provider_arc = {
let mut store_guard = TRACER_PROVIDER_STORE
.write()
.map_err(|e| TraceError::PoisonError(e.to_string()))?;
store_guard.take()
};
if let Some(provider) = provider_arc {
match Arc::try_unwrap(provider) {
Ok(provider) => match provider.shutdown() {
Ok(_) => (),
Err(e) => {
tracing::warn!("Failed to shut down tracer provider: {}", e);
}
},
Err(arc) => match arc.shutdown() {
Ok(_) => (),
Err(e) => {
tracing::warn!("Failed to shut down tracer provider: {}", e);
}
},
}
} else {
tracing::warn!("Tracer provider was already shut down or never initialized.");
}
get_trace_metadata_store().clear_all()?;
let mut queue_store_guard = SCOUTER_QUEUE_STORE
.write()
.map_err(|e| TraceError::PoisonError(e.to_string()))?;
*queue_store_guard = None;
Ok(())
}
fn get_tracer(scope: InstrumentationScope) -> Result<SdkTracer, TraceError> {
let provider_arc = get_tracer_provider()?.ok_or_else(|| {
TraceError::InitializationError(
"Tracer provider not initialized or already shut down".to_string(),
)
})?;
Ok(provider_arc.tracer_with_scope(scope))
}
#[pyfunction]
pub fn get_tracing_headers_from_current_span(
py: Python<'_>,
) -> Result<HashMap<String, String>, TraceError> {
let current_span_py = get_current_active_span(py)?;
let active_span_ref = current_span_py
.extract::<PyRef<ActiveSpan>>()
.map_err(|e| TraceError::DowncastError(format!("Failed to extract ActiveSpan: {}", e)))?;
let context_to_propagate = {
let inner_guard = active_span_ref
.inner
.read()
.map_err(|e| TraceError::PoisonError(e.to_string()))?;
inner_guard.span.span_context().clone()
};
let mut headers: HashMap<String, String> = HashMap::new();
headers.insert(
"trace_id".to_string(),
context_to_propagate.trace_id().to_string(),
);
headers.insert(
"span_id".to_string(),
context_to_propagate.span_id().to_string(),
);
let is_sampled = &context_to_propagate.trace_flags().is_sampled().to_string();
headers.insert("is_sampled".to_string(), is_sampled.to_string());
Ok(headers)
}
fn is_tracer_initialized() -> bool {
TRACER_PROVIDER_STORE
.read()
.map(|guard| guard.is_some())
.unwrap_or(false)
}
pub fn try_set_span_attribute(py: Python<'_>, key: &str, value: &str) -> Result<bool, TraceError> {
if !is_tracer_initialized() {
return Ok(false);
}
let span = match get_current_active_span(py) {
Ok(s) => s,
Err(_) => return Ok(false),
};
span.call_method1("set_attribute", (key, value))?;
Ok(true)
}