agent_sdk/observability/loop_instrument.rs
1//! Reusable `chat` / `execute_tool` span + metric helpers.
2//!
3//! These primitives are shared by every agent-loop implementation so
4//! the daemon-hosted worker emits byte-identical telemetry to the
5//! in-process loop.
6//!
7//! ## Why this module exists
8//!
9//! The SDK ships two agent-loop implementations:
10//!
11//! * the in-process `agent_loop` (used by `AgentLoop` /
12//! `BipAgent`), and
13//! * the daemon-hosted re-implementation in `agent-server`'s worker
14//! (`crates/agent-server/src/worker/root_turn.rs` +
15//! `agent-service-host`'s `registry_tool_executor.rs`).
16//!
17//! Both drive an LLM call and a tool dispatch, and both must emit the
18//! **same** `gen_ai.*` / `agent_sdk.*` telemetry so dashboards built
19//! against the in-process loop light up unchanged when a session runs
20//! on the daemon. Before this module the worker bypassed the SDK
21//! instrumentation entirely and emitted none of it.
22//!
23//! Rather than copy the span names, attribute keys, and metric label
24//! sets into `agent-server` (where they would silently drift), this
25//! module exposes the *exact* primitives the in-process loop uses:
26//!
27//! * [`build_chat_span`] / [`finish_chat_span_success`] /
28//! [`finish_chat_span_error`] mirror `agent_loop::turn`'s
29//! `build_llm_span` / `stamp_llm_success` / `stamp_llm_error`.
30//! * [`build_tool_span`] / [`finish_tool_span`] /
31//! [`ToolSpanOutcome`] mirror `agent_loop::tool_execution`'s
32//! `start_tool_span` / `finish_tool_span`.
33//!
34//! The metric recording underneath delegates to
35//! [`crate::observability::metrics::Metrics`] (the same global
36//! singleton), so there is a single source of truth for the label
37//! sets and no second meter scope.
38//!
39//! Compiled only with `feature = "otel"`.
40
41use std::collections::HashMap;
42use std::sync::{LazyLock, Mutex};
43use std::time::{Duration, Instant};
44
45use opentelemetry::global::BoxedSpan;
46use opentelemetry::trace::{Span, Status, TraceContextExt};
47use opentelemetry::{Context, KeyValue, global};
48
49use super::metrics::Metrics;
50use super::{attrs, baggage, langfuse, provider_name, spans};
51use crate::llm::{ChatResponse, StopReason};
52use crate::types::{TokenUsage, ToolTier};
53
54// ── Chat (LLM client) span ───────────────────────────────────────────
55
56/// Inputs needed to open a `chat {model}` CLIENT span.
57///
58/// Bundled so call sites stay readable and so the field set can grow
59/// (e.g. extra `gen_ai.request.*` attributes) without churning every
60/// caller. Mirrors the attributes set by `agent_loop::turn::build_llm_span`.
61#[derive(Clone, Copy)]
62pub struct ChatSpanParams<'a> {
63 /// Raw SDK provider id (e.g. `anthropic`, `openai-responses`).
64 /// Normalised to the `gen_ai.provider.name` semconv value
65 /// internally via [`provider_name::normalize`].
66 pub provider_id: &'static str,
67 /// Model the SDK is asking for (`gen_ai.request.model`).
68 pub model: &'a str,
69 /// Whether the call streams (`agent_sdk.llm.streaming`).
70 pub streaming: bool,
71 /// Configured output-token cap, if any
72 /// (`gen_ai.request.max_output_tokens`).
73 pub max_tokens: Option<u32>,
74}
75
76/// Open a `chat {model}` CLIENT span with the `gen_ai` semconv
77/// attributes known before the call.
78///
79/// Byte-for-byte mirror of `agent_loop::turn::build_llm_span`: same
80/// span name, same initial attribute set, same baggage copy, same
81/// Langfuse `generation` observation tag. Pair every successful call
82/// with [`finish_chat_span_success`] and every failure with
83/// [`finish_chat_span_error`].
84#[must_use]
85pub fn build_chat_span(params: ChatSpanParams<'_>) -> BoxedSpan {
86 let span_name = format!("chat {}", params.model);
87 let provider = provider_name::normalize(params.provider_id);
88 let mut init_attrs = vec![
89 KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "chat"),
90 KeyValue::new(attrs::GEN_AI_PROVIDER_NAME, provider),
91 KeyValue::new(attrs::GEN_AI_REQUEST_MODEL, params.model.to_string()),
92 attrs::kv_bool(attrs::SDK_LLM_STREAMING, params.streaming),
93 KeyValue::new(attrs::SDK_PROVIDER_ID, params.provider_id),
94 ];
95 if let Some(max_tokens) = params.max_tokens {
96 init_attrs.push(attrs::kv_i64(
97 attrs::GEN_AI_REQUEST_MAX_OUTPUT_TOKENS,
98 i64::from(max_tokens),
99 ));
100 }
101 let mut span = spans::start_client_span(span_name, init_attrs);
102 baggage::copy_baggage_to_active_span(&mut span);
103 langfuse::tag_observation(&mut span, langfuse::ObservationType::Generation);
104 span
105}
106
107/// Stamp a successful chat response onto `span`, record the
108/// `gen_ai.client.token.usage` + `gen_ai.client.operation.duration`
109/// metrics, and end the span.
110///
111/// Mirrors `agent_loop::turn::stamp_llm_success`: same response-model
112/// / id / finish-reason / usage / boolean attributes, same metric
113/// label sets (via [`Metrics::record_chat_token_usage`] +
114/// [`Metrics::record_chat_operation_duration_success`]).
115///
116/// `provider_id` is the raw SDK id; it is normalised internally so
117/// the metric `gen_ai.provider.name` label matches the span.
118pub fn finish_chat_span_success(
119 span: &mut BoxedSpan,
120 response: &ChatResponse,
121 elapsed_secs: f64,
122 provider_id: &'static str,
123 request_model: &str,
124) {
125 let provider = provider_name::normalize(provider_id);
126
127 if !response.id.is_empty() {
128 span.set_attribute(KeyValue::new(
129 attrs::GEN_AI_RESPONSE_ID,
130 response.id.clone(),
131 ));
132 }
133 span.set_attribute(KeyValue::new(
134 attrs::GEN_AI_RESPONSE_MODEL,
135 response.model.clone(),
136 ));
137 if let Some(reason) = response.stop_reason {
138 span.set_attribute(KeyValue::new(
139 attrs::GEN_AI_RESPONSE_FINISH_REASONS,
140 finish_reason_str(reason),
141 ));
142 }
143 span.set_attribute(attrs::kv_i64(
144 attrs::GEN_AI_USAGE_INPUT_TOKENS,
145 i64::from(response.usage.input_tokens),
146 ));
147 span.set_attribute(attrs::kv_i64(
148 attrs::GEN_AI_USAGE_OUTPUT_TOKENS,
149 i64::from(response.usage.output_tokens),
150 ));
151 span.set_attribute(attrs::kv_i64(
152 attrs::GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS,
153 i64::from(response.usage.cached_input_tokens),
154 ));
155 span.set_attribute(attrs::kv_i64(
156 attrs::GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS,
157 i64::from(response.usage.cache_creation_input_tokens),
158 ));
159 span.set_attribute(attrs::kv_bool(
160 attrs::SDK_LLM_HAD_TOOL_CALLS,
161 response.has_tool_use(),
162 ));
163 span.set_attribute(attrs::kv_bool(
164 attrs::SDK_LLM_TEXT_OUTPUT_PRESENT,
165 response.first_text().is_some(),
166 ));
167 span.set_attribute(attrs::kv_bool(
168 attrs::SDK_LLM_THINKING_PRESENT,
169 response.first_thinking().is_some(),
170 ));
171
172 let metrics = Metrics::global();
173 metrics.record_chat_token_usage(&response.usage, provider, request_model, &response.model);
174 metrics.record_chat_operation_duration_success(
175 elapsed_secs,
176 provider,
177 request_model,
178 &response.model,
179 );
180
181 span.end();
182}
183
184/// Stamp a chat error onto `span`, record the
185/// `gen_ai.client.operation.duration` metric with the `error.type`
186/// label, and end the span.
187///
188/// Mirrors `agent_loop::turn::stamp_llm_error`. `error_type` must be a
189/// stable, low-cardinality string (e.g. `rate_limited`,
190/// `server_error`, `invalid_request`, `stream_error`); use
191/// [`classify_llm_error`] for free-form provider error messages so the
192/// vocabulary matches the in-process loop exactly.
193pub fn finish_chat_span_error(
194 span: &mut BoxedSpan,
195 error_type: &'static str,
196 message: &str,
197 elapsed_secs: f64,
198 provider_id: &'static str,
199 request_model: &str,
200) {
201 let provider = provider_name::normalize(provider_id);
202 spans::set_span_error(span, error_type, message);
203 Metrics::global().record_chat_operation_duration_error(
204 elapsed_secs,
205 provider,
206 request_model,
207 error_type,
208 );
209 span.end();
210}
211
212/// Map a free-form LLM error message to the stable `error.type`
213/// attribute / metric label.
214///
215/// Byte-for-byte mirror of `agent_loop::turn::classify_llm_error` so
216/// the daemon worker and the in-process loop bucket transient failures
217/// identically.
218#[must_use]
219pub fn classify_llm_error(msg: &str) -> &'static str {
220 if msg.contains("Rate limited") {
221 "rate_limited"
222 } else if msg.contains("Invalid request") {
223 "invalid_request"
224 } else if msg.contains("Server error") {
225 "server_error"
226 } else if msg.contains("Stream") {
227 "stream_error"
228 } else {
229 "_OTHER"
230 }
231}
232
233/// Map an SDK [`StopReason`] to the semconv `finish_reason` string.
234///
235/// Mirrors `attrs::finish_reason_str` (kept here too so callers that
236/// only import this module get a consistent vocabulary).
237#[must_use]
238pub const fn finish_reason_str(reason: StopReason) -> &'static str {
239 attrs::finish_reason_str(reason)
240}
241
242// ── Tool execution span ──────────────────────────────────────────────
243
244/// Inputs needed to open an `execute_tool` INTERNAL span.
245///
246/// Mirrors the attributes set by
247/// `agent_loop::tool_execution::start_tool_span`.
248#[derive(Clone, Copy)]
249pub struct ToolSpanParams<'a> {
250 /// `gen_ai.tool.name` — the protocol tool name.
251 pub tool_name: &'a str,
252 /// `gen_ai.tool.call.id` — the LLM-assigned call id.
253 pub tool_call_id: &'a str,
254 /// `agent_sdk.tool.display_name`; skipped when empty.
255 pub display_name: &'a str,
256 /// `agent_sdk.tool.tier`; `None` when the tool is unknown to the
257 /// registry (matches the in-process loop's `unknown`-kind path,
258 /// which omits the tier attribute).
259 pub tier: Option<ToolTier>,
260 /// `agent_sdk.tool.kind` — `sync` / `async` / `listen` /
261 /// `unknown`.
262 pub kind: &'static str,
263}
264
265/// Open an `execute_tool` INTERNAL span.
266///
267/// Byte-for-byte mirror of
268/// `agent_loop::tool_execution::start_tool_span`: same span name, same
269/// attribute set (operation name, tool name, call id, optional display
270/// name, optional tier, kind), same baggage copy, same Langfuse `tool`
271/// observation tag. Finish with [`finish_tool_span`].
272#[must_use]
273pub fn build_tool_span(params: ToolSpanParams<'_>) -> BoxedSpan {
274 let mut span_attrs = vec![
275 KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "execute_tool"),
276 KeyValue::new(attrs::GEN_AI_TOOL_NAME, params.tool_name.to_string()),
277 KeyValue::new(attrs::GEN_AI_TOOL_CALL_ID, params.tool_call_id.to_string()),
278 ];
279 if !params.display_name.is_empty() {
280 span_attrs.push(KeyValue::new(
281 attrs::SDK_TOOL_DISPLAY_NAME,
282 params.display_name.to_string(),
283 ));
284 }
285 if let Some(tier) = params.tier {
286 span_attrs.push(KeyValue::new(
287 attrs::SDK_TOOL_TIER,
288 attrs::tool_tier_str(tier),
289 ));
290 }
291 span_attrs.push(KeyValue::new(attrs::SDK_TOOL_KIND, params.kind));
292
293 let mut span = spans::start_internal_span("execute_tool", span_attrs);
294 baggage::copy_baggage_to_active_span(&mut span);
295 langfuse::tag_observation(&mut span, langfuse::ObservationType::Tool);
296 span
297}
298
299/// Terminal outcome of a tool execution, used to stamp the outcome
300/// attributes + metric labels on an `execute_tool` span.
301///
302/// Mirrors the match arms of
303/// `agent_loop::tool_execution::finish_tool_span` so the
304/// `agent_sdk.tool.outcome` value and `error.type` value match across
305/// loops.
306pub enum ToolSpanOutcome<'a> {
307 /// Tool ran (or was short-circuited) and produced a result. The
308 /// outcome string is derived from the result body / success flag
309 /// exactly as the in-process loop does.
310 Completed {
311 /// Tool output body; inspected for the `Unknown tool:` /
312 /// `Blocked:` / `Rejected:` sentinels.
313 output: &'a str,
314 /// Whether the tool reported success.
315 success: bool,
316 /// Wall-clock duration in milliseconds, if measured.
317 duration_ms: Option<u64>,
318 },
319 /// Tool requires user confirmation before running.
320 AwaitingConfirmation,
321 /// The execution boundary itself errored (e.g. event-store commit
322 /// failure). Recorded with `error.type = event_store`.
323 EventStoreError {
324 /// Error message for the span status.
325 message: &'a str,
326 },
327}
328
329/// Stamp the terminal outcome on `span`, record the
330/// `agent_sdk.tools.execution.{count,duration}` metrics, and end the
331/// span.
332///
333/// Byte-for-byte mirror of
334/// `agent_loop::tool_execution::finish_tool_span`'s outcome handling +
335/// metric recording (via [`Metrics::record_tool_execution`]).
336pub fn finish_tool_span(
337 span: &mut BoxedSpan,
338 tool_name: &str,
339 tool_kind: &'static str,
340 outcome: &ToolSpanOutcome<'_>,
341) {
342 let (outcome_str, duration_ms) = match outcome {
343 ToolSpanOutcome::Completed {
344 output,
345 success,
346 duration_ms,
347 } => {
348 let outcome_str = if output.starts_with("Unknown tool:") {
349 span.set_attribute(KeyValue::new(attrs::ERROR_TYPE, "unknown_tool"));
350 span.set_status(Status::error((*output).to_string()));
351 "error"
352 } else if output.starts_with("Blocked:") {
353 "blocked"
354 } else if output.starts_with("Rejected:") {
355 "rejected"
356 } else if *success {
357 "success"
358 } else {
359 "error"
360 };
361 span.set_attribute(KeyValue::new(attrs::SDK_TOOL_OUTCOME, outcome_str));
362 if let Some(ms) = duration_ms {
363 span.set_attribute(attrs::kv_i64(
364 attrs::SDK_TOOL_DURATION_MS,
365 i64::try_from(*ms).unwrap_or(i64::MAX),
366 ));
367 }
368 (outcome_str, *duration_ms)
369 }
370 ToolSpanOutcome::AwaitingConfirmation => {
371 span.set_attribute(attrs::kv_bool(attrs::SDK_TOOL_CONFIRMATION_REQUIRED, true));
372 span.set_attribute(KeyValue::new(
373 attrs::SDK_TOOL_OUTCOME,
374 "awaiting_confirmation",
375 ));
376 ("awaiting_confirmation", None)
377 }
378 ToolSpanOutcome::EventStoreError { message } => {
379 span.set_attribute(KeyValue::new(attrs::ERROR_TYPE, "event_store"));
380 span.set_status(Status::error((*message).to_string()));
381 span.set_attribute(KeyValue::new(attrs::SDK_TOOL_OUTCOME, "error"));
382 ("error", None)
383 }
384 };
385
386 Metrics::global().record_tool_execution(tool_name, tool_kind, outcome_str, duration_ms);
387 span.end();
388}
389
390// ── Root turn span (daemon cross-task) ───────────────────────────────
391
392/// Inputs needed to open the daemon's root `invoke_agent` span.
393///
394/// The daemon-hosted worker drives one turn across multiple tokio tasks
395/// (execute → suspend at the tool boundary → resume). Unlike the
396/// in-process loop's `agent_loop` root span — which keeps a single live
397/// span wrapping the whole run future — the worker creates this span
398/// live on the FRESH turn, captures its `(trace_id, span_id)` to persist
399/// on the turn attempt, and re-parents every later span (resumed `chat`
400/// calls, child-task `execute_tool` calls) under those ids via
401/// [`remote_parent_context`].
402#[derive(Clone, Copy)]
403pub struct RootTurnSpanParams<'a> {
404 /// Raw SDK provider id (e.g. `anthropic`); normalised to the
405 /// `gen_ai.provider.name` semconv value internally.
406 pub provider_id: &'static str,
407 /// Model driving the turn (`gen_ai.request.model`).
408 pub model: &'a str,
409 /// Thread / conversation id (`gen_ai.conversation.id`).
410 pub conversation_id: &'a str,
411}
412
413/// A started root-turn span plus the hex ids the worker persists so the
414/// turn's later tasks can re-parent their spans under it.
415pub struct StartedRootTurnSpan {
416 /// The live `invoke_agent` span.
417 ///
418 /// Canonical lifecycle: hand this span straight to
419 /// [`stash_root_turn_span`] (keyed by task id) and finalize it later —
420 /// from whichever task reaches the terminal — via
421 /// [`finalize_root_turn_span`], so the exported span carries the
422 /// **full** turn duration across the suspend/resume hop. Only callers
423 /// that deliberately opt out of the registry hold this span directly
424 /// and finish it with [`finish_root_turn_span`] (or let it end on drop,
425 /// which truncates the duration to the fresh segment only).
426 pub span: BoxedSpan,
427 /// Hex-encoded `TraceId` — persist to
428 /// `agent_sdk_turn_attempts.otel_trace_id`.
429 pub trace_id_hex: String,
430 /// Hex-encoded root `SpanId` — persist to
431 /// `agent_sdk_turn_attempts.otel_span_id`.
432 pub span_id_hex: String,
433 /// The root span's real sampled bit, captured from its `SpanContext`
434 /// at creation.
435 ///
436 /// Persist this alongside the ids and pass it to
437 /// [`remote_parent_context_with_sampling`] /
438 /// [`traceparent_from_ids_with_sampling`] when re-parenting resumed
439 /// `chat` calls and child `execute_tool` spans, so those children
440 /// honour ratio sampling instead of being force-recorded under a
441 /// sampled-out root.
442 pub sampled: bool,
443}
444
445/// Start the daemon's root `invoke_agent` INTERNAL span and capture the
446/// ids the worker persists so the turn's later tasks can nest under it.
447///
448/// Mirrors the structural / `gen_ai.*` attributes of the in-process
449/// loop's root span (operation name, provider, model, conversation id)
450/// with the minimal parameter set the daemon worker supplies cheaply,
451/// and tags the same Langfuse `agent` observation.
452///
453/// # Lifecycle
454///
455/// The `OTel` 0.32 `SpanBuilder` exposes no way to assign a span id, so a
456/// single span cannot be reopened on resume to cover the whole turn.
457/// The **canonical** pattern is therefore:
458///
459/// 1. Persist [`StartedRootTurnSpan::trace_id_hex`] /
460/// [`StartedRootTurnSpan::span_id_hex`] (and
461/// [`StartedRootTurnSpan::sampled`]) on the turn attempt so later tasks
462/// re-parent under the root via [`remote_parent_context_with_sampling`].
463/// 2. Hand [`StartedRootTurnSpan::span`] to [`stash_root_turn_span`]
464/// immediately, then [`finalize_root_turn_span`] it from whichever task
465/// reaches the terminal — even across tasks — so the span keeps the
466/// full turn duration (it legitimately stays open while tools run).
467///
468/// End-on-drop is reserved for callers that deliberately opt out of the
469/// registry; for them the span's duration covers only the fresh segment.
470/// A custom worker that skips [`stash_root_turn_span`] will export
471/// truncated root spans.
472#[must_use]
473pub fn start_root_turn_span(params: RootTurnSpanParams<'_>) -> StartedRootTurnSpan {
474 let provider = provider_name::normalize(params.provider_id);
475 let span_attrs = vec![
476 KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "invoke_agent"),
477 KeyValue::new(attrs::GEN_AI_PROVIDER_NAME, provider),
478 KeyValue::new(attrs::GEN_AI_REQUEST_MODEL, params.model.to_string()),
479 KeyValue::new(
480 attrs::GEN_AI_CONVERSATION_ID,
481 params.conversation_id.to_string(),
482 ),
483 KeyValue::new(attrs::SDK_PROVIDER_ID, params.provider_id),
484 ];
485 let mut span = spans::start_internal_span("invoke_agent", span_attrs);
486 baggage::copy_baggage_to_active_span(&mut span);
487 langfuse::tag_observation(&mut span, langfuse::ObservationType::Agent);
488 let span_context = span.span_context().clone();
489 StartedRootTurnSpan {
490 trace_id_hex: span_context.trace_id().to_string(),
491 span_id_hex: span_context.span_id().to_string(),
492 sampled: span_context.is_sampled(),
493 span,
494 }
495}
496
497/// Build a [`Context`] whose active parent is the root-turn span
498/// identified by the hex ids persisted on the turn attempt.
499///
500/// Spans created while this context is attached become children of the
501/// root span. This is how the worker re-parents resumed `chat` calls and
502/// child-task `execute_tool` calls under the turn root after the original
503/// live span has gone (the worker suspended, or a fresh task / process
504/// picked the turn up). Returns `None` when the ids are absent or
505/// malformed — the caller then leaves the span un-parented (a new trace),
506/// exactly as before this wiring.
507#[must_use]
508pub fn remote_parent_context(trace_id_hex: &str, span_id_hex: &str) -> Option<Context> {
509 let span_context = spans::remote_span_context(trace_id_hex, span_id_hex)?;
510 Some(Context::current().with_remote_span_context(span_context))
511}
512
513/// Re-parent a child onto a remote span, preserving the root's real sampled bit.
514///
515/// Like [`remote_parent_context`] but propagates the root span's **real**
516/// sampled bit (see [`StartedRootTurnSpan::sampled`]) instead of forcing
517/// SAMPLED, so a `ParentBased` sampler keeps or drops the re-parented child
518/// to match the root's sampling decision. Use this on the resume / child
519/// tool paths to stop ratio sampling being silently defeated for daemon
520/// workloads. Returns `None` when the ids are absent or malformed.
521#[must_use]
522pub fn remote_parent_context_with_sampling(
523 trace_id_hex: &str,
524 span_id_hex: &str,
525 sampled: bool,
526) -> Option<Context> {
527 let span_context =
528 spans::remote_span_context_with_sampling(trace_id_hex, span_id_hex, sampled)?;
529 Some(Context::current().with_remote_span_context(span_context))
530}
531
532/// Finalize the root-turn span with run-outcome attributes + the
533/// `agent_sdk.runs.outcome` counter, then end it.
534///
535/// Mirrors `agent_loop`'s `end_root_span`: same total-turns / usage /
536/// outcome attribute set and the same outcome counter. Reached on the
537/// text-only terminal path where the worker still holds the live span;
538/// tool turns end the span on drop at the suspend boundary (see
539/// [`start_root_turn_span`]).
540pub fn finish_root_turn_span(
541 span: &mut BoxedSpan,
542 total_turns: usize,
543 total_usage: Option<&TokenUsage>,
544 outcome: &'static str,
545) {
546 Metrics::global()
547 .runs_outcome
548 .add(1, &[KeyValue::new(attrs::SDK_OUTCOME, outcome)]);
549 span.set_attribute(attrs::kv_i64(
550 attrs::SDK_TOTAL_TURNS,
551 i64::try_from(total_turns).unwrap_or(0),
552 ));
553 // Usage is optional: the daemon does not aggregate per-turn token
554 // usage across the suspend/resume hop, so it passes `None` and the
555 // per-call `chat` spans carry the authoritative usage. The in-process
556 // loop passes the cumulative `TokenUsage`.
557 if let Some(usage) = total_usage {
558 span.set_attribute(attrs::kv_i64(
559 attrs::GEN_AI_USAGE_INPUT_TOKENS,
560 i64::from(usage.input_tokens),
561 ));
562 span.set_attribute(attrs::kv_i64(
563 attrs::GEN_AI_USAGE_OUTPUT_TOKENS,
564 i64::from(usage.output_tokens),
565 ));
566 span.set_attribute(attrs::kv_i64(
567 attrs::GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS,
568 i64::from(usage.cached_input_tokens),
569 ));
570 span.set_attribute(attrs::kv_i64(
571 attrs::GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS,
572 i64::from(usage.cache_creation_input_tokens),
573 ));
574 }
575 span.set_attribute(KeyValue::new(attrs::SDK_OUTCOME, outcome));
576 if outcome == "error" {
577 spans::set_span_error(span, "agent_error", "agent invocation failed");
578 }
579 span.end();
580}
581
582// ── W3C traceparent ⇄ Context propagation ────────────────────────────
583
584/// W3C `traceparent` header key. The only carrier entry the daemon
585/// needs to persist a parent span context durably (on `AgentTask`) and
586/// rebuild it in a later task / process.
587const TRACEPARENT_KEY: &str = "traceparent";
588
589/// Rebuild an `OTel` [`Context`] from a persisted W3C `traceparent`.
590///
591/// The daemon decouples the gRPC call that submits work from the worker
592/// that runs it (a durable task queue, possibly a different process), so
593/// the inbound client trace context cannot ride an in-memory
594/// [`Context`]. It is persisted as a `traceparent` string on the task
595/// and rebuilt here via the globally-installed
596/// [`opentelemetry::propagation::TextMapPropagator`]. Spans started while
597/// the returned context is attached become children of the encoded span.
598///
599/// Returns `None` when `traceparent` is empty, malformed, or no
600/// propagator is installed (the extracted context then carries no valid
601/// span) — the caller leaves its span un-parented, exactly as before
602/// this wiring.
603#[must_use]
604pub fn context_from_traceparent(traceparent: &str) -> Option<Context> {
605 if traceparent.is_empty() {
606 return None;
607 }
608 let mut carrier = HashMap::with_capacity(1);
609 carrier.insert(TRACEPARENT_KEY.to_string(), traceparent.to_string());
610 let cx = global::get_text_map_propagator(|propagator| propagator.extract(&carrier));
611 if cx.span().span_context().is_valid() {
612 Some(cx)
613 } else {
614 None
615 }
616}
617
618/// Encode hex trace + span ids into a W3C `traceparent` string.
619///
620/// Used to stamp a child tool task's parent span: the root
621/// `invoke_agent` span's `(trace_id, span_id)` — persisted on the turn
622/// attempt — is encoded here and stored on the child task's
623/// `otel_traceparent` so the child's `execute_tool` span nests under the
624/// turn root. Returns `None` for malformed / zero ids (validated via
625/// [`spans::remote_span_context`]).
626///
627/// **Legacy entry point — always sets the `-01` (SAMPLED) flag.** That
628/// forces the child task's `execute_tool` span to be recorded even when the
629/// root turn was sampled out. Prefer [`traceparent_from_ids_with_sampling`]
630/// and pass the root span's real sampled bit so ratio sampling is honoured.
631#[must_use]
632pub fn traceparent_from_ids(trace_id_hex: &str, span_id_hex: &str) -> Option<String> {
633 traceparent_from_ids_with_sampling(trace_id_hex, span_id_hex, true)
634}
635
636/// Encode hex trace + span ids into a W3C `traceparent`, stamping the
637/// root span's **real** sampled bit in the flag byte (`-01` when sampled,
638/// `-00` otherwise).
639///
640/// A downstream [`context_from_traceparent`] parses this flag through the
641/// global propagator, so encoding the true bit keeps the child's sampling
642/// decision aligned with the root instead of force-recording every child.
643/// Returns `None` for malformed / zero ids.
644#[must_use]
645pub fn traceparent_from_ids_with_sampling(
646 trace_id_hex: &str,
647 span_id_hex: &str,
648 sampled: bool,
649) -> Option<String> {
650 let span_context = spans::remote_span_context(trace_id_hex, span_id_hex)?;
651 let flag = if sampled { "01" } else { "00" };
652 Some(format!(
653 "00-{}-{}-{flag}",
654 span_context.trace_id(),
655 span_context.span_id()
656 ))
657}
658
659// ── Live root-span registry (correct cross-task duration) ────────────
660
661/// Process-global home for the live root-turn span between the task that
662/// opens it and the (possibly later) task that finalizes it.
663///
664/// The daemon drives one turn across several tokio tasks
665/// (execute → suspend at the tool boundary → resume), and `OTel` 0.32
666/// offers no way to assign a span id, so a root span cannot be re-minted
667/// on resume to cover the whole turn. Instead the worker stashes the
668/// *live* `invoke_agent` span here at the fresh turn and finalizes it at
669/// the terminal — even though that runs in a different task — so the span
670/// carries the **full** turn duration (it legitimately stays open while
671/// tools run). Children never read this map; they nest via the ids
672/// persisted on the turn attempt (see [`remote_parent_context`]), so the
673/// tree is correct regardless of whether the live span survives.
674///
675/// Keyed by `AgentTask` id (stringified). An entry that is never finalized
676/// in this process — the daemon crashed/restarted mid-turn, or (in a
677/// horizontally-scaled deployment) the resume landed on a different replica
678/// so a fresh process owns the terminal — would otherwise leak its span
679/// object forever. To bound that leak the registry is swept on every
680/// [`stash_root_turn_span`]: entries older than [`MAX_STASH_AGE`] (and the
681/// oldest entries once the map reaches [`MAX_LIVE_ROOT_SPANS`]) are ended
682/// with an `abandoned` outcome so they still export rather than vanish.
683static LIVE_ROOT_SPANS: LazyLock<Mutex<HashMap<String, StashedRootSpan>>> =
684 LazyLock::new(|| Mutex::new(HashMap::new()));
685
686/// A live root-turn span plus the instant it was stashed, so the registry
687/// can evict entries that were never finalized (see `LIVE_ROOT_SPANS`).
688struct StashedRootSpan {
689 span: BoxedSpan,
690 stashed_at: Instant,
691}
692
693/// Hard cap on the number of live root-turn spans retained at once. Once
694/// reached, the oldest entries are evicted (ended as `abandoned`) to make
695/// room. Sized generously: real concurrency is bounded by worker leases, so
696/// hitting this implies leaked (cross-replica / crashed) entries.
697const MAX_LIVE_ROOT_SPANS: usize = 1024;
698
699/// Maximum age a stashed root-turn span is kept before it is force-ended
700/// with an `abandoned` outcome. An upper bound on a single turn's
701/// wall-clock; anything older is a leaked entry whose terminal will never
702/// arrive in this process.
703const MAX_STASH_AGE: Duration = Duration::from_hours(1);
704
705/// Remove and return every entry older than [`MAX_STASH_AGE`]. The caller
706/// finalizes the returned spans (as `abandoned`) *after* releasing the
707/// registry lock so span / metric work never runs under it.
708fn take_stale(spans: &mut HashMap<String, StashedRootSpan>, now: Instant) -> Vec<StashedRootSpan> {
709 spans
710 .extract_if(|_, stashed| now.saturating_duration_since(stashed.stashed_at) >= MAX_STASH_AGE)
711 .map(|(_, stashed)| stashed)
712 .collect()
713}
714
715/// Remove and return the oldest entries until the map has room for one more
716/// entry under `cap`. The caller finalizes the returned spans.
717fn take_over_capacity(
718 spans: &mut HashMap<String, StashedRootSpan>,
719 cap: usize,
720) -> Vec<StashedRootSpan> {
721 let mut evicted = Vec::new();
722 while spans.len() >= cap {
723 let Some(oldest) = spans
724 .iter()
725 .min_by_key(|(_, stashed)| stashed.stashed_at)
726 .map(|(key, _)| key.clone())
727 else {
728 break;
729 };
730 if let Some(stashed) = spans.remove(&oldest) {
731 evicted.push(stashed);
732 }
733 }
734 evicted
735}
736
737/// Stash the live root-turn `span` under `task_id` so a later task can
738/// finalize it with the correct full duration (see `LIVE_ROOT_SPANS`).
739///
740/// First write wins: a retry that re-opens the same turn keeps the
741/// original span (and its start time) rather than resetting the clock.
742///
743/// Sweeps the registry first (TTL + capacity) so leaked entries from
744/// crashed / cross-replica resumes cannot grow the map without bound; any
745/// evicted span is ended with an `abandoned` outcome (see `LIVE_ROOT_SPANS`).
746pub fn stash_root_turn_span(task_id: &str, span: BoxedSpan) {
747 let evicted = {
748 let Ok(mut spans) = LIVE_ROOT_SPANS.lock() else {
749 log::warn!("live root-span registry poisoned; dropping root span for task {task_id}");
750 return;
751 };
752 let now = Instant::now();
753 let mut evicted = take_stale(&mut spans, now);
754 evicted.extend(take_over_capacity(&mut spans, MAX_LIVE_ROOT_SPANS));
755 spans.entry(task_id.to_string()).or_insert(StashedRootSpan {
756 span,
757 stashed_at: now,
758 });
759 evicted
760 };
761 // Finalize evicted (leaked) spans outside the registry lock so the
762 // metric + export work never runs while holding it. Ending them with an
763 // `abandoned` outcome keeps them in the trace rather than vanishing.
764 for mut stashed in evicted {
765 finish_root_turn_span(&mut stashed.span, 0, None, "abandoned");
766 }
767}
768
769/// Finalize and end the stashed root-turn span for `task_id`, stamping
770/// run-outcome + usage attributes (see [`finish_root_turn_span`]).
771///
772/// No-op when no span is stashed — the expected path when the daemon
773/// restarted mid-turn and a fresh process owns the terminal, or when the
774/// entry was already evicted by the registry's TTL / capacity sweep. The
775/// outcome counter is still recorded in that case so dashboards see every
776/// run.
777pub fn finalize_root_turn_span(
778 task_id: &str,
779 total_turns: usize,
780 total_usage: Option<&TokenUsage>,
781 outcome: &'static str,
782) {
783 let stashed = LIVE_ROOT_SPANS
784 .lock()
785 .ok()
786 .and_then(|mut spans| spans.remove(task_id));
787 match stashed {
788 Some(mut stashed) => {
789 finish_root_turn_span(&mut stashed.span, total_turns, total_usage, outcome);
790 }
791 None => {
792 // Live span lost (cross-restart resume). Still record the
793 // run-outcome counter so the metric isn't undercounted.
794 Metrics::global()
795 .runs_outcome
796 .add(1, &[KeyValue::new(attrs::SDK_OUTCOME, outcome)]);
797 }
798 }
799}
800
801/// Drop the stashed root-turn span for `task_id` without finalizing.
802///
803/// For paths that abandon a turn without a meaningful outcome (e.g. an
804/// idempotent duplicate-suspension bail) so the registry doesn't leak.
805pub fn discard_root_turn_span(task_id: &str) {
806 if let Ok(mut spans) = LIVE_ROOT_SPANS.lock() {
807 drop(spans.remove(task_id));
808 }
809}
810
811#[cfg(test)]
812mod tests {
813 use super::{
814 Duration, Instant, MAX_STASH_AGE, StashedRootSpan, classify_llm_error,
815 discard_root_turn_span, finalize_root_turn_span, spans, stash_root_turn_span,
816 take_over_capacity, take_stale, traceparent_from_ids, traceparent_from_ids_with_sampling,
817 };
818 use anyhow::Context as _;
819 use std::collections::HashMap;
820
821 // W3C example ids (RFC trace-context), both non-zero / valid.
822 const TRACE_HEX: &str = "4bf92f3577b34da6a3ce929d0e0e4736";
823 const SPAN_HEX: &str = "00f067aa0ba902b7";
824
825 #[test]
826 fn traceparent_from_valid_ids_is_w3c_sampled() {
827 let traceparent = traceparent_from_ids(TRACE_HEX, SPAN_HEX).expect("valid ids");
828 assert_eq!(traceparent, format!("00-{TRACE_HEX}-{SPAN_HEX}-01"));
829 }
830
831 #[test]
832 fn traceparent_encodes_real_sampled_bit() -> anyhow::Result<()> {
833 let sampled =
834 traceparent_from_ids_with_sampling(TRACE_HEX, SPAN_HEX, true).context("sampled")?;
835 assert!(sampled.ends_with("-01"), "sampled traceparent: {sampled}");
836 let unsampled =
837 traceparent_from_ids_with_sampling(TRACE_HEX, SPAN_HEX, false).context("unsampled")?;
838 assert!(
839 unsampled.ends_with("-00"),
840 "sampled-out root must not force -01: {unsampled}"
841 );
842 Ok(())
843 }
844
845 #[test]
846 fn traceparent_from_malformed_or_zero_ids_is_none() {
847 assert!(traceparent_from_ids("not-hex", SPAN_HEX).is_none());
848 assert!(traceparent_from_ids(TRACE_HEX, "tooshort").is_none());
849 // All-zero ids are rejected by `SpanContext::is_valid`.
850 assert!(traceparent_from_ids(&"0".repeat(32), &"0".repeat(16)).is_none());
851 }
852
853 #[test]
854 fn classify_llm_error_vocabulary_is_stable() {
855 // Pins the daemon-side `error.type` vocabulary so any drift from the
856 // in-process `agent_loop::turn::classify_llm_error` byte-for-byte
857 // mirror is caught here (the two copies are not yet deduplicated).
858 assert_eq!(
859 classify_llm_error("Rate limited: slow down"),
860 "rate_limited"
861 );
862 assert_eq!(
863 classify_llm_error("Invalid request: bad"),
864 "invalid_request"
865 );
866 assert_eq!(classify_llm_error("Server error 500"), "server_error");
867 assert_eq!(classify_llm_error("Stream closed early"), "stream_error");
868 assert_eq!(classify_llm_error("something else entirely"), "_OTHER");
869 }
870
871 #[test]
872 fn take_stale_removes_entries_past_ttl() {
873 let mut map: HashMap<String, StashedRootSpan> = HashMap::new();
874 // Anchor everything to a single base instant and advance "now"
875 // forward (rather than subtracting from `now`) so the test never
876 // underflows the monotonic clock on a freshly-booted machine.
877 let base = Instant::now();
878 map.insert(
879 "old".to_string(),
880 StashedRootSpan {
881 span: spans::start_internal_span("test", Vec::new()),
882 stashed_at: base,
883 },
884 );
885 map.insert(
886 "fresh".to_string(),
887 StashedRootSpan {
888 span: spans::start_internal_span("test", Vec::new()),
889 stashed_at: base + MAX_STASH_AGE,
890 },
891 );
892 let eval_now = base + MAX_STASH_AGE + Duration::from_secs(1);
893 let evicted = take_stale(&mut map, eval_now);
894 assert_eq!(evicted.len(), 1, "exactly the stale entry is evicted");
895 assert!(!map.contains_key("old"), "stale entry must be removed");
896 assert!(map.contains_key("fresh"), "fresh entry must survive");
897 }
898
899 #[test]
900 fn take_over_capacity_trims_oldest_to_make_room() {
901 let mut map: HashMap<String, StashedRootSpan> = HashMap::new();
902 for i in 0u64..4 {
903 map.insert(
904 format!("t{i}"),
905 StashedRootSpan {
906 span: spans::start_internal_span("test", Vec::new()),
907 stashed_at: Instant::now() + Duration::from_millis(i),
908 },
909 );
910 }
911 let evicted = take_over_capacity(&mut map, 2);
912 assert!(!evicted.is_empty(), "over-capacity entries must be evicted");
913 // Leaves room for one more under the cap.
914 assert!(
915 map.len() < 2,
916 "map should be trimmed below cap, got {}",
917 map.len()
918 );
919 }
920
921 #[test]
922 fn live_root_span_registry_finalize_is_idempotent() {
923 // No provider installed → a no-op span, but the registry's map
924 // management (stash → finalize removes the entry) is exercised
925 // regardless, and a second finalize must not panic.
926 let task_id = "registry-roundtrip-task";
927 stash_root_turn_span(task_id, spans::start_internal_span("test", Vec::new()));
928 finalize_root_turn_span(task_id, 1, None, "done");
929 finalize_root_turn_span(task_id, 1, None, "done");
930 }
931
932 #[test]
933 fn discard_removes_stashed_span_without_finalize() {
934 let task_id = "registry-discard-task";
935 stash_root_turn_span(task_id, spans::start_internal_span("test", Vec::new()));
936 discard_root_turn_span(task_id);
937 // Finalize after discard is a no-op (entry already gone).
938 finalize_root_turn_span(task_id, 0, None, "cancelled");
939 }
940}