Skip to main content

defect_obs/langfuse/
projector.rs

1//! Translation of `AgentEvent` into Langfuse ingestion events.
2//!
3//! [`TraceProjector`] is a **stateful, per-session** projector (isomorphic to
4//! `defect-storage`'s `RecordProjector`). The main loop calls [`TraceProjector::project`]
5//! for each incoming [`AgentEvent`] and returns 0..N [`IngestionEvent`]s to the reporter.
6//!
7//! ## Hierarchy (one trace per turn)
8//!
9//! ```text
10//! trace (turn)
11//! └── step (span)                 One per turn: one llm_call + the tools it triggers
12//!     ├── llm_call (generation)
13//!     └── tool (span)             Sibling of llm_call under the same step
14//!         └── (spawn_agent) → subagent (span)
15//!             └── step (span)     Recursive isomorphic structure inside subagent
16//!                 ├── llm_call
17//!                 └── tool / spawn_agent → subagent → ...
18//! ```
19//!
20//! Key point: the **step span** is the container for each turn. Both `llm_call`
21//! (generation) and the tools triggered in that turn hang under it as siblings. This
22//! makes the generation duration reflect **pure LLM call** time (it ends at
23//! `LlmCallFinished`, no longer delayed to the next turn or wrapping tool execution
24//! time); tool duration lives inside the step.
25//!
26//! ## Recursive subagent (flattened ancestor_path)
27//!
28//! [`AgentEvent::Subagent`] carries an `ancestor_path` (a chain of `ToolCallId`s from the
29//! top-level `spawn_agent` tool call to the current layer), and `inner` is always a leaf
30//! event. The projector **deterministically** derives all span ids from this chain, so no
31//! per-layer anchoring is needed; arbitrary depth is handled uniformly. Each subagent
32//! layer is an independent **scope** (sharing the same step/gen/tool projection logic as
33//! the top-level turn).
34//!
35//! ## ID strategy
36//!
37//! - **traceId**: Generated once with `Uuid::new_v4()` at `TurnStarted`, reused within
38//!   the turn. **Must not** use an auto-incrementing `{session}-turn-{seq}` — resuming
39//!   would cause id collisions.
40//! - **scope prefix**: Top-level = `{trace}`; subagent path `[A,B]` =
41//!   `{trace}-sub-A-sub-B`. The subagent span's id **is** its scope prefix.
42//! - **step / generation / tool span id**: Derived from the scope prefix + sequence
43//!   number / `ToolCallId`, globally unique and deterministic — the parent of a subagent
44//!   span (the tool span that spawned it) is computed directly from the path.
45//! - **anchor**: The only state that needs to be stored is the **top-level `spawn_agent`
46//!   tool call id → trace_id** (trace_id is random and not derivable). All subagents in
47//!   the same turn share that trace_id, so only the top-level anchor is needed.
48//! - **envelope id**: One `Uuid::new_v4()` per ingestion event, for Langfuse
49//!   deduplication.
50//!
51//! ## Timestamps
52//!
53//! `AgentEvent` carries no timestamps; the caller passes `now` (an RFC3339 string). The
54//! projector does not read the clock itself, making it easier to test and deterministic.
55
56use std::collections::HashMap;
57
58use agent_client_protocol_schema::{
59    ContentBlock, StopReason, ToolCallStatus, ToolCallUpdateFields,
60};
61use defect_agent::event::{AgentEvent, LlmRequestSnapshot};
62use defect_agent::llm::{Message, MessageContent, Role, Usage};
63
64use super::model::{EventKind, IngestionEvent, ObservationBody, ObservationLevel, TraceBody};
65
66/// Deployment environment label (written into the `environment` field of traces and
67/// observations).
68const DEFAULT_ENVIRONMENT: &str = "production";
69/// The Langfuse trace name for each agent turn.
70const TRACE_NAME: &str = "turn";
71/// The container span name for each turn (one `llm_call` plus the tools it triggers).
72const STEP_NAME: &str = "step";
73/// The name of the Langfuse generation that corresponds to an LLM call.
74const GENERATION_NAME: &str = "llm_call";
75/// The wire-level name of the `spawn_agent` tool. The canonical source is
76/// `defect_agent::tool::spawn_agent::SPAWN_AGENT_TOOL_NAME` (which is `pub(crate)` and
77/// thus inaccessible across crates),
78/// so this is a copy of the wire name — the projector uses it to anchor **top-level**
79/// `spawn_agent` tool calls to a `trace_id`.
80const SPAWN_AGENT_TOOL_NAME: &str = "spawn_agent";
81/// The name prefix for a subagent's own span (the layer separate from the tool span that
82/// spawned it).
83const SUBAGENT_SPAN_NAME: &str = "subagent";
84
85/// Per-session projection state.
86pub struct TraceProjector {
87    session_id: String,
88    /// Metadata for the current **top-level** turn; `None` when not inside a turn (before
89    /// `TurnStarted` / after `TurnEnded`). Note that subagent events may still arrive (in
90    /// the background) after the top-level turn ends, at which point `turn` is `None`,
91    /// but the subagent scope remains alive and the trace_id is retrieved via
92    /// [`Self::anchors`].
93    turn: Option<TurnMeta>,
94    /// Temporarily stores the user prompt text. The main loop emits `UserPromptCommitted`
95    /// **before** `TurnStarted`, so when the prompt arrives the turn has not been created
96    /// yet — it is stashed here and consumed when `TurnStarted` builds the turn.
97    pending_input: Option<String>,
98    /// Top-level `spawn_agent` tool call id → its owning trace_id. Subagent events look
99    /// up the trace_id via `ancestor_path[0]` (trace_id is random and not derivable; all
100    /// nested subagents in the same turn share this trace_id, so only the top-level hop
101    /// is anchored). Cleared when the subagent (path length 1) ends.
102    anchors: HashMap<String, String>,
103    /// All active scopes: `scope prefix` → state. **Session-level** — top-level turn
104    /// scopes (prefix = `trace_id`) coexist with subagent scopes (prefix =
105    /// `{trace}-sub-...`). Subagent scopes may survive across turn boundaries
106    /// (background), so they are not cleared with the turn; each is removed when its
107    /// corresponding `TurnEnded` fires.
108    scopes: HashMap<String, ScopeState>,
109}
110
111/// Metadata for the current top-level turn (trace-level; does not include step/gen/tool
112/// projection state — those live in `scopes[trace_id]`, which is isomorphic to a subagent
113/// scope).
114struct TurnMeta {
115    trace_id: String,
116    /// The user prompt text, written into the trace input.
117    input: Option<String>,
118    /// The final assistant text for the entire turn (written into the trace output).
119    final_output: String,
120}
121
122/// Projection state for a scope (top-level turn or a subagent layer) of steps,
123/// generations, and tools.
124///
125/// Top-level and subagent scopes share this structure — this is the observability-side
126/// manifestation of "a subagent is just an agent with a parent": the same step container,
127/// generation, and tool span logic, differing only in the mount point (`step_parent`) and
128/// id prefix (`prefix`).
129struct ScopeState {
130    /// ID prefix: top-level = `{trace}`; subagent = `{trace}-sub-A-sub-B`.
131    /// For a subagent scope, `prefix` is also the subagent span's id.
132    prefix: String,
133    /// The parent observation for step spans in this scope: top-level = `None` (attached
134    /// directly to the trace); subagent = `Some(subagent span id)` = `Some(prefix)`.
135    step_parent: Option<String>,
136    /// The currently active step span id (`None` = no `llm_call` yet).
137    current_step_id: Option<String>,
138    /// The sequence number of this step, used to derive the step id.
139    step_seq: u32,
140    /// The current in-progress generation within this step.
141    current_gen: Option<PendingGeneration>,
142    /// Tool call ID → assigned span ID (pairing Started/Finished events).
143    tool_spans: HashMap<String, String>,
144}
145
146/// Accumulated state for an in-progress generation. Flushed into a single
147/// generation-update when finalized by `LlmCallFinished`.
148struct PendingGeneration {
149    id: String,
150    parent_step_id: String,
151    model: String,
152    /// Accumulated assistant reply text.
153    output: String,
154    /// Accumulated thinking text (stored in generation's `metadata.reasoning`, not in
155    /// `output`).
156    thinking: String,
157    /// Token usage for this call (from `LlmCallFinished.usage`).
158    usage: Usage,
159    /// Error message (from `LlmCallFinished.error`).
160    error: Option<String>,
161}
162
163impl ScopeState {
164    fn new(prefix: String, step_parent: Option<String>) -> Self {
165        Self {
166            prefix,
167            step_parent,
168            current_step_id: None,
169            step_seq: 0,
170            current_gen: None,
171            tool_spans: HashMap::new(),
172        }
173    }
174}
175
176impl TraceProjector {
177    /// Creates a new per-session projector.
178    pub fn new(session_id: impl Into<String>) -> Self {
179        Self {
180            session_id: session_id.into(),
181            turn: None,
182            pending_input: None,
183            anchors: HashMap::new(),
184            scopes: HashMap::new(),
185        }
186    }
187
188    /// Translates an event into 0..N ingestion events. `now` is an RFC3339 timestamp.
189    /// `new_id` supplies a unique id (envelope id / trace id) — injected for
190    /// deterministic testing.
191    pub fn project(
192        &mut self,
193        event: AgentEvent,
194        now: &str,
195        new_id: &mut dyn FnMut() -> String,
196    ) -> Vec<IngestionEvent> {
197        match event {
198            AgentEvent::TurnStarted => self.on_turn_started(now, new_id),
199            AgentEvent::UserPromptCommitted { content } => {
200                self.on_user_prompt(&content);
201                Vec::new()
202            }
203            AgentEvent::LlmCallStarted {
204                model,
205                attempt,
206                request,
207            } => self.on_top_llm_started(model, attempt, request.as_ref(), now, new_id),
208            AgentEvent::AssistantText { content } => {
209                self.accumulate_top_text(&content);
210                Vec::new()
211            }
212            AgentEvent::AssistantThought { content } => {
213                self.accumulate_top_thinking(&content);
214                Vec::new()
215            }
216            AgentEvent::LlmCallFinished { usage, error, .. } => {
217                self.on_top_llm_finished(usage, error, now, new_id)
218            }
219            AgentEvent::ToolCallStarted { id, name, fields } => {
220                self.on_top_tool_started(id.to_string(), name, fields.raw_input, now, new_id)
221            }
222            AgentEvent::ToolCallFinished { id, fields } => {
223                self.on_top_tool_finished(&id.to_string(), &fields, now, new_id)
224            }
225            AgentEvent::ContextCompressed {
226                tokens_before,
227                tokens_after,
228            } => self.on_context_compressed(tokens_before, tokens_after, None, now, new_id),
229            AgentEvent::ContextMicrocompacted {
230                tokens_before,
231                tokens_after,
232                cleared,
233            } => {
234                self.on_context_compressed(tokens_before, tokens_after, Some(cleared), now, new_id)
235            }
236            AgentEvent::TurnEnded { reason, usage } => {
237                self.on_turn_ended(reason, usage, now, new_id)
238            }
239            AgentEvent::Subagent {
240                ancestor_path,
241                agent_type,
242                inner,
243            } => {
244                let path: Vec<String> = ancestor_path.iter().map(ToString::to_string).collect();
245                self.on_subagent(&path, agent_type, *inner, now, new_id)
246            }
247            // Do not report: progress increments and permission audits (not included in
248            // langfuse for this release).
249            AgentEvent::ToolCallProgress { .. }
250            | AgentEvent::PolicyDecision { .. }
251            | AgentEvent::PermissionResolved { .. } => Vec::new(),
252            _ => Vec::new(),
253        }
254    }
255
256    // ---- Top-level turn events ----
257
258    fn on_turn_started(
259        &mut self,
260        now: &str,
261        new_id: &mut dyn FnMut() -> String,
262    ) -> Vec<IngestionEvent> {
263        let trace_id = new_id();
264        let input = self.pending_input.take();
265        let body = TraceBody {
266            id: trace_id.clone(),
267            name: Some(TRACE_NAME.into()),
268            session_id: Some(self.session_id.clone()),
269            // Include input at trace creation so the UI can immediately display user
270            // input without waiting for TurnEnded.
271            input: input.clone().map(serde_json::Value::String),
272            environment: Some(DEFAULT_ENVIRONMENT.into()),
273            timestamp: Some(now.to_string()),
274            ..Default::default()
275        };
276        // Top-level scope: prefix is `trace_id`, steps are attached directly to the trace
277        // (`step_parent` is `None`).
278        self.scopes
279            .insert(trace_id.clone(), ScopeState::new(trace_id.clone(), None));
280        self.turn = Some(TurnMeta {
281            trace_id: trace_id.clone(),
282            input,
283            final_output: String::new(),
284        });
285        vec![IngestionEvent::trace(
286            new_id(),
287            now.to_string(),
288            EventKind::TraceCreate,
289            &body,
290        )]
291    }
292
293    fn on_user_prompt(&mut self, content: &[ContentBlock]) {
294        let text = content_text(content);
295        if !text.is_empty() {
296            self.pending_input = Some(text);
297        }
298    }
299
300    fn on_top_llm_started(
301        &mut self,
302        model: String,
303        attempt: u32,
304        request: &LlmRequestSnapshot,
305        now: &str,
306        new_id: &mut dyn FnMut() -> String,
307    ) -> Vec<IngestionEvent> {
308        let Some(trace_id) = self.turn.as_ref().map(|t| t.trace_id.clone()) else {
309            return Vec::new();
310        };
311        let Some(scope) = self.scopes.get_mut(&trace_id) else {
312            return Vec::new();
313        };
314        scope_llm_started(scope, &trace_id, model, attempt, request, now, new_id)
315    }
316
317    fn accumulate_top_text(&mut self, content: &ContentBlock) {
318        if let ContentBlock::Text(text) = content
319            && let Some(turn) = self.turn.as_mut()
320        {
321            turn.final_output.push_str(&text.text);
322            let trace_id = turn.trace_id.clone();
323            if let Some(scope) = self.scopes.get_mut(&trace_id)
324                && let Some(pg) = scope.current_gen.as_mut()
325            {
326                pg.output.push_str(&text.text);
327            }
328        }
329    }
330
331    fn accumulate_top_thinking(&mut self, content: &ContentBlock) {
332        if let ContentBlock::Text(text) = content
333            && let Some(trace_id) = self.turn.as_ref().map(|t| t.trace_id.clone())
334            && let Some(scope) = self.scopes.get_mut(&trace_id)
335            && let Some(pg) = scope.current_gen.as_mut()
336        {
337            pg.thinking.push_str(&text.text);
338        }
339    }
340
341    fn on_top_llm_finished(
342        &mut self,
343        usage: Usage,
344        error: Option<String>,
345        now: &str,
346        new_id: &mut dyn FnMut() -> String,
347    ) -> Vec<IngestionEvent> {
348        let Some(trace_id) = self.turn.as_ref().map(|t| t.trace_id.clone()) else {
349            return Vec::new();
350        };
351        let Some(scope) = self.scopes.get_mut(&trace_id) else {
352            return Vec::new();
353        };
354        note_llm_finished(scope, usage, error);
355        flush_generation(scope, &trace_id, now, new_id)
356    }
357
358    fn on_top_tool_started(
359        &mut self,
360        tool_call_id: String,
361        name: String,
362        raw_input: Option<serde_json::Value>,
363        now: &str,
364        new_id: &mut dyn FnMut() -> String,
365    ) -> Vec<IngestionEvent> {
366        let Some(trace_id) = self.turn.as_ref().map(|t| t.trace_id.clone()) else {
367            return Vec::new();
368        };
369        // Top-level `spawn_agent` tool call: anchor the `trace_id` so that later subagent
370        // events (including background and cross-turn) can retrieve it via
371        // `ancestor_path[0]`.
372        if name == SPAWN_AGENT_TOOL_NAME {
373            self.anchors.insert(tool_call_id.clone(), trace_id.clone());
374        }
375        let Some(scope) = self.scopes.get_mut(&trace_id) else {
376            return Vec::new();
377        };
378        scope_tool_started(
379            scope,
380            &trace_id,
381            &tool_call_id,
382            name,
383            raw_input,
384            now,
385            new_id,
386        )
387    }
388
389    fn on_top_tool_finished(
390        &mut self,
391        tool_call_id: &str,
392        fields: &ToolCallUpdateFields,
393        now: &str,
394        new_id: &mut dyn FnMut() -> String,
395    ) -> Vec<IngestionEvent> {
396        let Some(trace_id) = self.turn.as_ref().map(|t| t.trace_id.clone()) else {
397            return Vec::new();
398        };
399        let Some(scope) = self.scopes.get_mut(&trace_id) else {
400            return Vec::new();
401        };
402        scope_tool_finished(scope, &trace_id, tool_call_id, fields, now, new_id)
403    }
404
405    /// When `cleared` is `Some`, a micro-compression (clearing `tool_result` without LLM
406    /// involvement) is performed; when `None`, a full summary compression is done. Both
407    /// produce structurally identical observations, differing only in `name`/`metadata`.
408    /// Compression is a turn-level, cross-step operation (not part of any single
409    /// `llm_call`), so it is attached directly to the trace (no parent).
410    fn on_context_compressed(
411        &mut self,
412        tokens_before: u64,
413        tokens_after: u64,
414        cleared: Option<usize>,
415        now: &str,
416        new_id: &mut dyn FnMut() -> String,
417    ) -> Vec<IngestionEvent> {
418        let Some(trace_id) = self.turn.as_ref().map(|t| t.trace_id.clone()) else {
419            return Vec::new();
420        };
421        let mut meta = serde_json::Map::new();
422        meta.insert("tokens_before".into(), tokens_before.into());
423        meta.insert("tokens_after".into(), tokens_after.into());
424        if let Some(cleared) = cleared {
425            meta.insert("cleared_tool_results".into(), cleared.into());
426        }
427        let name = if cleared.is_some() {
428            "context_microcompaction"
429        } else {
430            "context_compaction"
431        };
432        let body = ObservationBody {
433            id: new_id(),
434            trace_id,
435            name: Some(name.into()),
436            start_time: Some(now.to_string()),
437            metadata: Some(serde_json::Value::Object(meta)),
438            environment: Some(DEFAULT_ENVIRONMENT.into()),
439            ..Default::default()
440        };
441        vec![IngestionEvent::observation(
442            new_id(),
443            now.to_string(),
444            EventKind::EventCreate,
445            &body,
446        )]
447    }
448
449    fn on_turn_ended(
450        &mut self,
451        reason: StopReason,
452        usage: Usage,
453        now: &str,
454        new_id: &mut dyn FnMut() -> String,
455    ) -> Vec<IngestionEvent> {
456        let Some(turn) = self.turn.take() else {
457            return Vec::new();
458        };
459        let trace_id = turn.trace_id.clone();
460        let mut events = Vec::new();
461        // Finalize the top-level scope: flush any in-flight generation, close the current
462        // step, then remove the scope.
463        if let Some(mut scope) = self.scopes.remove(&trace_id) {
464            events.extend(flush_generation(&mut scope, &trace_id, now, new_id));
465            events.extend(close_current_step(&mut scope, &trace_id, now, new_id));
466        }
467
468        let mut meta = serde_json::Map::new();
469        meta.insert(
470            "stop_reason".into(),
471            serde_json::to_value(reason).unwrap_or(serde_json::Value::Null),
472        );
473        if let Some(details) = usage_to_details(&usage) {
474            meta.insert("usage".into(), serde_json::Value::Object(details));
475        }
476        let body = TraceBody {
477            id: trace_id,
478            name: Some(TRACE_NAME.into()),
479            session_id: Some(self.session_id.clone()),
480            input: turn.input.map(serde_json::Value::String),
481            output: (!turn.final_output.is_empty())
482                .then_some(serde_json::Value::String(turn.final_output)),
483            metadata: Some(serde_json::Value::Object(meta)),
484            timestamp: Some(now.to_string()),
485            ..Default::default()
486        };
487        events.push(IngestionEvent::trace(
488            new_id(),
489            now.to_string(),
490            // A second send with the same trace_id acts as an update (merging
491            // endTime/output/metadata).
492            EventKind::TraceCreate,
493            &body,
494        ));
495        events
496    }
497
498    // ---- subagent events (arbitrary depth, homogeneous) ----
499
500    /// Processes a **leaf** event of a subagent child turn (the `inner` after unwrapping
501    /// `AgentEvent::Subagent`).
502    ///
503    /// `path` is the chain of `ToolCallId`s from the top-level `spawn_agent` tool call
504    /// down to the current layer. The `trace_id` is retrieved via `anchors[path[0]]`; the
505    /// scope prefix and the parent of the subagent span (the tool span that spawned it)
506    /// are deterministically derived from `path`. On first encounter with a given `path`,
507    /// lazily create its subagent span and scope; subsequent `inner` events are
508    /// dispatched into that scope (reusing the same step/gen/tool logic as the top
509    /// level).
510    ///
511    /// If no top-level anchor is found (the `spawn_agent` tool call was never seen), the
512    /// event is dropped — no orphans are created.
513    fn on_subagent(
514        &mut self,
515        path: &[String],
516        agent_type: String,
517        inner: AgentEvent,
518        now: &str,
519        new_id: &mut dyn FnMut() -> String,
520    ) -> Vec<IngestionEvent> {
521        let Some(first) = path.first() else {
522            return Vec::new();
523        };
524        let Some(trace_id) = self.anchors.get(first).cloned() else {
525            // No top-level anchor: this `spawn_agent` tool call has never been seen
526            // before — drop it, don't create an orphan.
527            return Vec::new();
528        };
529        let prefix = scope_prefix(&trace_id, path);
530
531        let mut events = Vec::new();
532        // First time seeing this path: lazily create a dedicated subagent span (parent =
533        // the tool span that initiated it, derived from path).
534        if !self.scopes.contains_key(&prefix) {
535            let parent_tool = parent_tool_span_id(&trace_id, path);
536            let mut meta = serde_json::Map::new();
537            meta.insert("agent_type".into(), agent_type.clone().into());
538            let body = ObservationBody {
539                id: prefix.clone(),
540                trace_id: trace_id.clone(),
541                parent_observation_id: Some(parent_tool),
542                name: Some(format!("{SUBAGENT_SPAN_NAME}:{agent_type}")),
543                start_time: Some(now.to_string()),
544                metadata: Some(serde_json::Value::Object(meta)),
545                environment: Some(DEFAULT_ENVIRONMENT.into()),
546                ..Default::default()
547            };
548            events.push(IngestionEvent::observation(
549                new_id(),
550                now.to_string(),
551                EventKind::SpanCreate,
552                &body,
553            ));
554            // Subagent scope: the step is attached under this subagent span (= prefix).
555            self.scopes.insert(
556                prefix.clone(),
557                ScopeState::new(prefix.clone(), Some(prefix.clone())),
558            );
559        }
560        let scope = self
561            .scopes
562            .get_mut(&prefix)
563            .expect("subagent scope just ensured");
564
565        match inner {
566            AgentEvent::LlmCallStarted {
567                model,
568                attempt,
569                request,
570            } => {
571                events.extend(scope_llm_started(
572                    scope,
573                    &trace_id,
574                    model,
575                    attempt,
576                    request.as_ref(),
577                    now,
578                    new_id,
579                ));
580            }
581            AgentEvent::AssistantText { content } => {
582                if let (ContentBlock::Text(text), Some(pg)) = (&content, scope.current_gen.as_mut())
583                {
584                    pg.output.push_str(&text.text);
585                }
586            }
587            AgentEvent::AssistantThought { content } => {
588                if let (ContentBlock::Text(text), Some(pg)) = (&content, scope.current_gen.as_mut())
589                {
590                    pg.thinking.push_str(&text.text);
591                }
592            }
593            AgentEvent::LlmCallFinished { usage, error, .. } => {
594                note_llm_finished(scope, usage, error);
595                events.extend(flush_generation(scope, &trace_id, now, new_id));
596            }
597            AgentEvent::ToolCallStarted { id, name, fields } => {
598                events.extend(scope_tool_started(
599                    scope,
600                    &trace_id,
601                    &id.to_string(),
602                    name,
603                    fields.raw_input,
604                    now,
605                    new_id,
606                ));
607            }
608            AgentEvent::ToolCallFinished { id, fields } => {
609                events.extend(scope_tool_finished(
610                    scope,
611                    &trace_id,
612                    &id.to_string(),
613                    &fields,
614                    now,
615                    new_id,
616                ));
617            }
618            // Sub-turn ended: finalize the in-progress generation, close the current
619            // step, close the subagent span, and clear the session-level scope; also
620            // clear the anchor for the top-level hop (path length 1).
621            AgentEvent::TurnEnded { .. } => {
622                events.extend(flush_generation(scope, &trace_id, now, new_id));
623                events.extend(close_current_step(scope, &trace_id, now, new_id));
624                let subagent_span_id = scope.prefix.clone();
625                let body = ObservationBody {
626                    id: subagent_span_id,
627                    trace_id: trace_id.clone(),
628                    end_time: Some(now.to_string()),
629                    ..Default::default()
630                };
631                events.push(IngestionEvent::observation(
632                    new_id(),
633                    now.to_string(),
634                    EventKind::SpanUpdate,
635                    &body,
636                ));
637                self.scopes.remove(&prefix);
638                if path.len() == 1 {
639                    self.anchors.remove(first);
640                }
641            }
642            // Remaining sub-turn events (TurnStarted, UserPromptCommitted, progress,
643            // audit) are not reported individually.
644            _ => {}
645        }
646        events
647    }
648}
649
650// ---- scope generic projection (shared by top-level turn and subagent) ----
651
652/// LLM call started: finalize the previous step (if any) → open a new step → create a
653/// generation under the new step.
654fn scope_llm_started(
655    scope: &mut ScopeState,
656    trace_id: &str,
657    model: String,
658    attempt: u32,
659    request: &LlmRequestSnapshot,
660    now: &str,
661    new_id: &mut dyn FnMut() -> String,
662) -> Vec<IngestionEvent> {
663    // Defensive: the previous generation should already have been finalized by its
664    // `LlmCallFinished` (generation duration = pure LLM time); if it is still active,
665    // flush it first to ensure `create` precedes `update`.
666    let mut events = flush_generation(scope, trace_id, now, new_id);
667    // Close the previous step (which includes the last llm_call and the tools triggered
668    // in that round).
669    events.extend(close_current_step(scope, trace_id, now, new_id));
670
671    // Start a new step.
672    scope.step_seq += 1;
673    let step_id = format!("{}-step-{}", scope.prefix, scope.step_seq);
674    scope.current_step_id = Some(step_id.clone());
675    let step_body = ObservationBody {
676        id: step_id.clone(),
677        trace_id: trace_id.to_string(),
678        parent_observation_id: scope.step_parent.clone(),
679        name: Some(STEP_NAME.into()),
680        start_time: Some(now.to_string()),
681        environment: Some(DEFAULT_ENVIRONMENT.into()),
682        ..Default::default()
683    };
684    events.push(IngestionEvent::observation(
685        new_id(),
686        now.to_string(),
687        EventKind::SpanCreate,
688        &step_body,
689    ));
690
691    // Attach the generation under the new step.
692    let gen_id = format!("{step_id}-gen");
693    scope.current_gen = Some(PendingGeneration {
694        id: gen_id.clone(),
695        parent_step_id: step_id.clone(),
696        model: model.clone(),
697        output: String::new(),
698        thinking: String::new(),
699        usage: Usage::default(),
700        error: None,
701    });
702    let mut meta = serde_json::Map::new();
703    meta.insert("attempt".into(), attempt.into());
704    let gen_body = ObservationBody {
705        id: gen_id,
706        trace_id: trace_id.to_string(),
707        parent_observation_id: Some(step_id),
708        name: Some(GENERATION_NAME.into()),
709        model: Some(model),
710        start_time: Some(now.to_string()),
711        // input is the standard chat messages array, with system as the first entry
712        // {role:"system"}.
713        input: Some(request_to_input(request)),
714        metadata: Some(serde_json::Value::Object(meta)),
715        environment: Some(DEFAULT_ENVIRONMENT.into()),
716        ..Default::default()
717    };
718    events.push(IngestionEvent::observation(
719        new_id(),
720        now.to_string(),
721        EventKind::GenerationCreate,
722        &gen_body,
723    ));
724    events
725}
726
727/// Record usage/error from `LlmCallFinished` into the current generation (written out at
728/// finalization).
729fn note_llm_finished(scope: &mut ScopeState, usage: Usage, error: Option<String>) {
730    if let Some(pg) = scope.current_gen.as_mut() {
731        pg.usage = usage;
732        if error.is_some() {
733            pg.error = error;
734        }
735    }
736}
737
738/// Finalize the current generation: output, thinking, usage, and endTime are written into
739/// a generation-update event. No-op if there is no ongoing generation. Called from
740/// `LlmCallFinished` (on the success path, after the stream has been drained and
741/// output/thinking are fully collected) — the generation duration covers only the pure
742/// LLM call, excluding tool execution.
743fn flush_generation(
744    scope: &mut ScopeState,
745    trace_id: &str,
746    now: &str,
747    new_id: &mut dyn FnMut() -> String,
748) -> Vec<IngestionEvent> {
749    let Some(pg) = scope.current_gen.take() else {
750        return Vec::new();
751    };
752    let mut meta = serde_json::Map::new();
753    if !pg.thinking.is_empty() {
754        // There is no dedicated ingestion field for thinking/reasoning; store it in
755        // metadata to avoid polluting output.
756        meta.insert("reasoning".into(), serde_json::Value::String(pg.thinking));
757    }
758    let body = ObservationBody {
759        id: pg.id,
760        trace_id: trace_id.to_string(),
761        parent_observation_id: Some(pg.parent_step_id),
762        name: Some(GENERATION_NAME.into()),
763        model: Some(pg.model),
764        end_time: Some(now.to_string()),
765        output: (!pg.output.is_empty()).then_some(serde_json::Value::String(pg.output)),
766        usage_details: usage_to_details(&pg.usage),
767        metadata: (!meta.is_empty()).then_some(serde_json::Value::Object(meta)),
768        level: pg.error.as_ref().map(|_| ObservationLevel::Error),
769        status_message: pg.error,
770        ..Default::default()
771    };
772    vec![IngestionEvent::observation(
773        new_id(),
774        now.to_string(),
775        EventKind::GenerationUpdate,
776        &body,
777    )]
778}
779
780/// Close the current step span (write `end_time`). No-op if no step is in progress.
781fn close_current_step(
782    scope: &mut ScopeState,
783    trace_id: &str,
784    now: &str,
785    new_id: &mut dyn FnMut() -> String,
786) -> Vec<IngestionEvent> {
787    let Some(step_id) = scope.current_step_id.take() else {
788        return Vec::new();
789    };
790    let body = ObservationBody {
791        id: step_id,
792        trace_id: trace_id.to_string(),
793        end_time: Some(now.to_string()),
794        ..Default::default()
795    };
796    vec![IngestionEvent::observation(
797        new_id(),
798        now.to_string(),
799        EventKind::SpanUpdate,
800        &body,
801    )]
802}
803
804/// Tool call start → span-create, attached under the current step (sibling to llm_call).
805fn scope_tool_started(
806    scope: &mut ScopeState,
807    trace_id: &str,
808    tool_call_id: &str,
809    name: String,
810    raw_input: Option<serde_json::Value>,
811    now: &str,
812    new_id: &mut dyn FnMut() -> String,
813) -> Vec<IngestionEvent> {
814    let span_id = format!("{}-tool-{}", scope.prefix, tool_call_id);
815    scope
816        .tool_spans
817        .insert(tool_call_id.to_string(), span_id.clone());
818    let body = ObservationBody {
819        id: span_id,
820        trace_id: trace_id.to_string(),
821        // Tool calls are always attached to the current step; in theory, a tool call
822        // always follows an `llm_call`, so the step must exist. Defensively allow `None`
823        // (out-of-order / no step) — fall back to attaching directly to the trace.
824        parent_observation_id: scope.current_step_id.clone(),
825        name: Some(name),
826        start_time: Some(now.to_string()),
827        input: raw_input,
828        environment: Some(DEFAULT_ENVIRONMENT.into()),
829        ..Default::default()
830    };
831    vec![IngestionEvent::observation(
832        new_id(),
833        now.to_string(),
834        EventKind::SpanCreate,
835        &body,
836    )]
837}
838
839/// Tool call finished → span-update (endTime + output + level).
840fn scope_tool_finished(
841    scope: &mut ScopeState,
842    trace_id: &str,
843    tool_call_id: &str,
844    fields: &ToolCallUpdateFields,
845    now: &str,
846    new_id: &mut dyn FnMut() -> String,
847) -> Vec<IngestionEvent> {
848    // Retrieve the span id assigned at Started; if missing (out of order), derive a new
849    // one.
850    let span_id = scope
851        .tool_spans
852        .remove(tool_call_id)
853        .unwrap_or_else(|| format!("{}-tool-{}", scope.prefix, tool_call_id));
854    let failed = matches!(fields.status, Some(ToolCallStatus::Failed));
855    let body = ObservationBody {
856        id: span_id,
857        trace_id: trace_id.to_string(),
858        end_time: Some(now.to_string()),
859        output: fields.raw_output.clone(),
860        level: failed.then_some(ObservationLevel::Error),
861        ..Default::default()
862    };
863    vec![IngestionEvent::observation(
864        new_id(),
865        now.to_string(),
866        EventKind::SpanUpdate,
867        &body,
868    )]
869}
870
871// ---- id derivation ----
872
873/// The id prefix for a scope: top-level (empty path) = `{trace}`; subagent path `[A,B]` =
874/// `{trace}-sub-A-sub-B`. The subagent scope's prefix is also the id of its subagent
875/// span.
876fn scope_prefix(trace_id: &str, path: &[String]) -> String {
877    let mut s = trace_id.to_string();
878    for id in path {
879        s.push_str("-sub-");
880        s.push_str(id);
881    }
882    s
883}
884
885/// The parent observation id for a subagent span (the `spawn_agent` tool span that
886/// initiated it).
887/// Format: `{parent scope prefix}-tool-{subagent's initiating tool_call_id}`. `path` is
888/// non-empty.
889fn parent_tool_span_id(trace_id: &str, path: &[String]) -> String {
890    let (last, parent_path) = path.split_last().expect("path is non-empty");
891    format!("{}-tool-{}", scope_prefix(trace_id, parent_path), last)
892}
893
894// ---- data conversion helpers ----
895
896/// Converts a [`Usage`] into a langfuse `usageDetails` map. Returns `None` (no report)
897/// when all fields are `None`.
898fn usage_to_details(usage: &Usage) -> Option<serde_json::Map<String, serde_json::Value>> {
899    let mut map = serde_json::Map::new();
900    if let Some(v) = usage.input_tokens {
901        map.insert("input".into(), v.into());
902    }
903    if let Some(v) = usage.output_tokens {
904        map.insert("output".into(), v.into());
905    }
906    if let Some(v) = usage.cache_read_input_tokens {
907        map.insert("cache_read_input_tokens".into(), v.into());
908    }
909    if let Some(v) = usage.cache_creation_input_tokens {
910        map.insert("cache_creation_input_tokens".into(), v.into());
911    }
912    (!map.is_empty()).then_some(map)
913}
914
915/// Concatenates text from `ContentBlock` items in a list, ignoring non-text blocks.
916fn content_text(content: &[ContentBlock]) -> String {
917    let mut out = String::new();
918    for block in content {
919        if let ContentBlock::Text(text) = block {
920            out.push_str(&text.text);
921        }
922    }
923    out
924}
925
926/// Reconstruct the request snapshot into the standard `input` for a Langfuse generation:
927/// an array of chat messages.
928///
929/// The system prompt becomes the first entry `{role:"system"}`, followed by the full
930/// message history.
931/// This matches the Langfuse SDK's standard format (see observation-types docs) — the UI
932/// renders it as conversation bubbles and supports playground replay.
933fn request_to_input(request: &LlmRequestSnapshot) -> serde_json::Value {
934    let mut messages: Vec<serde_json::Value> = Vec::new();
935    if let Some(system) = &request.system {
936        messages.push(serde_json::json!({ "role": "system", "content": system }));
937    }
938    for msg in &request.messages {
939        messages.push(message_to_value(msg));
940    }
941    serde_json::Value::Array(messages)
942}
943
944/// Converts a single [`Message`] to a langfuse `{role, content}` object. The content
945/// field collapses multimodal blocks into text or structured fragments (langfuse input
946/// accepts arbitrary JSON, and the UI renders it as best it can).
947fn message_to_value(msg: &Message) -> serde_json::Value {
948    let role = match msg.role {
949        Role::User => "user",
950        Role::Assistant => "assistant",
951    };
952    let parts: Vec<serde_json::Value> = msg.content.iter().map(content_to_value).collect();
953    // Use a plain string for single text content (most common and readable); otherwise
954    // use an array.
955    let content = match parts.as_slice() {
956        [serde_json::Value::String(s)] => serde_json::Value::String(s.clone()),
957        _ => serde_json::Value::Array(parts),
958    };
959    serde_json::json!({ "role": role, "content": content })
960}
961
962/// Converts [`MessageContent`] to a Langfuse content fragment.
963fn content_to_value(content: &MessageContent) -> serde_json::Value {
964    match content {
965        MessageContent::Text { text } => serde_json::Value::String(text.clone()),
966        MessageContent::Thinking { text, .. } => {
967            serde_json::json!({ "type": "thinking", "text": text })
968        }
969        MessageContent::ToolUse { id, name, args } => {
970            serde_json::json!({ "type": "tool_use", "id": id, "name": name, "input": args })
971        }
972        MessageContent::ToolResult {
973            tool_use_id,
974            is_error,
975            ..
976        } => serde_json::json!({
977            "type": "tool_result",
978            "tool_use_id": tool_use_id,
979            "is_error": is_error,
980        }),
981        MessageContent::Image { mime, .. } => {
982            serde_json::json!({ "type": "image", "mime": mime })
983        }
984        MessageContent::ProviderActivity {
985            provider_id, kind, ..
986        } => serde_json::json!({
987            "type": "provider_activity",
988            "provider_id": provider_id,
989            "kind": format!("{kind:?}"),
990        }),
991    }
992}