Skip to main content

agent_sdk/observability/
loop_instrument.rs

1//! Reusable `chat` / `execute_tool` span + metric helpers.
2//!
3//! These primitives are shared by every agent-loop implementation so
4//! the daemon-hosted worker emits byte-identical telemetry to the
5//! in-process loop.
6//!
7//! ## Why this module exists
8//!
9//! The SDK ships two agent-loop implementations:
10//!
11//! * the in-process `agent_loop` (used by `AgentLoop` /
12//!   `BipAgent`), and
13//! * the daemon-hosted re-implementation in `agent-server`'s worker
14//!   (`crates/agent-server/src/worker/root_turn.rs` +
15//!   `agent-service-host`'s `registry_tool_executor.rs`).
16//!
17//! Both drive an LLM call and a tool dispatch, and both must emit the
18//! **same** `gen_ai.*` / `agent_sdk.*` telemetry so dashboards built
19//! against the in-process loop light up unchanged when a session runs
20//! on the daemon. Before this module the worker bypassed the SDK
21//! instrumentation entirely and emitted none of it.
22//!
23//! Rather than copy the span names, attribute keys, and metric label
24//! sets into `agent-server` (where they would silently drift), this
25//! module exposes the *exact* primitives the in-process loop uses:
26//!
27//! * [`build_chat_span`] / [`finish_chat_span_success`] /
28//!   [`finish_chat_span_error`] mirror `agent_loop::turn`'s
29//!   `build_llm_span` / `stamp_llm_success` / `stamp_llm_error`.
30//! * [`build_tool_span`] / [`finish_tool_span`] /
31//!   [`ToolSpanOutcome`] mirror `agent_loop::tool_execution`'s
32//!   `start_tool_span` / `finish_tool_span`.
33//!
34//! The metric recording underneath delegates to
35//! [`crate::observability::metrics::Metrics`] (the same global
36//! singleton), so there is a single source of truth for the label
37//! sets and no second meter scope.
38//!
39//! Compiled only with `feature = "otel"`.
40
41use std::collections::HashMap;
42use std::sync::{LazyLock, Mutex};
43use std::time::{Duration, Instant};
44
45use opentelemetry::global::BoxedSpan;
46use opentelemetry::trace::{Span, Status, TraceContextExt};
47use opentelemetry::{Context, KeyValue, global};
48
49use super::metrics::Metrics;
50use super::{attrs, baggage, langfuse, provider_name, spans};
51use crate::llm::{ChatResponse, StopReason};
52use crate::types::{TokenUsage, ToolTier};
53
54// ── Chat (LLM client) span ───────────────────────────────────────────
55
56/// Inputs needed to open a `chat {model}` CLIENT span.
57///
58/// Bundled so call sites stay readable and so the field set can grow
59/// (e.g. extra `gen_ai.request.*` attributes) without churning every
60/// caller. Mirrors the attributes set by `agent_loop::turn::build_llm_span`.
61#[derive(Clone, Copy)]
62pub struct ChatSpanParams<'a> {
63    /// Raw SDK provider id (e.g. `anthropic`, `openai-responses`).
64    /// Normalised to the `gen_ai.provider.name` semconv value
65    /// internally via [`provider_name::normalize`].
66    pub provider_id: &'static str,
67    /// Model the SDK is asking for (`gen_ai.request.model`).
68    pub model: &'a str,
69    /// Whether the call streams (`agent_sdk.llm.streaming`).
70    pub streaming: bool,
71    /// Configured output-token cap, if any
72    /// (`gen_ai.request.max_output_tokens`).
73    pub max_tokens: Option<u32>,
74}
75
76/// Open a `chat {model}` CLIENT span with the `gen_ai` semconv
77/// attributes known before the call.
78///
79/// Byte-for-byte mirror of `agent_loop::turn::build_llm_span`: same
80/// span name, same initial attribute set, same baggage copy, same
81/// Langfuse `generation` observation tag. Pair every successful call
82/// with [`finish_chat_span_success`] and every failure with
83/// [`finish_chat_span_error`].
84#[must_use]
85pub fn build_chat_span(params: ChatSpanParams<'_>) -> BoxedSpan {
86    let span_name = format!("chat {}", params.model);
87    let provider = provider_name::normalize(params.provider_id);
88    let mut init_attrs = vec![
89        KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "chat"),
90        KeyValue::new(attrs::GEN_AI_PROVIDER_NAME, provider),
91        KeyValue::new(attrs::GEN_AI_REQUEST_MODEL, params.model.to_string()),
92        attrs::kv_bool(attrs::SDK_LLM_STREAMING, params.streaming),
93        KeyValue::new(attrs::SDK_PROVIDER_ID, params.provider_id),
94    ];
95    if let Some(max_tokens) = params.max_tokens {
96        init_attrs.push(attrs::kv_i64(
97            attrs::GEN_AI_REQUEST_MAX_OUTPUT_TOKENS,
98            i64::from(max_tokens),
99        ));
100    }
101    let mut span = spans::start_client_span(span_name, init_attrs);
102    baggage::copy_baggage_to_active_span(&mut span);
103    langfuse::tag_observation(&mut span, langfuse::ObservationType::Generation);
104    span
105}
106
107/// Stamp a successful chat response onto `span`, record the
108/// `gen_ai.client.token.usage` + `gen_ai.client.operation.duration`
109/// metrics, and end the span.
110///
111/// Mirrors `agent_loop::turn::stamp_llm_success`: same response-model
112/// / id / finish-reason / usage / boolean attributes, same metric
113/// label sets (via [`Metrics::record_chat_token_usage`] +
114/// [`Metrics::record_chat_operation_duration_success`]).
115///
116/// `provider_id` is the raw SDK id; it is normalised internally so
117/// the metric `gen_ai.provider.name` label matches the span.
118pub fn finish_chat_span_success(
119    span: &mut BoxedSpan,
120    response: &ChatResponse,
121    elapsed_secs: f64,
122    provider_id: &'static str,
123    request_model: &str,
124) {
125    let provider = provider_name::normalize(provider_id);
126
127    if !response.id.is_empty() {
128        span.set_attribute(KeyValue::new(
129            attrs::GEN_AI_RESPONSE_ID,
130            response.id.clone(),
131        ));
132    }
133    span.set_attribute(KeyValue::new(
134        attrs::GEN_AI_RESPONSE_MODEL,
135        response.model.clone(),
136    ));
137    if let Some(reason) = response.stop_reason {
138        span.set_attribute(KeyValue::new(
139            attrs::GEN_AI_RESPONSE_FINISH_REASONS,
140            finish_reason_str(reason),
141        ));
142    }
143    span.set_attribute(attrs::kv_i64(
144        attrs::GEN_AI_USAGE_INPUT_TOKENS,
145        i64::from(response.usage.input_tokens),
146    ));
147    span.set_attribute(attrs::kv_i64(
148        attrs::GEN_AI_USAGE_OUTPUT_TOKENS,
149        i64::from(response.usage.output_tokens),
150    ));
151    span.set_attribute(attrs::kv_i64(
152        attrs::GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS,
153        i64::from(response.usage.cached_input_tokens),
154    ));
155    span.set_attribute(attrs::kv_i64(
156        attrs::GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS,
157        i64::from(response.usage.cache_creation_input_tokens),
158    ));
159    span.set_attribute(attrs::kv_bool(
160        attrs::SDK_LLM_HAD_TOOL_CALLS,
161        response.has_tool_use(),
162    ));
163    span.set_attribute(attrs::kv_bool(
164        attrs::SDK_LLM_TEXT_OUTPUT_PRESENT,
165        response.first_text().is_some(),
166    ));
167    span.set_attribute(attrs::kv_bool(
168        attrs::SDK_LLM_THINKING_PRESENT,
169        response.first_thinking().is_some(),
170    ));
171
172    let metrics = Metrics::global();
173    metrics.record_chat_token_usage(&response.usage, provider, request_model, &response.model);
174    metrics.record_chat_operation_duration_success(
175        elapsed_secs,
176        provider,
177        request_model,
178        &response.model,
179    );
180
181    span.end();
182}
183
184/// Stamp a chat error onto `span`, record the
185/// `gen_ai.client.operation.duration` metric with the `error.type`
186/// label, and end the span.
187///
188/// Mirrors `agent_loop::turn::stamp_llm_error`. `error_type` must be a
189/// stable, low-cardinality string (e.g. `rate_limited`,
190/// `server_error`, `invalid_request`, `stream_error`); use
191/// [`classify_llm_error`] for free-form provider error messages so the
192/// vocabulary matches the in-process loop exactly.
193pub fn finish_chat_span_error(
194    span: &mut BoxedSpan,
195    error_type: &'static str,
196    message: &str,
197    elapsed_secs: f64,
198    provider_id: &'static str,
199    request_model: &str,
200) {
201    let provider = provider_name::normalize(provider_id);
202    spans::set_span_error(span, error_type, message);
203    Metrics::global().record_chat_operation_duration_error(
204        elapsed_secs,
205        provider,
206        request_model,
207        error_type,
208    );
209    span.end();
210}
211
212/// Map a free-form LLM error message to the stable `error.type`
213/// attribute / metric label.
214///
215/// Byte-for-byte mirror of `agent_loop::turn::classify_llm_error` so
216/// the daemon worker and the in-process loop bucket transient failures
217/// identically.
218#[must_use]
219pub fn classify_llm_error(msg: &str) -> &'static str {
220    if msg.contains("Rate limited") {
221        "rate_limited"
222    } else if msg.contains("Invalid request") {
223        "invalid_request"
224    } else if msg.contains("Server error") {
225        "server_error"
226    } else if msg.contains("Stream") {
227        "stream_error"
228    } else {
229        "_OTHER"
230    }
231}
232
233/// Map an SDK [`StopReason`] to the semconv `finish_reason` string.
234///
235/// Mirrors `attrs::finish_reason_str` (kept here too so callers that
236/// only import this module get a consistent vocabulary).
237#[must_use]
238pub const fn finish_reason_str(reason: StopReason) -> &'static str {
239    attrs::finish_reason_str(reason)
240}
241
242// ── Tool execution span ──────────────────────────────────────────────
243
244/// Inputs needed to open an `execute_tool` INTERNAL span.
245///
246/// Mirrors the attributes set by
247/// `agent_loop::tool_execution::start_tool_span`.
248#[derive(Clone, Copy)]
249pub struct ToolSpanParams<'a> {
250    /// `gen_ai.tool.name` — the protocol tool name.
251    pub tool_name: &'a str,
252    /// `gen_ai.tool.call.id` — the LLM-assigned call id.
253    pub tool_call_id: &'a str,
254    /// `agent_sdk.tool.display_name`; skipped when empty.
255    pub display_name: &'a str,
256    /// `agent_sdk.tool.tier`; `None` when the tool is unknown to the
257    /// registry (matches the in-process loop's `unknown`-kind path,
258    /// which omits the tier attribute).
259    pub tier: Option<ToolTier>,
260    /// `agent_sdk.tool.kind` — `sync` / `async` / `listen` /
261    /// `unknown`.
262    pub kind: &'static str,
263}
264
265/// Open an `execute_tool` INTERNAL span.
266///
267/// Byte-for-byte mirror of
268/// `agent_loop::tool_execution::start_tool_span`: same span name, same
269/// attribute set (operation name, tool name, call id, optional display
270/// name, optional tier, kind), same baggage copy, same Langfuse `tool`
271/// observation tag. Finish with [`finish_tool_span`].
272#[must_use]
273pub fn build_tool_span(params: ToolSpanParams<'_>) -> BoxedSpan {
274    let mut span_attrs = vec![
275        KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "execute_tool"),
276        KeyValue::new(attrs::GEN_AI_TOOL_NAME, params.tool_name.to_string()),
277        KeyValue::new(attrs::GEN_AI_TOOL_CALL_ID, params.tool_call_id.to_string()),
278    ];
279    if !params.display_name.is_empty() {
280        span_attrs.push(KeyValue::new(
281            attrs::SDK_TOOL_DISPLAY_NAME,
282            params.display_name.to_string(),
283        ));
284    }
285    if let Some(tier) = params.tier {
286        span_attrs.push(KeyValue::new(
287            attrs::SDK_TOOL_TIER,
288            attrs::tool_tier_str(tier),
289        ));
290    }
291    span_attrs.push(KeyValue::new(attrs::SDK_TOOL_KIND, params.kind));
292
293    let mut span = spans::start_internal_span("execute_tool", span_attrs);
294    baggage::copy_baggage_to_active_span(&mut span);
295    langfuse::tag_observation(&mut span, langfuse::ObservationType::Tool);
296    span
297}
298
299/// Terminal outcome of a tool execution, used to stamp the outcome
300/// attributes + metric labels on an `execute_tool` span.
301///
302/// Mirrors the match arms of
303/// `agent_loop::tool_execution::finish_tool_span` so the
304/// `agent_sdk.tool.outcome` value and `error.type` value match across
305/// loops.
306pub enum ToolSpanOutcome<'a> {
307    /// Tool ran (or was short-circuited) and produced a result. The
308    /// outcome string is derived from the result body / success flag
309    /// exactly as the in-process loop does.
310    Completed {
311        /// Tool output body; inspected for the `Unknown tool:` /
312        /// `Blocked:` / `Rejected:` sentinels.
313        output: &'a str,
314        /// Whether the tool reported success.
315        success: bool,
316        /// Wall-clock duration in milliseconds, if measured.
317        duration_ms: Option<u64>,
318    },
319    /// Tool requires user confirmation before running.
320    AwaitingConfirmation,
321    /// The execution boundary itself errored (e.g. event-store commit
322    /// failure). Recorded with `error.type = event_store`.
323    EventStoreError {
324        /// Error message for the span status.
325        message: &'a str,
326    },
327}
328
329/// Stamp the terminal outcome on `span`, record the
330/// `agent_sdk.tools.execution.{count,duration}` metrics, and end the
331/// span.
332///
333/// Byte-for-byte mirror of
334/// `agent_loop::tool_execution::finish_tool_span`'s outcome handling +
335/// metric recording (via [`Metrics::record_tool_execution`]).
336pub fn finish_tool_span(
337    span: &mut BoxedSpan,
338    tool_name: &str,
339    tool_kind: &'static str,
340    outcome: &ToolSpanOutcome<'_>,
341) {
342    let (outcome_str, duration_ms) = match outcome {
343        ToolSpanOutcome::Completed {
344            output,
345            success,
346            duration_ms,
347        } => {
348            let outcome_str = if output.starts_with("Unknown tool:") {
349                span.set_attribute(KeyValue::new(attrs::ERROR_TYPE, "unknown_tool"));
350                span.set_status(Status::error((*output).to_string()));
351                "error"
352            } else if output.starts_with("Blocked:") {
353                "blocked"
354            } else if output.starts_with("Rejected:") {
355                "rejected"
356            } else if *success {
357                "success"
358            } else {
359                "error"
360            };
361            span.set_attribute(KeyValue::new(attrs::SDK_TOOL_OUTCOME, outcome_str));
362            if let Some(ms) = duration_ms {
363                span.set_attribute(attrs::kv_i64(
364                    attrs::SDK_TOOL_DURATION_MS,
365                    i64::try_from(*ms).unwrap_or(i64::MAX),
366                ));
367            }
368            (outcome_str, *duration_ms)
369        }
370        ToolSpanOutcome::AwaitingConfirmation => {
371            span.set_attribute(attrs::kv_bool(attrs::SDK_TOOL_CONFIRMATION_REQUIRED, true));
372            span.set_attribute(KeyValue::new(
373                attrs::SDK_TOOL_OUTCOME,
374                "awaiting_confirmation",
375            ));
376            ("awaiting_confirmation", None)
377        }
378        ToolSpanOutcome::EventStoreError { message } => {
379            span.set_attribute(KeyValue::new(attrs::ERROR_TYPE, "event_store"));
380            span.set_status(Status::error((*message).to_string()));
381            span.set_attribute(KeyValue::new(attrs::SDK_TOOL_OUTCOME, "error"));
382            ("error", None)
383        }
384    };
385
386    Metrics::global().record_tool_execution(tool_name, tool_kind, outcome_str, duration_ms);
387    span.end();
388}
389
390// ── Root turn span (daemon cross-task) ───────────────────────────────
391
392/// Inputs needed to open the daemon's root `invoke_agent` span.
393///
394/// The daemon-hosted worker drives one turn across multiple tokio tasks
395/// (execute → suspend at the tool boundary → resume). Unlike the
396/// in-process loop's `agent_loop` root span — which keeps a single live
397/// span wrapping the whole run future — the worker creates this span
398/// live on the FRESH turn, captures its `(trace_id, span_id)` to persist
399/// on the turn attempt, and re-parents every later span (resumed `chat`
400/// calls, child-task `execute_tool` calls) under those ids via
401/// [`remote_parent_context`].
402#[derive(Clone, Copy)]
403pub struct RootTurnSpanParams<'a> {
404    /// Raw SDK provider id (e.g. `anthropic`); normalised to the
405    /// `gen_ai.provider.name` semconv value internally.
406    pub provider_id: &'static str,
407    /// Model driving the turn (`gen_ai.request.model`).
408    pub model: &'a str,
409    /// Thread / conversation id (`gen_ai.conversation.id`).
410    pub conversation_id: &'a str,
411}
412
413/// A started root-turn span plus the hex ids the worker persists so the
414/// turn's later tasks can re-parent their spans under it.
415pub struct StartedRootTurnSpan {
416    /// The live `invoke_agent` span.
417    ///
418    /// Canonical lifecycle: hand this span straight to
419    /// [`stash_root_turn_span`] (keyed by task id) and finalize it later —
420    /// from whichever task reaches the terminal — via
421    /// [`finalize_root_turn_span`], so the exported span carries the
422    /// **full** turn duration across the suspend/resume hop. Only callers
423    /// that deliberately opt out of the registry hold this span directly
424    /// and finish it with [`finish_root_turn_span`] (or let it end on drop,
425    /// which truncates the duration to the fresh segment only).
426    pub span: BoxedSpan,
427    /// Hex-encoded `TraceId` — persist to
428    /// `agent_sdk_turn_attempts.otel_trace_id`.
429    pub trace_id_hex: String,
430    /// Hex-encoded root `SpanId` — persist to
431    /// `agent_sdk_turn_attempts.otel_span_id`.
432    pub span_id_hex: String,
433    /// The root span's real sampled bit, captured from its `SpanContext`
434    /// at creation.
435    ///
436    /// Persist this alongside the ids and pass it to
437    /// [`remote_parent_context_with_sampling`] /
438    /// [`traceparent_from_ids_with_sampling`] when re-parenting resumed
439    /// `chat` calls and child `execute_tool` spans, so those children
440    /// honour ratio sampling instead of being force-recorded under a
441    /// sampled-out root.
442    pub sampled: bool,
443}
444
445/// Start the daemon's root `invoke_agent` INTERNAL span and capture the
446/// ids the worker persists so the turn's later tasks can nest under it.
447///
448/// Mirrors the structural / `gen_ai.*` attributes of the in-process
449/// loop's root span (operation name, provider, model, conversation id)
450/// with the minimal parameter set the daemon worker supplies cheaply,
451/// and tags the same Langfuse `agent` observation.
452///
453/// # Lifecycle
454///
455/// The `OTel` 0.32 `SpanBuilder` exposes no way to assign a span id, so a
456/// single span cannot be reopened on resume to cover the whole turn.
457/// The **canonical** pattern is therefore:
458///
459/// 1. Persist [`StartedRootTurnSpan::trace_id_hex`] /
460///    [`StartedRootTurnSpan::span_id_hex`] (and
461///    [`StartedRootTurnSpan::sampled`]) on the turn attempt so later tasks
462///    re-parent under the root via [`remote_parent_context_with_sampling`].
463/// 2. Hand [`StartedRootTurnSpan::span`] to [`stash_root_turn_span`]
464///    immediately, then [`finalize_root_turn_span`] it from whichever task
465///    reaches the terminal — even across tasks — so the span keeps the
466///    full turn duration (it legitimately stays open while tools run).
467///
468/// End-on-drop is reserved for callers that deliberately opt out of the
469/// registry; for them the span's duration covers only the fresh segment.
470/// A custom worker that skips [`stash_root_turn_span`] will export
471/// truncated root spans.
472#[must_use]
473pub fn start_root_turn_span(params: RootTurnSpanParams<'_>) -> StartedRootTurnSpan {
474    let provider = provider_name::normalize(params.provider_id);
475    let span_attrs = vec![
476        KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "invoke_agent"),
477        KeyValue::new(attrs::GEN_AI_PROVIDER_NAME, provider),
478        KeyValue::new(attrs::GEN_AI_REQUEST_MODEL, params.model.to_string()),
479        KeyValue::new(
480            attrs::GEN_AI_CONVERSATION_ID,
481            params.conversation_id.to_string(),
482        ),
483        KeyValue::new(attrs::SDK_PROVIDER_ID, params.provider_id),
484    ];
485    let mut span = spans::start_internal_span("invoke_agent", span_attrs);
486    baggage::copy_baggage_to_active_span(&mut span);
487    langfuse::tag_observation(&mut span, langfuse::ObservationType::Agent);
488    let span_context = span.span_context().clone();
489    StartedRootTurnSpan {
490        trace_id_hex: span_context.trace_id().to_string(),
491        span_id_hex: span_context.span_id().to_string(),
492        sampled: span_context.is_sampled(),
493        span,
494    }
495}
496
497/// Build a [`Context`] whose active parent is the root-turn span
498/// identified by the hex ids persisted on the turn attempt.
499///
500/// Spans created while this context is attached become children of the
501/// root span. This is how the worker re-parents resumed `chat` calls and
502/// child-task `execute_tool` calls under the turn root after the original
503/// live span has gone (the worker suspended, or a fresh task / process
504/// picked the turn up). Returns `None` when the ids are absent or
505/// malformed — the caller then leaves the span un-parented (a new trace),
506/// exactly as before this wiring.
507#[must_use]
508pub fn remote_parent_context(trace_id_hex: &str, span_id_hex: &str) -> Option<Context> {
509    let span_context = spans::remote_span_context(trace_id_hex, span_id_hex)?;
510    Some(Context::current().with_remote_span_context(span_context))
511}
512
513/// Re-parent a child onto a remote span, preserving the root's real sampled bit.
514///
515/// Like [`remote_parent_context`] but propagates the root span's **real**
516/// sampled bit (see [`StartedRootTurnSpan::sampled`]) instead of forcing
517/// SAMPLED, so a `ParentBased` sampler keeps or drops the re-parented child
518/// to match the root's sampling decision. Use this on the resume / child
519/// tool paths to stop ratio sampling being silently defeated for daemon
520/// workloads. Returns `None` when the ids are absent or malformed.
521#[must_use]
522pub fn remote_parent_context_with_sampling(
523    trace_id_hex: &str,
524    span_id_hex: &str,
525    sampled: bool,
526) -> Option<Context> {
527    let span_context =
528        spans::remote_span_context_with_sampling(trace_id_hex, span_id_hex, sampled)?;
529    Some(Context::current().with_remote_span_context(span_context))
530}
531
532/// Finalize the root-turn span with run-outcome attributes + the
533/// `agent_sdk.runs.outcome` counter, then end it.
534///
535/// Mirrors `agent_loop`'s `end_root_span`: same total-turns / usage /
536/// outcome attribute set and the same outcome counter. Reached on the
537/// text-only terminal path where the worker still holds the live span;
538/// tool turns end the span on drop at the suspend boundary (see
539/// [`start_root_turn_span`]).
540pub fn finish_root_turn_span(
541    span: &mut BoxedSpan,
542    total_turns: usize,
543    total_usage: Option<&TokenUsage>,
544    outcome: &'static str,
545) {
546    Metrics::global()
547        .runs_outcome
548        .add(1, &[KeyValue::new(attrs::SDK_OUTCOME, outcome)]);
549    span.set_attribute(attrs::kv_i64(
550        attrs::SDK_TOTAL_TURNS,
551        i64::try_from(total_turns).unwrap_or(0),
552    ));
553    // Usage is optional: the daemon does not aggregate per-turn token
554    // usage across the suspend/resume hop, so it passes `None` and the
555    // per-call `chat` spans carry the authoritative usage. The in-process
556    // loop passes the cumulative `TokenUsage`.
557    if let Some(usage) = total_usage {
558        span.set_attribute(attrs::kv_i64(
559            attrs::GEN_AI_USAGE_INPUT_TOKENS,
560            i64::from(usage.input_tokens),
561        ));
562        span.set_attribute(attrs::kv_i64(
563            attrs::GEN_AI_USAGE_OUTPUT_TOKENS,
564            i64::from(usage.output_tokens),
565        ));
566        span.set_attribute(attrs::kv_i64(
567            attrs::GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS,
568            i64::from(usage.cached_input_tokens),
569        ));
570        span.set_attribute(attrs::kv_i64(
571            attrs::GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS,
572            i64::from(usage.cache_creation_input_tokens),
573        ));
574    }
575    span.set_attribute(KeyValue::new(attrs::SDK_OUTCOME, outcome));
576    if outcome == "error" {
577        spans::set_span_error(span, "agent_error", "agent invocation failed");
578    }
579    span.end();
580}
581
582// ── W3C traceparent ⇄ Context propagation ────────────────────────────
583
584/// W3C `traceparent` header key. The only carrier entry the daemon
585/// needs to persist a parent span context durably (on `AgentTask`) and
586/// rebuild it in a later task / process.
587const TRACEPARENT_KEY: &str = "traceparent";
588
589/// Rebuild an `OTel` [`Context`] from a persisted W3C `traceparent`.
590///
591/// The daemon decouples the gRPC call that submits work from the worker
592/// that runs it (a durable task queue, possibly a different process), so
593/// the inbound client trace context cannot ride an in-memory
594/// [`Context`]. It is persisted as a `traceparent` string on the task
595/// and rebuilt here via the globally-installed
596/// [`opentelemetry::propagation::TextMapPropagator`]. Spans started while
597/// the returned context is attached become children of the encoded span.
598///
599/// Returns `None` when `traceparent` is empty, malformed, or no
600/// propagator is installed (the extracted context then carries no valid
601/// span) — the caller leaves its span un-parented, exactly as before
602/// this wiring.
603#[must_use]
604pub fn context_from_traceparent(traceparent: &str) -> Option<Context> {
605    if traceparent.is_empty() {
606        return None;
607    }
608    let mut carrier = HashMap::with_capacity(1);
609    carrier.insert(TRACEPARENT_KEY.to_string(), traceparent.to_string());
610    let cx = global::get_text_map_propagator(|propagator| propagator.extract(&carrier));
611    if cx.span().span_context().is_valid() {
612        Some(cx)
613    } else {
614        None
615    }
616}
617
618/// Encode hex trace + span ids into a W3C `traceparent` string.
619///
620/// Used to stamp a child tool task's parent span: the root
621/// `invoke_agent` span's `(trace_id, span_id)` — persisted on the turn
622/// attempt — is encoded here and stored on the child task's
623/// `otel_traceparent` so the child's `execute_tool` span nests under the
624/// turn root. Returns `None` for malformed / zero ids (validated via
625/// [`spans::remote_span_context`]).
626///
627/// **Legacy entry point — always sets the `-01` (SAMPLED) flag.** That
628/// forces the child task's `execute_tool` span to be recorded even when the
629/// root turn was sampled out. Prefer [`traceparent_from_ids_with_sampling`]
630/// and pass the root span's real sampled bit so ratio sampling is honoured.
631#[must_use]
632pub fn traceparent_from_ids(trace_id_hex: &str, span_id_hex: &str) -> Option<String> {
633    traceparent_from_ids_with_sampling(trace_id_hex, span_id_hex, true)
634}
635
636/// Encode hex trace + span ids into a W3C `traceparent`, stamping the
637/// root span's **real** sampled bit in the flag byte (`-01` when sampled,
638/// `-00` otherwise).
639///
640/// A downstream [`context_from_traceparent`] parses this flag through the
641/// global propagator, so encoding the true bit keeps the child's sampling
642/// decision aligned with the root instead of force-recording every child.
643/// Returns `None` for malformed / zero ids.
644#[must_use]
645pub fn traceparent_from_ids_with_sampling(
646    trace_id_hex: &str,
647    span_id_hex: &str,
648    sampled: bool,
649) -> Option<String> {
650    let span_context = spans::remote_span_context(trace_id_hex, span_id_hex)?;
651    let flag = if sampled { "01" } else { "00" };
652    Some(format!(
653        "00-{}-{}-{flag}",
654        span_context.trace_id(),
655        span_context.span_id()
656    ))
657}
658
659// ── Live root-span registry (correct cross-task duration) ────────────
660
661/// Process-global home for the live root-turn span between the task that
662/// opens it and the (possibly later) task that finalizes it.
663///
664/// The daemon drives one turn across several tokio tasks
665/// (execute → suspend at the tool boundary → resume), and `OTel` 0.32
666/// offers no way to assign a span id, so a root span cannot be re-minted
667/// on resume to cover the whole turn. Instead the worker stashes the
668/// *live* `invoke_agent` span here at the fresh turn and finalizes it at
669/// the terminal — even though that runs in a different task — so the span
670/// carries the **full** turn duration (it legitimately stays open while
671/// tools run). Children never read this map; they nest via the ids
672/// persisted on the turn attempt (see [`remote_parent_context`]), so the
673/// tree is correct regardless of whether the live span survives.
674///
675/// Keyed by `AgentTask` id (stringified). An entry that is never finalized
676/// in this process — the daemon crashed/restarted mid-turn, or (in a
677/// horizontally-scaled deployment) the resume landed on a different replica
678/// so a fresh process owns the terminal — would otherwise leak its span
679/// object forever. To bound that leak the registry is swept on every
680/// [`stash_root_turn_span`]: entries older than [`MAX_STASH_AGE`] (and the
681/// oldest entries once the map reaches [`MAX_LIVE_ROOT_SPANS`]) are ended
682/// with an `abandoned` outcome so they still export rather than vanish.
683static LIVE_ROOT_SPANS: LazyLock<Mutex<HashMap<String, StashedRootSpan>>> =
684    LazyLock::new(|| Mutex::new(HashMap::new()));
685
686/// A live root-turn span plus the instant it was stashed, so the registry
687/// can evict entries that were never finalized (see `LIVE_ROOT_SPANS`).
688struct StashedRootSpan {
689    span: BoxedSpan,
690    stashed_at: Instant,
691}
692
693/// Hard cap on the number of live root-turn spans retained at once. Once
694/// reached, the oldest entries are evicted (ended as `abandoned`) to make
695/// room. Sized generously: real concurrency is bounded by worker leases, so
696/// hitting this implies leaked (cross-replica / crashed) entries.
697const MAX_LIVE_ROOT_SPANS: usize = 1024;
698
699/// Maximum age a stashed root-turn span is kept before it is force-ended
700/// with an `abandoned` outcome. An upper bound on a single turn's
701/// wall-clock; anything older is a leaked entry whose terminal will never
702/// arrive in this process.
703const MAX_STASH_AGE: Duration = Duration::from_hours(1);
704
705/// Remove and return every entry older than [`MAX_STASH_AGE`]. The caller
706/// finalizes the returned spans (as `abandoned`) *after* releasing the
707/// registry lock so span / metric work never runs under it.
708fn take_stale(spans: &mut HashMap<String, StashedRootSpan>, now: Instant) -> Vec<StashedRootSpan> {
709    spans
710        .extract_if(|_, stashed| now.saturating_duration_since(stashed.stashed_at) >= MAX_STASH_AGE)
711        .map(|(_, stashed)| stashed)
712        .collect()
713}
714
715/// Remove and return the oldest entries until the map has room for one more
716/// entry under `cap`. The caller finalizes the returned spans.
717fn take_over_capacity(
718    spans: &mut HashMap<String, StashedRootSpan>,
719    cap: usize,
720) -> Vec<StashedRootSpan> {
721    let mut evicted = Vec::new();
722    while spans.len() >= cap {
723        let Some(oldest) = spans
724            .iter()
725            .min_by_key(|(_, stashed)| stashed.stashed_at)
726            .map(|(key, _)| key.clone())
727        else {
728            break;
729        };
730        if let Some(stashed) = spans.remove(&oldest) {
731            evicted.push(stashed);
732        }
733    }
734    evicted
735}
736
737/// Stash the live root-turn `span` under `task_id` so a later task can
738/// finalize it with the correct full duration (see `LIVE_ROOT_SPANS`).
739///
740/// First write wins: a retry that re-opens the same turn keeps the
741/// original span (and its start time) rather than resetting the clock.
742///
743/// Sweeps the registry first (TTL + capacity) so leaked entries from
744/// crashed / cross-replica resumes cannot grow the map without bound; any
745/// evicted span is ended with an `abandoned` outcome (see `LIVE_ROOT_SPANS`).
746pub fn stash_root_turn_span(task_id: &str, span: BoxedSpan) {
747    let evicted = {
748        let Ok(mut spans) = LIVE_ROOT_SPANS.lock() else {
749            log::warn!("live root-span registry poisoned; dropping root span for task {task_id}");
750            return;
751        };
752        let now = Instant::now();
753        let mut evicted = take_stale(&mut spans, now);
754        evicted.extend(take_over_capacity(&mut spans, MAX_LIVE_ROOT_SPANS));
755        spans.entry(task_id.to_string()).or_insert(StashedRootSpan {
756            span,
757            stashed_at: now,
758        });
759        evicted
760    };
761    // Finalize evicted (leaked) spans outside the registry lock so the
762    // metric + export work never runs while holding it. Ending them with an
763    // `abandoned` outcome keeps them in the trace rather than vanishing.
764    for mut stashed in evicted {
765        finish_root_turn_span(&mut stashed.span, 0, None, "abandoned");
766    }
767}
768
769/// Finalize and end the stashed root-turn span for `task_id`, stamping
770/// run-outcome + usage attributes (see [`finish_root_turn_span`]).
771///
772/// No-op when no span is stashed — the expected path when the daemon
773/// restarted mid-turn and a fresh process owns the terminal, or when the
774/// entry was already evicted by the registry's TTL / capacity sweep. The
775/// outcome counter is still recorded in that case so dashboards see every
776/// run.
777pub fn finalize_root_turn_span(
778    task_id: &str,
779    total_turns: usize,
780    total_usage: Option<&TokenUsage>,
781    outcome: &'static str,
782) {
783    let stashed = LIVE_ROOT_SPANS
784        .lock()
785        .ok()
786        .and_then(|mut spans| spans.remove(task_id));
787    match stashed {
788        Some(mut stashed) => {
789            finish_root_turn_span(&mut stashed.span, total_turns, total_usage, outcome);
790        }
791        None => {
792            // Live span lost (cross-restart resume). Still record the
793            // run-outcome counter so the metric isn't undercounted.
794            Metrics::global()
795                .runs_outcome
796                .add(1, &[KeyValue::new(attrs::SDK_OUTCOME, outcome)]);
797        }
798    }
799}
800
801/// Drop the stashed root-turn span for `task_id` without finalizing.
802///
803/// For paths that abandon a turn without a meaningful outcome (e.g. an
804/// idempotent duplicate-suspension bail) so the registry doesn't leak.
805pub fn discard_root_turn_span(task_id: &str) {
806    if let Ok(mut spans) = LIVE_ROOT_SPANS.lock() {
807        drop(spans.remove(task_id));
808    }
809}
810
811#[cfg(test)]
812mod tests {
813    use super::{
814        Duration, Instant, MAX_STASH_AGE, StashedRootSpan, classify_llm_error,
815        discard_root_turn_span, finalize_root_turn_span, spans, stash_root_turn_span,
816        take_over_capacity, take_stale, traceparent_from_ids, traceparent_from_ids_with_sampling,
817    };
818    use anyhow::Context as _;
819    use std::collections::HashMap;
820
821    // W3C example ids (RFC trace-context), both non-zero / valid.
822    const TRACE_HEX: &str = "4bf92f3577b34da6a3ce929d0e0e4736";
823    const SPAN_HEX: &str = "00f067aa0ba902b7";
824
825    #[test]
826    fn traceparent_from_valid_ids_is_w3c_sampled() {
827        let traceparent = traceparent_from_ids(TRACE_HEX, SPAN_HEX).expect("valid ids");
828        assert_eq!(traceparent, format!("00-{TRACE_HEX}-{SPAN_HEX}-01"));
829    }
830
831    #[test]
832    fn traceparent_encodes_real_sampled_bit() -> anyhow::Result<()> {
833        let sampled =
834            traceparent_from_ids_with_sampling(TRACE_HEX, SPAN_HEX, true).context("sampled")?;
835        assert!(sampled.ends_with("-01"), "sampled traceparent: {sampled}");
836        let unsampled =
837            traceparent_from_ids_with_sampling(TRACE_HEX, SPAN_HEX, false).context("unsampled")?;
838        assert!(
839            unsampled.ends_with("-00"),
840            "sampled-out root must not force -01: {unsampled}"
841        );
842        Ok(())
843    }
844
845    #[test]
846    fn traceparent_from_malformed_or_zero_ids_is_none() {
847        assert!(traceparent_from_ids("not-hex", SPAN_HEX).is_none());
848        assert!(traceparent_from_ids(TRACE_HEX, "tooshort").is_none());
849        // All-zero ids are rejected by `SpanContext::is_valid`.
850        assert!(traceparent_from_ids(&"0".repeat(32), &"0".repeat(16)).is_none());
851    }
852
853    #[test]
854    fn classify_llm_error_vocabulary_is_stable() {
855        // Pins the daemon-side `error.type` vocabulary so any drift from the
856        // in-process `agent_loop::turn::classify_llm_error` byte-for-byte
857        // mirror is caught here (the two copies are not yet deduplicated).
858        assert_eq!(
859            classify_llm_error("Rate limited: slow down"),
860            "rate_limited"
861        );
862        assert_eq!(
863            classify_llm_error("Invalid request: bad"),
864            "invalid_request"
865        );
866        assert_eq!(classify_llm_error("Server error 500"), "server_error");
867        assert_eq!(classify_llm_error("Stream closed early"), "stream_error");
868        assert_eq!(classify_llm_error("something else entirely"), "_OTHER");
869    }
870
871    #[test]
872    fn take_stale_removes_entries_past_ttl() {
873        let mut map: HashMap<String, StashedRootSpan> = HashMap::new();
874        // Anchor everything to a single base instant and advance "now"
875        // forward (rather than subtracting from `now`) so the test never
876        // underflows the monotonic clock on a freshly-booted machine.
877        let base = Instant::now();
878        map.insert(
879            "old".to_string(),
880            StashedRootSpan {
881                span: spans::start_internal_span("test", Vec::new()),
882                stashed_at: base,
883            },
884        );
885        map.insert(
886            "fresh".to_string(),
887            StashedRootSpan {
888                span: spans::start_internal_span("test", Vec::new()),
889                stashed_at: base + MAX_STASH_AGE,
890            },
891        );
892        let eval_now = base + MAX_STASH_AGE + Duration::from_secs(1);
893        let evicted = take_stale(&mut map, eval_now);
894        assert_eq!(evicted.len(), 1, "exactly the stale entry is evicted");
895        assert!(!map.contains_key("old"), "stale entry must be removed");
896        assert!(map.contains_key("fresh"), "fresh entry must survive");
897    }
898
899    #[test]
900    fn take_over_capacity_trims_oldest_to_make_room() {
901        let mut map: HashMap<String, StashedRootSpan> = HashMap::new();
902        for i in 0u64..4 {
903            map.insert(
904                format!("t{i}"),
905                StashedRootSpan {
906                    span: spans::start_internal_span("test", Vec::new()),
907                    stashed_at: Instant::now() + Duration::from_millis(i),
908                },
909            );
910        }
911        let evicted = take_over_capacity(&mut map, 2);
912        assert!(!evicted.is_empty(), "over-capacity entries must be evicted");
913        // Leaves room for one more under the cap.
914        assert!(
915            map.len() < 2,
916            "map should be trimmed below cap, got {}",
917            map.len()
918        );
919    }
920
921    #[test]
922    fn live_root_span_registry_finalize_is_idempotent() {
923        // No provider installed → a no-op span, but the registry's map
924        // management (stash → finalize removes the entry) is exercised
925        // regardless, and a second finalize must not panic.
926        let task_id = "registry-roundtrip-task";
927        stash_root_turn_span(task_id, spans::start_internal_span("test", Vec::new()));
928        finalize_root_turn_span(task_id, 1, None, "done");
929        finalize_root_turn_span(task_id, 1, None, "done");
930    }
931
932    #[test]
933    fn discard_removes_stashed_span_without_finalize() {
934        let task_id = "registry-discard-task";
935        stash_root_turn_span(task_id, spans::start_internal_span("test", Vec::new()));
936        discard_root_turn_span(task_id);
937        // Finalize after discard is a no-op (entry already gone).
938        finalize_root_turn_span(task_id, 0, None, "cancelled");
939    }
940}