use agent_sdk_foundation::privacy::{RedactionPolicy, redact_string};
use opentelemetry::global::BoxedSpan;
use opentelemetry::trace::{Span, TraceContextExt};
use opentelemetry::{Context, KeyValue};
use super::attrs;
use super::baggage;
use super::langfuse;
use super::provider_name;
use super::spans;
use super::trace_io;
use crate::llm::LlmProvider;
use crate::tools::ToolRegistry;
use crate::types::{AgentConfig, AgentInput, RunOptions, ThreadId, TokenUsage};
#[derive(Clone, Copy)]
pub(crate) struct StartRootSpanParams<'a, Ctx, P>
where
Ctx: Send + Sync + 'static,
P: LlmProvider,
{
pub(crate) provider: &'a P,
pub(crate) tools: &'a ToolRegistry<Ctx>,
pub(crate) config: &'a AgentConfig,
pub(crate) thread_id: &'a ThreadId,
pub(crate) input: &'a AgentInput,
pub(crate) run_mode: &'static str,
pub(crate) run_options: &'a RunOptions,
}
pub(crate) struct StartedRootSpan {
pub(crate) sink: RootSpanEventSink,
pub(crate) span_context: opentelemetry::trace::SpanContext,
pub(crate) is_recording: bool,
}
pub(crate) fn start_root_span<Ctx, P>(params: &StartRootSpanParams<'_, Ctx, P>) -> StartedRootSpan
where
Ctx: Send + Sync + 'static,
P: LlmProvider,
{
let StartRootSpanParams {
provider,
tools,
config,
thread_id,
input,
run_mode,
run_options,
} = *params;
let span_attrs = build_root_attrs(provider, tools, config, thread_id, input, run_mode);
let baggage_cx = run_options_baggage(&Context::current(), run_options);
let _baggage_guard = baggage_cx.attach();
let mut span = spans::start_internal_span("invoke_agent", span_attrs);
super::baggage::copy_baggage_to_active_span(&mut span);
super::langfuse::tag_observation(&mut span, super::langfuse::ObservationType::Agent);
apply_run_options_attrs(&mut span, input, run_options);
let is_recording = span.is_recording();
let span_context = span.span_context().clone();
StartedRootSpan {
sink: RootSpanEventSink::new(span),
span_context,
is_recording,
}
}
fn build_root_attrs<Ctx, P>(
provider: &P,
tools: &ToolRegistry<Ctx>,
config: &AgentConfig,
thread_id: &ThreadId,
input: &AgentInput,
run_mode: &'static str,
) -> Vec<KeyValue>
where
Ctx: Send + Sync + 'static,
P: LlmProvider,
{
let provider_name_val = provider_name::normalize(provider.provider());
let mut span_attrs = vec![
KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "invoke_agent"),
KeyValue::new(attrs::GEN_AI_PROVIDER_NAME, provider_name_val),
KeyValue::new(attrs::GEN_AI_REQUEST_MODEL, provider.model().to_string()),
KeyValue::new(attrs::GEN_AI_CONVERSATION_ID, thread_id.to_string()),
KeyValue::new(attrs::SDK_PROVIDER_ID, provider.provider()),
KeyValue::new(attrs::SDK_RUN_MODE, run_mode),
KeyValue::new(attrs::SDK_INPUT_KIND, attrs::input_kind_str(input)),
attrs::kv_bool(attrs::SDK_CONFIG_STREAMING, config.streaming),
attrs::kv_i64(
attrs::SDK_TOOLS_COUNT,
i64::try_from(tools.len()).unwrap_or(0),
),
];
if let Some(max_turns) = config.max_turns {
span_attrs.push(attrs::kv_i64(
attrs::SDK_CONFIG_MAX_TURNS,
i64::try_from(max_turns).unwrap_or(0),
));
}
span_attrs
}
#[must_use]
pub(crate) fn build_root_context(
span_context: opentelemetry::trace::SpanContext,
run_options: &RunOptions,
) -> Context {
let cx = run_options_baggage(&Context::current(), run_options);
cx.with_remote_span_context(span_context)
}
#[must_use]
pub(crate) fn attach_root_event_sink(cx: &Context, sink: RootSpanEventSink) -> Context {
cx.with_value(sink)
}
pub(crate) fn record_root_event(name: &'static str, attrs: Vec<KeyValue>) {
if let Some(sink) = Context::current().get::<RootSpanEventSink>() {
sink.add_event(name, attrs);
}
}
#[derive(Clone)]
pub(crate) struct RootSpanEventSink(
std::sync::Arc<std::sync::Mutex<opentelemetry::global::BoxedSpan>>,
);
impl RootSpanEventSink {
pub(crate) fn new(span: opentelemetry::global::BoxedSpan) -> Self {
Self(std::sync::Arc::new(std::sync::Mutex::new(span)))
}
pub(crate) fn into_inner(self) -> Option<opentelemetry::global::BoxedSpan> {
let Self(arc) = self;
std::sync::Arc::try_unwrap(arc)
.ok()
.and_then(|mu| mu.into_inner().ok())
}
fn add_event(&self, name: &'static str, attrs: Vec<KeyValue>) {
let Ok(mut span) = self.0.lock() else {
log::warn!("root span sink mutex poisoned; dropping event {name}");
return;
};
if !span.is_recording() {
return;
}
span.add_event(name, attrs);
}
pub(crate) fn with_span_mut<R>(
&self,
op: impl FnOnce(&mut opentelemetry::global::BoxedSpan) -> R,
) -> Option<R> {
let mut span = self.0.lock().ok()?;
Some(op(&mut span))
}
}
fn run_options_baggage(cx: &Context, opts: &RunOptions) -> Context {
let mut entries: Vec<KeyValue> = Vec::new();
if let Some(session) = opts.session_id.as_deref() {
let value = session.to_owned();
entries.push(KeyValue::new(baggage::BAGGAGE_SESSION_ID, value.clone()));
entries.push(KeyValue::new(baggage::BAGGAGE_LANGFUSE_SESSION_ID, value));
}
if let Some(user) = opts.user_id.as_deref() {
let value = user.to_owned();
entries.push(KeyValue::new(baggage::BAGGAGE_USER_ID, value.clone()));
entries.push(KeyValue::new(baggage::BAGGAGE_LANGFUSE_USER_ID, value));
}
if let Some(env) = opts.environment.as_deref() {
entries.push(KeyValue::new(
baggage::BAGGAGE_DEPLOYMENT_ENVIRONMENT,
env.to_owned(),
));
}
if entries.is_empty() {
cx.clone()
} else {
baggage::with_attributes(cx, entries)
}
}
fn apply_run_options_attrs(span: &mut BoxedSpan, input: &AgentInput, opts: &RunOptions) {
if let Some(name) = opts.trace_name.as_deref() {
langfuse::set_trace_name(span, name);
}
if !opts.trace_tags.is_empty() {
langfuse::set_trace_tags(span, &opts.trace_tags);
}
for (key, value) in &opts.trace_metadata {
let stringified = match value {
serde_json::Value::String(s) => s.clone(),
other => other.to_string(),
};
langfuse::set_trace_metadata(span, key, stringified);
}
if let Some(release) = opts.release.as_deref() {
langfuse::set_release(span, release);
}
if let Some(env) = opts.environment.as_deref() {
langfuse::set_environment(span, env);
}
let max_chars = opts
.trace_text_max_chars
.unwrap_or(langfuse::DEFAULT_TRACE_TEXT_MAX_CHARS);
if let Some(trace_input) = trace_io::langfuse_trace_input(input, max_chars) {
let masked = redact_string(&trace_input, &RedactionPolicy::baseline());
langfuse::set_trace_input(span, masked);
}
}
#[must_use]
pub(crate) fn build_root_trace_state(
is_recording: bool,
run_options: &RunOptions,
) -> Option<std::sync::Arc<trace_io::RootTraceState>> {
if !is_recording {
return None;
}
let max_chars = run_options
.trace_text_max_chars
.unwrap_or(langfuse::DEFAULT_TRACE_TEXT_MAX_CHARS);
Some(std::sync::Arc::new(trace_io::RootTraceState::new(
max_chars,
)))
}
pub(crate) fn end_root_span(
sink: RootSpanEventSink,
total_turns: usize,
total_usage: &TokenUsage,
outcome: &'static str,
) {
let metrics = super::metrics::Metrics::global();
metrics
.runs_outcome
.add(1, &[KeyValue::new(attrs::SDK_OUTCOME, outcome)]);
let Some(mut span) = sink.into_inner() else {
log::warn!(
"root span sink still has outstanding clones at end_root_span; \
dropping outcome attributes",
);
return;
};
span.set_attribute(KeyValue::new(
attrs::SDK_TOTAL_TURNS,
i64::try_from(total_turns).unwrap_or(0),
));
span.set_attribute(KeyValue::new(
attrs::GEN_AI_USAGE_INPUT_TOKENS,
i64::from(total_usage.input_tokens),
));
span.set_attribute(KeyValue::new(
attrs::GEN_AI_USAGE_OUTPUT_TOKENS,
i64::from(total_usage.output_tokens),
));
span.set_attribute(KeyValue::new(
attrs::GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS,
i64::from(total_usage.cached_input_tokens),
));
span.set_attribute(KeyValue::new(
attrs::GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS,
i64::from(total_usage.cache_creation_input_tokens),
));
span.set_attribute(KeyValue::new(attrs::SDK_OUTCOME, outcome));
if outcome == "error" {
spans::set_span_error(&mut span, "agent_error", "agent invocation failed");
}
span.end();
}
pub(crate) fn flush_root_trace_state(sink: &RootSpanEventSink, state: &trace_io::RootTraceState) {
sink.with_span_mut(|span| state.flush(span));
}
#[must_use]
pub(crate) const fn run_state_outcome(state: &crate::types::AgentRunState) -> &'static str {
match state {
crate::types::AgentRunState::Done { .. } => "done",
crate::types::AgentRunState::Refusal { .. } => "refusal",
crate::types::AgentRunState::AwaitingConfirmation { .. } => "awaiting_confirmation",
crate::types::AgentRunState::Cancelled { .. } => "cancelled",
crate::types::AgentRunState::Error(_) => "error",
_ => "unknown",
}
}
#[must_use]
pub(crate) const fn turn_outcome_str(outcome: &crate::types::TurnOutcome) -> &'static str {
match outcome {
crate::types::TurnOutcome::Done { .. } => "done",
crate::types::TurnOutcome::Refusal { .. } => "refusal",
crate::types::TurnOutcome::NeedsMoreTurns { .. } => "needs_more_turns",
crate::types::TurnOutcome::AwaitingConfirmation { .. } => "awaiting_confirmation",
crate::types::TurnOutcome::PendingToolCalls { .. } => "pending_tool_calls",
crate::types::TurnOutcome::Cancelled { .. } => "cancelled",
crate::types::TurnOutcome::Error(_) => "error",
}
}