Skip to main content

nemo_flow/observability/
atif.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! ATIF (Agent Trajectory Interchange Format) exporter.
5//!
6//! This module provides types and an exporter that collects lifecycle events
7//! from the NeMo Flow runtime and converts them into ATIF trajectories conforming
8//! to the ATIF v1.6 schema.
9//!
10//! # Overview
11//!
12//! The [`AtifExporter`] registers as an event subscriber, collects all events,
13//! and can export them as an [`AtifTrajectory`] via [`AtifExporter::export`].
14//!
15//! # Event-to-Step Mapping
16//!
17//! The core conversion from NeMo Flow events to ATIF steps follows these rules:
18//!
19//! | NeMo Flow Event     | ATIF Step               | Notes                                |
20//! |-----------------|-------------------------|--------------------------------------|
21//! | LLM Start       | `user` step             | Messages extracted from LlmRequest   |
22//! | LLM End         | `agent` step            | Response content, tool_calls promoted|
23//! | Tool Start      | *(skipped)*             | tool_calls come from LLM End instead |
24//! | Tool End        | `system` observation     | Consecutive tool ends merged         |
25//! | Mark (with data)| `system` step           | Custom event data preserved          |
26//! | Scope Start/End | *(skipped)*             | Structural events, not trajectory    |
27//!
28//! The exporter serializes the full collected event stream into a single ATIF
29//! trajectory.
30
31use std::sync::{Arc, Mutex};
32
33use chrono::{DateTime, Utc};
34use serde::{Deserialize, Serialize};
35use uuid::Uuid;
36
37use crate::api::event::Event;
38use crate::api::runtime::EventSubscriberFn;
39use crate::json::Json;
40
41/// The ATIF schema version string embedded in all exported trajectories.
42///
43/// Currently `"ATIF-v1.6"`. This constant is used by [`AtifTrajectory`]
44/// serialization and verified by downstream consumers to ensure compatibility.
45pub const ATIF_SCHEMA_VERSION: &str = "ATIF-v1.6";
46
47// ---------------------------------------------------------------------------
48// ATIF types
49// ---------------------------------------------------------------------------
50
51/// Information about the agent that produced the trajectory.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct AtifAgentInfo {
54    /// Human-readable agent name.
55    pub name: String,
56    /// Agent version string.
57    pub version: String,
58    /// Default LLM model name used by the agent.
59    #[serde(skip_serializing_if = "Option::is_none")]
60    pub model_name: Option<String>,
61    /// Tool definitions available to the agent.
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub tool_definitions: Option<Vec<Json>>,
64    /// Extra metadata.
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub extra: Option<Json>,
67}
68
69/// A single step in an ATIF trajectory.
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct AtifStep {
72    /// 1-based ordinal step ID.
73    pub step_id: usize,
74    /// Source of the step: `"system"`, `"user"`, or `"agent"`.
75    pub source: String,
76    /// The message content (string or array of content parts).
77    pub message: Json,
78    /// ISO 8601 timestamp.
79    #[serde(skip_serializing_if = "Option::is_none")]
80    pub timestamp: Option<String>,
81    /// LLM model name, if applicable.
82    #[serde(skip_serializing_if = "Option::is_none")]
83    pub model_name: Option<String>,
84    /// Qualitative or quantitative measure of reasoning effort.
85    #[serde(skip_serializing_if = "Option::is_none")]
86    pub reasoning_effort: Option<Json>,
87    /// The agent's explicit internal reasoning.
88    #[serde(skip_serializing_if = "Option::is_none")]
89    pub reasoning_content: Option<String>,
90    /// Tool calls made by the agent in this step.
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub tool_calls: Option<Vec<AtifToolCall>>,
93    /// Observation (tool results) for this step.
94    #[serde(skip_serializing_if = "Option::is_none")]
95    pub observation: Option<AtifObservation>,
96    /// Token usage and cost metrics.
97    #[serde(skip_serializing_if = "Option::is_none")]
98    pub metrics: Option<AtifMetrics>,
99    /// Whether this step was copied from a previous trajectory for context.
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub is_copied_context: Option<bool>,
102    /// Extra metadata.
103    #[serde(skip_serializing_if = "Option::is_none")]
104    pub extra: Option<Json>,
105}
106
107/// Token usage and cost metrics for a single step.
108#[derive(Debug, Clone, Default, Serialize, Deserialize)]
109pub struct AtifMetrics {
110    /// Number of prompt tokens.
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub prompt_tokens: Option<u64>,
113    /// Number of completion tokens.
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub completion_tokens: Option<u64>,
116    /// Number of cached tokens.
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub cached_tokens: Option<u64>,
119    /// Cost in USD.
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub cost_usd: Option<f64>,
122    /// Token IDs for prompt (input) tokens.
123    #[serde(skip_serializing_if = "Option::is_none")]
124    pub prompt_token_ids: Option<Vec<u64>>,
125    /// Token IDs for completion (response) tokens.
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub completion_token_ids: Option<Vec<u64>>,
128    /// Log probability assigned to each generated token.
129    #[serde(skip_serializing_if = "Option::is_none")]
130    pub logprobs: Option<Vec<f64>>,
131    /// Other metrics (e.g. reasoning_tokens, cache_creation_input_tokens).
132    #[serde(skip_serializing_if = "Option::is_none")]
133    pub extra: Option<Json>,
134}
135
136/// Aggregate statistics for the entire trajectory (ATIF v1.6 final_metrics).
137#[derive(Debug, Clone, Default, Serialize, Deserialize)]
138pub struct AtifFinalMetrics {
139    /// Sum of all prompt tokens across all steps, including cached tokens.
140    #[serde(skip_serializing_if = "Option::is_none")]
141    pub total_prompt_tokens: Option<u64>,
142    /// Sum of all completion tokens across all steps.
143    #[serde(skip_serializing_if = "Option::is_none")]
144    pub total_completion_tokens: Option<u64>,
145    /// Sum of all cached tokens across all steps.
146    #[serde(skip_serializing_if = "Option::is_none")]
147    pub total_cached_tokens: Option<u64>,
148    /// Total real monetary cost for the entire trajectory.
149    #[serde(skip_serializing_if = "Option::is_none")]
150    pub total_cost_usd: Option<f64>,
151    /// Total number of steps. If not equivalent to steps.len(), document in notes.
152    #[serde(skip_serializing_if = "Option::is_none")]
153    pub total_steps: Option<u64>,
154    /// Custom aggregate metrics.
155    #[serde(skip_serializing_if = "Option::is_none")]
156    pub extra: Option<Json>,
157}
158
159/// A tool call made by the agent.
160#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct AtifToolCall {
162    /// Correlation ID linking this call to its observation result.
163    pub tool_call_id: String,
164    /// Name of the tool/function called.
165    pub function_name: String,
166    /// Arguments passed to the tool.
167    pub arguments: Json,
168}
169
170/// Observation results from tool execution.
171#[derive(Debug, Clone, Serialize, Deserialize)]
172pub struct AtifObservation {
173    /// List of observation results (one per tool call).
174    pub results: Vec<AtifObservationResult>,
175}
176
177/// A single observation result from a tool call.
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct AtifObservationResult {
180    /// Correlation ID linking to the originating tool call.
181    #[serde(skip_serializing_if = "Option::is_none")]
182    pub source_call_id: Option<String>,
183    /// The tool's output content.
184    pub content: Json,
185}
186
187/// Lineage node identifying a callable within an ATIF step.
188#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct AtifAncestry {
190    /// Unique identifier for the callable node (scope UUID).
191    pub function_id: String,
192    /// Human-readable name of the callable node.
193    pub function_name: String,
194    /// Optional parent callable identifier.
195    #[serde(skip_serializing_if = "Option::is_none")]
196    pub parent_id: Option<String>,
197    /// Optional parent callable name.
198    #[serde(skip_serializing_if = "Option::is_none")]
199    pub parent_name: Option<String>,
200}
201
202/// Invocation timing and correlation metadata for one execution occurrence.
203///
204/// `start_timestamp` and `end_timestamp` are always emitted together or not
205/// at all.
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct AtifInvocationInfo {
208    /// Invocation start timestamp in Unix epoch seconds.
209    #[serde(skip_serializing_if = "Option::is_none")]
210    pub start_timestamp: Option<f64>,
211    /// Invocation end timestamp in Unix epoch seconds.
212    #[serde(skip_serializing_if = "Option::is_none")]
213    pub end_timestamp: Option<f64>,
214    /// Stable invocation identifier for correlation.
215    #[serde(skip_serializing_if = "Option::is_none")]
216    pub invocation_id: Option<String>,
217    /// Terminal status of the invocation.
218    #[serde(skip_serializing_if = "Option::is_none")]
219    pub status: Option<String>,
220    /// Runtime or framework label.
221    #[serde(skip_serializing_if = "Option::is_none")]
222    pub framework: Option<String>,
223}
224
225/// Lineage payload serialized into ATIF `Step.extra`.
226///
227/// `tool_ancestry[i]` and `tool_invocations[i]` align by index with
228/// `Step.tool_calls[i]`.
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct AtifStepExtra {
231    /// Step-level callable lineage.
232    pub ancestry: AtifAncestry,
233    /// Step-level invocation timing.
234    #[serde(skip_serializing_if = "Option::is_none")]
235    pub invocation: Option<AtifInvocationInfo>,
236    /// Per-tool callable lineage, aligned with `tool_calls`.
237    #[serde(default, skip_serializing_if = "Vec::is_empty")]
238    pub tool_ancestry: Vec<AtifAncestry>,
239    /// Per-tool invocation timing, aligned with `tool_calls`.
240    #[serde(skip_serializing_if = "Option::is_none")]
241    pub tool_invocations: Option<Vec<AtifInvocationInfo>>,
242}
243
244/// A complete ATIF trajectory.
245#[derive(Debug, Clone, Serialize, Deserialize)]
246pub struct AtifTrajectory {
247    /// Schema version (e.g., `"ATIF-v1.6"`).
248    pub schema_version: String,
249    /// Unique session identifier.
250    pub session_id: String,
251    /// Information about the agent.
252    pub agent: AtifAgentInfo,
253    /// Ordered list of trajectory steps.
254    pub steps: Vec<AtifStep>,
255    /// Custom information, design notes, or explanations.
256    #[serde(skip_serializing_if = "Option::is_none")]
257    pub notes: Option<String>,
258    /// Aggregate metrics for the entire trajectory.
259    #[serde(skip_serializing_if = "Option::is_none")]
260    pub final_metrics: Option<AtifFinalMetrics>,
261    /// Reference to the continuation trajectory file if continued elsewhere.
262    #[serde(skip_serializing_if = "Option::is_none")]
263    pub continued_trajectory_ref: Option<String>,
264    /// Extra metadata.
265    #[serde(skip_serializing_if = "Option::is_none")]
266    pub extra: Option<Json>,
267}
268
269// ---------------------------------------------------------------------------
270// AtifExporter
271// ---------------------------------------------------------------------------
272
273struct AtifExporterState {
274    session_id: String,
275    agent_info: AtifAgentInfo,
276    events: Vec<Event>,
277}
278
279/// Collects lifecycle events and exports them as ATIF trajectories.
280///
281/// Register this exporter as an event subscriber via [`AtifExporter::subscriber`],
282/// then call [`AtifExporter::export`] to produce an [`AtifTrajectory`].
283pub struct AtifExporter {
284    state: Arc<Mutex<AtifExporterState>>,
285}
286
287impl AtifExporter {
288    /// Create a new exporter with the given session metadata.
289    ///
290    /// # Parameters
291    /// - `session_id`: Stable identifier for the trajectory being collected.
292    /// - `agent_info`: Metadata describing the emitting agent.
293    ///
294    /// # Returns
295    /// A new [`AtifExporter`] with an empty in-memory event buffer.
296    pub fn new(session_id: String, agent_info: AtifAgentInfo) -> Self {
297        Self {
298            state: Arc::new(Mutex::new(AtifExporterState {
299                session_id,
300                agent_info,
301                events: Vec::new(),
302            })),
303        }
304    }
305
306    /// Return an event subscriber function that records NeMo Flow events.
307    ///
308    /// The returned callback can be registered with
309    /// [`register_subscriber`](crate::api::subscriber::register_subscriber).
310    ///
311    /// # Returns
312    /// An [`EventSubscriberFn`] that appends each observed event to this
313    /// exporter's internal buffer.
314    pub fn subscriber(&self) -> EventSubscriberFn {
315        let state = self.state.clone();
316        Arc::new(move |event: &Event| {
317            if let Ok(mut s) = state.lock() {
318                s.events.push(event.clone());
319            }
320        })
321    }
322
323    /// Export the collected event history as an [`AtifTrajectory`].
324    ///
325    /// # Returns
326    /// An [`AtifTrajectory`] synthesized from the events observed so far.
327    ///
328    /// # Notes
329    /// Exporting does not clear the buffered events. Call [`AtifExporter::clear`]
330    /// when you need to reset the exporter between trajectories.
331    pub fn export(&self) -> AtifTrajectory {
332        let state = self.state.lock().unwrap();
333        let collected_events: Vec<&Event> = state.events.iter().collect();
334        let steps = events_to_steps(&collected_events);
335        let final_metrics = compute_final_metrics(&steps);
336
337        AtifTrajectory {
338            schema_version: ATIF_SCHEMA_VERSION.to_string(),
339            session_id: state.session_id.clone(),
340            agent: state.agent_info.clone(),
341            steps,
342            notes: None,
343            final_metrics,
344            continued_trajectory_ref: None,
345            extra: None,
346        }
347    }
348
349    /// Clear all collected events from the internal buffer.
350    ///
351    /// # Returns
352    /// `()`.
353    pub fn clear(&self) {
354        let mut state = self.state.lock().unwrap();
355        state.events.clear();
356    }
357}
358
359// ---------------------------------------------------------------------------
360// Safe JSON extraction helpers
361// ---------------------------------------------------------------------------
362
363/// If `input` looks like an `LlmRequest` envelope (`{"content": ..., "headers": ...}`),
364/// return the inner `content` value. Otherwise return the input unchanged.
365///
366/// This avoids leaking the NeMo Flow transport wrapper into the trajectory.
367fn unwrap_llm_request(input: &Json) -> Json {
368    if let Some(obj) = input.as_object()
369        && obj.contains_key("content")
370        && obj.contains_key("headers")
371    {
372        return obj.get("content").cloned().unwrap_or_else(|| input.clone());
373    }
374    input.clone()
375}
376
377/// Extract the user-facing message content from a raw LLM response.
378///
379/// Looks for a `"content"` field (string or structured) on the response object.
380/// Falls back to the full response if the field is absent or not an object.
381fn extract_llm_response_message(output: &Json) -> Json {
382    if let Some(obj) = output.as_object() {
383        if let Some(content) = non_null_object_field(obj, "content") {
384            return content;
385        }
386        if let Some(summary) = llm_response_summary(obj) {
387            return summary;
388        }
389    }
390    // Not a recognized object structure — return as-is.
391    output.clone()
392}
393
394fn non_null_object_field(obj: &serde_json::Map<String, Json>, key: &str) -> Option<Json> {
395    obj.get(key).filter(|value| !value.is_null()).cloned()
396}
397
398fn llm_response_summary(obj: &serde_json::Map<String, Json>) -> Option<Json> {
399    if !obj.contains_key("tool_calls") && !obj.contains_key("role") {
400        return None;
401    }
402
403    let mut summary = serde_json::Map::new();
404    if let Some(role) = obj.get("role") {
405        summary.insert("role".to_string(), role.clone());
406    }
407    if let Some(tool_calls) = obj.get("tool_calls") {
408        summary.insert("tool_calls".to_string(), tool_calls.clone());
409    }
410    if let Some(reasoning) = non_null_object_field(obj, "reasoning") {
411        summary.insert("reasoning".to_string(), reasoning);
412    }
413
414    (!summary.is_empty()).then_some(Json::Object(summary))
415}
416
417/// Known keys in token_usage that we extract to dedicated fields.
418const TOKEN_USAGE_KNOWN_KEYS: &[&str] = &[
419    "prompt_tokens",
420    "completion_tokens",
421    "cached_tokens",
422    "cost_usd",
423    "prompt_token_ids",
424    "completion_token_ids",
425    "logprobs",
426];
427
428/// Try to extract `AtifMetrics` from a `token_usage` object in the LLM response.
429///
430/// Populates `extra` with any unknown token_usage keys (e.g. reasoning_tokens).
431/// Returns `None` if the response has no `token_usage` or it is not an object.
432fn extract_metrics(output: &Json) -> Option<AtifMetrics> {
433    let usage = output.as_object()?.get("token_usage")?.as_object()?;
434    let prompt = usage.get("prompt_tokens").and_then(Json::as_u64);
435    let completion = usage.get("completion_tokens").and_then(Json::as_u64);
436    let cached = usage.get("cached_tokens").and_then(Json::as_u64);
437    let cost = usage.get("cost_usd").and_then(Json::as_f64);
438    let prompt_ids = usage
439        .get("prompt_token_ids")
440        .and_then(Json::as_array)
441        .map(|a| a.iter().filter_map(Json::as_u64).collect());
442    let completion_ids = usage
443        .get("completion_token_ids")
444        .and_then(Json::as_array)
445        .map(|a| a.iter().filter_map(Json::as_u64).collect());
446    let logprobs = usage
447        .get("logprobs")
448        .and_then(Json::as_array)
449        .map(|a| a.iter().filter_map(Json::as_f64).collect());
450    let known: std::collections::HashSet<&str> = TOKEN_USAGE_KNOWN_KEYS.iter().copied().collect();
451    let extra_map: serde_json::Map<String, Json> = usage
452        .iter()
453        .filter(|(k, _)| !known.contains(k.as_str()))
454        .map(|(k, v)| (k.clone(), v.clone()))
455        .collect();
456    let extra = if extra_map.is_empty() {
457        None
458    } else {
459        Some(Json::Object(extra_map))
460    };
461    if prompt.is_none() && completion.is_none() && cached.is_none() {
462        return None;
463    }
464    Some(AtifMetrics {
465        prompt_tokens: prompt,
466        completion_tokens: completion,
467        cached_tokens: cached,
468        cost_usd: cost,
469        prompt_token_ids: prompt_ids,
470        completion_token_ids: completion_ids,
471        logprobs,
472        extra,
473    })
474}
475
476/// Extract `reasoning_effort` from an LLM request (string or number).
477///
478/// The request content may have `reasoning_effort` (e.g. `"high"`, `"medium"`,
479/// or a numeric value). Returns the value as Json for flexibility.
480fn extract_reasoning_effort(input: &Json) -> Option<Json> {
481    if let Some(obj) = input.as_object()
482        && let Some(v) = obj.get("reasoning_effort")
483        && !v.is_null()
484    {
485        return Some(v.clone());
486    }
487    None
488}
489
490/// Extract `reasoning` (reasoning_content) from an LLM response output.
491///
492/// The agent's explicit internal reasoning may appear in the response under the
493/// `"reasoning"` key. Returns `None` if absent or not a string.
494fn extract_reasoning_content(output: &Json) -> Option<String> {
495    if let Some(obj) = output.as_object()
496        && let Some(r) = obj.get("reasoning")
497    {
498        return r.as_str().map(String::from);
499    }
500    None
501}
502
503/// Extract just the `messages` array from an LLM request payload.
504///
505/// LLM start inputs typically contain `{ "messages": [...], "model": "...",
506/// "max_tokens": ..., "tools": [...], "stream": ... }`. For the user step we
507/// only want the `messages` array — the rest is LLM configuration noise.
508///
509/// Returns the `messages` value if present, otherwise the full input.
510fn extract_user_messages(input: &Json) -> Json {
511    if let Some(obj) = input.as_object()
512        && let Some(messages) = obj.get("messages")
513    {
514        return messages.clone();
515    }
516    input.clone()
517}
518
519/// Try to promote `tool_calls` from the raw LLM response into `AtifToolCall` entries.
520///
521/// Expected shape per OpenAI convention:
522/// ```json
523/// "tool_calls": [{ "id": "...", "type": "function", "function": { "name": "...", "arguments": "..." } }]
524/// ```
525///
526/// String `arguments` are parsed into JSON for consistency with NeMo Flow tool events
527/// which always provide parsed arguments.
528///
529/// Returns `None` if there are no tool calls or the structure is unrecognized.
530fn extract_tool_calls(output: &Json) -> Option<Vec<AtifToolCall>> {
531    let arr = output.as_object()?.get("tool_calls")?.as_array()?;
532    if arr.is_empty() {
533        return None;
534    }
535    let mut calls = Vec::with_capacity(arr.len());
536    for tc in arr {
537        let tc_obj = tc.as_object()?;
538        let id = tc_obj
539            .get("id")
540            .and_then(Json::as_str)
541            .unwrap_or("")
542            .to_string();
543        // The function details live under "function".
544        let func = tc_obj.get("function").and_then(Json::as_object);
545        let name = func
546            .and_then(|f| f.get("name"))
547            .and_then(Json::as_str)
548            .unwrap_or("")
549            .to_string();
550        let raw_arguments = func
551            .and_then(|f| f.get("arguments"))
552            .cloned()
553            .unwrap_or(Json::Null);
554        // Parse string arguments as JSON for consistency.
555        let arguments = if let Some(s) = raw_arguments.as_str() {
556            serde_json::from_str(s).unwrap_or(raw_arguments)
557        } else {
558            raw_arguments
559        };
560        // Skip entries with no id and no name — they are not meaningful.
561        if id.is_empty() && name.is_empty() {
562            continue;
563        }
564        calls.push(AtifToolCall {
565            tool_call_id: id,
566            function_name: name,
567            arguments,
568        });
569    }
570    if calls.is_empty() { None } else { Some(calls) }
571}
572
573/// Compute aggregate `final_metrics` by summing token counts across all steps.
574///
575/// Always returns `Some(AtifFinalMetrics)` with `total_steps` set. Token/cost
576/// fields are populated when at least one step carries metrics.
577fn compute_final_metrics(steps: &[AtifStep]) -> Option<AtifFinalMetrics> {
578    let mut total_prompt: u64 = 0;
579    let mut total_completion: u64 = 0;
580    let mut total_cached: u64 = 0;
581    let mut total_cost: f64 = 0.0;
582    let mut has_any = false;
583
584    for step in steps {
585        if let Some(m) = &step.metrics {
586            has_any = true;
587            total_prompt += m.prompt_tokens.unwrap_or(0);
588            total_completion += m.completion_tokens.unwrap_or(0);
589            total_cached += m.cached_tokens.unwrap_or(0);
590            total_cost += m.cost_usd.unwrap_or(0.0);
591        }
592    }
593
594    Some(AtifFinalMetrics {
595        total_prompt_tokens: if has_any { Some(total_prompt) } else { None },
596        total_completion_tokens: if has_any {
597            Some(total_completion)
598        } else {
599            None
600        },
601        total_cached_tokens: if has_any && total_cached > 0 {
602            Some(total_cached)
603        } else {
604            None
605        },
606        total_cost_usd: if has_any && total_cost > 0.0 {
607            Some(total_cost)
608        } else {
609            None
610        },
611        total_steps: Some(steps.len() as u64),
612        extra: None,
613    })
614}
615
616// ---------------------------------------------------------------------------
617// AtifStepExtra helpers
618// ---------------------------------------------------------------------------
619
620/// Build an [`AtifAncestry`] from a NeMo Flow [`Event`].
621///
622/// `name_map` is a pre-pass uuid → name lookup used to resolve `parent_name`.
623fn build_ancestry(
624    event: &Event,
625    name_map: &std::collections::HashMap<Uuid, String>,
626) -> AtifAncestry {
627    AtifAncestry {
628        function_id: event.uuid().to_string(),
629        function_name: event.name().to_string(),
630        parent_id: event.parent_uuid().map(|u| u.to_string()),
631        parent_name: event.parent_uuid().and_then(|u| name_map.get(&u)).cloned(),
632    }
633}
634
635/// Build an [`AtifInvocationInfo`] from start/end timestamps.
636///
637/// If `start_ts` is `None`, both timestamps are omitted to preserve the
638/// requirement that they are always emitted together or not at all.
639fn build_invocation_info(
640    start_ts: Option<DateTime<Utc>>,
641    end_ts: DateTime<Utc>,
642    invocation_id: Option<String>,
643    framework: &str,
644) -> AtifInvocationInfo {
645    AtifInvocationInfo {
646        start_timestamp: start_ts.map(|s| s.timestamp_millis() as f64 / 1000.0),
647        end_timestamp: start_ts.map(|_| end_ts.timestamp_millis() as f64 / 1000.0),
648        invocation_id,
649        status: Some("completed".to_string()),
650        framework: Some(framework.to_string()),
651    }
652}
653
654struct EventLookupMaps {
655    name_map: std::collections::HashMap<Uuid, String>,
656    start_ts_map: std::collections::HashMap<Uuid, DateTime<Utc>>,
657}
658
659impl EventLookupMaps {
660    fn from_events(events: &[&Event]) -> Self {
661        let mut name_map = std::collections::HashMap::new();
662        let mut start_ts_map = std::collections::HashMap::new();
663        for event in events {
664            if is_start_event(event) {
665                name_map.insert(event.uuid(), event.name().to_string());
666                start_ts_map.insert(event.uuid(), *event.timestamp());
667            }
668        }
669        Self {
670            name_map,
671            start_ts_map,
672        }
673    }
674}
675
676#[derive(Default)]
677struct PendingAgentStep {
678    step_idx: Option<usize>,
679    ancestry: Option<AtifAncestry>,
680    invocation: Option<AtifInvocationInfo>,
681    tool_ancestry: Vec<AtifAncestry>,
682    tool_invocations: Vec<AtifInvocationInfo>,
683    tool_call_order: Vec<String>,
684}
685
686impl PendingAgentStep {
687    fn finalize_into(&mut self, steps: &mut [AtifStep]) {
688        let (Some(step_idx), Some(ancestry)) = (self.step_idx.take(), self.ancestry.take()) else {
689            return;
690        };
691        let Some(step) = steps.get_mut(step_idx) else {
692            return;
693        };
694
695        self.sort_tool_metadata();
696        let extra = AtifStepExtra {
697            ancestry,
698            invocation: self.invocation.take(),
699            tool_ancestry: std::mem::take(&mut self.tool_ancestry),
700            tool_invocations: if self.tool_invocations.is_empty() {
701                None
702            } else {
703                Some(std::mem::take(&mut self.tool_invocations))
704            },
705        };
706        step.extra = serde_json::to_value(&extra).ok();
707    }
708
709    fn set_current_agent(
710        &mut self,
711        step_idx: usize,
712        ancestry: AtifAncestry,
713        invocation: AtifInvocationInfo,
714        tool_call_order: Vec<String>,
715    ) {
716        self.step_idx = Some(step_idx);
717        self.ancestry = Some(ancestry);
718        self.invocation = Some(invocation);
719        self.tool_ancestry.clear();
720        self.tool_invocations.clear();
721        self.tool_call_order = tool_call_order;
722    }
723
724    fn push_tool_metadata(&mut self, ancestry: AtifAncestry, invocation: AtifInvocationInfo) {
725        self.tool_ancestry.push(ancestry);
726        self.tool_invocations.push(invocation);
727    }
728
729    fn has_active_step(&self) -> bool {
730        self.step_idx.is_some()
731    }
732
733    fn sort_tool_metadata(&mut self) {
734        if self.tool_call_order.is_empty() || self.tool_ancestry.is_empty() {
735            return;
736        }
737
738        let mut pairs: Vec<(AtifAncestry, AtifInvocationInfo)> =
739            std::mem::take(&mut self.tool_ancestry)
740                .into_iter()
741                .zip(std::mem::take(&mut self.tool_invocations))
742                .collect();
743        pairs.sort_by_key(|(_, invocation)| {
744            invocation
745                .invocation_id
746                .as_deref()
747                .and_then(|id| self.tool_call_order.iter().position(|entry| entry == id))
748                .unwrap_or(usize::MAX)
749        });
750        let (sorted_ancestry, sorted_invocations): (Vec<_>, Vec<_>) = pairs.into_iter().unzip();
751        self.tool_ancestry = sorted_ancestry;
752        self.tool_invocations = sorted_invocations;
753    }
754}
755
756#[derive(Default)]
757struct StepConversionState {
758    steps: Vec<AtifStep>,
759    last_tool_call_map: std::collections::HashMap<String, String>,
760    pending_observations: Vec<AtifObservationResult>,
761    pending_obs_timestamp: Option<String>,
762    current_reasoning_effort: Option<Json>,
763    current_agent: PendingAgentStep,
764}
765
766impl StepConversionState {
767    fn flush_observations(&mut self) {
768        if self.pending_observations.is_empty() {
769            return;
770        }
771
772        self.steps.push(AtifStep {
773            step_id: 0,
774            source: "system".to_string(),
775            message: Json::Null,
776            timestamp: self.pending_obs_timestamp.take(),
777            model_name: None,
778            reasoning_effort: None,
779            reasoning_content: None,
780            tool_calls: None,
781            observation: Some(AtifObservation {
782                results: std::mem::take(&mut self.pending_observations),
783            }),
784            metrics: None,
785            is_copied_context: None,
786            extra: None,
787        });
788    }
789
790    fn finalize_agent_extra(&mut self) {
791        self.current_agent.finalize_into(&mut self.steps);
792    }
793
794    fn handle_llm_start(&mut self, event: &Event, lookups: &EventLookupMaps) {
795        self.flush_observations();
796        self.finalize_agent_extra();
797
798        let Some(input) = event.data() else {
799            return;
800        };
801        let content = unwrap_llm_request(input);
802        self.current_reasoning_effort = extract_reasoning_effort(&content);
803        let extra = AtifStepExtra {
804            ancestry: build_ancestry(event, &lookups.name_map),
805            invocation: None,
806            tool_ancestry: Vec::new(),
807            tool_invocations: None,
808        };
809        self.steps.push(AtifStep {
810            step_id: 0,
811            source: "user".to_string(),
812            message: extract_user_messages(&content),
813            timestamp: Some(event.timestamp().to_rfc3339()),
814            model_name: None,
815            reasoning_effort: None,
816            reasoning_content: None,
817            tool_calls: None,
818            observation: None,
819            metrics: None,
820            is_copied_context: None,
821            extra: serde_json::to_value(&extra).ok(),
822        });
823    }
824
825    fn handle_llm_end(&mut self, event: &Event, lookups: &EventLookupMaps) {
826        self.flush_observations();
827
828        let Some(output) = event.data() else {
829            return;
830        };
831        let tool_calls = extract_tool_calls(output);
832        let tool_call_order = refresh_tool_call_lookup(&mut self.last_tool_call_map, &tool_calls);
833        let reasoning_effort = self.current_reasoning_effort.take();
834        let reasoning_content = extract_reasoning_content(output);
835        let start_ts = lookups.start_ts_map.get(&event.uuid()).cloned();
836        let ancestry = build_ancestry(event, &lookups.name_map);
837        let invocation = build_invocation_info(
838            start_ts,
839            *event.timestamp(),
840            Some(event.uuid().to_string()),
841            "nemo_flow",
842        );
843
844        self.steps.push(AtifStep {
845            step_id: 0,
846            source: "agent".to_string(),
847            message: extract_llm_response_message(output),
848            timestamp: Some(event.timestamp().to_rfc3339()),
849            model_name: event.model_name().map(ToOwned::to_owned),
850            reasoning_effort,
851            reasoning_content,
852            tool_calls,
853            observation: None,
854            metrics: extract_metrics(output),
855            is_copied_context: None,
856            extra: None,
857        });
858        self.current_agent.set_current_agent(
859            self.steps.len() - 1,
860            ancestry,
861            invocation,
862            tool_call_order,
863        );
864    }
865
866    fn handle_tool_end(&mut self, event: &Event, lookups: &EventLookupMaps) {
867        if let Some(output) = event.data() {
868            if self.pending_obs_timestamp.is_none() {
869                self.pending_obs_timestamp = Some(event.timestamp().to_rfc3339());
870            }
871            self.pending_observations.push(AtifObservationResult {
872                source_call_id: event
873                    .tool_call_id()
874                    .map(ToOwned::to_owned)
875                    .or_else(|| self.last_tool_call_map.get(event.name()).cloned()),
876                content: output.clone(),
877            });
878        }
879
880        if !self.current_agent.has_active_step() {
881            return;
882        }
883        let start_ts = lookups.start_ts_map.get(&event.uuid()).cloned();
884        let invocation = build_invocation_info(
885            start_ts,
886            *event.timestamp(),
887            event
888                .tool_call_id()
889                .map(ToOwned::to_owned)
890                .or_else(|| Some(event.uuid().to_string())),
891            "nemo_flow",
892        );
893        self.current_agent
894            .push_tool_metadata(build_ancestry(event, &lookups.name_map), invocation);
895    }
896
897    fn handle_mark(&mut self, mark: &Event) {
898        self.flush_observations();
899        let Some(data) = mark.data() else {
900            return;
901        };
902        self.steps.push(AtifStep {
903            step_id: 0,
904            source: "system".to_string(),
905            message: data.clone(),
906            timestamp: Some(mark.timestamp().to_rfc3339()),
907            model_name: None,
908            reasoning_effort: None,
909            reasoning_content: None,
910            tool_calls: None,
911            observation: None,
912            metrics: None,
913            is_copied_context: None,
914            extra: None,
915        });
916    }
917
918    fn finish(mut self) -> Vec<AtifStep> {
919        self.finalize_agent_extra();
920        self.flush_observations();
921        for (index, step) in self.steps.iter_mut().enumerate() {
922            step.step_id = index + 1;
923        }
924        self.steps
925    }
926}
927
928fn refresh_tool_call_lookup(
929    last_tool_call_map: &mut std::collections::HashMap<String, String>,
930    tool_calls: &Option<Vec<AtifToolCall>>,
931) -> Vec<String> {
932    last_tool_call_map.clear();
933    let mut tool_call_order = Vec::new();
934    if let Some(tool_calls) = tool_calls {
935        for tool_call in tool_calls {
936            if !tool_call.function_name.is_empty() {
937                last_tool_call_map.insert(
938                    tool_call.function_name.clone(),
939                    tool_call.tool_call_id.clone(),
940                );
941            }
942            tool_call_order.push(tool_call.tool_call_id.clone());
943        }
944    }
945    tool_call_order
946}
947
948// ---------------------------------------------------------------------------
949// Event-to-step mapping
950// ---------------------------------------------------------------------------
951
952/// Converts a slice of events into ATIF steps.
953///
954/// Mapping logic:
955/// 1. Sort events by timestamp.
956/// 2. For each LLM pair:
957///    - Start event → user step (message = extracted `messages` array from
958///      unwrapped LlmRequest content, stripping `max_tokens`/`model`/etc.)
959///    - End event → agent step (message = extracted content, metrics from
960///      token_usage, tool_calls promoted to AtifToolCall entries with parsed
961///      JSON arguments)
962/// 3. For Tool events:
963///    - Start events are **skipped** (tool_calls come from LLM End promotion)
964///    - Consecutive End events are **merged** into a single system observation
965///      step with multiple results
966/// 4. Tool End observation results are correlated with the preceding LLM End's
967///    promoted tool_calls by function name → `source_call_id`.
968/// 5. Mark events → system steps if they carry data.
969/// 6. Scope Start/End → skipped.
970fn events_to_steps(events: &[&Event]) -> Vec<AtifStep> {
971    let mut sorted: Vec<&Event> = events.to_vec();
972    sorted.sort_by_key(|e| *e.timestamp());
973    let lookups = EventLookupMaps::from_events(&sorted);
974    let mut state = StepConversionState::default();
975
976    for event in &sorted {
977        match (
978            event.kind(),
979            event.scope_category(),
980            event.category().map(|category| category.as_str()),
981        ) {
982            ("scope", Some(crate::api::event::ScopeCategory::Start), Some("llm")) => {
983                state.handle_llm_start(event, &lookups)
984            }
985            ("scope", Some(crate::api::event::ScopeCategory::End), Some("llm")) => {
986                state.handle_llm_end(event, &lookups)
987            }
988            ("scope", Some(crate::api::event::ScopeCategory::End), Some("tool")) => {
989                state.handle_tool_end(event, &lookups)
990            }
991            ("mark", _, _) => state.handle_mark(event),
992            _ => {}
993        }
994    }
995
996    state.finish()
997}
998
999fn is_start_event(event: &Event) -> bool {
1000    event.scope_category() == Some(crate::api::event::ScopeCategory::Start)
1001}
1002
1003// ---------------------------------------------------------------------------
1004// Tests
1005// ---------------------------------------------------------------------------
1006
1007#[cfg(test)]
1008#[path = "../../tests/unit/atif_tests.rs"]
1009mod tests;