1use std::sync::{Arc, Mutex};
32
33use chrono::{DateTime, Utc};
34use serde::{Deserialize, Serialize};
35use uuid::Uuid;
36
37use crate::api::event::Event;
38use crate::api::runtime::EventSubscriberFn;
39use crate::json::Json;
40
41pub const ATIF_SCHEMA_VERSION: &str = "ATIF-v1.6";
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct AtifAgentInfo {
54 pub name: String,
56 pub version: String,
58 #[serde(skip_serializing_if = "Option::is_none")]
60 pub model_name: Option<String>,
61 #[serde(skip_serializing_if = "Option::is_none")]
63 pub tool_definitions: Option<Vec<Json>>,
64 #[serde(skip_serializing_if = "Option::is_none")]
66 pub extra: Option<Json>,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct AtifStep {
72 pub step_id: usize,
74 pub source: String,
76 pub message: Json,
78 #[serde(skip_serializing_if = "Option::is_none")]
80 pub timestamp: Option<String>,
81 #[serde(skip_serializing_if = "Option::is_none")]
83 pub model_name: Option<String>,
84 #[serde(skip_serializing_if = "Option::is_none")]
86 pub reasoning_effort: Option<Json>,
87 #[serde(skip_serializing_if = "Option::is_none")]
89 pub reasoning_content: Option<String>,
90 #[serde(skip_serializing_if = "Option::is_none")]
92 pub tool_calls: Option<Vec<AtifToolCall>>,
93 #[serde(skip_serializing_if = "Option::is_none")]
95 pub observation: Option<AtifObservation>,
96 #[serde(skip_serializing_if = "Option::is_none")]
98 pub metrics: Option<AtifMetrics>,
99 #[serde(skip_serializing_if = "Option::is_none")]
101 pub is_copied_context: Option<bool>,
102 #[serde(skip_serializing_if = "Option::is_none")]
104 pub extra: Option<Json>,
105}
106
107#[derive(Debug, Clone, Default, Serialize, Deserialize)]
109pub struct AtifMetrics {
110 #[serde(skip_serializing_if = "Option::is_none")]
112 pub prompt_tokens: Option<u64>,
113 #[serde(skip_serializing_if = "Option::is_none")]
115 pub completion_tokens: Option<u64>,
116 #[serde(skip_serializing_if = "Option::is_none")]
118 pub cached_tokens: Option<u64>,
119 #[serde(skip_serializing_if = "Option::is_none")]
121 pub cost_usd: Option<f64>,
122 #[serde(skip_serializing_if = "Option::is_none")]
124 pub prompt_token_ids: Option<Vec<u64>>,
125 #[serde(skip_serializing_if = "Option::is_none")]
127 pub completion_token_ids: Option<Vec<u64>>,
128 #[serde(skip_serializing_if = "Option::is_none")]
130 pub logprobs: Option<Vec<f64>>,
131 #[serde(skip_serializing_if = "Option::is_none")]
133 pub extra: Option<Json>,
134}
135
136#[derive(Debug, Clone, Default, Serialize, Deserialize)]
138pub struct AtifFinalMetrics {
139 #[serde(skip_serializing_if = "Option::is_none")]
141 pub total_prompt_tokens: Option<u64>,
142 #[serde(skip_serializing_if = "Option::is_none")]
144 pub total_completion_tokens: Option<u64>,
145 #[serde(skip_serializing_if = "Option::is_none")]
147 pub total_cached_tokens: Option<u64>,
148 #[serde(skip_serializing_if = "Option::is_none")]
150 pub total_cost_usd: Option<f64>,
151 #[serde(skip_serializing_if = "Option::is_none")]
153 pub total_steps: Option<u64>,
154 #[serde(skip_serializing_if = "Option::is_none")]
156 pub extra: Option<Json>,
157}
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct AtifToolCall {
162 pub tool_call_id: String,
164 pub function_name: String,
166 pub arguments: Json,
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
172pub struct AtifObservation {
173 pub results: Vec<AtifObservationResult>,
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct AtifObservationResult {
180 #[serde(skip_serializing_if = "Option::is_none")]
182 pub source_call_id: Option<String>,
183 pub content: Json,
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct AtifAncestry {
190 pub function_id: String,
192 pub function_name: String,
194 #[serde(skip_serializing_if = "Option::is_none")]
196 pub parent_id: Option<String>,
197 #[serde(skip_serializing_if = "Option::is_none")]
199 pub parent_name: Option<String>,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct AtifInvocationInfo {
208 #[serde(skip_serializing_if = "Option::is_none")]
210 pub start_timestamp: Option<f64>,
211 #[serde(skip_serializing_if = "Option::is_none")]
213 pub end_timestamp: Option<f64>,
214 #[serde(skip_serializing_if = "Option::is_none")]
216 pub invocation_id: Option<String>,
217 #[serde(skip_serializing_if = "Option::is_none")]
219 pub status: Option<String>,
220 #[serde(skip_serializing_if = "Option::is_none")]
222 pub framework: Option<String>,
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct AtifStepExtra {
231 pub ancestry: AtifAncestry,
233 #[serde(skip_serializing_if = "Option::is_none")]
235 pub invocation: Option<AtifInvocationInfo>,
236 #[serde(skip_serializing_if = "Option::is_none")]
238 pub llm_request: Option<Json>,
239 #[serde(default, skip_serializing_if = "Vec::is_empty")]
241 pub tool_ancestry: Vec<AtifAncestry>,
242 #[serde(skip_serializing_if = "Option::is_none")]
244 pub tool_invocations: Option<Vec<AtifInvocationInfo>>,
245}
246
247#[derive(Debug, Clone, Serialize, Deserialize)]
249pub struct AtifTrajectory {
250 pub schema_version: String,
252 pub session_id: String,
254 pub agent: AtifAgentInfo,
256 pub steps: Vec<AtifStep>,
258 #[serde(skip_serializing_if = "Option::is_none")]
260 pub notes: Option<String>,
261 #[serde(skip_serializing_if = "Option::is_none")]
263 pub final_metrics: Option<AtifFinalMetrics>,
264 #[serde(skip_serializing_if = "Option::is_none")]
266 pub continued_trajectory_ref: Option<String>,
267 #[serde(skip_serializing_if = "Option::is_none")]
269 pub extra: Option<Json>,
270}
271
272struct AtifExporterState {
277 session_id: String,
278 agent_info: AtifAgentInfo,
279 events: Vec<Event>,
280}
281
282pub struct AtifExporter {
287 state: Arc<Mutex<AtifExporterState>>,
288}
289
290impl AtifExporter {
291 pub fn new(session_id: String, agent_info: AtifAgentInfo) -> Self {
300 Self {
301 state: Arc::new(Mutex::new(AtifExporterState {
302 session_id,
303 agent_info,
304 events: Vec::new(),
305 })),
306 }
307 }
308
309 pub fn subscriber(&self) -> EventSubscriberFn {
318 let state = self.state.clone();
319 Arc::new(move |event: &Event| {
320 if let Ok(mut s) = state.lock() {
321 s.events.push(event.clone());
322 }
323 })
324 }
325
326 pub fn export(&self) -> AtifTrajectory {
335 let state = self.state.lock().unwrap();
336 let collected_events: Vec<&Event> = state.events.iter().collect();
337 let steps = events_to_steps(&collected_events);
338 let final_metrics = compute_final_metrics(&steps);
339
340 AtifTrajectory {
341 schema_version: ATIF_SCHEMA_VERSION.to_string(),
342 session_id: state.session_id.clone(),
343 agent: state.agent_info.clone(),
344 steps,
345 notes: None,
346 final_metrics,
347 continued_trajectory_ref: None,
348 extra: None,
349 }
350 }
351
352 pub fn clear(&self) {
357 let mut state = self.state.lock().unwrap();
358 state.events.clear();
359 }
360}
361
362fn unwrap_llm_request(input: &Json) -> Json {
371 if let Some(obj) = input.as_object()
372 && obj.contains_key("content")
373 && obj.contains_key("headers")
374 {
375 return obj.get("content").cloned().unwrap_or_else(|| input.clone());
376 }
377 input.clone()
378}
379
380fn extract_llm_response_message(output: &Json) -> Json {
385 if let Some(obj) = output.as_object() {
386 if let Some(content) = non_null_object_field(obj, "content") {
387 return content;
388 }
389 if let Some(summary) = llm_response_summary(obj) {
390 return summary;
391 }
392 }
393 output.clone()
395}
396
397fn non_null_object_field(obj: &serde_json::Map<String, Json>, key: &str) -> Option<Json> {
398 obj.get(key).filter(|value| !value.is_null()).cloned()
399}
400
401fn llm_response_summary(obj: &serde_json::Map<String, Json>) -> Option<Json> {
402 if !obj.contains_key("tool_calls") && !obj.contains_key("role") {
403 return None;
404 }
405
406 let mut summary = serde_json::Map::new();
407 if let Some(role) = obj.get("role") {
408 summary.insert("role".to_string(), role.clone());
409 }
410 if let Some(tool_calls) = obj.get("tool_calls") {
411 summary.insert("tool_calls".to_string(), tool_calls.clone());
412 }
413 if let Some(reasoning) = non_null_object_field(obj, "reasoning") {
414 summary.insert("reasoning".to_string(), reasoning);
415 }
416
417 (!summary.is_empty()).then_some(Json::Object(summary))
418}
419
420const TOKEN_USAGE_KNOWN_KEYS: &[&str] = &[
422 "prompt_tokens",
423 "completion_tokens",
424 "cached_tokens",
425 "cost_usd",
426 "prompt_token_ids",
427 "completion_token_ids",
428 "logprobs",
429];
430
431fn extract_metrics(output: &Json) -> Option<AtifMetrics> {
437 let usage = token_usage_object(output)?;
438 let prompt = usage_u64(usage, &["prompt_tokens", "input_tokens"]);
439 let completion = usage_u64(usage, &["completion_tokens", "output_tokens"]);
440 let cached = usage_u64(usage, &["cached_tokens"])
441 .or_else(|| prompt_tokens_detail_u64(usage, "cached_tokens"))
442 .or_else(|| {
443 sum_usage_u64(
444 usage,
445 &["cache_read_input_tokens", "cache_creation_input_tokens"],
446 )
447 });
448 let cost = usage.get("cost_usd").and_then(Json::as_f64);
449 let prompt_ids = usage
450 .get("prompt_token_ids")
451 .and_then(Json::as_array)
452 .map(|a| a.iter().filter_map(Json::as_u64).collect());
453 let completion_ids = usage
454 .get("completion_token_ids")
455 .and_then(Json::as_array)
456 .map(|a| a.iter().filter_map(Json::as_u64).collect());
457 let logprobs = usage
458 .get("logprobs")
459 .and_then(Json::as_array)
460 .map(|a| a.iter().filter_map(Json::as_f64).collect());
461 let known: std::collections::HashSet<&str> = TOKEN_USAGE_KNOWN_KEYS.iter().copied().collect();
462 let extra_map: serde_json::Map<String, Json> = usage
463 .iter()
464 .filter(|(k, _)| !known.contains(k.as_str()))
465 .map(|(k, v)| (k.clone(), v.clone()))
466 .collect();
467 let extra = if extra_map.is_empty() {
468 None
469 } else {
470 Some(Json::Object(extra_map))
471 };
472 if prompt.is_none() && completion.is_none() && cached.is_none() {
473 return None;
474 }
475 Some(AtifMetrics {
476 prompt_tokens: prompt,
477 completion_tokens: completion,
478 cached_tokens: cached,
479 cost_usd: cost,
480 prompt_token_ids: prompt_ids,
481 completion_token_ids: completion_ids,
482 logprobs,
483 extra,
484 })
485}
486
487fn token_usage_object(output: &Json) -> Option<&serde_json::Map<String, Json>> {
488 let output = output.as_object()?;
489 output
490 .get("token_usage")
491 .or_else(|| output.get("usage"))
492 .and_then(Json::as_object)
493}
494
495fn usage_u64(usage: &serde_json::Map<String, Json>, keys: &[&str]) -> Option<u64> {
496 keys.iter()
497 .find_map(|key| usage.get(*key).and_then(Json::as_u64))
498}
499
500fn sum_usage_u64(usage: &serde_json::Map<String, Json>, keys: &[&str]) -> Option<u64> {
501 let mut total = 0;
502 let mut found = false;
503 for key in keys {
504 if let Some(value) = usage.get(*key).and_then(Json::as_u64) {
505 total += value;
506 found = true;
507 }
508 }
509 found.then_some(total)
510}
511
512fn prompt_tokens_detail_u64(usage: &serde_json::Map<String, Json>, key: &str) -> Option<u64> {
513 usage
514 .get("prompt_tokens_details")
515 .and_then(Json::as_object)
516 .and_then(|details| details.get(key))
517 .and_then(Json::as_u64)
518}
519
520fn extract_reasoning_effort(input: &Json) -> Option<Json> {
525 if let Some(obj) = input.as_object()
526 && let Some(v) = obj.get("reasoning_effort")
527 && !v.is_null()
528 {
529 return Some(v.clone());
530 }
531 None
532}
533
534fn extract_reasoning_content(output: &Json) -> Option<String> {
539 if let Some(obj) = output.as_object()
540 && let Some(r) = obj.get("reasoning")
541 {
542 return r.as_str().map(String::from);
543 }
544 None
545}
546
547fn extract_user_messages(input: &Json) -> Json {
555 if let Some(obj) = input.as_object()
556 && let Some(messages) = obj.get("messages")
557 {
558 return messages.clone();
559 }
560 input.clone()
561}
562
563fn extract_tool_calls(output: &Json) -> Option<Vec<AtifToolCall>> {
575 let arr = output.as_object()?.get("tool_calls")?.as_array()?;
576 if arr.is_empty() {
577 return None;
578 }
579 let mut calls = Vec::with_capacity(arr.len());
580 for tc in arr {
581 let tc_obj = tc.as_object()?;
582 let id = tc_obj
583 .get("id")
584 .and_then(Json::as_str)
585 .unwrap_or("")
586 .to_string();
587 let func = tc_obj.get("function").and_then(Json::as_object);
589 let name = func
590 .and_then(|f| f.get("name"))
591 .and_then(Json::as_str)
592 .unwrap_or("")
593 .to_string();
594 let raw_arguments = func
595 .and_then(|f| f.get("arguments"))
596 .cloned()
597 .unwrap_or(Json::Null);
598 let arguments = if let Some(s) = raw_arguments.as_str() {
600 serde_json::from_str(s).unwrap_or(raw_arguments)
601 } else {
602 raw_arguments
603 };
604 if id.is_empty() && name.is_empty() {
606 continue;
607 }
608 calls.push(AtifToolCall {
609 tool_call_id: id,
610 function_name: name,
611 arguments,
612 });
613 }
614 if calls.is_empty() { None } else { Some(calls) }
615}
616
617fn compute_final_metrics(steps: &[AtifStep]) -> Option<AtifFinalMetrics> {
622 let mut total_prompt: u64 = 0;
623 let mut total_completion: u64 = 0;
624 let mut total_cached: u64 = 0;
625 let mut total_cost: f64 = 0.0;
626 let mut has_any = false;
627
628 for step in steps {
629 if let Some(m) = &step.metrics {
630 has_any = true;
631 total_prompt += m.prompt_tokens.unwrap_or(0);
632 total_completion += m.completion_tokens.unwrap_or(0);
633 total_cached += m.cached_tokens.unwrap_or(0);
634 total_cost += m.cost_usd.unwrap_or(0.0);
635 }
636 }
637
638 Some(AtifFinalMetrics {
639 total_prompt_tokens: if has_any { Some(total_prompt) } else { None },
640 total_completion_tokens: if has_any {
641 Some(total_completion)
642 } else {
643 None
644 },
645 total_cached_tokens: if has_any && total_cached > 0 {
646 Some(total_cached)
647 } else {
648 None
649 },
650 total_cost_usd: if has_any && total_cost > 0.0 {
651 Some(total_cost)
652 } else {
653 None
654 },
655 total_steps: Some(steps.len() as u64),
656 extra: None,
657 })
658}
659
660fn build_ancestry(
668 event: &Event,
669 name_map: &std::collections::HashMap<Uuid, String>,
670) -> AtifAncestry {
671 AtifAncestry {
672 function_id: event.uuid().to_string(),
673 function_name: event.name().to_string(),
674 parent_id: event.parent_uuid().map(|u| u.to_string()),
675 parent_name: event.parent_uuid().and_then(|u| name_map.get(&u)).cloned(),
676 }
677}
678
679fn build_invocation_info(
684 start_ts: Option<DateTime<Utc>>,
685 end_ts: DateTime<Utc>,
686 invocation_id: Option<String>,
687 framework: &str,
688) -> AtifInvocationInfo {
689 AtifInvocationInfo {
690 start_timestamp: start_ts.map(|s| s.timestamp_millis() as f64 / 1000.0),
691 end_timestamp: start_ts.map(|_| end_ts.timestamp_millis() as f64 / 1000.0),
692 invocation_id,
693 status: Some("completed".to_string()),
694 framework: Some(framework.to_string()),
695 }
696}
697
698struct EventLookupMaps {
699 name_map: std::collections::HashMap<Uuid, String>,
700 start_ts_map: std::collections::HashMap<Uuid, DateTime<Utc>>,
701}
702
703impl EventLookupMaps {
704 fn from_events(events: &[&Event]) -> Self {
705 let mut name_map = std::collections::HashMap::new();
706 let mut start_ts_map = std::collections::HashMap::new();
707 for event in events {
708 if is_start_event(event) {
709 name_map.insert(event.uuid(), event.name().to_string());
710 start_ts_map.insert(event.uuid(), *event.timestamp());
711 }
712 }
713 Self {
714 name_map,
715 start_ts_map,
716 }
717 }
718}
719
720#[derive(Default)]
721struct PendingAgentStep {
722 step_idx: Option<usize>,
723 ancestry: Option<AtifAncestry>,
724 invocation: Option<AtifInvocationInfo>,
725 tool_ancestry: Vec<AtifAncestry>,
726 tool_invocations: Vec<AtifInvocationInfo>,
727 tool_call_order: Vec<String>,
728}
729
730impl PendingAgentStep {
731 fn finalize_into(&mut self, steps: &mut [AtifStep]) {
732 let (Some(step_idx), Some(ancestry)) = (self.step_idx.take(), self.ancestry.take()) else {
733 return;
734 };
735 let Some(step) = steps.get_mut(step_idx) else {
736 return;
737 };
738
739 self.sort_tool_metadata();
740 let extra = AtifStepExtra {
741 ancestry,
742 invocation: self.invocation.take(),
743 llm_request: None,
744 tool_ancestry: std::mem::take(&mut self.tool_ancestry),
745 tool_invocations: if self.tool_invocations.is_empty() {
746 None
747 } else {
748 Some(std::mem::take(&mut self.tool_invocations))
749 },
750 };
751 step.extra = serde_json::to_value(&extra).ok();
752 }
753
754 fn set_current_agent(
755 &mut self,
756 step_idx: usize,
757 ancestry: AtifAncestry,
758 invocation: AtifInvocationInfo,
759 tool_call_order: Vec<String>,
760 ) {
761 self.step_idx = Some(step_idx);
762 self.ancestry = Some(ancestry);
763 self.invocation = Some(invocation);
764 self.tool_ancestry.clear();
765 self.tool_invocations.clear();
766 self.tool_call_order = tool_call_order;
767 }
768
769 fn push_tool_metadata(&mut self, ancestry: AtifAncestry, invocation: AtifInvocationInfo) {
770 self.tool_ancestry.push(ancestry);
771 self.tool_invocations.push(invocation);
772 }
773
774 fn has_active_step(&self) -> bool {
775 self.step_idx.is_some()
776 }
777
778 fn sort_tool_metadata(&mut self) {
779 if self.tool_call_order.is_empty() || self.tool_ancestry.is_empty() {
780 return;
781 }
782
783 let mut pairs: Vec<(AtifAncestry, AtifInvocationInfo)> =
784 std::mem::take(&mut self.tool_ancestry)
785 .into_iter()
786 .zip(std::mem::take(&mut self.tool_invocations))
787 .collect();
788 pairs.sort_by_key(|(_, invocation)| {
789 invocation
790 .invocation_id
791 .as_deref()
792 .and_then(|id| self.tool_call_order.iter().position(|entry| entry == id))
793 .unwrap_or(usize::MAX)
794 });
795 let (sorted_ancestry, sorted_invocations): (Vec<_>, Vec<_>) = pairs.into_iter().unzip();
796 self.tool_ancestry = sorted_ancestry;
797 self.tool_invocations = sorted_invocations;
798 }
799}
800
801#[derive(Default)]
802struct StepConversionState {
803 steps: Vec<AtifStep>,
804 last_tool_call_map: std::collections::HashMap<String, String>,
805 pending_observations: Vec<AtifObservationResult>,
806 pending_obs_timestamp: Option<String>,
807 current_reasoning_effort: Option<Json>,
808 current_agent: PendingAgentStep,
809}
810
811impl StepConversionState {
812 fn flush_observations(&mut self) {
813 if self.pending_observations.is_empty() {
814 return;
815 }
816
817 self.steps.push(AtifStep {
818 step_id: 0,
819 source: "system".to_string(),
820 message: Json::Null,
821 timestamp: self.pending_obs_timestamp.take(),
822 model_name: None,
823 reasoning_effort: None,
824 reasoning_content: None,
825 tool_calls: None,
826 observation: Some(AtifObservation {
827 results: std::mem::take(&mut self.pending_observations),
828 }),
829 metrics: None,
830 is_copied_context: None,
831 extra: None,
832 });
833 }
834
835 fn finalize_agent_extra(&mut self) {
836 self.current_agent.finalize_into(&mut self.steps);
837 }
838
839 fn handle_llm_start(&mut self, event: &Event, lookups: &EventLookupMaps) {
840 self.flush_observations();
841 self.finalize_agent_extra();
842
843 let Some(input) = event.data() else {
844 return;
845 };
846 let content = unwrap_llm_request(input);
847 self.current_reasoning_effort = extract_reasoning_effort(&content);
848 let extra = AtifStepExtra {
849 ancestry: build_ancestry(event, &lookups.name_map),
850 invocation: None,
851 llm_request: Some(content.clone()),
852 tool_ancestry: Vec::new(),
853 tool_invocations: None,
854 };
855 self.steps.push(AtifStep {
856 step_id: 0,
857 source: "user".to_string(),
858 message: extract_user_messages(&content),
859 timestamp: Some(event.timestamp().to_rfc3339()),
860 model_name: None,
861 reasoning_effort: None,
862 reasoning_content: None,
863 tool_calls: None,
864 observation: None,
865 metrics: None,
866 is_copied_context: None,
867 extra: serde_json::to_value(&extra).ok(),
868 });
869 }
870
871 fn handle_llm_end(&mut self, event: &Event, lookups: &EventLookupMaps) {
872 self.flush_observations();
873
874 let Some(output) = event.data() else {
875 return;
876 };
877 let tool_calls = extract_tool_calls(output);
878 let tool_call_order = refresh_tool_call_lookup(&mut self.last_tool_call_map, &tool_calls);
879 let reasoning_effort = self.current_reasoning_effort.take();
880 let reasoning_content = extract_reasoning_content(output);
881 let start_ts = lookups.start_ts_map.get(&event.uuid()).cloned();
882 let ancestry = build_ancestry(event, &lookups.name_map);
883 let invocation = build_invocation_info(
884 start_ts,
885 *event.timestamp(),
886 Some(event.uuid().to_string()),
887 "nemo_flow",
888 );
889
890 self.steps.push(AtifStep {
891 step_id: 0,
892 source: "agent".to_string(),
893 message: extract_llm_response_message(output),
894 timestamp: Some(event.timestamp().to_rfc3339()),
895 model_name: event.model_name().map(ToOwned::to_owned),
896 reasoning_effort,
897 reasoning_content,
898 tool_calls,
899 observation: None,
900 metrics: extract_metrics(output),
901 is_copied_context: None,
902 extra: None,
903 });
904 self.current_agent.set_current_agent(
905 self.steps.len() - 1,
906 ancestry,
907 invocation,
908 tool_call_order,
909 );
910 }
911
912 fn handle_tool_end(&mut self, event: &Event, lookups: &EventLookupMaps) {
913 if let Some(output) = event.data() {
914 if self.pending_obs_timestamp.is_none() {
915 self.pending_obs_timestamp = Some(event.timestamp().to_rfc3339());
916 }
917 self.pending_observations.push(AtifObservationResult {
918 source_call_id: event
919 .tool_call_id()
920 .map(ToOwned::to_owned)
921 .or_else(|| self.last_tool_call_map.get(event.name()).cloned()),
922 content: output.clone(),
923 });
924 }
925
926 if !self.current_agent.has_active_step() {
927 return;
928 }
929 let start_ts = lookups.start_ts_map.get(&event.uuid()).cloned();
930 let invocation = build_invocation_info(
931 start_ts,
932 *event.timestamp(),
933 event
934 .tool_call_id()
935 .map(ToOwned::to_owned)
936 .or_else(|| Some(event.uuid().to_string())),
937 "nemo_flow",
938 );
939 self.current_agent
940 .push_tool_metadata(build_ancestry(event, &lookups.name_map), invocation);
941 }
942
943 fn handle_mark(&mut self, mark: &Event, lookups: &EventLookupMaps) {
944 self.flush_observations();
945 let Some(data) = mark.data() else {
946 return;
947 };
948 if is_empty_mark_payload(data) {
949 return;
950 }
951 let extra = AtifStepExtra {
952 ancestry: build_ancestry(mark, &lookups.name_map),
953 invocation: Some(AtifInvocationInfo {
954 start_timestamp: None,
955 end_timestamp: None,
956 invocation_id: Some(mark.uuid().to_string()),
957 status: Some("completed".to_string()),
958 framework: Some("nemo_flow".to_string()),
959 }),
960 llm_request: None,
961 tool_ancestry: Vec::new(),
962 tool_invocations: None,
963 };
964 self.steps.push(AtifStep {
965 step_id: 0,
966 source: "system".to_string(),
967 message: mark_message(mark, data),
968 timestamp: Some(mark.timestamp().to_rfc3339()),
969 model_name: None,
970 reasoning_effort: None,
971 reasoning_content: None,
972 tool_calls: None,
973 observation: None,
974 metrics: None,
975 is_copied_context: None,
976 extra: serde_json::to_value(&extra).ok(),
977 });
978 }
979
980 fn finish(mut self) -> Vec<AtifStep> {
981 self.finalize_agent_extra();
982 self.flush_observations();
983 for (index, step) in self.steps.iter_mut().enumerate() {
984 step.step_id = index + 1;
985 }
986 self.steps
987 }
988}
989
990fn refresh_tool_call_lookup(
991 last_tool_call_map: &mut std::collections::HashMap<String, String>,
992 tool_calls: &Option<Vec<AtifToolCall>>,
993) -> Vec<String> {
994 last_tool_call_map.clear();
995 let mut tool_call_order = Vec::new();
996 if let Some(tool_calls) = tool_calls {
997 for tool_call in tool_calls {
998 if !tool_call.function_name.is_empty() {
999 last_tool_call_map.insert(
1000 tool_call.function_name.clone(),
1001 tool_call.tool_call_id.clone(),
1002 );
1003 }
1004 tool_call_order.push(tool_call.tool_call_id.clone());
1005 }
1006 }
1007 tool_call_order
1008}
1009
1010fn events_to_steps(events: &[&Event]) -> Vec<AtifStep> {
1033 let mut sorted: Vec<&Event> = events.to_vec();
1034 sorted.sort_by_key(|e| *e.timestamp());
1035 let lookups = EventLookupMaps::from_events(&sorted);
1036 let mut state = StepConversionState::default();
1037
1038 for event in &sorted {
1039 match (
1040 event.kind(),
1041 event.scope_category(),
1042 event.category().map(|category| category.as_str()),
1043 ) {
1044 ("scope", Some(crate::api::event::ScopeCategory::Start), Some("llm")) => {
1045 state.handle_llm_start(event, &lookups)
1046 }
1047 ("scope", Some(crate::api::event::ScopeCategory::End), Some("llm")) => {
1048 state.handle_llm_end(event, &lookups)
1049 }
1050 ("scope", Some(crate::api::event::ScopeCategory::End), Some("tool")) => {
1051 state.handle_tool_end(event, &lookups)
1052 }
1053 ("mark", _, _) => state.handle_mark(event, &lookups),
1054 _ => {}
1055 }
1056 }
1057
1058 state.finish()
1059}
1060
1061fn is_empty_mark_payload(data: &Json) -> bool {
1062 data.is_null() || data.as_object().is_some_and(|object| object.is_empty())
1063}
1064
1065fn mark_message(mark: &Event, data: &Json) -> Json {
1071 let Some(object) = data.as_object() else {
1072 return data.clone();
1073 };
1074 let mut message = object.clone();
1075 if !message.contains_key("hook_event_name")
1076 && let Some(hook_event_name) = mark_hook_event_name(mark)
1077 {
1078 message.insert("hook_event_name".to_string(), Json::String(hook_event_name));
1079 }
1080 Json::Object(message)
1081}
1082
1083fn mark_hook_event_name(mark: &Event) -> Option<String> {
1087 mark.metadata()
1088 .and_then(Json::as_object)
1089 .and_then(|metadata| metadata.get("hook_event_name"))
1090 .and_then(Json::as_str)
1091 .filter(|name| !name.is_empty())
1092 .map(ToOwned::to_owned)
1093 .or_else(|| Some(mark.name().to_string()).filter(|name| !name.is_empty()))
1094}
1095
1096fn is_start_event(event: &Event) -> bool {
1097 event.scope_category() == Some(crate::api::event::ScopeCategory::Start)
1098}
1099
1100#[cfg(test)]
1105#[path = "../../tests/unit/atif_tests.rs"]
1106mod tests;