agent_sdk/observability/loop_instrument.rs
1//! Reusable `chat` / `execute_tool` span + metric helpers.
2//!
3//! These primitives are shared by every agent-loop implementation so
4//! the daemon-hosted worker emits byte-identical telemetry to the
5//! in-process loop.
6//!
7//! ## Why this module exists
8//!
9//! The SDK ships two agent-loop implementations:
10//!
11//! * the in-process `agent_loop` (used by `AgentLoop` /
12//! `BipAgent`), and
13//! * the daemon-hosted re-implementation in `agent-server`'s worker
14//! (`crates/agent-server/src/worker/root_turn.rs` +
15//! `agent-service-host`'s `registry_tool_executor.rs`).
16//!
17//! Both drive an LLM call and a tool dispatch, and both must emit the
18//! **same** `gen_ai.*` / `agent_sdk.*` telemetry so dashboards built
19//! against the in-process loop light up unchanged when a session runs
20//! on the daemon. Before this module the worker bypassed the SDK
21//! instrumentation entirely and emitted none of it.
22//!
23//! Rather than copy the span names, attribute keys, and metric label
24//! sets into `agent-server` (where they would silently drift), this
25//! module exposes the *exact* primitives the in-process loop uses:
26//!
27//! * [`build_chat_span`] / [`finish_chat_span_success`] /
28//! [`finish_chat_span_error`] mirror `agent_loop::turn`'s
29//! `build_llm_span` / `stamp_llm_success` / `stamp_llm_error`.
30//! * [`build_tool_span`] / [`finish_tool_span`] /
31//! [`ToolSpanOutcome`] mirror `agent_loop::tool_execution`'s
32//! `start_tool_span` / `finish_tool_span`.
33//!
34//! The metric recording underneath delegates to
35//! [`crate::observability::metrics::Metrics`] (the same global
36//! singleton), so there is a single source of truth for the label
37//! sets and no second meter scope.
38//!
39//! Compiled only with `feature = "otel"`.
40
41use std::collections::HashMap;
42use std::sync::{LazyLock, Mutex};
43
44use opentelemetry::global::BoxedSpan;
45use opentelemetry::trace::{Span, Status, TraceContextExt};
46use opentelemetry::{Context, KeyValue, global};
47
48use super::metrics::Metrics;
49use super::{attrs, baggage, langfuse, provider_name, spans};
50use crate::llm::{ChatResponse, StopReason};
51use crate::types::{TokenUsage, ToolTier};
52
53// ── Chat (LLM client) span ───────────────────────────────────────────
54
55/// Inputs needed to open a `chat {model}` CLIENT span.
56///
57/// Bundled so call sites stay readable and so the field set can grow
58/// (e.g. extra `gen_ai.request.*` attributes) without churning every
59/// caller. Mirrors the attributes set by `agent_loop::turn::build_llm_span`.
60#[derive(Clone, Copy)]
61pub struct ChatSpanParams<'a> {
62 /// Raw SDK provider id (e.g. `anthropic`, `openai-responses`).
63 /// Normalised to the `gen_ai.provider.name` semconv value
64 /// internally via [`provider_name::normalize`].
65 pub provider_id: &'static str,
66 /// Model the SDK is asking for (`gen_ai.request.model`).
67 pub model: &'a str,
68 /// Whether the call streams (`agent_sdk.llm.streaming`).
69 pub streaming: bool,
70 /// Configured output-token cap, if any
71 /// (`gen_ai.request.max_output_tokens`).
72 pub max_tokens: Option<u32>,
73}
74
75/// Open a `chat {model}` CLIENT span with the `gen_ai` semconv
76/// attributes known before the call.
77///
78/// Byte-for-byte mirror of `agent_loop::turn::build_llm_span`: same
79/// span name, same initial attribute set, same baggage copy, same
80/// Langfuse `generation` observation tag. Pair every successful call
81/// with [`finish_chat_span_success`] and every failure with
82/// [`finish_chat_span_error`].
83#[must_use]
84pub fn build_chat_span(params: ChatSpanParams<'_>) -> BoxedSpan {
85 let span_name = format!("chat {}", params.model);
86 let provider = provider_name::normalize(params.provider_id);
87 let mut init_attrs = vec![
88 KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "chat"),
89 KeyValue::new(attrs::GEN_AI_PROVIDER_NAME, provider),
90 KeyValue::new(attrs::GEN_AI_REQUEST_MODEL, params.model.to_string()),
91 attrs::kv_bool(attrs::SDK_LLM_STREAMING, params.streaming),
92 KeyValue::new(attrs::SDK_PROVIDER_ID, params.provider_id),
93 ];
94 if let Some(max_tokens) = params.max_tokens {
95 init_attrs.push(attrs::kv_i64(
96 attrs::GEN_AI_REQUEST_MAX_OUTPUT_TOKENS,
97 i64::from(max_tokens),
98 ));
99 }
100 let mut span = spans::start_client_span(span_name, init_attrs);
101 baggage::copy_baggage_to_active_span(&mut span);
102 langfuse::tag_observation(&mut span, langfuse::ObservationType::Generation);
103 span
104}
105
106/// Stamp a successful chat response onto `span`, record the
107/// `gen_ai.client.token.usage` + `gen_ai.client.operation.duration`
108/// metrics, and end the span.
109///
110/// Mirrors `agent_loop::turn::stamp_llm_success`: same response-model
111/// / id / finish-reason / usage / boolean attributes, same metric
112/// label sets (via [`Metrics::record_chat_token_usage`] +
113/// [`Metrics::record_chat_operation_duration_success`]).
114///
115/// `provider_id` is the raw SDK id; it is normalised internally so
116/// the metric `gen_ai.provider.name` label matches the span.
117pub fn finish_chat_span_success(
118 span: &mut BoxedSpan,
119 response: &ChatResponse,
120 elapsed_secs: f64,
121 provider_id: &'static str,
122 request_model: &str,
123) {
124 let provider = provider_name::normalize(provider_id);
125
126 if !response.id.is_empty() {
127 span.set_attribute(KeyValue::new(
128 attrs::GEN_AI_RESPONSE_ID,
129 response.id.clone(),
130 ));
131 }
132 span.set_attribute(KeyValue::new(
133 attrs::GEN_AI_RESPONSE_MODEL,
134 response.model.clone(),
135 ));
136 if let Some(reason) = response.stop_reason {
137 span.set_attribute(KeyValue::new(
138 attrs::GEN_AI_RESPONSE_FINISH_REASONS,
139 finish_reason_str(reason),
140 ));
141 }
142 span.set_attribute(attrs::kv_i64(
143 attrs::GEN_AI_USAGE_INPUT_TOKENS,
144 i64::from(response.usage.input_tokens),
145 ));
146 span.set_attribute(attrs::kv_i64(
147 attrs::GEN_AI_USAGE_OUTPUT_TOKENS,
148 i64::from(response.usage.output_tokens),
149 ));
150 span.set_attribute(attrs::kv_i64(
151 attrs::GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS,
152 i64::from(response.usage.cached_input_tokens),
153 ));
154 span.set_attribute(attrs::kv_i64(
155 attrs::GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS,
156 i64::from(response.usage.cache_creation_input_tokens),
157 ));
158 span.set_attribute(attrs::kv_bool(
159 attrs::SDK_LLM_HAD_TOOL_CALLS,
160 response.has_tool_use(),
161 ));
162 span.set_attribute(attrs::kv_bool(
163 attrs::SDK_LLM_TEXT_OUTPUT_PRESENT,
164 response.first_text().is_some(),
165 ));
166 span.set_attribute(attrs::kv_bool(
167 attrs::SDK_LLM_THINKING_PRESENT,
168 response.first_thinking().is_some(),
169 ));
170
171 let metrics = Metrics::global();
172 metrics.record_chat_token_usage(&response.usage, provider, request_model, &response.model);
173 metrics.record_chat_operation_duration_success(
174 elapsed_secs,
175 provider,
176 request_model,
177 &response.model,
178 );
179
180 span.end();
181}
182
183/// Stamp a chat error onto `span`, record the
184/// `gen_ai.client.operation.duration` metric with the `error.type`
185/// label, and end the span.
186///
187/// Mirrors `agent_loop::turn::stamp_llm_error`. `error_type` must be a
188/// stable, low-cardinality string (e.g. `rate_limited`,
189/// `server_error`, `invalid_request`, `stream_error`); use
190/// [`classify_llm_error`] for free-form provider error messages so the
191/// vocabulary matches the in-process loop exactly.
192pub fn finish_chat_span_error(
193 span: &mut BoxedSpan,
194 error_type: &'static str,
195 message: &str,
196 elapsed_secs: f64,
197 provider_id: &'static str,
198 request_model: &str,
199) {
200 let provider = provider_name::normalize(provider_id);
201 spans::set_span_error(span, error_type, message);
202 Metrics::global().record_chat_operation_duration_error(
203 elapsed_secs,
204 provider,
205 request_model,
206 error_type,
207 );
208 span.end();
209}
210
211/// Map a free-form LLM error message to the stable `error.type`
212/// attribute / metric label.
213///
214/// Byte-for-byte mirror of `agent_loop::turn::classify_llm_error` so
215/// the daemon worker and the in-process loop bucket transient failures
216/// identically.
217#[must_use]
218pub fn classify_llm_error(msg: &str) -> &'static str {
219 if msg.contains("Rate limited") {
220 "rate_limited"
221 } else if msg.contains("Invalid request") {
222 "invalid_request"
223 } else if msg.contains("Server error") {
224 "server_error"
225 } else if msg.contains("Stream") {
226 "stream_error"
227 } else {
228 "_OTHER"
229 }
230}
231
232/// Map an SDK [`StopReason`] to the semconv `finish_reason` string.
233///
234/// Mirrors `attrs::finish_reason_str` (kept here too so callers that
235/// only import this module get a consistent vocabulary).
236#[must_use]
237pub const fn finish_reason_str(reason: StopReason) -> &'static str {
238 attrs::finish_reason_str(reason)
239}
240
241// ── Tool execution span ──────────────────────────────────────────────
242
243/// Inputs needed to open an `execute_tool` INTERNAL span.
244///
245/// Mirrors the attributes set by
246/// `agent_loop::tool_execution::start_tool_span`.
247#[derive(Clone, Copy)]
248pub struct ToolSpanParams<'a> {
249 /// `gen_ai.tool.name` — the protocol tool name.
250 pub tool_name: &'a str,
251 /// `gen_ai.tool.call.id` — the LLM-assigned call id.
252 pub tool_call_id: &'a str,
253 /// `agent_sdk.tool.display_name`; skipped when empty.
254 pub display_name: &'a str,
255 /// `agent_sdk.tool.tier`; `None` when the tool is unknown to the
256 /// registry (matches the in-process loop's `unknown`-kind path,
257 /// which omits the tier attribute).
258 pub tier: Option<ToolTier>,
259 /// `agent_sdk.tool.kind` — `sync` / `async` / `listen` /
260 /// `unknown`.
261 pub kind: &'static str,
262}
263
264/// Open an `execute_tool` INTERNAL span.
265///
266/// Byte-for-byte mirror of
267/// `agent_loop::tool_execution::start_tool_span`: same span name, same
268/// attribute set (operation name, tool name, call id, optional display
269/// name, optional tier, kind), same baggage copy, same Langfuse `tool`
270/// observation tag. Finish with [`finish_tool_span`].
271#[must_use]
272pub fn build_tool_span(params: ToolSpanParams<'_>) -> BoxedSpan {
273 let mut span_attrs = vec![
274 KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "execute_tool"),
275 KeyValue::new(attrs::GEN_AI_TOOL_NAME, params.tool_name.to_string()),
276 KeyValue::new(attrs::GEN_AI_TOOL_CALL_ID, params.tool_call_id.to_string()),
277 ];
278 if !params.display_name.is_empty() {
279 span_attrs.push(KeyValue::new(
280 attrs::SDK_TOOL_DISPLAY_NAME,
281 params.display_name.to_string(),
282 ));
283 }
284 if let Some(tier) = params.tier {
285 span_attrs.push(KeyValue::new(
286 attrs::SDK_TOOL_TIER,
287 attrs::tool_tier_str(tier),
288 ));
289 }
290 span_attrs.push(KeyValue::new(attrs::SDK_TOOL_KIND, params.kind));
291
292 let mut span = spans::start_internal_span("execute_tool", span_attrs);
293 baggage::copy_baggage_to_active_span(&mut span);
294 langfuse::tag_observation(&mut span, langfuse::ObservationType::Tool);
295 span
296}
297
298/// Terminal outcome of a tool execution, used to stamp the outcome
299/// attributes + metric labels on an `execute_tool` span.
300///
301/// Mirrors the match arms of
302/// `agent_loop::tool_execution::finish_tool_span` so the
303/// `agent_sdk.tool.outcome` value and `error.type` value match across
304/// loops.
305pub enum ToolSpanOutcome<'a> {
306 /// Tool ran (or was short-circuited) and produced a result. The
307 /// outcome string is derived from the result body / success flag
308 /// exactly as the in-process loop does.
309 Completed {
310 /// Tool output body; inspected for the `Unknown tool:` /
311 /// `Blocked:` / `Rejected:` sentinels.
312 output: &'a str,
313 /// Whether the tool reported success.
314 success: bool,
315 /// Wall-clock duration in milliseconds, if measured.
316 duration_ms: Option<u64>,
317 },
318 /// Tool requires user confirmation before running.
319 AwaitingConfirmation,
320 /// The execution boundary itself errored (e.g. event-store commit
321 /// failure). Recorded with `error.type = event_store`.
322 EventStoreError {
323 /// Error message for the span status.
324 message: &'a str,
325 },
326}
327
328/// Stamp the terminal outcome on `span`, record the
329/// `agent_sdk.tools.execution.{count,duration}` metrics, and end the
330/// span.
331///
332/// Byte-for-byte mirror of
333/// `agent_loop::tool_execution::finish_tool_span`'s outcome handling +
334/// metric recording (via [`Metrics::record_tool_execution`]).
335pub fn finish_tool_span(
336 span: &mut BoxedSpan,
337 tool_name: &str,
338 tool_kind: &'static str,
339 outcome: &ToolSpanOutcome<'_>,
340) {
341 let (outcome_str, duration_ms) = match outcome {
342 ToolSpanOutcome::Completed {
343 output,
344 success,
345 duration_ms,
346 } => {
347 let outcome_str = if output.starts_with("Unknown tool:") {
348 span.set_attribute(KeyValue::new(attrs::ERROR_TYPE, "unknown_tool"));
349 span.set_status(Status::error((*output).to_string()));
350 "error"
351 } else if output.starts_with("Blocked:") {
352 "blocked"
353 } else if output.starts_with("Rejected:") {
354 "rejected"
355 } else if *success {
356 "success"
357 } else {
358 "error"
359 };
360 span.set_attribute(KeyValue::new(attrs::SDK_TOOL_OUTCOME, outcome_str));
361 if let Some(ms) = duration_ms {
362 span.set_attribute(attrs::kv_i64(
363 attrs::SDK_TOOL_DURATION_MS,
364 i64::try_from(*ms).unwrap_or(i64::MAX),
365 ));
366 }
367 (outcome_str, *duration_ms)
368 }
369 ToolSpanOutcome::AwaitingConfirmation => {
370 span.set_attribute(attrs::kv_bool(attrs::SDK_TOOL_CONFIRMATION_REQUIRED, true));
371 span.set_attribute(KeyValue::new(
372 attrs::SDK_TOOL_OUTCOME,
373 "awaiting_confirmation",
374 ));
375 ("awaiting_confirmation", None)
376 }
377 ToolSpanOutcome::EventStoreError { message } => {
378 span.set_attribute(KeyValue::new(attrs::ERROR_TYPE, "event_store"));
379 span.set_status(Status::error((*message).to_string()));
380 span.set_attribute(KeyValue::new(attrs::SDK_TOOL_OUTCOME, "error"));
381 ("error", None)
382 }
383 };
384
385 Metrics::global().record_tool_execution(tool_name, tool_kind, outcome_str, duration_ms);
386 span.end();
387}
388
389// ── Root turn span (daemon cross-task) ───────────────────────────────
390
391/// Inputs needed to open the daemon's root `invoke_agent` span.
392///
393/// The daemon-hosted worker drives one turn across multiple tokio tasks
394/// (execute → suspend at the tool boundary → resume). Unlike the
395/// in-process loop's `agent_loop` root span — which keeps a single live
396/// span wrapping the whole run future — the worker creates this span
397/// live on the FRESH turn, captures its `(trace_id, span_id)` to persist
398/// on the turn attempt, and re-parents every later span (resumed `chat`
399/// calls, child-task `execute_tool` calls) under those ids via
400/// [`remote_parent_context`].
401#[derive(Clone, Copy)]
402pub struct RootTurnSpanParams<'a> {
403 /// Raw SDK provider id (e.g. `anthropic`); normalised to the
404 /// `gen_ai.provider.name` semconv value internally.
405 pub provider_id: &'static str,
406 /// Model driving the turn (`gen_ai.request.model`).
407 pub model: &'a str,
408 /// Thread / conversation id (`gen_ai.conversation.id`).
409 pub conversation_id: &'a str,
410}
411
412/// A started root-turn span plus the hex ids the worker persists so the
413/// turn's later tasks can re-parent their spans under it.
414pub struct StartedRootTurnSpan {
415 /// The live `invoke_agent` span. The worker holds it for the fresh
416 /// segment, finishing it with [`finish_root_turn_span`] on a
417 /// text-only terminal or letting it end on drop when the turn
418 /// suspends for tools.
419 pub span: BoxedSpan,
420 /// Hex-encoded `TraceId` — persist to
421 /// `agent_sdk_turn_attempts.otel_trace_id`.
422 pub trace_id_hex: String,
423 /// Hex-encoded root `SpanId` — persist to
424 /// `agent_sdk_turn_attempts.otel_span_id`.
425 pub span_id_hex: String,
426}
427
428/// Start the daemon's root `invoke_agent` INTERNAL span and capture the
429/// ids the worker persists so the turn's later tasks can nest under it.
430///
431/// Mirrors the structural / `gen_ai.*` attributes of the in-process
432/// loop's root span (operation name, provider, model, conversation id)
433/// with the minimal parameter set the daemon worker supplies cheaply,
434/// and tags the same Langfuse `agent` observation. Pair a text-only
435/// terminal with [`finish_root_turn_span`]; on the tool-suspend path the
436/// span ends when dropped — its duration then covers the fresh segment
437/// only, because the `OTel` 0.32 `SpanBuilder` exposes no way to assign a
438/// span id, so a single span cannot be reopened on resume to span the
439/// whole turn. The persisted ids still re-parent every later span, so
440/// the turn remains one coherent trace tree.
441#[must_use]
442pub fn start_root_turn_span(params: RootTurnSpanParams<'_>) -> StartedRootTurnSpan {
443 let provider = provider_name::normalize(params.provider_id);
444 let span_attrs = vec![
445 KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "invoke_agent"),
446 KeyValue::new(attrs::GEN_AI_PROVIDER_NAME, provider),
447 KeyValue::new(attrs::GEN_AI_REQUEST_MODEL, params.model.to_string()),
448 KeyValue::new(
449 attrs::GEN_AI_CONVERSATION_ID,
450 params.conversation_id.to_string(),
451 ),
452 KeyValue::new(attrs::SDK_PROVIDER_ID, params.provider_id),
453 ];
454 let mut span = spans::start_internal_span("invoke_agent", span_attrs);
455 baggage::copy_baggage_to_active_span(&mut span);
456 langfuse::tag_observation(&mut span, langfuse::ObservationType::Agent);
457 let span_context = span.span_context().clone();
458 StartedRootTurnSpan {
459 trace_id_hex: span_context.trace_id().to_string(),
460 span_id_hex: span_context.span_id().to_string(),
461 span,
462 }
463}
464
465/// Build a [`Context`] whose active parent is the root-turn span
466/// identified by the hex ids persisted on the turn attempt.
467///
468/// Spans created while this context is attached become children of the
469/// root span. This is how the worker re-parents resumed `chat` calls and
470/// child-task `execute_tool` calls under the turn root after the original
471/// live span has gone (the worker suspended, or a fresh task / process
472/// picked the turn up). Returns `None` when the ids are absent or
473/// malformed — the caller then leaves the span un-parented (a new trace),
474/// exactly as before this wiring.
475#[must_use]
476pub fn remote_parent_context(trace_id_hex: &str, span_id_hex: &str) -> Option<Context> {
477 let span_context = spans::remote_span_context(trace_id_hex, span_id_hex)?;
478 Some(Context::current().with_remote_span_context(span_context))
479}
480
481/// Finalize the root-turn span with run-outcome attributes + the
482/// `agent_sdk.runs.outcome` counter, then end it.
483///
484/// Mirrors `agent_loop`'s `end_root_span`: same total-turns / usage /
485/// outcome attribute set and the same outcome counter. Reached on the
486/// text-only terminal path where the worker still holds the live span;
487/// tool turns end the span on drop at the suspend boundary (see
488/// [`start_root_turn_span`]).
489pub fn finish_root_turn_span(
490 span: &mut BoxedSpan,
491 total_turns: usize,
492 total_usage: Option<&TokenUsage>,
493 outcome: &'static str,
494) {
495 Metrics::global()
496 .runs_outcome
497 .add(1, &[KeyValue::new(attrs::SDK_OUTCOME, outcome)]);
498 span.set_attribute(attrs::kv_i64(
499 attrs::SDK_TOTAL_TURNS,
500 i64::try_from(total_turns).unwrap_or(0),
501 ));
502 // Usage is optional: the daemon does not aggregate per-turn token
503 // usage across the suspend/resume hop, so it passes `None` and the
504 // per-call `chat` spans carry the authoritative usage. The in-process
505 // loop passes the cumulative `TokenUsage`.
506 if let Some(usage) = total_usage {
507 span.set_attribute(attrs::kv_i64(
508 attrs::GEN_AI_USAGE_INPUT_TOKENS,
509 i64::from(usage.input_tokens),
510 ));
511 span.set_attribute(attrs::kv_i64(
512 attrs::GEN_AI_USAGE_OUTPUT_TOKENS,
513 i64::from(usage.output_tokens),
514 ));
515 span.set_attribute(attrs::kv_i64(
516 attrs::GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS,
517 i64::from(usage.cached_input_tokens),
518 ));
519 span.set_attribute(attrs::kv_i64(
520 attrs::GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS,
521 i64::from(usage.cache_creation_input_tokens),
522 ));
523 }
524 span.set_attribute(KeyValue::new(attrs::SDK_OUTCOME, outcome));
525 if outcome == "error" {
526 spans::set_span_error(span, "agent_error", "agent invocation failed");
527 }
528 span.end();
529}
530
531// ── W3C traceparent ⇄ Context propagation ────────────────────────────
532
533/// W3C `traceparent` header key. The only carrier entry the daemon
534/// needs to persist a parent span context durably (on `AgentTask`) and
535/// rebuild it in a later task / process.
536const TRACEPARENT_KEY: &str = "traceparent";
537
538/// Rebuild an `OTel` [`Context`] from a persisted W3C `traceparent`.
539///
540/// The daemon decouples the gRPC call that submits work from the worker
541/// that runs it (a durable task queue, possibly a different process), so
542/// the inbound client trace context cannot ride an in-memory
543/// [`Context`]. It is persisted as a `traceparent` string on the task
544/// and rebuilt here via the globally-installed
545/// [`opentelemetry::propagation::TextMapPropagator`]. Spans started while
546/// the returned context is attached become children of the encoded span.
547///
548/// Returns `None` when `traceparent` is empty, malformed, or no
549/// propagator is installed (the extracted context then carries no valid
550/// span) — the caller leaves its span un-parented, exactly as before
551/// this wiring.
552#[must_use]
553pub fn context_from_traceparent(traceparent: &str) -> Option<Context> {
554 if traceparent.is_empty() {
555 return None;
556 }
557 let mut carrier = HashMap::with_capacity(1);
558 carrier.insert(TRACEPARENT_KEY.to_string(), traceparent.to_string());
559 let cx = global::get_text_map_propagator(|propagator| propagator.extract(&carrier));
560 if cx.span().span_context().is_valid() {
561 Some(cx)
562 } else {
563 None
564 }
565}
566
567/// Encode hex trace + span ids into a W3C `traceparent` string.
568///
569/// Used to stamp a child tool task's parent span: the root
570/// `invoke_agent` span's `(trace_id, span_id)` — persisted on the turn
571/// attempt — is encoded here and stored on the child task's
572/// `otel_traceparent` so the child's `execute_tool` span nests under the
573/// turn root. Returns `None` for malformed / zero ids (validated via
574/// [`spans::remote_span_context`]). The `SAMPLED` flag is always set, to
575/// match the remote context the daemon reconstructs.
576#[must_use]
577pub fn traceparent_from_ids(trace_id_hex: &str, span_id_hex: &str) -> Option<String> {
578 let span_context = spans::remote_span_context(trace_id_hex, span_id_hex)?;
579 Some(format!(
580 "00-{}-{}-01",
581 span_context.trace_id(),
582 span_context.span_id()
583 ))
584}
585
586// ── Live root-span registry (correct cross-task duration) ────────────
587
588/// Process-global home for the live root-turn span between the task that
589/// opens it and the (possibly later) task that finalizes it.
590///
591/// The daemon drives one turn across several tokio tasks
592/// (execute → suspend at the tool boundary → resume), and `OTel` 0.32
593/// offers no way to assign a span id, so a root span cannot be re-minted
594/// on resume to cover the whole turn. Instead the worker stashes the
595/// *live* `invoke_agent` span here at the fresh turn and finalizes it at
596/// the terminal — even though that runs in a different task — so the span
597/// carries the **full** turn duration (it legitimately stays open while
598/// tools run). Children never read this map; they nest via the ids
599/// persisted on the turn attempt (see [`remote_parent_context`]), so the
600/// tree is correct regardless of whether the live span survives.
601///
602/// Keyed by `AgentTask` id (stringified). An entry that is never
603/// finalized — the daemon crashed/restarted mid-turn, so a fresh process
604/// owns the resume — simply leaks its (small) span object until process
605/// exit; that turn's root span is the only one missing its terminal
606/// finalize, and its children still nest correctly.
607static LIVE_ROOT_SPANS: LazyLock<Mutex<HashMap<String, BoxedSpan>>> =
608 LazyLock::new(|| Mutex::new(HashMap::new()));
609
610/// Stash the live root-turn `span` under `task_id` so a later task can
611/// finalize it with the correct full duration (see `LIVE_ROOT_SPANS`).
612///
613/// First write wins: a retry that re-opens the same turn keeps the
614/// original span (and its start time) rather than resetting the clock.
615pub fn stash_root_turn_span(task_id: &str, span: BoxedSpan) {
616 let Ok(mut spans) = LIVE_ROOT_SPANS.lock() else {
617 log::warn!("live root-span registry poisoned; dropping root span for task {task_id}");
618 return;
619 };
620 spans.entry(task_id.to_string()).or_insert(span);
621}
622
623/// Finalize and end the stashed root-turn span for `task_id`, stamping
624/// run-outcome + usage attributes (see [`finish_root_turn_span`]).
625///
626/// No-op when no span is stashed — the expected path when the daemon
627/// restarted mid-turn and a fresh process owns the terminal. The outcome
628/// counter is still recorded in that case so dashboards see every run.
629pub fn finalize_root_turn_span(
630 task_id: &str,
631 total_turns: usize,
632 total_usage: Option<&TokenUsage>,
633 outcome: &'static str,
634) {
635 let stashed = LIVE_ROOT_SPANS
636 .lock()
637 .ok()
638 .and_then(|mut spans| spans.remove(task_id));
639 match stashed {
640 Some(mut span) => finish_root_turn_span(&mut span, total_turns, total_usage, outcome),
641 None => {
642 // Live span lost (cross-restart resume). Still record the
643 // run-outcome counter so the metric isn't undercounted.
644 Metrics::global()
645 .runs_outcome
646 .add(1, &[KeyValue::new(attrs::SDK_OUTCOME, outcome)]);
647 }
648 }
649}
650
651/// Drop the stashed root-turn span for `task_id` without finalizing.
652///
653/// For paths that abandon a turn without a meaningful outcome (e.g. an
654/// idempotent duplicate-suspension bail) so the registry doesn't leak.
655pub fn discard_root_turn_span(task_id: &str) {
656 if let Ok(mut spans) = LIVE_ROOT_SPANS.lock() {
657 drop(spans.remove(task_id));
658 }
659}
660
661#[cfg(test)]
662mod tests {
663 use super::{
664 discard_root_turn_span, finalize_root_turn_span, spans, stash_root_turn_span,
665 traceparent_from_ids,
666 };
667
668 // W3C example ids (RFC trace-context), both non-zero / valid.
669 const TRACE_HEX: &str = "4bf92f3577b34da6a3ce929d0e0e4736";
670 const SPAN_HEX: &str = "00f067aa0ba902b7";
671
672 #[test]
673 fn traceparent_from_valid_ids_is_w3c_sampled() {
674 let traceparent = traceparent_from_ids(TRACE_HEX, SPAN_HEX).expect("valid ids");
675 assert_eq!(traceparent, format!("00-{TRACE_HEX}-{SPAN_HEX}-01"));
676 }
677
678 #[test]
679 fn traceparent_from_malformed_or_zero_ids_is_none() {
680 assert!(traceparent_from_ids("not-hex", SPAN_HEX).is_none());
681 assert!(traceparent_from_ids(TRACE_HEX, "tooshort").is_none());
682 // All-zero ids are rejected by `SpanContext::is_valid`.
683 assert!(traceparent_from_ids(&"0".repeat(32), &"0".repeat(16)).is_none());
684 }
685
686 #[test]
687 fn live_root_span_registry_finalize_is_idempotent() {
688 // No provider installed → a no-op span, but the registry's map
689 // management (stash → finalize removes the entry) is exercised
690 // regardless, and a second finalize must not panic.
691 let task_id = "registry-roundtrip-task";
692 stash_root_turn_span(task_id, spans::start_internal_span("test", Vec::new()));
693 finalize_root_turn_span(task_id, 1, None, "done");
694 finalize_root_turn_span(task_id, 1, None, "done");
695 }
696
697 #[test]
698 fn discard_removes_stashed_span_without_finalize() {
699 let task_id = "registry-discard-task";
700 stash_root_turn_span(task_id, spans::start_internal_span("test", Vec::new()));
701 discard_root_turn_span(task_id);
702 // Finalize after discard is a no-op (entry already gone).
703 finalize_root_turn_span(task_id, 0, None, "cancelled");
704 }
705}