Skip to main content

agent_sdk/observability/
loop_instrument.rs

1//! Reusable `chat` / `execute_tool` span + metric helpers.
2//!
3//! These primitives are shared by every agent-loop implementation so
4//! the daemon-hosted worker emits byte-identical telemetry to the
5//! in-process loop.
6//!
7//! ## Why this module exists
8//!
9//! The SDK ships two agent-loop implementations:
10//!
11//! * the in-process `agent_loop` (used by `AgentLoop` /
12//!   `BipAgent`), and
13//! * the daemon-hosted re-implementation in `agent-server`'s worker
14//!   (`crates/agent-server/src/worker/root_turn.rs` +
15//!   `agent-service-host`'s `registry_tool_executor.rs`).
16//!
17//! Both drive an LLM call and a tool dispatch, and both must emit the
18//! **same** `gen_ai.*` / `agent_sdk.*` telemetry so dashboards built
19//! against the in-process loop light up unchanged when a session runs
20//! on the daemon. Before this module the worker bypassed the SDK
21//! instrumentation entirely and emitted none of it.
22//!
23//! Rather than copy the span names, attribute keys, and metric label
24//! sets into `agent-server` (where they would silently drift), this
25//! module exposes the *exact* primitives the in-process loop uses:
26//!
27//! * [`build_chat_span`] / [`finish_chat_span_success`] /
28//!   [`finish_chat_span_error`] mirror `agent_loop::turn`'s
29//!   `build_llm_span` / `stamp_llm_success` / `stamp_llm_error`.
30//! * [`build_tool_span`] / [`finish_tool_span`] /
31//!   [`ToolSpanOutcome`] mirror `agent_loop::tool_execution`'s
32//!   `start_tool_span` / `finish_tool_span`.
33//!
34//! The metric recording underneath delegates to
35//! [`crate::observability::metrics::Metrics`] (the same global
36//! singleton), so there is a single source of truth for the label
37//! sets and no second meter scope.
38//!
39//! Compiled only with `feature = "otel"`.
40
41use std::collections::HashMap;
42use std::sync::{LazyLock, Mutex};
43
44use opentelemetry::global::BoxedSpan;
45use opentelemetry::trace::{Span, Status, TraceContextExt};
46use opentelemetry::{Context, KeyValue, global};
47
48use super::metrics::Metrics;
49use super::{attrs, baggage, langfuse, provider_name, spans};
50use crate::llm::{ChatResponse, StopReason};
51use crate::types::{TokenUsage, ToolTier};
52
53// ── Chat (LLM client) span ───────────────────────────────────────────
54
55/// Inputs needed to open a `chat {model}` CLIENT span.
56///
57/// Bundled so call sites stay readable and so the field set can grow
58/// (e.g. extra `gen_ai.request.*` attributes) without churning every
59/// caller. Mirrors the attributes set by `agent_loop::turn::build_llm_span`.
60#[derive(Clone, Copy)]
61pub struct ChatSpanParams<'a> {
62    /// Raw SDK provider id (e.g. `anthropic`, `openai-responses`).
63    /// Normalised to the `gen_ai.provider.name` semconv value
64    /// internally via [`provider_name::normalize`].
65    pub provider_id: &'static str,
66    /// Model the SDK is asking for (`gen_ai.request.model`).
67    pub model: &'a str,
68    /// Whether the call streams (`agent_sdk.llm.streaming`).
69    pub streaming: bool,
70    /// Configured output-token cap, if any
71    /// (`gen_ai.request.max_output_tokens`).
72    pub max_tokens: Option<u32>,
73}
74
75/// Open a `chat {model}` CLIENT span with the `gen_ai` semconv
76/// attributes known before the call.
77///
78/// Byte-for-byte mirror of `agent_loop::turn::build_llm_span`: same
79/// span name, same initial attribute set, same baggage copy, same
80/// Langfuse `generation` observation tag. Pair every successful call
81/// with [`finish_chat_span_success`] and every failure with
82/// [`finish_chat_span_error`].
83#[must_use]
84pub fn build_chat_span(params: ChatSpanParams<'_>) -> BoxedSpan {
85    let span_name = format!("chat {}", params.model);
86    let provider = provider_name::normalize(params.provider_id);
87    let mut init_attrs = vec![
88        KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "chat"),
89        KeyValue::new(attrs::GEN_AI_PROVIDER_NAME, provider),
90        KeyValue::new(attrs::GEN_AI_REQUEST_MODEL, params.model.to_string()),
91        attrs::kv_bool(attrs::SDK_LLM_STREAMING, params.streaming),
92        KeyValue::new(attrs::SDK_PROVIDER_ID, params.provider_id),
93    ];
94    if let Some(max_tokens) = params.max_tokens {
95        init_attrs.push(attrs::kv_i64(
96            attrs::GEN_AI_REQUEST_MAX_OUTPUT_TOKENS,
97            i64::from(max_tokens),
98        ));
99    }
100    let mut span = spans::start_client_span(span_name, init_attrs);
101    baggage::copy_baggage_to_active_span(&mut span);
102    langfuse::tag_observation(&mut span, langfuse::ObservationType::Generation);
103    span
104}
105
106/// Stamp a successful chat response onto `span`, record the
107/// `gen_ai.client.token.usage` + `gen_ai.client.operation.duration`
108/// metrics, and end the span.
109///
110/// Mirrors `agent_loop::turn::stamp_llm_success`: same response-model
111/// / id / finish-reason / usage / boolean attributes, same metric
112/// label sets (via [`Metrics::record_chat_token_usage`] +
113/// [`Metrics::record_chat_operation_duration_success`]).
114///
115/// `provider_id` is the raw SDK id; it is normalised internally so
116/// the metric `gen_ai.provider.name` label matches the span.
117pub fn finish_chat_span_success(
118    span: &mut BoxedSpan,
119    response: &ChatResponse,
120    elapsed_secs: f64,
121    provider_id: &'static str,
122    request_model: &str,
123) {
124    let provider = provider_name::normalize(provider_id);
125
126    if !response.id.is_empty() {
127        span.set_attribute(KeyValue::new(
128            attrs::GEN_AI_RESPONSE_ID,
129            response.id.clone(),
130        ));
131    }
132    span.set_attribute(KeyValue::new(
133        attrs::GEN_AI_RESPONSE_MODEL,
134        response.model.clone(),
135    ));
136    if let Some(reason) = response.stop_reason {
137        span.set_attribute(KeyValue::new(
138            attrs::GEN_AI_RESPONSE_FINISH_REASONS,
139            finish_reason_str(reason),
140        ));
141    }
142    span.set_attribute(attrs::kv_i64(
143        attrs::GEN_AI_USAGE_INPUT_TOKENS,
144        i64::from(response.usage.input_tokens),
145    ));
146    span.set_attribute(attrs::kv_i64(
147        attrs::GEN_AI_USAGE_OUTPUT_TOKENS,
148        i64::from(response.usage.output_tokens),
149    ));
150    span.set_attribute(attrs::kv_i64(
151        attrs::GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS,
152        i64::from(response.usage.cached_input_tokens),
153    ));
154    span.set_attribute(attrs::kv_i64(
155        attrs::GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS,
156        i64::from(response.usage.cache_creation_input_tokens),
157    ));
158    span.set_attribute(attrs::kv_bool(
159        attrs::SDK_LLM_HAD_TOOL_CALLS,
160        response.has_tool_use(),
161    ));
162    span.set_attribute(attrs::kv_bool(
163        attrs::SDK_LLM_TEXT_OUTPUT_PRESENT,
164        response.first_text().is_some(),
165    ));
166    span.set_attribute(attrs::kv_bool(
167        attrs::SDK_LLM_THINKING_PRESENT,
168        response.first_thinking().is_some(),
169    ));
170
171    let metrics = Metrics::global();
172    metrics.record_chat_token_usage(&response.usage, provider, request_model, &response.model);
173    metrics.record_chat_operation_duration_success(
174        elapsed_secs,
175        provider,
176        request_model,
177        &response.model,
178    );
179
180    span.end();
181}
182
183/// Stamp a chat error onto `span`, record the
184/// `gen_ai.client.operation.duration` metric with the `error.type`
185/// label, and end the span.
186///
187/// Mirrors `agent_loop::turn::stamp_llm_error`. `error_type` must be a
188/// stable, low-cardinality string (e.g. `rate_limited`,
189/// `server_error`, `invalid_request`, `stream_error`); use
190/// [`classify_llm_error`] for free-form provider error messages so the
191/// vocabulary matches the in-process loop exactly.
192pub fn finish_chat_span_error(
193    span: &mut BoxedSpan,
194    error_type: &'static str,
195    message: &str,
196    elapsed_secs: f64,
197    provider_id: &'static str,
198    request_model: &str,
199) {
200    let provider = provider_name::normalize(provider_id);
201    spans::set_span_error(span, error_type, message);
202    Metrics::global().record_chat_operation_duration_error(
203        elapsed_secs,
204        provider,
205        request_model,
206        error_type,
207    );
208    span.end();
209}
210
211/// Map a free-form LLM error message to the stable `error.type`
212/// attribute / metric label.
213///
214/// Byte-for-byte mirror of `agent_loop::turn::classify_llm_error` so
215/// the daemon worker and the in-process loop bucket transient failures
216/// identically.
217#[must_use]
218pub fn classify_llm_error(msg: &str) -> &'static str {
219    if msg.contains("Rate limited") {
220        "rate_limited"
221    } else if msg.contains("Invalid request") {
222        "invalid_request"
223    } else if msg.contains("Server error") {
224        "server_error"
225    } else if msg.contains("Stream") {
226        "stream_error"
227    } else {
228        "_OTHER"
229    }
230}
231
232/// Map an SDK [`StopReason`] to the semconv `finish_reason` string.
233///
234/// Mirrors `attrs::finish_reason_str` (kept here too so callers that
235/// only import this module get a consistent vocabulary).
236#[must_use]
237pub const fn finish_reason_str(reason: StopReason) -> &'static str {
238    attrs::finish_reason_str(reason)
239}
240
241// ── Tool execution span ──────────────────────────────────────────────
242
243/// Inputs needed to open an `execute_tool` INTERNAL span.
244///
245/// Mirrors the attributes set by
246/// `agent_loop::tool_execution::start_tool_span`.
247#[derive(Clone, Copy)]
248pub struct ToolSpanParams<'a> {
249    /// `gen_ai.tool.name` — the protocol tool name.
250    pub tool_name: &'a str,
251    /// `gen_ai.tool.call.id` — the LLM-assigned call id.
252    pub tool_call_id: &'a str,
253    /// `agent_sdk.tool.display_name`; skipped when empty.
254    pub display_name: &'a str,
255    /// `agent_sdk.tool.tier`; `None` when the tool is unknown to the
256    /// registry (matches the in-process loop's `unknown`-kind path,
257    /// which omits the tier attribute).
258    pub tier: Option<ToolTier>,
259    /// `agent_sdk.tool.kind` — `sync` / `async` / `listen` /
260    /// `unknown`.
261    pub kind: &'static str,
262}
263
264/// Open an `execute_tool` INTERNAL span.
265///
266/// Byte-for-byte mirror of
267/// `agent_loop::tool_execution::start_tool_span`: same span name, same
268/// attribute set (operation name, tool name, call id, optional display
269/// name, optional tier, kind), same baggage copy, same Langfuse `tool`
270/// observation tag. Finish with [`finish_tool_span`].
271#[must_use]
272pub fn build_tool_span(params: ToolSpanParams<'_>) -> BoxedSpan {
273    let mut span_attrs = vec![
274        KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "execute_tool"),
275        KeyValue::new(attrs::GEN_AI_TOOL_NAME, params.tool_name.to_string()),
276        KeyValue::new(attrs::GEN_AI_TOOL_CALL_ID, params.tool_call_id.to_string()),
277    ];
278    if !params.display_name.is_empty() {
279        span_attrs.push(KeyValue::new(
280            attrs::SDK_TOOL_DISPLAY_NAME,
281            params.display_name.to_string(),
282        ));
283    }
284    if let Some(tier) = params.tier {
285        span_attrs.push(KeyValue::new(
286            attrs::SDK_TOOL_TIER,
287            attrs::tool_tier_str(tier),
288        ));
289    }
290    span_attrs.push(KeyValue::new(attrs::SDK_TOOL_KIND, params.kind));
291
292    let mut span = spans::start_internal_span("execute_tool", span_attrs);
293    baggage::copy_baggage_to_active_span(&mut span);
294    langfuse::tag_observation(&mut span, langfuse::ObservationType::Tool);
295    span
296}
297
298/// Terminal outcome of a tool execution, used to stamp the outcome
299/// attributes + metric labels on an `execute_tool` span.
300///
301/// Mirrors the match arms of
302/// `agent_loop::tool_execution::finish_tool_span` so the
303/// `agent_sdk.tool.outcome` value and `error.type` value match across
304/// loops.
305pub enum ToolSpanOutcome<'a> {
306    /// Tool ran (or was short-circuited) and produced a result. The
307    /// outcome string is derived from the result body / success flag
308    /// exactly as the in-process loop does.
309    Completed {
310        /// Tool output body; inspected for the `Unknown tool:` /
311        /// `Blocked:` / `Rejected:` sentinels.
312        output: &'a str,
313        /// Whether the tool reported success.
314        success: bool,
315        /// Wall-clock duration in milliseconds, if measured.
316        duration_ms: Option<u64>,
317    },
318    /// Tool requires user confirmation before running.
319    AwaitingConfirmation,
320    /// The execution boundary itself errored (e.g. event-store commit
321    /// failure). Recorded with `error.type = event_store`.
322    EventStoreError {
323        /// Error message for the span status.
324        message: &'a str,
325    },
326}
327
328/// Stamp the terminal outcome on `span`, record the
329/// `agent_sdk.tools.execution.{count,duration}` metrics, and end the
330/// span.
331///
332/// Byte-for-byte mirror of
333/// `agent_loop::tool_execution::finish_tool_span`'s outcome handling +
334/// metric recording (via [`Metrics::record_tool_execution`]).
335pub fn finish_tool_span(
336    span: &mut BoxedSpan,
337    tool_name: &str,
338    tool_kind: &'static str,
339    outcome: &ToolSpanOutcome<'_>,
340) {
341    let (outcome_str, duration_ms) = match outcome {
342        ToolSpanOutcome::Completed {
343            output,
344            success,
345            duration_ms,
346        } => {
347            let outcome_str = if output.starts_with("Unknown tool:") {
348                span.set_attribute(KeyValue::new(attrs::ERROR_TYPE, "unknown_tool"));
349                span.set_status(Status::error((*output).to_string()));
350                "error"
351            } else if output.starts_with("Blocked:") {
352                "blocked"
353            } else if output.starts_with("Rejected:") {
354                "rejected"
355            } else if *success {
356                "success"
357            } else {
358                "error"
359            };
360            span.set_attribute(KeyValue::new(attrs::SDK_TOOL_OUTCOME, outcome_str));
361            if let Some(ms) = duration_ms {
362                span.set_attribute(attrs::kv_i64(
363                    attrs::SDK_TOOL_DURATION_MS,
364                    i64::try_from(*ms).unwrap_or(i64::MAX),
365                ));
366            }
367            (outcome_str, *duration_ms)
368        }
369        ToolSpanOutcome::AwaitingConfirmation => {
370            span.set_attribute(attrs::kv_bool(attrs::SDK_TOOL_CONFIRMATION_REQUIRED, true));
371            span.set_attribute(KeyValue::new(
372                attrs::SDK_TOOL_OUTCOME,
373                "awaiting_confirmation",
374            ));
375            ("awaiting_confirmation", None)
376        }
377        ToolSpanOutcome::EventStoreError { message } => {
378            span.set_attribute(KeyValue::new(attrs::ERROR_TYPE, "event_store"));
379            span.set_status(Status::error((*message).to_string()));
380            span.set_attribute(KeyValue::new(attrs::SDK_TOOL_OUTCOME, "error"));
381            ("error", None)
382        }
383    };
384
385    Metrics::global().record_tool_execution(tool_name, tool_kind, outcome_str, duration_ms);
386    span.end();
387}
388
389// ── Root turn span (daemon cross-task) ───────────────────────────────
390
391/// Inputs needed to open the daemon's root `invoke_agent` span.
392///
393/// The daemon-hosted worker drives one turn across multiple tokio tasks
394/// (execute → suspend at the tool boundary → resume). Unlike the
395/// in-process loop's `agent_loop` root span — which keeps a single live
396/// span wrapping the whole run future — the worker creates this span
397/// live on the FRESH turn, captures its `(trace_id, span_id)` to persist
398/// on the turn attempt, and re-parents every later span (resumed `chat`
399/// calls, child-task `execute_tool` calls) under those ids via
400/// [`remote_parent_context`].
401#[derive(Clone, Copy)]
402pub struct RootTurnSpanParams<'a> {
403    /// Raw SDK provider id (e.g. `anthropic`); normalised to the
404    /// `gen_ai.provider.name` semconv value internally.
405    pub provider_id: &'static str,
406    /// Model driving the turn (`gen_ai.request.model`).
407    pub model: &'a str,
408    /// Thread / conversation id (`gen_ai.conversation.id`).
409    pub conversation_id: &'a str,
410}
411
412/// A started root-turn span plus the hex ids the worker persists so the
413/// turn's later tasks can re-parent their spans under it.
414pub struct StartedRootTurnSpan {
415    /// The live `invoke_agent` span. The worker holds it for the fresh
416    /// segment, finishing it with [`finish_root_turn_span`] on a
417    /// text-only terminal or letting it end on drop when the turn
418    /// suspends for tools.
419    pub span: BoxedSpan,
420    /// Hex-encoded `TraceId` — persist to
421    /// `agent_sdk_turn_attempts.otel_trace_id`.
422    pub trace_id_hex: String,
423    /// Hex-encoded root `SpanId` — persist to
424    /// `agent_sdk_turn_attempts.otel_span_id`.
425    pub span_id_hex: String,
426}
427
428/// Start the daemon's root `invoke_agent` INTERNAL span and capture the
429/// ids the worker persists so the turn's later tasks can nest under it.
430///
431/// Mirrors the structural / `gen_ai.*` attributes of the in-process
432/// loop's root span (operation name, provider, model, conversation id)
433/// with the minimal parameter set the daemon worker supplies cheaply,
434/// and tags the same Langfuse `agent` observation. Pair a text-only
435/// terminal with [`finish_root_turn_span`]; on the tool-suspend path the
436/// span ends when dropped — its duration then covers the fresh segment
437/// only, because the `OTel` 0.32 `SpanBuilder` exposes no way to assign a
438/// span id, so a single span cannot be reopened on resume to span the
439/// whole turn. The persisted ids still re-parent every later span, so
440/// the turn remains one coherent trace tree.
441#[must_use]
442pub fn start_root_turn_span(params: RootTurnSpanParams<'_>) -> StartedRootTurnSpan {
443    let provider = provider_name::normalize(params.provider_id);
444    let span_attrs = vec![
445        KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "invoke_agent"),
446        KeyValue::new(attrs::GEN_AI_PROVIDER_NAME, provider),
447        KeyValue::new(attrs::GEN_AI_REQUEST_MODEL, params.model.to_string()),
448        KeyValue::new(
449            attrs::GEN_AI_CONVERSATION_ID,
450            params.conversation_id.to_string(),
451        ),
452        KeyValue::new(attrs::SDK_PROVIDER_ID, params.provider_id),
453    ];
454    let mut span = spans::start_internal_span("invoke_agent", span_attrs);
455    baggage::copy_baggage_to_active_span(&mut span);
456    langfuse::tag_observation(&mut span, langfuse::ObservationType::Agent);
457    let span_context = span.span_context().clone();
458    StartedRootTurnSpan {
459        trace_id_hex: span_context.trace_id().to_string(),
460        span_id_hex: span_context.span_id().to_string(),
461        span,
462    }
463}
464
465/// Build a [`Context`] whose active parent is the root-turn span
466/// identified by the hex ids persisted on the turn attempt.
467///
468/// Spans created while this context is attached become children of the
469/// root span. This is how the worker re-parents resumed `chat` calls and
470/// child-task `execute_tool` calls under the turn root after the original
471/// live span has gone (the worker suspended, or a fresh task / process
472/// picked the turn up). Returns `None` when the ids are absent or
473/// malformed — the caller then leaves the span un-parented (a new trace),
474/// exactly as before this wiring.
475#[must_use]
476pub fn remote_parent_context(trace_id_hex: &str, span_id_hex: &str) -> Option<Context> {
477    let span_context = spans::remote_span_context(trace_id_hex, span_id_hex)?;
478    Some(Context::current().with_remote_span_context(span_context))
479}
480
481/// Finalize the root-turn span with run-outcome attributes + the
482/// `agent_sdk.runs.outcome` counter, then end it.
483///
484/// Mirrors `agent_loop`'s `end_root_span`: same total-turns / usage /
485/// outcome attribute set and the same outcome counter. Reached on the
486/// text-only terminal path where the worker still holds the live span;
487/// tool turns end the span on drop at the suspend boundary (see
488/// [`start_root_turn_span`]).
489pub fn finish_root_turn_span(
490    span: &mut BoxedSpan,
491    total_turns: usize,
492    total_usage: Option<&TokenUsage>,
493    outcome: &'static str,
494) {
495    Metrics::global()
496        .runs_outcome
497        .add(1, &[KeyValue::new(attrs::SDK_OUTCOME, outcome)]);
498    span.set_attribute(attrs::kv_i64(
499        attrs::SDK_TOTAL_TURNS,
500        i64::try_from(total_turns).unwrap_or(0),
501    ));
502    // Usage is optional: the daemon does not aggregate per-turn token
503    // usage across the suspend/resume hop, so it passes `None` and the
504    // per-call `chat` spans carry the authoritative usage. The in-process
505    // loop passes the cumulative `TokenUsage`.
506    if let Some(usage) = total_usage {
507        span.set_attribute(attrs::kv_i64(
508            attrs::GEN_AI_USAGE_INPUT_TOKENS,
509            i64::from(usage.input_tokens),
510        ));
511        span.set_attribute(attrs::kv_i64(
512            attrs::GEN_AI_USAGE_OUTPUT_TOKENS,
513            i64::from(usage.output_tokens),
514        ));
515        span.set_attribute(attrs::kv_i64(
516            attrs::GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS,
517            i64::from(usage.cached_input_tokens),
518        ));
519        span.set_attribute(attrs::kv_i64(
520            attrs::GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS,
521            i64::from(usage.cache_creation_input_tokens),
522        ));
523    }
524    span.set_attribute(KeyValue::new(attrs::SDK_OUTCOME, outcome));
525    if outcome == "error" {
526        spans::set_span_error(span, "agent_error", "agent invocation failed");
527    }
528    span.end();
529}
530
531// ── W3C traceparent ⇄ Context propagation ────────────────────────────
532
533/// W3C `traceparent` header key. The only carrier entry the daemon
534/// needs to persist a parent span context durably (on `AgentTask`) and
535/// rebuild it in a later task / process.
536const TRACEPARENT_KEY: &str = "traceparent";
537
538/// Rebuild an `OTel` [`Context`] from a persisted W3C `traceparent`.
539///
540/// The daemon decouples the gRPC call that submits work from the worker
541/// that runs it (a durable task queue, possibly a different process), so
542/// the inbound client trace context cannot ride an in-memory
543/// [`Context`]. It is persisted as a `traceparent` string on the task
544/// and rebuilt here via the globally-installed
545/// [`opentelemetry::propagation::TextMapPropagator`]. Spans started while
546/// the returned context is attached become children of the encoded span.
547///
548/// Returns `None` when `traceparent` is empty, malformed, or no
549/// propagator is installed (the extracted context then carries no valid
550/// span) — the caller leaves its span un-parented, exactly as before
551/// this wiring.
552#[must_use]
553pub fn context_from_traceparent(traceparent: &str) -> Option<Context> {
554    if traceparent.is_empty() {
555        return None;
556    }
557    let mut carrier = HashMap::with_capacity(1);
558    carrier.insert(TRACEPARENT_KEY.to_string(), traceparent.to_string());
559    let cx = global::get_text_map_propagator(|propagator| propagator.extract(&carrier));
560    if cx.span().span_context().is_valid() {
561        Some(cx)
562    } else {
563        None
564    }
565}
566
567/// Encode hex trace + span ids into a W3C `traceparent` string.
568///
569/// Used to stamp a child tool task's parent span: the root
570/// `invoke_agent` span's `(trace_id, span_id)` — persisted on the turn
571/// attempt — is encoded here and stored on the child task's
572/// `otel_traceparent` so the child's `execute_tool` span nests under the
573/// turn root. Returns `None` for malformed / zero ids (validated via
574/// [`spans::remote_span_context`]). The `SAMPLED` flag is always set, to
575/// match the remote context the daemon reconstructs.
576#[must_use]
577pub fn traceparent_from_ids(trace_id_hex: &str, span_id_hex: &str) -> Option<String> {
578    let span_context = spans::remote_span_context(trace_id_hex, span_id_hex)?;
579    Some(format!(
580        "00-{}-{}-01",
581        span_context.trace_id(),
582        span_context.span_id()
583    ))
584}
585
586// ── Live root-span registry (correct cross-task duration) ────────────
587
588/// Process-global home for the live root-turn span between the task that
589/// opens it and the (possibly later) task that finalizes it.
590///
591/// The daemon drives one turn across several tokio tasks
592/// (execute → suspend at the tool boundary → resume), and `OTel` 0.32
593/// offers no way to assign a span id, so a root span cannot be re-minted
594/// on resume to cover the whole turn. Instead the worker stashes the
595/// *live* `invoke_agent` span here at the fresh turn and finalizes it at
596/// the terminal — even though that runs in a different task — so the span
597/// carries the **full** turn duration (it legitimately stays open while
598/// tools run). Children never read this map; they nest via the ids
599/// persisted on the turn attempt (see [`remote_parent_context`]), so the
600/// tree is correct regardless of whether the live span survives.
601///
602/// Keyed by `AgentTask` id (stringified). An entry that is never
603/// finalized — the daemon crashed/restarted mid-turn, so a fresh process
604/// owns the resume — simply leaks its (small) span object until process
605/// exit; that turn's root span is the only one missing its terminal
606/// finalize, and its children still nest correctly.
607static LIVE_ROOT_SPANS: LazyLock<Mutex<HashMap<String, BoxedSpan>>> =
608    LazyLock::new(|| Mutex::new(HashMap::new()));
609
610/// Stash the live root-turn `span` under `task_id` so a later task can
611/// finalize it with the correct full duration (see `LIVE_ROOT_SPANS`).
612///
613/// First write wins: a retry that re-opens the same turn keeps the
614/// original span (and its start time) rather than resetting the clock.
615pub fn stash_root_turn_span(task_id: &str, span: BoxedSpan) {
616    let Ok(mut spans) = LIVE_ROOT_SPANS.lock() else {
617        log::warn!("live root-span registry poisoned; dropping root span for task {task_id}");
618        return;
619    };
620    spans.entry(task_id.to_string()).or_insert(span);
621}
622
623/// Finalize and end the stashed root-turn span for `task_id`, stamping
624/// run-outcome + usage attributes (see [`finish_root_turn_span`]).
625///
626/// No-op when no span is stashed — the expected path when the daemon
627/// restarted mid-turn and a fresh process owns the terminal. The outcome
628/// counter is still recorded in that case so dashboards see every run.
629pub fn finalize_root_turn_span(
630    task_id: &str,
631    total_turns: usize,
632    total_usage: Option<&TokenUsage>,
633    outcome: &'static str,
634) {
635    let stashed = LIVE_ROOT_SPANS
636        .lock()
637        .ok()
638        .and_then(|mut spans| spans.remove(task_id));
639    match stashed {
640        Some(mut span) => finish_root_turn_span(&mut span, total_turns, total_usage, outcome),
641        None => {
642            // Live span lost (cross-restart resume). Still record the
643            // run-outcome counter so the metric isn't undercounted.
644            Metrics::global()
645                .runs_outcome
646                .add(1, &[KeyValue::new(attrs::SDK_OUTCOME, outcome)]);
647        }
648    }
649}
650
651/// Drop the stashed root-turn span for `task_id` without finalizing.
652///
653/// For paths that abandon a turn without a meaningful outcome (e.g. an
654/// idempotent duplicate-suspension bail) so the registry doesn't leak.
655pub fn discard_root_turn_span(task_id: &str) {
656    if let Ok(mut spans) = LIVE_ROOT_SPANS.lock() {
657        drop(spans.remove(task_id));
658    }
659}
660
661#[cfg(test)]
662mod tests {
663    use super::{
664        discard_root_turn_span, finalize_root_turn_span, spans, stash_root_turn_span,
665        traceparent_from_ids,
666    };
667
668    // W3C example ids (RFC trace-context), both non-zero / valid.
669    const TRACE_HEX: &str = "4bf92f3577b34da6a3ce929d0e0e4736";
670    const SPAN_HEX: &str = "00f067aa0ba902b7";
671
672    #[test]
673    fn traceparent_from_valid_ids_is_w3c_sampled() {
674        let traceparent = traceparent_from_ids(TRACE_HEX, SPAN_HEX).expect("valid ids");
675        assert_eq!(traceparent, format!("00-{TRACE_HEX}-{SPAN_HEX}-01"));
676    }
677
678    #[test]
679    fn traceparent_from_malformed_or_zero_ids_is_none() {
680        assert!(traceparent_from_ids("not-hex", SPAN_HEX).is_none());
681        assert!(traceparent_from_ids(TRACE_HEX, "tooshort").is_none());
682        // All-zero ids are rejected by `SpanContext::is_valid`.
683        assert!(traceparent_from_ids(&"0".repeat(32), &"0".repeat(16)).is_none());
684    }
685
686    #[test]
687    fn live_root_span_registry_finalize_is_idempotent() {
688        // No provider installed → a no-op span, but the registry's map
689        // management (stash → finalize removes the entry) is exercised
690        // regardless, and a second finalize must not panic.
691        let task_id = "registry-roundtrip-task";
692        stash_root_turn_span(task_id, spans::start_internal_span("test", Vec::new()));
693        finalize_root_turn_span(task_id, 1, None, "done");
694        finalize_root_turn_span(task_id, 1, None, "done");
695    }
696
697    #[test]
698    fn discard_removes_stashed_span_without_finalize() {
699        let task_id = "registry-discard-task";
700        stash_root_turn_span(task_id, spans::start_internal_span("test", Vec::new()));
701        discard_root_turn_span(task_id);
702        // Finalize after discard is a no-op (entry already gone).
703        finalize_root_turn_span(task_id, 0, None, "cancelled");
704    }
705}