Skip to main content

nemo_flow/observability/
atif.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Agent Trajectory Interchange Format (ATIF) exporter.
5//!
6//! This module provides types and an exporter that collects lifecycle events
7//! from the NeMo Flow runtime and converts them into ATIF trajectories conforming
8//! to the ATIF v1.6 schema.
9//!
10//! # Overview
11//!
12//! The [`AtifExporter`] registers as an event subscriber, collects all events,
13//! and can export them as an [`AtifTrajectory`] via [`AtifExporter::export`].
14//!
15//! # Event-to-Step Mapping
16//!
17//! The core conversion from NeMo Flow events to ATIF steps follows these rules:
18//!
19//! | NeMo Flow Event     | ATIF Step               | Notes                                |
20//! |-----------------|-------------------------|--------------------------------------|
21//! | LLM Start       | `user` step             | Messages extracted from LlmRequest   |
22//! | LLM End         | `agent` step            | Response content, tool_calls promoted|
23//! | Tool Start      | *(skipped)*             | tool_calls come from LLM End instead |
24//! | Tool End        | `system` observation     | Consecutive tool ends merged         |
25//! | Mark (with data)| `system` step           | Custom event data preserved          |
26//! | Scope Start/End | *(skipped)*             | Structural events, not trajectory    |
27//!
28//! The exporter serializes the full collected event stream into a single ATIF
29//! trajectory.
30
31use std::sync::{Arc, Mutex};
32
33use chrono::{DateTime, Utc};
34use serde::{Deserialize, Serialize};
35use uuid::Uuid;
36
37use crate::api::event::Event;
38use crate::api::runtime::EventSubscriberFn;
39use crate::json::Json;
40
41/// The ATIF schema version string embedded in all exported trajectories.
42///
43/// Currently `"ATIF-v1.6"`. This constant is used by [`AtifTrajectory`]
44/// serialization and verified by downstream consumers to ensure compatibility.
45pub const ATIF_SCHEMA_VERSION: &str = "ATIF-v1.6";
46
47// ---------------------------------------------------------------------------
48// ATIF types
49// ---------------------------------------------------------------------------
50
51/// Information about the agent that produced the trajectory.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct AtifAgentInfo {
54    /// Human-readable agent name.
55    pub name: String,
56    /// Agent version string.
57    pub version: String,
58    /// Default LLM model name used by the agent.
59    #[serde(skip_serializing_if = "Option::is_none")]
60    pub model_name: Option<String>,
61    /// Tool definitions available to the agent.
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub tool_definitions: Option<Vec<Json>>,
64    /// Extra metadata.
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub extra: Option<Json>,
67}
68
69/// A single step in an ATIF trajectory.
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct AtifStep {
72    /// 1-based ordinal step ID.
73    pub step_id: usize,
74    /// Source of the step: `"system"`, `"user"`, or `"agent"`.
75    pub source: String,
76    /// The message content (string or array of content parts).
77    pub message: Json,
78    /// ISO 8601 timestamp.
79    #[serde(skip_serializing_if = "Option::is_none")]
80    pub timestamp: Option<String>,
81    /// LLM model name, if applicable.
82    #[serde(skip_serializing_if = "Option::is_none")]
83    pub model_name: Option<String>,
84    /// Qualitative or quantitative measure of reasoning effort.
85    #[serde(skip_serializing_if = "Option::is_none")]
86    pub reasoning_effort: Option<Json>,
87    /// The agent's explicit internal reasoning.
88    #[serde(skip_serializing_if = "Option::is_none")]
89    pub reasoning_content: Option<String>,
90    /// Tool calls made by the agent in this step.
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub tool_calls: Option<Vec<AtifToolCall>>,
93    /// Observation (tool results) for this step.
94    #[serde(skip_serializing_if = "Option::is_none")]
95    pub observation: Option<AtifObservation>,
96    /// Token usage and cost metrics.
97    #[serde(skip_serializing_if = "Option::is_none")]
98    pub metrics: Option<AtifMetrics>,
99    /// Whether this step was copied from a previous trajectory for context.
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub is_copied_context: Option<bool>,
102    /// Extra metadata.
103    #[serde(skip_serializing_if = "Option::is_none")]
104    pub extra: Option<Json>,
105}
106
107/// Token usage and cost metrics for a single step.
108#[derive(Debug, Clone, Default, Serialize, Deserialize)]
109pub struct AtifMetrics {
110    /// Number of prompt tokens.
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub prompt_tokens: Option<u64>,
113    /// Number of completion tokens.
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub completion_tokens: Option<u64>,
116    /// Number of cached tokens.
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub cached_tokens: Option<u64>,
119    /// Cost in USD.
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub cost_usd: Option<f64>,
122    /// Token IDs for prompt (input) tokens.
123    #[serde(skip_serializing_if = "Option::is_none")]
124    pub prompt_token_ids: Option<Vec<u64>>,
125    /// Token IDs for completion (response) tokens.
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub completion_token_ids: Option<Vec<u64>>,
128    /// Log probability assigned to each generated token.
129    #[serde(skip_serializing_if = "Option::is_none")]
130    pub logprobs: Option<Vec<f64>>,
131    /// Other metrics (e.g. reasoning_tokens, cache_creation_input_tokens).
132    #[serde(skip_serializing_if = "Option::is_none")]
133    pub extra: Option<Json>,
134}
135
136/// Aggregate statistics for the entire trajectory (ATIF v1.6 final_metrics).
137#[derive(Debug, Clone, Default, Serialize, Deserialize)]
138pub struct AtifFinalMetrics {
139    /// Sum of all prompt tokens across all steps, including cached tokens.
140    #[serde(skip_serializing_if = "Option::is_none")]
141    pub total_prompt_tokens: Option<u64>,
142    /// Sum of all completion tokens across all steps.
143    #[serde(skip_serializing_if = "Option::is_none")]
144    pub total_completion_tokens: Option<u64>,
145    /// Sum of all cached tokens across all steps.
146    #[serde(skip_serializing_if = "Option::is_none")]
147    pub total_cached_tokens: Option<u64>,
148    /// Total real monetary cost for the entire trajectory.
149    #[serde(skip_serializing_if = "Option::is_none")]
150    pub total_cost_usd: Option<f64>,
151    /// Total number of steps. If not equivalent to steps.len(), document in notes.
152    #[serde(skip_serializing_if = "Option::is_none")]
153    pub total_steps: Option<u64>,
154    /// Custom aggregate metrics.
155    #[serde(skip_serializing_if = "Option::is_none")]
156    pub extra: Option<Json>,
157}
158
159/// A tool call made by the agent.
160#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct AtifToolCall {
162    /// Correlation ID linking this call to its observation result.
163    pub tool_call_id: String,
164    /// Name of the tool/function called.
165    pub function_name: String,
166    /// Arguments passed to the tool.
167    pub arguments: Json,
168}
169
170/// Observation results from tool execution.
171#[derive(Debug, Clone, Serialize, Deserialize)]
172pub struct AtifObservation {
173    /// List of observation results (one per tool call).
174    pub results: Vec<AtifObservationResult>,
175}
176
177/// A single observation result from a tool call.
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct AtifObservationResult {
180    /// Correlation ID linking to the originating tool call.
181    #[serde(skip_serializing_if = "Option::is_none")]
182    pub source_call_id: Option<String>,
183    /// The tool's output content.
184    pub content: Json,
185}
186
187/// Lineage node identifying a callable within an ATIF step.
188#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct AtifAncestry {
190    /// Unique identifier for the callable node (scope UUID).
191    pub function_id: String,
192    /// Human-readable name of the callable node.
193    pub function_name: String,
194    /// Optional parent callable identifier.
195    #[serde(skip_serializing_if = "Option::is_none")]
196    pub parent_id: Option<String>,
197    /// Optional parent callable name.
198    #[serde(skip_serializing_if = "Option::is_none")]
199    pub parent_name: Option<String>,
200}
201
202/// Invocation timing and correlation metadata for one execution occurrence.
203///
204/// `start_timestamp` and `end_timestamp` are always emitted together or not
205/// at all.
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct AtifInvocationInfo {
208    /// Invocation start timestamp in Unix epoch seconds.
209    #[serde(skip_serializing_if = "Option::is_none")]
210    pub start_timestamp: Option<f64>,
211    /// Invocation end timestamp in Unix epoch seconds.
212    #[serde(skip_serializing_if = "Option::is_none")]
213    pub end_timestamp: Option<f64>,
214    /// Stable invocation identifier for correlation.
215    #[serde(skip_serializing_if = "Option::is_none")]
216    pub invocation_id: Option<String>,
217    /// Terminal status of the invocation.
218    #[serde(skip_serializing_if = "Option::is_none")]
219    pub status: Option<String>,
220    /// Runtime or framework label.
221    #[serde(skip_serializing_if = "Option::is_none")]
222    pub framework: Option<String>,
223}
224
225/// Lineage payload serialized into ATIF `Step.extra`.
226///
227/// `tool_ancestry[i]` and `tool_invocations[i]` align by index with
228/// `Step.tool_calls[i]`.
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct AtifStepExtra {
231    /// Step-level callable lineage.
232    pub ancestry: AtifAncestry,
233    /// Step-level invocation timing.
234    #[serde(skip_serializing_if = "Option::is_none")]
235    pub invocation: Option<AtifInvocationInfo>,
236    /// Full unwrapped LLM request payload for request-level fidelity.
237    #[serde(skip_serializing_if = "Option::is_none")]
238    pub llm_request: Option<Json>,
239    /// Per-tool callable lineage, aligned with `tool_calls`.
240    #[serde(default, skip_serializing_if = "Vec::is_empty")]
241    pub tool_ancestry: Vec<AtifAncestry>,
242    /// Per-tool invocation timing, aligned with `tool_calls`.
243    #[serde(skip_serializing_if = "Option::is_none")]
244    pub tool_invocations: Option<Vec<AtifInvocationInfo>>,
245}
246
247/// A complete ATIF trajectory.
248#[derive(Debug, Clone, Serialize, Deserialize)]
249pub struct AtifTrajectory {
250    /// Schema version (e.g., `"ATIF-v1.6"`).
251    pub schema_version: String,
252    /// Unique session identifier.
253    pub session_id: String,
254    /// Information about the agent.
255    pub agent: AtifAgentInfo,
256    /// Ordered list of trajectory steps.
257    pub steps: Vec<AtifStep>,
258    /// Custom information, design notes, or explanations.
259    #[serde(skip_serializing_if = "Option::is_none")]
260    pub notes: Option<String>,
261    /// Aggregate metrics for the entire trajectory.
262    #[serde(skip_serializing_if = "Option::is_none")]
263    pub final_metrics: Option<AtifFinalMetrics>,
264    /// Reference to the continuation trajectory file if continued elsewhere.
265    #[serde(skip_serializing_if = "Option::is_none")]
266    pub continued_trajectory_ref: Option<String>,
267    /// Extra metadata.
268    #[serde(skip_serializing_if = "Option::is_none")]
269    pub extra: Option<Json>,
270}
271
272// ---------------------------------------------------------------------------
273// AtifExporter
274// ---------------------------------------------------------------------------
275
276struct AtifExporterState {
277    session_id: String,
278    agent_info: AtifAgentInfo,
279    events: Vec<Event>,
280}
281
282/// Collects lifecycle events and exports them as ATIF trajectories.
283///
284/// Register this exporter as an event subscriber via [`AtifExporter::subscriber`],
285/// then call [`AtifExporter::export`] to produce an [`AtifTrajectory`].
286pub struct AtifExporter {
287    state: Arc<Mutex<AtifExporterState>>,
288}
289
290impl AtifExporter {
291    /// Create a new exporter with the given session metadata.
292    ///
293    /// # Parameters
294    /// - `session_id`: Stable identifier for the trajectory being collected.
295    /// - `agent_info`: Metadata describing the emitting agent.
296    ///
297    /// # Returns
298    /// A new [`AtifExporter`] with an empty in-memory event buffer.
299    pub fn new(session_id: String, agent_info: AtifAgentInfo) -> Self {
300        Self {
301            state: Arc::new(Mutex::new(AtifExporterState {
302                session_id,
303                agent_info,
304                events: Vec::new(),
305            })),
306        }
307    }
308
309    /// Return an event subscriber function that records NeMo Flow events.
310    ///
311    /// The returned callback can be registered with
312    /// [`register_subscriber`](crate::api::subscriber::register_subscriber).
313    ///
314    /// # Returns
315    /// An [`EventSubscriberFn`] that appends each observed event to this
316    /// exporter's internal buffer.
317    pub fn subscriber(&self) -> EventSubscriberFn {
318        let state = self.state.clone();
319        Arc::new(move |event: &Event| {
320            if let Ok(mut s) = state.lock() {
321                s.events.push(event.clone());
322            }
323        })
324    }
325
326    /// Export the collected event history as an [`AtifTrajectory`].
327    ///
328    /// # Returns
329    /// An [`AtifTrajectory`] synthesized from the events observed so far.
330    ///
331    /// # Notes
332    /// Exporting does not clear the buffered events. Call [`AtifExporter::clear`]
333    /// when you need to reset the exporter between trajectories.
334    pub fn export(&self) -> AtifTrajectory {
335        let state = self.state.lock().unwrap();
336        let collected_events: Vec<&Event> = state.events.iter().collect();
337        let steps = events_to_steps(&collected_events);
338        let final_metrics = compute_final_metrics(&steps);
339
340        AtifTrajectory {
341            schema_version: ATIF_SCHEMA_VERSION.to_string(),
342            session_id: state.session_id.clone(),
343            agent: state.agent_info.clone(),
344            steps,
345            notes: None,
346            final_metrics,
347            continued_trajectory_ref: None,
348            extra: None,
349        }
350    }
351
352    /// Clear all collected events from the internal buffer.
353    ///
354    /// # Returns
355    /// `()`.
356    pub fn clear(&self) {
357        let mut state = self.state.lock().unwrap();
358        state.events.clear();
359    }
360}
361
362// ---------------------------------------------------------------------------
363// Safe JSON extraction helpers
364// ---------------------------------------------------------------------------
365
366/// If `input` looks like an `LlmRequest` envelope (`{"content": ..., "headers": ...}`),
367/// return the inner `content` value. Otherwise return the input unchanged.
368///
369/// This avoids leaking the NeMo Flow transport wrapper into the trajectory.
370fn unwrap_llm_request(input: &Json) -> Json {
371    if let Some(obj) = input.as_object()
372        && obj.contains_key("content")
373        && obj.contains_key("headers")
374    {
375        return obj.get("content").cloned().unwrap_or_else(|| input.clone());
376    }
377    input.clone()
378}
379
380/// Extract the user-facing message content from a raw LLM response.
381///
382/// Looks for a `"content"` field (string or structured) on the response object.
383/// Falls back to the full response if the field is absent or not an object.
384fn extract_llm_response_message(output: &Json) -> Json {
385    if let Some(obj) = output.as_object() {
386        if let Some(content) = non_null_object_field(obj, "content") {
387            return content;
388        }
389        if let Some(summary) = llm_response_summary(obj) {
390            return summary;
391        }
392    }
393    // Not a recognized object structure — return as-is.
394    output.clone()
395}
396
397fn non_null_object_field(obj: &serde_json::Map<String, Json>, key: &str) -> Option<Json> {
398    obj.get(key).filter(|value| !value.is_null()).cloned()
399}
400
401fn llm_response_summary(obj: &serde_json::Map<String, Json>) -> Option<Json> {
402    if !obj.contains_key("tool_calls") && !obj.contains_key("role") {
403        return None;
404    }
405
406    let mut summary = serde_json::Map::new();
407    if let Some(role) = obj.get("role") {
408        summary.insert("role".to_string(), role.clone());
409    }
410    if let Some(tool_calls) = obj.get("tool_calls") {
411        summary.insert("tool_calls".to_string(), tool_calls.clone());
412    }
413    if let Some(reasoning) = non_null_object_field(obj, "reasoning") {
414        summary.insert("reasoning".to_string(), reasoning);
415    }
416
417    (!summary.is_empty()).then_some(Json::Object(summary))
418}
419
420/// Known keys in token_usage that we extract to dedicated fields.
421const TOKEN_USAGE_KNOWN_KEYS: &[&str] = &[
422    "prompt_tokens",
423    "completion_tokens",
424    "cached_tokens",
425    "cost_usd",
426    "prompt_token_ids",
427    "completion_token_ids",
428    "logprobs",
429];
430
431/// Try to extract `AtifMetrics` from a `token_usage` object in the LLM response.
432///
433/// Supports NeMo Flow `token_usage` and provider-native `usage` payloads.
434/// Populates `extra` with any unknown usage keys (e.g. reasoning_tokens or total_tokens).
435/// Returns `None` if the response has no recognized token counts.
436fn extract_metrics(output: &Json) -> Option<AtifMetrics> {
437    let usage = token_usage_object(output)?;
438    let prompt = usage_u64(usage, &["prompt_tokens", "input_tokens"]);
439    let completion = usage_u64(usage, &["completion_tokens", "output_tokens"]);
440    let cached = usage_u64(usage, &["cached_tokens"])
441        .or_else(|| prompt_tokens_detail_u64(usage, "cached_tokens"))
442        .or_else(|| {
443            sum_usage_u64(
444                usage,
445                &["cache_read_input_tokens", "cache_creation_input_tokens"],
446            )
447        });
448    let cost = usage.get("cost_usd").and_then(Json::as_f64);
449    let prompt_ids = usage
450        .get("prompt_token_ids")
451        .and_then(Json::as_array)
452        .map(|a| a.iter().filter_map(Json::as_u64).collect());
453    let completion_ids = usage
454        .get("completion_token_ids")
455        .and_then(Json::as_array)
456        .map(|a| a.iter().filter_map(Json::as_u64).collect());
457    let logprobs = usage
458        .get("logprobs")
459        .and_then(Json::as_array)
460        .map(|a| a.iter().filter_map(Json::as_f64).collect());
461    let known: std::collections::HashSet<&str> = TOKEN_USAGE_KNOWN_KEYS.iter().copied().collect();
462    let extra_map: serde_json::Map<String, Json> = usage
463        .iter()
464        .filter(|(k, _)| !known.contains(k.as_str()))
465        .map(|(k, v)| (k.clone(), v.clone()))
466        .collect();
467    let extra = if extra_map.is_empty() {
468        None
469    } else {
470        Some(Json::Object(extra_map))
471    };
472    if prompt.is_none() && completion.is_none() && cached.is_none() {
473        return None;
474    }
475    Some(AtifMetrics {
476        prompt_tokens: prompt,
477        completion_tokens: completion,
478        cached_tokens: cached,
479        cost_usd: cost,
480        prompt_token_ids: prompt_ids,
481        completion_token_ids: completion_ids,
482        logprobs,
483        extra,
484    })
485}
486
487fn token_usage_object(output: &Json) -> Option<&serde_json::Map<String, Json>> {
488    let output = output.as_object()?;
489    output
490        .get("token_usage")
491        .or_else(|| output.get("usage"))
492        .and_then(Json::as_object)
493}
494
495fn usage_u64(usage: &serde_json::Map<String, Json>, keys: &[&str]) -> Option<u64> {
496    keys.iter()
497        .find_map(|key| usage.get(*key).and_then(Json::as_u64))
498}
499
500fn sum_usage_u64(usage: &serde_json::Map<String, Json>, keys: &[&str]) -> Option<u64> {
501    let mut total = 0;
502    let mut found = false;
503    for key in keys {
504        if let Some(value) = usage.get(*key).and_then(Json::as_u64) {
505            total += value;
506            found = true;
507        }
508    }
509    found.then_some(total)
510}
511
512fn prompt_tokens_detail_u64(usage: &serde_json::Map<String, Json>, key: &str) -> Option<u64> {
513    usage
514        .get("prompt_tokens_details")
515        .and_then(Json::as_object)
516        .and_then(|details| details.get(key))
517        .and_then(Json::as_u64)
518}
519
520/// Extract `reasoning_effort` from an LLM request (string or number).
521///
522/// The request content may have `reasoning_effort` (e.g. `"high"`, `"medium"`,
523/// or a numeric value). Returns the value as Json for flexibility.
524fn extract_reasoning_effort(input: &Json) -> Option<Json> {
525    if let Some(obj) = input.as_object()
526        && let Some(v) = obj.get("reasoning_effort")
527        && !v.is_null()
528    {
529        return Some(v.clone());
530    }
531    None
532}
533
534/// Extract `reasoning` (reasoning_content) from an LLM response output.
535///
536/// The agent's explicit internal reasoning may appear in the response under the
537/// `"reasoning"` key. Returns `None` if absent or not a string.
538fn extract_reasoning_content(output: &Json) -> Option<String> {
539    if let Some(obj) = output.as_object()
540        && let Some(r) = obj.get("reasoning")
541    {
542        return r.as_str().map(String::from);
543    }
544    None
545}
546
547/// Extract just the `messages` array from an LLM request payload.
548///
549/// LLM start inputs typically contain `{ "messages": [...], "model": "...",
550/// "max_tokens": ..., "tools": [...], "stream": ... }`. For the user step we
551/// only want the `messages` array — the rest is LLM configuration noise.
552///
553/// Returns the `messages` value if present, otherwise the full input.
554fn extract_user_messages(input: &Json) -> Json {
555    if let Some(obj) = input.as_object()
556        && let Some(messages) = obj.get("messages")
557    {
558        return messages.clone();
559    }
560    input.clone()
561}
562
563/// Try to promote `tool_calls` from the raw LLM response into `AtifToolCall` entries.
564///
565/// Expected shape per OpenAI convention:
566/// ```json
567/// "tool_calls": [{ "id": "...", "type": "function", "function": { "name": "...", "arguments": "..." } }]
568/// ```
569///
570/// String `arguments` are parsed into JSON for consistency with NeMo Flow tool events
571/// which always provide parsed arguments.
572///
573/// Returns `None` if there are no tool calls or the structure is unrecognized.
574fn extract_tool_calls(output: &Json) -> Option<Vec<AtifToolCall>> {
575    let arr = output.as_object()?.get("tool_calls")?.as_array()?;
576    if arr.is_empty() {
577        return None;
578    }
579    let mut calls = Vec::with_capacity(arr.len());
580    for tc in arr {
581        let tc_obj = tc.as_object()?;
582        let id = tc_obj
583            .get("id")
584            .and_then(Json::as_str)
585            .unwrap_or("")
586            .to_string();
587        // The function details live under "function".
588        let func = tc_obj.get("function").and_then(Json::as_object);
589        let name = func
590            .and_then(|f| f.get("name"))
591            .and_then(Json::as_str)
592            .unwrap_or("")
593            .to_string();
594        let raw_arguments = func
595            .and_then(|f| f.get("arguments"))
596            .cloned()
597            .unwrap_or(Json::Null);
598        // Parse string arguments as JSON for consistency.
599        let arguments = if let Some(s) = raw_arguments.as_str() {
600            serde_json::from_str(s).unwrap_or(raw_arguments)
601        } else {
602            raw_arguments
603        };
604        // Skip entries with no id and no name — they are not meaningful.
605        if id.is_empty() && name.is_empty() {
606            continue;
607        }
608        calls.push(AtifToolCall {
609            tool_call_id: id,
610            function_name: name,
611            arguments,
612        });
613    }
614    if calls.is_empty() { None } else { Some(calls) }
615}
616
617/// Compute aggregate `final_metrics` by summing token counts across all steps.
618///
619/// Always returns `Some(AtifFinalMetrics)` with `total_steps` set. Token/cost
620/// fields are populated when at least one step carries metrics.
621fn compute_final_metrics(steps: &[AtifStep]) -> Option<AtifFinalMetrics> {
622    let mut total_prompt: u64 = 0;
623    let mut total_completion: u64 = 0;
624    let mut total_cached: u64 = 0;
625    let mut total_cost: f64 = 0.0;
626    let mut has_any = false;
627
628    for step in steps {
629        if let Some(m) = &step.metrics {
630            has_any = true;
631            total_prompt += m.prompt_tokens.unwrap_or(0);
632            total_completion += m.completion_tokens.unwrap_or(0);
633            total_cached += m.cached_tokens.unwrap_or(0);
634            total_cost += m.cost_usd.unwrap_or(0.0);
635        }
636    }
637
638    Some(AtifFinalMetrics {
639        total_prompt_tokens: if has_any { Some(total_prompt) } else { None },
640        total_completion_tokens: if has_any {
641            Some(total_completion)
642        } else {
643            None
644        },
645        total_cached_tokens: if has_any && total_cached > 0 {
646            Some(total_cached)
647        } else {
648            None
649        },
650        total_cost_usd: if has_any && total_cost > 0.0 {
651            Some(total_cost)
652        } else {
653            None
654        },
655        total_steps: Some(steps.len() as u64),
656        extra: None,
657    })
658}
659
660// ---------------------------------------------------------------------------
661// AtifStepExtra helpers
662// ---------------------------------------------------------------------------
663
664/// Build an [`AtifAncestry`] from a NeMo Flow [`Event`].
665///
666/// `name_map` is a pre-pass uuid → name lookup used to resolve `parent_name`.
667fn build_ancestry(
668    event: &Event,
669    name_map: &std::collections::HashMap<Uuid, String>,
670) -> AtifAncestry {
671    AtifAncestry {
672        function_id: event.uuid().to_string(),
673        function_name: event.name().to_string(),
674        parent_id: event.parent_uuid().map(|u| u.to_string()),
675        parent_name: event.parent_uuid().and_then(|u| name_map.get(&u)).cloned(),
676    }
677}
678
679/// Build an [`AtifInvocationInfo`] from start/end timestamps.
680///
681/// If `start_ts` is `None`, both timestamps are omitted to preserve the
682/// requirement that they are always emitted together or not at all.
683fn build_invocation_info(
684    start_ts: Option<DateTime<Utc>>,
685    end_ts: DateTime<Utc>,
686    invocation_id: Option<String>,
687    framework: &str,
688) -> AtifInvocationInfo {
689    AtifInvocationInfo {
690        start_timestamp: start_ts.map(|s| s.timestamp_millis() as f64 / 1000.0),
691        end_timestamp: start_ts.map(|_| end_ts.timestamp_millis() as f64 / 1000.0),
692        invocation_id,
693        status: Some("completed".to_string()),
694        framework: Some(framework.to_string()),
695    }
696}
697
698struct EventLookupMaps {
699    name_map: std::collections::HashMap<Uuid, String>,
700    start_ts_map: std::collections::HashMap<Uuid, DateTime<Utc>>,
701}
702
703impl EventLookupMaps {
704    fn from_events(events: &[&Event]) -> Self {
705        let mut name_map = std::collections::HashMap::new();
706        let mut start_ts_map = std::collections::HashMap::new();
707        for event in events {
708            if is_start_event(event) {
709                name_map.insert(event.uuid(), event.name().to_string());
710                start_ts_map.insert(event.uuid(), *event.timestamp());
711            }
712        }
713        Self {
714            name_map,
715            start_ts_map,
716        }
717    }
718}
719
720#[derive(Default)]
721struct PendingAgentStep {
722    step_idx: Option<usize>,
723    ancestry: Option<AtifAncestry>,
724    invocation: Option<AtifInvocationInfo>,
725    tool_ancestry: Vec<AtifAncestry>,
726    tool_invocations: Vec<AtifInvocationInfo>,
727    tool_call_order: Vec<String>,
728}
729
730impl PendingAgentStep {
731    fn finalize_into(&mut self, steps: &mut [AtifStep]) {
732        let (Some(step_idx), Some(ancestry)) = (self.step_idx.take(), self.ancestry.take()) else {
733            return;
734        };
735        let Some(step) = steps.get_mut(step_idx) else {
736            return;
737        };
738
739        self.sort_tool_metadata();
740        let extra = AtifStepExtra {
741            ancestry,
742            invocation: self.invocation.take(),
743            llm_request: None,
744            tool_ancestry: std::mem::take(&mut self.tool_ancestry),
745            tool_invocations: if self.tool_invocations.is_empty() {
746                None
747            } else {
748                Some(std::mem::take(&mut self.tool_invocations))
749            },
750        };
751        step.extra = serde_json::to_value(&extra).ok();
752    }
753
754    fn set_current_agent(
755        &mut self,
756        step_idx: usize,
757        ancestry: AtifAncestry,
758        invocation: AtifInvocationInfo,
759        tool_call_order: Vec<String>,
760    ) {
761        self.step_idx = Some(step_idx);
762        self.ancestry = Some(ancestry);
763        self.invocation = Some(invocation);
764        self.tool_ancestry.clear();
765        self.tool_invocations.clear();
766        self.tool_call_order = tool_call_order;
767    }
768
769    fn push_tool_metadata(&mut self, ancestry: AtifAncestry, invocation: AtifInvocationInfo) {
770        self.tool_ancestry.push(ancestry);
771        self.tool_invocations.push(invocation);
772    }
773
774    fn has_active_step(&self) -> bool {
775        self.step_idx.is_some()
776    }
777
778    fn sort_tool_metadata(&mut self) {
779        if self.tool_call_order.is_empty() || self.tool_ancestry.is_empty() {
780            return;
781        }
782
783        let mut pairs: Vec<(AtifAncestry, AtifInvocationInfo)> =
784            std::mem::take(&mut self.tool_ancestry)
785                .into_iter()
786                .zip(std::mem::take(&mut self.tool_invocations))
787                .collect();
788        pairs.sort_by_key(|(_, invocation)| {
789            invocation
790                .invocation_id
791                .as_deref()
792                .and_then(|id| self.tool_call_order.iter().position(|entry| entry == id))
793                .unwrap_or(usize::MAX)
794        });
795        let (sorted_ancestry, sorted_invocations): (Vec<_>, Vec<_>) = pairs.into_iter().unzip();
796        self.tool_ancestry = sorted_ancestry;
797        self.tool_invocations = sorted_invocations;
798    }
799}
800
801#[derive(Default)]
802struct StepConversionState {
803    steps: Vec<AtifStep>,
804    last_tool_call_map: std::collections::HashMap<String, String>,
805    pending_observations: Vec<AtifObservationResult>,
806    pending_obs_timestamp: Option<String>,
807    current_reasoning_effort: Option<Json>,
808    current_agent: PendingAgentStep,
809}
810
811impl StepConversionState {
812    fn flush_observations(&mut self) {
813        if self.pending_observations.is_empty() {
814            return;
815        }
816
817        self.steps.push(AtifStep {
818            step_id: 0,
819            source: "system".to_string(),
820            message: Json::Null,
821            timestamp: self.pending_obs_timestamp.take(),
822            model_name: None,
823            reasoning_effort: None,
824            reasoning_content: None,
825            tool_calls: None,
826            observation: Some(AtifObservation {
827                results: std::mem::take(&mut self.pending_observations),
828            }),
829            metrics: None,
830            is_copied_context: None,
831            extra: None,
832        });
833    }
834
835    fn finalize_agent_extra(&mut self) {
836        self.current_agent.finalize_into(&mut self.steps);
837    }
838
839    fn handle_llm_start(&mut self, event: &Event, lookups: &EventLookupMaps) {
840        self.flush_observations();
841        self.finalize_agent_extra();
842
843        let Some(input) = event.data() else {
844            return;
845        };
846        let content = unwrap_llm_request(input);
847        self.current_reasoning_effort = extract_reasoning_effort(&content);
848        let extra = AtifStepExtra {
849            ancestry: build_ancestry(event, &lookups.name_map),
850            invocation: None,
851            llm_request: Some(content.clone()),
852            tool_ancestry: Vec::new(),
853            tool_invocations: None,
854        };
855        self.steps.push(AtifStep {
856            step_id: 0,
857            source: "user".to_string(),
858            message: extract_user_messages(&content),
859            timestamp: Some(event.timestamp().to_rfc3339()),
860            model_name: None,
861            reasoning_effort: None,
862            reasoning_content: None,
863            tool_calls: None,
864            observation: None,
865            metrics: None,
866            is_copied_context: None,
867            extra: serde_json::to_value(&extra).ok(),
868        });
869    }
870
871    fn handle_llm_end(&mut self, event: &Event, lookups: &EventLookupMaps) {
872        self.flush_observations();
873
874        let Some(output) = event.data() else {
875            return;
876        };
877        let tool_calls = extract_tool_calls(output);
878        let tool_call_order = refresh_tool_call_lookup(&mut self.last_tool_call_map, &tool_calls);
879        let reasoning_effort = self.current_reasoning_effort.take();
880        let reasoning_content = extract_reasoning_content(output);
881        let start_ts = lookups.start_ts_map.get(&event.uuid()).cloned();
882        let ancestry = build_ancestry(event, &lookups.name_map);
883        let invocation = build_invocation_info(
884            start_ts,
885            *event.timestamp(),
886            Some(event.uuid().to_string()),
887            "nemo_flow",
888        );
889
890        self.steps.push(AtifStep {
891            step_id: 0,
892            source: "agent".to_string(),
893            message: extract_llm_response_message(output),
894            timestamp: Some(event.timestamp().to_rfc3339()),
895            model_name: event.model_name().map(ToOwned::to_owned),
896            reasoning_effort,
897            reasoning_content,
898            tool_calls,
899            observation: None,
900            metrics: extract_metrics(output),
901            is_copied_context: None,
902            extra: None,
903        });
904        self.current_agent.set_current_agent(
905            self.steps.len() - 1,
906            ancestry,
907            invocation,
908            tool_call_order,
909        );
910    }
911
912    fn handle_tool_end(&mut self, event: &Event, lookups: &EventLookupMaps) {
913        if let Some(output) = event.data() {
914            if self.pending_obs_timestamp.is_none() {
915                self.pending_obs_timestamp = Some(event.timestamp().to_rfc3339());
916            }
917            self.pending_observations.push(AtifObservationResult {
918                source_call_id: event
919                    .tool_call_id()
920                    .map(ToOwned::to_owned)
921                    .or_else(|| self.last_tool_call_map.get(event.name()).cloned()),
922                content: output.clone(),
923            });
924        }
925
926        if !self.current_agent.has_active_step() {
927            return;
928        }
929        let start_ts = lookups.start_ts_map.get(&event.uuid()).cloned();
930        let invocation = build_invocation_info(
931            start_ts,
932            *event.timestamp(),
933            event
934                .tool_call_id()
935                .map(ToOwned::to_owned)
936                .or_else(|| Some(event.uuid().to_string())),
937            "nemo_flow",
938        );
939        self.current_agent
940            .push_tool_metadata(build_ancestry(event, &lookups.name_map), invocation);
941    }
942
943    fn handle_mark(&mut self, mark: &Event, lookups: &EventLookupMaps) {
944        self.flush_observations();
945        let Some(data) = mark.data() else {
946            return;
947        };
948        if is_empty_mark_payload(data) {
949            return;
950        }
951        let extra = AtifStepExtra {
952            ancestry: build_ancestry(mark, &lookups.name_map),
953            invocation: Some(AtifInvocationInfo {
954                start_timestamp: None,
955                end_timestamp: None,
956                invocation_id: Some(mark.uuid().to_string()),
957                status: Some("completed".to_string()),
958                framework: Some("nemo_flow".to_string()),
959            }),
960            llm_request: None,
961            tool_ancestry: Vec::new(),
962            tool_invocations: None,
963        };
964        self.steps.push(AtifStep {
965            step_id: 0,
966            source: "system".to_string(),
967            message: mark_message(mark, data),
968            timestamp: Some(mark.timestamp().to_rfc3339()),
969            model_name: None,
970            reasoning_effort: None,
971            reasoning_content: None,
972            tool_calls: None,
973            observation: None,
974            metrics: None,
975            is_copied_context: None,
976            extra: serde_json::to_value(&extra).ok(),
977        });
978    }
979
980    fn finish(mut self) -> Vec<AtifStep> {
981        self.finalize_agent_extra();
982        self.flush_observations();
983        for (index, step) in self.steps.iter_mut().enumerate() {
984            step.step_id = index + 1;
985        }
986        self.steps
987    }
988}
989
990fn refresh_tool_call_lookup(
991    last_tool_call_map: &mut std::collections::HashMap<String, String>,
992    tool_calls: &Option<Vec<AtifToolCall>>,
993) -> Vec<String> {
994    last_tool_call_map.clear();
995    let mut tool_call_order = Vec::new();
996    if let Some(tool_calls) = tool_calls {
997        for tool_call in tool_calls {
998            if !tool_call.function_name.is_empty() {
999                last_tool_call_map.insert(
1000                    tool_call.function_name.clone(),
1001                    tool_call.tool_call_id.clone(),
1002                );
1003            }
1004            tool_call_order.push(tool_call.tool_call_id.clone());
1005        }
1006    }
1007    tool_call_order
1008}
1009
1010// ---------------------------------------------------------------------------
1011// Event-to-step mapping
1012// ---------------------------------------------------------------------------
1013
1014/// Converts a slice of events into ATIF steps.
1015///
1016/// Mapping logic:
1017/// 1. Sort events by timestamp.
1018/// 2. For each LLM pair:
1019///    - Start event → user step (message = extracted `messages` array from
1020///      unwrapped LlmRequest content, stripping `max_tokens`/`model`/etc.)
1021///    - End event → agent step (message = extracted content, metrics from
1022///      token_usage, tool_calls promoted to AtifToolCall entries with parsed
1023///      JSON arguments)
1024/// 3. For Tool events:
1025///    - Start events are **skipped** (tool_calls come from LLM End promotion)
1026///    - Consecutive End events are **merged** into a single system observation
1027///      step with multiple results
1028/// 4. Tool End observation results are correlated with the preceding LLM End's
1029///    promoted tool_calls by function name → `source_call_id`.
1030/// 5. Mark events → system steps if they carry data.
1031/// 6. Scope Start/End → skipped.
1032fn events_to_steps(events: &[&Event]) -> Vec<AtifStep> {
1033    let mut sorted: Vec<&Event> = events.to_vec();
1034    sorted.sort_by_key(|e| *e.timestamp());
1035    let lookups = EventLookupMaps::from_events(&sorted);
1036    let mut state = StepConversionState::default();
1037
1038    for event in &sorted {
1039        match (
1040            event.kind(),
1041            event.scope_category(),
1042            event.category().map(|category| category.as_str()),
1043        ) {
1044            ("scope", Some(crate::api::event::ScopeCategory::Start), Some("llm")) => {
1045                state.handle_llm_start(event, &lookups)
1046            }
1047            ("scope", Some(crate::api::event::ScopeCategory::End), Some("llm")) => {
1048                state.handle_llm_end(event, &lookups)
1049            }
1050            ("scope", Some(crate::api::event::ScopeCategory::End), Some("tool")) => {
1051                state.handle_tool_end(event, &lookups)
1052            }
1053            ("mark", _, _) => state.handle_mark(event, &lookups),
1054            _ => {}
1055        }
1056    }
1057
1058    state.finish()
1059}
1060
1061fn is_empty_mark_payload(data: &Json) -> bool {
1062    data.is_null() || data.as_object().is_some_and(|object| object.is_empty())
1063}
1064
1065// A runtime mark is point-in-time telemetry rather than a scoped call with start/end events. Agent
1066// hook adapters use marks for lifecycle notifications that do not map to first-class ATIF step
1067// types, for example hook-only status updates or synthetic fallback events. Preserve the original
1068// mark payload, but surface the hook name in a stable `hook_event_name` field so trajectory readers
1069// can label system steps without knowing adapter-specific metadata conventions.
1070fn mark_message(mark: &Event, data: &Json) -> Json {
1071    let Some(object) = data.as_object() else {
1072        return data.clone();
1073    };
1074    let mut message = object.clone();
1075    if !message.contains_key("hook_event_name")
1076        && let Some(hook_event_name) = mark_hook_event_name(mark)
1077    {
1078        message.insert("hook_event_name".to_string(), Json::String(hook_event_name));
1079    }
1080    Json::Object(message)
1081}
1082
1083// Prefer the adapter-provided hook name because the runtime mark name may be a generic bucket such
1084// as `hook_mark` or a synthetic fallback like `subagent_end_without_start`. Falling back to the mark
1085// name keeps non-hook marks readable without making this exporter depend on any one agent adapter.
1086fn mark_hook_event_name(mark: &Event) -> Option<String> {
1087    mark.metadata()
1088        .and_then(Json::as_object)
1089        .and_then(|metadata| metadata.get("hook_event_name"))
1090        .and_then(Json::as_str)
1091        .filter(|name| !name.is_empty())
1092        .map(ToOwned::to_owned)
1093        .or_else(|| Some(mark.name().to_string()).filter(|name| !name.is_empty()))
1094}
1095
1096fn is_start_event(event: &Event) -> bool {
1097    event.scope_category() == Some(crate::api::event::ScopeCategory::Start)
1098}
1099
1100// ---------------------------------------------------------------------------
1101// Tests
1102// ---------------------------------------------------------------------------
1103
1104#[cfg(test)]
1105#[path = "../../tests/unit/atif_tests.rs"]
1106mod tests;