use std::collections::HashMap;
use std::sync::{LazyLock, Mutex};
use opentelemetry::global::BoxedSpan;
use opentelemetry::trace::{Span, Status, TraceContextExt};
use opentelemetry::{Context, KeyValue, global};
use super::metrics::Metrics;
use super::{attrs, baggage, langfuse, provider_name, spans};
use crate::llm::{ChatResponse, StopReason};
use crate::types::{TokenUsage, ToolTier};
#[derive(Clone, Copy)]
pub struct ChatSpanParams<'a> {
pub provider_id: &'static str,
pub model: &'a str,
pub streaming: bool,
pub max_tokens: Option<u32>,
}
#[must_use]
pub fn build_chat_span(params: ChatSpanParams<'_>) -> BoxedSpan {
let span_name = format!("chat {}", params.model);
let provider = provider_name::normalize(params.provider_id);
let mut init_attrs = vec![
KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "chat"),
KeyValue::new(attrs::GEN_AI_PROVIDER_NAME, provider),
KeyValue::new(attrs::GEN_AI_REQUEST_MODEL, params.model.to_string()),
attrs::kv_bool(attrs::SDK_LLM_STREAMING, params.streaming),
KeyValue::new(attrs::SDK_PROVIDER_ID, params.provider_id),
];
if let Some(max_tokens) = params.max_tokens {
init_attrs.push(attrs::kv_i64(
attrs::GEN_AI_REQUEST_MAX_OUTPUT_TOKENS,
i64::from(max_tokens),
));
}
let mut span = spans::start_client_span(span_name, init_attrs);
baggage::copy_baggage_to_active_span(&mut span);
langfuse::tag_observation(&mut span, langfuse::ObservationType::Generation);
span
}
pub fn finish_chat_span_success(
span: &mut BoxedSpan,
response: &ChatResponse,
elapsed_secs: f64,
provider_id: &'static str,
request_model: &str,
) {
let provider = provider_name::normalize(provider_id);
if !response.id.is_empty() {
span.set_attribute(KeyValue::new(
attrs::GEN_AI_RESPONSE_ID,
response.id.clone(),
));
}
span.set_attribute(KeyValue::new(
attrs::GEN_AI_RESPONSE_MODEL,
response.model.clone(),
));
if let Some(reason) = response.stop_reason {
span.set_attribute(KeyValue::new(
attrs::GEN_AI_RESPONSE_FINISH_REASONS,
finish_reason_str(reason),
));
}
span.set_attribute(attrs::kv_i64(
attrs::GEN_AI_USAGE_INPUT_TOKENS,
i64::from(response.usage.input_tokens),
));
span.set_attribute(attrs::kv_i64(
attrs::GEN_AI_USAGE_OUTPUT_TOKENS,
i64::from(response.usage.output_tokens),
));
span.set_attribute(attrs::kv_i64(
attrs::GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS,
i64::from(response.usage.cached_input_tokens),
));
span.set_attribute(attrs::kv_i64(
attrs::GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS,
i64::from(response.usage.cache_creation_input_tokens),
));
span.set_attribute(attrs::kv_bool(
attrs::SDK_LLM_HAD_TOOL_CALLS,
response.has_tool_use(),
));
span.set_attribute(attrs::kv_bool(
attrs::SDK_LLM_TEXT_OUTPUT_PRESENT,
response.first_text().is_some(),
));
span.set_attribute(attrs::kv_bool(
attrs::SDK_LLM_THINKING_PRESENT,
response.first_thinking().is_some(),
));
let metrics = Metrics::global();
metrics.record_chat_token_usage(&response.usage, provider, request_model, &response.model);
metrics.record_chat_operation_duration_success(
elapsed_secs,
provider,
request_model,
&response.model,
);
span.end();
}
pub fn finish_chat_span_error(
span: &mut BoxedSpan,
error_type: &'static str,
message: &str,
elapsed_secs: f64,
provider_id: &'static str,
request_model: &str,
) {
let provider = provider_name::normalize(provider_id);
spans::set_span_error(span, error_type, message);
Metrics::global().record_chat_operation_duration_error(
elapsed_secs,
provider,
request_model,
error_type,
);
span.end();
}
#[must_use]
pub fn classify_llm_error(msg: &str) -> &'static str {
if msg.contains("Rate limited") {
"rate_limited"
} else if msg.contains("Invalid request") {
"invalid_request"
} else if msg.contains("Server error") {
"server_error"
} else if msg.contains("Stream") {
"stream_error"
} else {
"_OTHER"
}
}
#[must_use]
pub const fn finish_reason_str(reason: StopReason) -> &'static str {
attrs::finish_reason_str(reason)
}
#[derive(Clone, Copy)]
pub struct ToolSpanParams<'a> {
pub tool_name: &'a str,
pub tool_call_id: &'a str,
pub display_name: &'a str,
pub tier: Option<ToolTier>,
pub kind: &'static str,
}
#[must_use]
pub fn build_tool_span(params: ToolSpanParams<'_>) -> BoxedSpan {
let mut span_attrs = vec![
KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "execute_tool"),
KeyValue::new(attrs::GEN_AI_TOOL_NAME, params.tool_name.to_string()),
KeyValue::new(attrs::GEN_AI_TOOL_CALL_ID, params.tool_call_id.to_string()),
];
if !params.display_name.is_empty() {
span_attrs.push(KeyValue::new(
attrs::SDK_TOOL_DISPLAY_NAME,
params.display_name.to_string(),
));
}
if let Some(tier) = params.tier {
span_attrs.push(KeyValue::new(
attrs::SDK_TOOL_TIER,
attrs::tool_tier_str(tier),
));
}
span_attrs.push(KeyValue::new(attrs::SDK_TOOL_KIND, params.kind));
let mut span = spans::start_internal_span("execute_tool", span_attrs);
baggage::copy_baggage_to_active_span(&mut span);
langfuse::tag_observation(&mut span, langfuse::ObservationType::Tool);
span
}
pub enum ToolSpanOutcome<'a> {
Completed {
output: &'a str,
success: bool,
duration_ms: Option<u64>,
},
AwaitingConfirmation,
EventStoreError {
message: &'a str,
},
}
pub fn finish_tool_span(
span: &mut BoxedSpan,
tool_name: &str,
tool_kind: &'static str,
outcome: &ToolSpanOutcome<'_>,
) {
let (outcome_str, duration_ms) = match outcome {
ToolSpanOutcome::Completed {
output,
success,
duration_ms,
} => {
let outcome_str = if output.starts_with("Unknown tool:") {
span.set_attribute(KeyValue::new(attrs::ERROR_TYPE, "unknown_tool"));
span.set_status(Status::error((*output).to_string()));
"error"
} else if output.starts_with("Blocked:") {
"blocked"
} else if output.starts_with("Rejected:") {
"rejected"
} else if *success {
"success"
} else {
"error"
};
span.set_attribute(KeyValue::new(attrs::SDK_TOOL_OUTCOME, outcome_str));
if let Some(ms) = duration_ms {
span.set_attribute(attrs::kv_i64(
attrs::SDK_TOOL_DURATION_MS,
i64::try_from(*ms).unwrap_or(i64::MAX),
));
}
(outcome_str, *duration_ms)
}
ToolSpanOutcome::AwaitingConfirmation => {
span.set_attribute(attrs::kv_bool(attrs::SDK_TOOL_CONFIRMATION_REQUIRED, true));
span.set_attribute(KeyValue::new(
attrs::SDK_TOOL_OUTCOME,
"awaiting_confirmation",
));
("awaiting_confirmation", None)
}
ToolSpanOutcome::EventStoreError { message } => {
span.set_attribute(KeyValue::new(attrs::ERROR_TYPE, "event_store"));
span.set_status(Status::error((*message).to_string()));
span.set_attribute(KeyValue::new(attrs::SDK_TOOL_OUTCOME, "error"));
("error", None)
}
};
Metrics::global().record_tool_execution(tool_name, tool_kind, outcome_str, duration_ms);
span.end();
}
#[derive(Clone, Copy)]
pub struct RootTurnSpanParams<'a> {
pub provider_id: &'static str,
pub model: &'a str,
pub conversation_id: &'a str,
}
pub struct StartedRootTurnSpan {
pub span: BoxedSpan,
pub trace_id_hex: String,
pub span_id_hex: String,
}
#[must_use]
pub fn start_root_turn_span(params: RootTurnSpanParams<'_>) -> StartedRootTurnSpan {
let provider = provider_name::normalize(params.provider_id);
let span_attrs = vec![
KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "invoke_agent"),
KeyValue::new(attrs::GEN_AI_PROVIDER_NAME, provider),
KeyValue::new(attrs::GEN_AI_REQUEST_MODEL, params.model.to_string()),
KeyValue::new(
attrs::GEN_AI_CONVERSATION_ID,
params.conversation_id.to_string(),
),
KeyValue::new(attrs::SDK_PROVIDER_ID, params.provider_id),
];
let mut span = spans::start_internal_span("invoke_agent", span_attrs);
baggage::copy_baggage_to_active_span(&mut span);
langfuse::tag_observation(&mut span, langfuse::ObservationType::Agent);
let span_context = span.span_context().clone();
StartedRootTurnSpan {
trace_id_hex: span_context.trace_id().to_string(),
span_id_hex: span_context.span_id().to_string(),
span,
}
}
#[must_use]
pub fn remote_parent_context(trace_id_hex: &str, span_id_hex: &str) -> Option<Context> {
let span_context = spans::remote_span_context(trace_id_hex, span_id_hex)?;
Some(Context::current().with_remote_span_context(span_context))
}
pub fn finish_root_turn_span(
span: &mut BoxedSpan,
total_turns: usize,
total_usage: Option<&TokenUsage>,
outcome: &'static str,
) {
Metrics::global()
.runs_outcome
.add(1, &[KeyValue::new(attrs::SDK_OUTCOME, outcome)]);
span.set_attribute(attrs::kv_i64(
attrs::SDK_TOTAL_TURNS,
i64::try_from(total_turns).unwrap_or(0),
));
if let Some(usage) = total_usage {
span.set_attribute(attrs::kv_i64(
attrs::GEN_AI_USAGE_INPUT_TOKENS,
i64::from(usage.input_tokens),
));
span.set_attribute(attrs::kv_i64(
attrs::GEN_AI_USAGE_OUTPUT_TOKENS,
i64::from(usage.output_tokens),
));
span.set_attribute(attrs::kv_i64(
attrs::GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS,
i64::from(usage.cached_input_tokens),
));
span.set_attribute(attrs::kv_i64(
attrs::GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS,
i64::from(usage.cache_creation_input_tokens),
));
}
span.set_attribute(KeyValue::new(attrs::SDK_OUTCOME, outcome));
if outcome == "error" {
spans::set_span_error(span, "agent_error", "agent invocation failed");
}
span.end();
}
const TRACEPARENT_KEY: &str = "traceparent";
#[must_use]
pub fn context_from_traceparent(traceparent: &str) -> Option<Context> {
if traceparent.is_empty() {
return None;
}
let mut carrier = HashMap::with_capacity(1);
carrier.insert(TRACEPARENT_KEY.to_string(), traceparent.to_string());
let cx = global::get_text_map_propagator(|propagator| propagator.extract(&carrier));
if cx.span().span_context().is_valid() {
Some(cx)
} else {
None
}
}
#[must_use]
pub fn traceparent_from_ids(trace_id_hex: &str, span_id_hex: &str) -> Option<String> {
let span_context = spans::remote_span_context(trace_id_hex, span_id_hex)?;
Some(format!(
"00-{}-{}-01",
span_context.trace_id(),
span_context.span_id()
))
}
static LIVE_ROOT_SPANS: LazyLock<Mutex<HashMap<String, BoxedSpan>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));
pub fn stash_root_turn_span(task_id: &str, span: BoxedSpan) {
let Ok(mut spans) = LIVE_ROOT_SPANS.lock() else {
log::warn!("live root-span registry poisoned; dropping root span for task {task_id}");
return;
};
spans.entry(task_id.to_string()).or_insert(span);
}
pub fn finalize_root_turn_span(
task_id: &str,
total_turns: usize,
total_usage: Option<&TokenUsage>,
outcome: &'static str,
) {
let stashed = LIVE_ROOT_SPANS
.lock()
.ok()
.and_then(|mut spans| spans.remove(task_id));
match stashed {
Some(mut span) => finish_root_turn_span(&mut span, total_turns, total_usage, outcome),
None => {
Metrics::global()
.runs_outcome
.add(1, &[KeyValue::new(attrs::SDK_OUTCOME, outcome)]);
}
}
}
pub fn discard_root_turn_span(task_id: &str) {
if let Ok(mut spans) = LIVE_ROOT_SPANS.lock() {
drop(spans.remove(task_id));
}
}
#[cfg(test)]
mod tests {
use super::{
discard_root_turn_span, finalize_root_turn_span, spans, stash_root_turn_span,
traceparent_from_ids,
};
const TRACE_HEX: &str = "4bf92f3577b34da6a3ce929d0e0e4736";
const SPAN_HEX: &str = "00f067aa0ba902b7";
#[test]
fn traceparent_from_valid_ids_is_w3c_sampled() {
let traceparent = traceparent_from_ids(TRACE_HEX, SPAN_HEX).expect("valid ids");
assert_eq!(traceparent, format!("00-{TRACE_HEX}-{SPAN_HEX}-01"));
}
#[test]
fn traceparent_from_malformed_or_zero_ids_is_none() {
assert!(traceparent_from_ids("not-hex", SPAN_HEX).is_none());
assert!(traceparent_from_ids(TRACE_HEX, "tooshort").is_none());
assert!(traceparent_from_ids(&"0".repeat(32), &"0".repeat(16)).is_none());
}
#[test]
fn live_root_span_registry_finalize_is_idempotent() {
let task_id = "registry-roundtrip-task";
stash_root_turn_span(task_id, spans::start_internal_span("test", Vec::new()));
finalize_root_turn_span(task_id, 1, None, "done");
finalize_root_turn_span(task_id, 1, None, "done");
}
#[test]
fn discard_removes_stashed_span_without_finalize() {
let task_id = "registry-discard-task";
stash_root_turn_span(task_id, spans::start_internal_span("test", Vec::new()));
discard_root_turn_span(task_id);
finalize_root_turn_span(task_id, 0, None, "cancelled");
}
}