use std::sync::{Arc, Mutex};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::api::event::Event;
use crate::api::runtime::EventSubscriberFn;
use crate::json::Json;
pub const ATIF_SCHEMA_VERSION: &str = "ATIF-v1.6";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AtifAgentInfo {
pub name: String,
pub version: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub model_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_definitions: Option<Vec<Json>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub extra: Option<Json>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AtifStep {
pub step_id: usize,
pub source: String,
pub message: Json,
#[serde(skip_serializing_if = "Option::is_none")]
pub timestamp: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub model_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reasoning_effort: Option<Json>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reasoning_content: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_calls: Option<Vec<AtifToolCall>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub observation: Option<AtifObservation>,
#[serde(skip_serializing_if = "Option::is_none")]
pub metrics: Option<AtifMetrics>,
#[serde(skip_serializing_if = "Option::is_none")]
pub is_copied_context: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub extra: Option<Json>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct AtifMetrics {
#[serde(skip_serializing_if = "Option::is_none")]
pub prompt_tokens: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub completion_tokens: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cached_tokens: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cost_usd: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub prompt_token_ids: Option<Vec<u64>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub completion_token_ids: Option<Vec<u64>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub logprobs: Option<Vec<f64>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub extra: Option<Json>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct AtifFinalMetrics {
#[serde(skip_serializing_if = "Option::is_none")]
pub total_prompt_tokens: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_completion_tokens: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_cached_tokens: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_cost_usd: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_steps: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub extra: Option<Json>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AtifToolCall {
pub tool_call_id: String,
pub function_name: String,
pub arguments: Json,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AtifObservation {
pub results: Vec<AtifObservationResult>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AtifObservationResult {
#[serde(skip_serializing_if = "Option::is_none")]
pub source_call_id: Option<String>,
pub content: Json,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AtifAncestry {
pub function_id: String,
pub function_name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub parent_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub parent_name: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AtifInvocationInfo {
#[serde(skip_serializing_if = "Option::is_none")]
pub start_timestamp: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub end_timestamp: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub invocation_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub status: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub framework: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AtifStepExtra {
pub ancestry: AtifAncestry,
#[serde(skip_serializing_if = "Option::is_none")]
pub invocation: Option<AtifInvocationInfo>,
#[serde(skip_serializing_if = "Option::is_none")]
pub llm_request: Option<Json>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub tool_ancestry: Vec<AtifAncestry>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_invocations: Option<Vec<AtifInvocationInfo>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AtifTrajectory {
pub schema_version: String,
pub session_id: String,
pub agent: AtifAgentInfo,
pub steps: Vec<AtifStep>,
#[serde(skip_serializing_if = "Option::is_none")]
pub notes: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub final_metrics: Option<AtifFinalMetrics>,
#[serde(skip_serializing_if = "Option::is_none")]
pub continued_trajectory_ref: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub extra: Option<Json>,
}
struct AtifExporterState {
session_id: String,
agent_info: AtifAgentInfo,
events: Vec<Event>,
}
pub struct AtifExporter {
state: Arc<Mutex<AtifExporterState>>,
}
impl AtifExporter {
pub fn new(session_id: String, agent_info: AtifAgentInfo) -> Self {
Self {
state: Arc::new(Mutex::new(AtifExporterState {
session_id,
agent_info,
events: Vec::new(),
})),
}
}
pub fn subscriber(&self) -> EventSubscriberFn {
let state = self.state.clone();
Arc::new(move |event: &Event| {
if let Ok(mut s) = state.lock() {
s.events.push(event.clone());
}
})
}
pub fn export(&self) -> AtifTrajectory {
let state = self.state.lock().unwrap();
let collected_events: Vec<&Event> = state.events.iter().collect();
let steps = events_to_steps(&collected_events);
let final_metrics = compute_final_metrics(&steps);
AtifTrajectory {
schema_version: ATIF_SCHEMA_VERSION.to_string(),
session_id: state.session_id.clone(),
agent: state.agent_info.clone(),
steps,
notes: None,
final_metrics,
continued_trajectory_ref: None,
extra: None,
}
}
pub fn clear(&self) {
let mut state = self.state.lock().unwrap();
state.events.clear();
}
}
fn unwrap_llm_request(input: &Json) -> Json {
if let Some(obj) = input.as_object()
&& obj.contains_key("content")
&& obj.contains_key("headers")
{
return obj.get("content").cloned().unwrap_or_else(|| input.clone());
}
input.clone()
}
fn extract_llm_response_message(output: &Json) -> Json {
if let Some(obj) = output.as_object() {
if let Some(content) = non_null_object_field(obj, "content") {
return content;
}
if let Some(summary) = llm_response_summary(obj) {
return summary;
}
}
output.clone()
}
fn non_null_object_field(obj: &serde_json::Map<String, Json>, key: &str) -> Option<Json> {
obj.get(key).filter(|value| !value.is_null()).cloned()
}
fn llm_response_summary(obj: &serde_json::Map<String, Json>) -> Option<Json> {
if !obj.contains_key("tool_calls") && !obj.contains_key("role") {
return None;
}
let mut summary = serde_json::Map::new();
if let Some(role) = obj.get("role") {
summary.insert("role".to_string(), role.clone());
}
if let Some(tool_calls) = obj.get("tool_calls") {
summary.insert("tool_calls".to_string(), tool_calls.clone());
}
if let Some(reasoning) = non_null_object_field(obj, "reasoning") {
summary.insert("reasoning".to_string(), reasoning);
}
(!summary.is_empty()).then_some(Json::Object(summary))
}
const TOKEN_USAGE_KNOWN_KEYS: &[&str] = &[
"prompt_tokens",
"completion_tokens",
"cached_tokens",
"cost_usd",
"prompt_token_ids",
"completion_token_ids",
"logprobs",
];
fn extract_metrics(output: &Json) -> Option<AtifMetrics> {
let usage = token_usage_object(output)?;
let prompt = usage_u64(usage, &["prompt_tokens", "input_tokens"]);
let completion = usage_u64(usage, &["completion_tokens", "output_tokens"]);
let cached = usage_u64(usage, &["cached_tokens"])
.or_else(|| prompt_tokens_detail_u64(usage, "cached_tokens"))
.or_else(|| {
sum_usage_u64(
usage,
&["cache_read_input_tokens", "cache_creation_input_tokens"],
)
});
let cost = usage.get("cost_usd").and_then(Json::as_f64);
let prompt_ids = usage
.get("prompt_token_ids")
.and_then(Json::as_array)
.map(|a| a.iter().filter_map(Json::as_u64).collect());
let completion_ids = usage
.get("completion_token_ids")
.and_then(Json::as_array)
.map(|a| a.iter().filter_map(Json::as_u64).collect());
let logprobs = usage
.get("logprobs")
.and_then(Json::as_array)
.map(|a| a.iter().filter_map(Json::as_f64).collect());
let known: std::collections::HashSet<&str> = TOKEN_USAGE_KNOWN_KEYS.iter().copied().collect();
let extra_map: serde_json::Map<String, Json> = usage
.iter()
.filter(|(k, _)| !known.contains(k.as_str()))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let extra = if extra_map.is_empty() {
None
} else {
Some(Json::Object(extra_map))
};
if prompt.is_none() && completion.is_none() && cached.is_none() {
return None;
}
Some(AtifMetrics {
prompt_tokens: prompt,
completion_tokens: completion,
cached_tokens: cached,
cost_usd: cost,
prompt_token_ids: prompt_ids,
completion_token_ids: completion_ids,
logprobs,
extra,
})
}
fn token_usage_object(output: &Json) -> Option<&serde_json::Map<String, Json>> {
let output = output.as_object()?;
output
.get("token_usage")
.or_else(|| output.get("usage"))
.and_then(Json::as_object)
}
fn usage_u64(usage: &serde_json::Map<String, Json>, keys: &[&str]) -> Option<u64> {
keys.iter()
.find_map(|key| usage.get(*key).and_then(Json::as_u64))
}
fn sum_usage_u64(usage: &serde_json::Map<String, Json>, keys: &[&str]) -> Option<u64> {
let mut total = 0;
let mut found = false;
for key in keys {
if let Some(value) = usage.get(*key).and_then(Json::as_u64) {
total += value;
found = true;
}
}
found.then_some(total)
}
fn prompt_tokens_detail_u64(usage: &serde_json::Map<String, Json>, key: &str) -> Option<u64> {
usage
.get("prompt_tokens_details")
.and_then(Json::as_object)
.and_then(|details| details.get(key))
.and_then(Json::as_u64)
}
fn extract_reasoning_effort(input: &Json) -> Option<Json> {
if let Some(obj) = input.as_object()
&& let Some(v) = obj.get("reasoning_effort")
&& !v.is_null()
{
return Some(v.clone());
}
None
}
fn extract_reasoning_content(output: &Json) -> Option<String> {
if let Some(obj) = output.as_object()
&& let Some(r) = obj.get("reasoning")
{
return r.as_str().map(String::from);
}
None
}
fn extract_user_messages(input: &Json) -> Json {
if let Some(obj) = input.as_object()
&& let Some(messages) = obj.get("messages")
{
return messages.clone();
}
input.clone()
}
fn extract_tool_calls(output: &Json) -> Option<Vec<AtifToolCall>> {
let arr = output.as_object()?.get("tool_calls")?.as_array()?;
if arr.is_empty() {
return None;
}
let mut calls = Vec::with_capacity(arr.len());
for tc in arr {
let tc_obj = tc.as_object()?;
let id = tc_obj
.get("id")
.and_then(Json::as_str)
.unwrap_or("")
.to_string();
let func = tc_obj.get("function").and_then(Json::as_object);
let name = func
.and_then(|f| f.get("name"))
.and_then(Json::as_str)
.unwrap_or("")
.to_string();
let raw_arguments = func
.and_then(|f| f.get("arguments"))
.cloned()
.unwrap_or(Json::Null);
let arguments = if let Some(s) = raw_arguments.as_str() {
serde_json::from_str(s).unwrap_or(raw_arguments)
} else {
raw_arguments
};
if id.is_empty() && name.is_empty() {
continue;
}
calls.push(AtifToolCall {
tool_call_id: id,
function_name: name,
arguments,
});
}
if calls.is_empty() { None } else { Some(calls) }
}
fn compute_final_metrics(steps: &[AtifStep]) -> Option<AtifFinalMetrics> {
let mut total_prompt: u64 = 0;
let mut total_completion: u64 = 0;
let mut total_cached: u64 = 0;
let mut total_cost: f64 = 0.0;
let mut has_any = false;
for step in steps {
if let Some(m) = &step.metrics {
has_any = true;
total_prompt += m.prompt_tokens.unwrap_or(0);
total_completion += m.completion_tokens.unwrap_or(0);
total_cached += m.cached_tokens.unwrap_or(0);
total_cost += m.cost_usd.unwrap_or(0.0);
}
}
Some(AtifFinalMetrics {
total_prompt_tokens: if has_any { Some(total_prompt) } else { None },
total_completion_tokens: if has_any {
Some(total_completion)
} else {
None
},
total_cached_tokens: if has_any && total_cached > 0 {
Some(total_cached)
} else {
None
},
total_cost_usd: if has_any && total_cost > 0.0 {
Some(total_cost)
} else {
None
},
total_steps: Some(steps.len() as u64),
extra: None,
})
}
fn build_ancestry(
event: &Event,
name_map: &std::collections::HashMap<Uuid, String>,
) -> AtifAncestry {
AtifAncestry {
function_id: event.uuid().to_string(),
function_name: event.name().to_string(),
parent_id: event.parent_uuid().map(|u| u.to_string()),
parent_name: event.parent_uuid().and_then(|u| name_map.get(&u)).cloned(),
}
}
fn build_invocation_info(
start_ts: Option<DateTime<Utc>>,
end_ts: DateTime<Utc>,
invocation_id: Option<String>,
framework: &str,
) -> AtifInvocationInfo {
AtifInvocationInfo {
start_timestamp: start_ts.map(|s| s.timestamp_millis() as f64 / 1000.0),
end_timestamp: start_ts.map(|_| end_ts.timestamp_millis() as f64 / 1000.0),
invocation_id,
status: Some("completed".to_string()),
framework: Some(framework.to_string()),
}
}
struct EventLookupMaps {
name_map: std::collections::HashMap<Uuid, String>,
start_ts_map: std::collections::HashMap<Uuid, DateTime<Utc>>,
}
impl EventLookupMaps {
fn from_events(events: &[&Event]) -> Self {
let mut name_map = std::collections::HashMap::new();
let mut start_ts_map = std::collections::HashMap::new();
for event in events {
if is_start_event(event) {
name_map.insert(event.uuid(), event.name().to_string());
start_ts_map.insert(event.uuid(), *event.timestamp());
}
}
Self {
name_map,
start_ts_map,
}
}
}
#[derive(Default)]
struct PendingAgentStep {
step_idx: Option<usize>,
ancestry: Option<AtifAncestry>,
invocation: Option<AtifInvocationInfo>,
tool_ancestry: Vec<AtifAncestry>,
tool_invocations: Vec<AtifInvocationInfo>,
tool_call_order: Vec<String>,
}
impl PendingAgentStep {
fn finalize_into(&mut self, steps: &mut [AtifStep]) {
let (Some(step_idx), Some(ancestry)) = (self.step_idx.take(), self.ancestry.take()) else {
return;
};
let Some(step) = steps.get_mut(step_idx) else {
return;
};
self.sort_tool_metadata();
let extra = AtifStepExtra {
ancestry,
invocation: self.invocation.take(),
llm_request: None,
tool_ancestry: std::mem::take(&mut self.tool_ancestry),
tool_invocations: if self.tool_invocations.is_empty() {
None
} else {
Some(std::mem::take(&mut self.tool_invocations))
},
};
step.extra = serde_json::to_value(&extra).ok();
}
fn set_current_agent(
&mut self,
step_idx: usize,
ancestry: AtifAncestry,
invocation: AtifInvocationInfo,
tool_call_order: Vec<String>,
) {
self.step_idx = Some(step_idx);
self.ancestry = Some(ancestry);
self.invocation = Some(invocation);
self.tool_ancestry.clear();
self.tool_invocations.clear();
self.tool_call_order = tool_call_order;
}
fn push_tool_metadata(&mut self, ancestry: AtifAncestry, invocation: AtifInvocationInfo) {
self.tool_ancestry.push(ancestry);
self.tool_invocations.push(invocation);
}
fn has_active_step(&self) -> bool {
self.step_idx.is_some()
}
fn sort_tool_metadata(&mut self) {
if self.tool_call_order.is_empty() || self.tool_ancestry.is_empty() {
return;
}
let mut pairs: Vec<(AtifAncestry, AtifInvocationInfo)> =
std::mem::take(&mut self.tool_ancestry)
.into_iter()
.zip(std::mem::take(&mut self.tool_invocations))
.collect();
pairs.sort_by_key(|(_, invocation)| {
invocation
.invocation_id
.as_deref()
.and_then(|id| self.tool_call_order.iter().position(|entry| entry == id))
.unwrap_or(usize::MAX)
});
let (sorted_ancestry, sorted_invocations): (Vec<_>, Vec<_>) = pairs.into_iter().unzip();
self.tool_ancestry = sorted_ancestry;
self.tool_invocations = sorted_invocations;
}
}
#[derive(Default)]
struct StepConversionState {
steps: Vec<AtifStep>,
last_tool_call_map: std::collections::HashMap<String, String>,
pending_observations: Vec<AtifObservationResult>,
pending_obs_timestamp: Option<String>,
current_reasoning_effort: Option<Json>,
current_agent: PendingAgentStep,
}
impl StepConversionState {
fn flush_observations(&mut self) {
if self.pending_observations.is_empty() {
return;
}
self.steps.push(AtifStep {
step_id: 0,
source: "system".to_string(),
message: Json::Null,
timestamp: self.pending_obs_timestamp.take(),
model_name: None,
reasoning_effort: None,
reasoning_content: None,
tool_calls: None,
observation: Some(AtifObservation {
results: std::mem::take(&mut self.pending_observations),
}),
metrics: None,
is_copied_context: None,
extra: None,
});
}
fn finalize_agent_extra(&mut self) {
self.current_agent.finalize_into(&mut self.steps);
}
fn handle_llm_start(&mut self, event: &Event, lookups: &EventLookupMaps) {
self.flush_observations();
self.finalize_agent_extra();
let Some(input) = event.data() else {
return;
};
let content = unwrap_llm_request(input);
self.current_reasoning_effort = extract_reasoning_effort(&content);
let extra = AtifStepExtra {
ancestry: build_ancestry(event, &lookups.name_map),
invocation: None,
llm_request: Some(content.clone()),
tool_ancestry: Vec::new(),
tool_invocations: None,
};
self.steps.push(AtifStep {
step_id: 0,
source: "user".to_string(),
message: extract_user_messages(&content),
timestamp: Some(event.timestamp().to_rfc3339()),
model_name: None,
reasoning_effort: None,
reasoning_content: None,
tool_calls: None,
observation: None,
metrics: None,
is_copied_context: None,
extra: serde_json::to_value(&extra).ok(),
});
}
fn handle_llm_end(&mut self, event: &Event, lookups: &EventLookupMaps) {
self.flush_observations();
let Some(output) = event.data() else {
return;
};
let tool_calls = extract_tool_calls(output);
let tool_call_order = refresh_tool_call_lookup(&mut self.last_tool_call_map, &tool_calls);
let reasoning_effort = self.current_reasoning_effort.take();
let reasoning_content = extract_reasoning_content(output);
let start_ts = lookups.start_ts_map.get(&event.uuid()).cloned();
let ancestry = build_ancestry(event, &lookups.name_map);
let invocation = build_invocation_info(
start_ts,
*event.timestamp(),
Some(event.uuid().to_string()),
"nemo_flow",
);
self.steps.push(AtifStep {
step_id: 0,
source: "agent".to_string(),
message: extract_llm_response_message(output),
timestamp: Some(event.timestamp().to_rfc3339()),
model_name: event.model_name().map(ToOwned::to_owned),
reasoning_effort,
reasoning_content,
tool_calls,
observation: None,
metrics: extract_metrics(output),
is_copied_context: None,
extra: None,
});
self.current_agent.set_current_agent(
self.steps.len() - 1,
ancestry,
invocation,
tool_call_order,
);
}
fn handle_tool_end(&mut self, event: &Event, lookups: &EventLookupMaps) {
if let Some(output) = event.data() {
if self.pending_obs_timestamp.is_none() {
self.pending_obs_timestamp = Some(event.timestamp().to_rfc3339());
}
self.pending_observations.push(AtifObservationResult {
source_call_id: event
.tool_call_id()
.map(ToOwned::to_owned)
.or_else(|| self.last_tool_call_map.get(event.name()).cloned()),
content: output.clone(),
});
}
if !self.current_agent.has_active_step() {
return;
}
let start_ts = lookups.start_ts_map.get(&event.uuid()).cloned();
let invocation = build_invocation_info(
start_ts,
*event.timestamp(),
event
.tool_call_id()
.map(ToOwned::to_owned)
.or_else(|| Some(event.uuid().to_string())),
"nemo_flow",
);
self.current_agent
.push_tool_metadata(build_ancestry(event, &lookups.name_map), invocation);
}
fn handle_mark(&mut self, mark: &Event, lookups: &EventLookupMaps) {
self.flush_observations();
let Some(data) = mark.data() else {
return;
};
if is_empty_mark_payload(data) {
return;
}
let extra = AtifStepExtra {
ancestry: build_ancestry(mark, &lookups.name_map),
invocation: Some(AtifInvocationInfo {
start_timestamp: None,
end_timestamp: None,
invocation_id: Some(mark.uuid().to_string()),
status: Some("completed".to_string()),
framework: Some("nemo_flow".to_string()),
}),
llm_request: None,
tool_ancestry: Vec::new(),
tool_invocations: None,
};
self.steps.push(AtifStep {
step_id: 0,
source: "system".to_string(),
message: mark_message(mark, data),
timestamp: Some(mark.timestamp().to_rfc3339()),
model_name: None,
reasoning_effort: None,
reasoning_content: None,
tool_calls: None,
observation: None,
metrics: None,
is_copied_context: None,
extra: serde_json::to_value(&extra).ok(),
});
}
fn finish(mut self) -> Vec<AtifStep> {
self.finalize_agent_extra();
self.flush_observations();
for (index, step) in self.steps.iter_mut().enumerate() {
step.step_id = index + 1;
}
self.steps
}
}
fn refresh_tool_call_lookup(
last_tool_call_map: &mut std::collections::HashMap<String, String>,
tool_calls: &Option<Vec<AtifToolCall>>,
) -> Vec<String> {
last_tool_call_map.clear();
let mut tool_call_order = Vec::new();
if let Some(tool_calls) = tool_calls {
for tool_call in tool_calls {
if !tool_call.function_name.is_empty() {
last_tool_call_map.insert(
tool_call.function_name.clone(),
tool_call.tool_call_id.clone(),
);
}
tool_call_order.push(tool_call.tool_call_id.clone());
}
}
tool_call_order
}
fn events_to_steps(events: &[&Event]) -> Vec<AtifStep> {
let mut sorted: Vec<&Event> = events.to_vec();
sorted.sort_by_key(|e| *e.timestamp());
let lookups = EventLookupMaps::from_events(&sorted);
let mut state = StepConversionState::default();
for event in &sorted {
match (
event.kind(),
event.scope_category(),
event.category().map(|category| category.as_str()),
) {
("scope", Some(crate::api::event::ScopeCategory::Start), Some("llm")) => {
state.handle_llm_start(event, &lookups)
}
("scope", Some(crate::api::event::ScopeCategory::End), Some("llm")) => {
state.handle_llm_end(event, &lookups)
}
("scope", Some(crate::api::event::ScopeCategory::End), Some("tool")) => {
state.handle_tool_end(event, &lookups)
}
("mark", _, _) => state.handle_mark(event, &lookups),
_ => {}
}
}
state.finish()
}
fn is_empty_mark_payload(data: &Json) -> bool {
data.is_null() || data.as_object().is_some_and(|object| object.is_empty())
}
fn mark_message(mark: &Event, data: &Json) -> Json {
let Some(object) = data.as_object() else {
return data.clone();
};
let mut message = object.clone();
if !message.contains_key("hook_event_name")
&& let Some(hook_event_name) = mark_hook_event_name(mark)
{
message.insert("hook_event_name".to_string(), Json::String(hook_event_name));
}
Json::Object(message)
}
fn mark_hook_event_name(mark: &Event) -> Option<String> {
mark.metadata()
.and_then(Json::as_object)
.and_then(|metadata| metadata.get("hook_event_name"))
.and_then(Json::as_str)
.filter(|name| !name.is_empty())
.map(ToOwned::to_owned)
.or_else(|| Some(mark.name().to_string()).filter(|name| !name.is_empty()))
}
fn is_start_event(event: &Event) -> bool {
event.scope_category() == Some(crate::api::event::ScopeCategory::Start)
}
#[cfg(test)]
#[path = "../../tests/unit/atif_tests.rs"]
mod tests;