use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct StateSnapshot {
pub input: serde_json::Value,
pub output: serde_json::Value,
}
impl StateSnapshot {
pub fn new(input: serde_json::Value, output: serde_json::Value) -> Self {
Self { input, output }
}
pub fn with_input(input: serde_json::Value) -> Self {
Self {
input,
output: serde_json::Value::Object(Default::default()),
}
}
pub fn extract_state_keys(&self) -> Vec<String> {
match &self.output {
serde_json::Value::Object(map) => map.keys().cloned().collect(),
_ => Vec::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TriggerInputEvent {
#[serde(rename = "type")]
pub event_type: String,
pub trigger_id: String,
pub input: String,
pub timestamp: u64,
}
impl TriggerInputEvent {
pub fn new(trigger_id: impl Into<String>, input: impl Into<String>) -> Self {
Self {
event_type: "trigger_input".to_string(),
trigger_id: trigger_id.into(),
input: input.into(),
timestamp: current_timestamp_ms(),
}
}
pub fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InterruptEvent {
#[serde(rename = "type")]
pub event_type: String,
pub node_id: String,
pub message: String,
pub data: serde_json::Value,
pub timestamp: u64,
}
impl InterruptEvent {
pub fn new(
node_id: impl Into<String>,
message: impl Into<String>,
data: serde_json::Value,
) -> Self {
Self {
event_type: "interrupt".to_string(),
node_id: node_id.into(),
message: message.into(),
data,
timestamp: current_timestamp_ms(),
}
}
pub fn simple(node_id: impl Into<String>, message: impl Into<String>) -> Self {
Self::new(
node_id,
message,
serde_json::Value::Object(Default::default()),
)
}
pub fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InterruptResponseEvent {
#[serde(rename = "type")]
pub event_type: String,
pub node_id: String,
pub response: serde_json::Value,
pub timestamp: u64,
}
impl InterruptResponseEvent {
pub fn new(node_id: impl Into<String>, response: serde_json::Value) -> Self {
Self {
event_type: "interrupt_response".to_string(),
node_id: node_id.into(),
response,
timestamp: current_timestamp_ms(),
}
}
pub fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResumeEvent {
#[serde(rename = "type")]
pub event_type: String,
pub node_id: String,
pub timestamp: u64,
}
impl ResumeEvent {
pub fn new(node_id: impl Into<String>) -> Self {
Self {
event_type: "resume".to_string(),
node_id: node_id.into(),
timestamp: current_timestamp_ms(),
}
}
pub fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string())
}
}
pub struct HitlEventEmitter {
sender: tokio::sync::mpsc::Sender<String>,
}
impl HitlEventEmitter {
pub fn new(sender: tokio::sync::mpsc::Sender<String>) -> Self {
Self { sender }
}
pub async fn emit_trigger_input(
&self,
trigger_id: impl Into<String>,
input: impl Into<String>,
) {
let event = TriggerInputEvent::new(trigger_id, input);
let _ = self.sender.send(event.to_json()).await;
}
pub async fn emit_interrupt(
&self,
node_id: impl Into<String>,
message: impl Into<String>,
data: serde_json::Value,
) {
let event = InterruptEvent::new(node_id, message, data);
let _ = self.sender.send(event.to_json()).await;
}
pub async fn emit_interrupt_response(
&self,
node_id: impl Into<String>,
response: serde_json::Value,
) {
let event = InterruptResponseEvent::new(node_id, response);
let _ = self.sender.send(event.to_json()).await;
}
pub async fn emit_resume(&self, node_id: impl Into<String>) {
let event = ResumeEvent::new(node_id);
let _ = self.sender.send(event.to_json()).await;
}
}
fn current_timestamp_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DebugEvent {
pub level: String,
pub category: String,
pub agent: String,
pub summary: String,
pub detail: serde_json::Value,
pub timestamp: u64,
}
impl DebugEvent {
pub fn new(
level: &str,
category: &str,
agent: &str,
summary: impl Into<String>,
detail: serde_json::Value,
) -> Self {
Self {
level: level.to_string(),
category: category.to_string(),
agent: agent.to_string(),
summary: summary.into(),
detail,
timestamp: current_timestamp_ms(),
}
}
pub fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TraceEventV2 {
#[serde(rename = "type")]
pub event_type: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub node: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub step: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub duration_ms: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_steps: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub state_snapshot: Option<StateSnapshot>,
#[serde(skip_serializing_if = "Option::is_none")]
pub state_keys: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub state: Option<serde_json::Value>,
}
impl TraceEventV2 {
pub fn node_start(node: &str, step: u32, input_state: serde_json::Value) -> Self {
let snapshot = StateSnapshot::with_input(input_state);
let state_keys = snapshot.extract_state_keys();
Self {
event_type: "node_start".to_string(),
node: Some(node.to_string()),
step: Some(step),
duration_ms: None,
total_steps: None,
state_snapshot: Some(snapshot),
state_keys: if state_keys.is_empty() {
None
} else {
Some(state_keys)
},
state: None,
}
}
pub fn node_end(
node: &str,
step: u32,
duration_ms: u64,
input_state: serde_json::Value,
output_state: serde_json::Value,
) -> Self {
let snapshot = StateSnapshot::new(input_state, output_state);
let state_keys = snapshot.extract_state_keys();
Self {
event_type: "node_end".to_string(),
node: Some(node.to_string()),
step: Some(step),
duration_ms: Some(duration_ms),
total_steps: None,
state_snapshot: Some(snapshot),
state_keys: if state_keys.is_empty() {
None
} else {
Some(state_keys)
},
state: None,
}
}
pub fn done(
total_steps: u32,
input_state: serde_json::Value,
output_state: serde_json::Value,
) -> Self {
let snapshot = StateSnapshot::new(input_state, output_state);
let state_keys = snapshot.extract_state_keys();
Self {
event_type: "done".to_string(),
node: None,
step: None,
duration_ms: None,
total_steps: Some(total_steps),
state_snapshot: Some(snapshot),
state_keys: if state_keys.is_empty() {
None
} else {
Some(state_keys)
},
state: None,
}
}
pub fn state_update(output_state: serde_json::Value) -> Self {
let snapshot =
StateSnapshot::new(serde_json::Value::Object(Default::default()), output_state);
let state_keys = snapshot.extract_state_keys();
Self {
event_type: "state".to_string(),
node: None,
step: None,
duration_ms: None,
total_steps: None,
state_snapshot: Some(snapshot),
state_keys: if state_keys.is_empty() {
None
} else {
Some(state_keys)
},
state: None,
}
}
pub fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string())
}
}
#[derive(Debug, Clone, Default)]
pub struct ExecutionStateTracker {
current_state: HashMap<String, serde_json::Value>,
step: u32,
node_start_times: HashMap<String, std::time::Instant>,
}
impl ExecutionStateTracker {
pub fn new() -> Self {
Self::default()
}
pub fn node_start(&mut self, node: &str) -> TraceEventV2 {
self.step += 1;
self.node_start_times
.insert(node.to_string(), std::time::Instant::now());
let input_state = serde_json::to_value(&self.current_state).unwrap_or_default();
TraceEventV2::node_start(node, self.step, input_state)
}
pub fn node_end(&mut self, node: &str, output_state: serde_json::Value) -> TraceEventV2 {
let duration_ms = self
.node_start_times
.remove(node)
.map(|start| start.elapsed().as_millis() as u64)
.unwrap_or(0);
let input_state = serde_json::to_value(&self.current_state).unwrap_or_default();
if let serde_json::Value::Object(map) = &output_state {
for (k, v) in map {
self.current_state.insert(k.clone(), v.clone());
}
}
TraceEventV2::node_end(node, self.step, duration_ms, input_state, output_state)
}
pub fn done(&self) -> TraceEventV2 {
let output_state = serde_json::to_value(&self.current_state).unwrap_or_default();
TraceEventV2::done(
self.step,
serde_json::Value::Object(Default::default()),
output_state,
)
}
pub fn update_state(&mut self, key: &str, value: serde_json::Value) {
self.current_state.insert(key.to_string(), value);
}
pub fn current_step(&self) -> u32 {
self.step
}
pub fn current_state_value(&self) -> serde_json::Value {
serde_json::to_value(&self.current_state).unwrap_or_default()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_state_snapshot_extract_keys() {
let snapshot = StateSnapshot::new(
serde_json::json!({"input_key": "value"}),
serde_json::json!({"output_key1": "value1", "output_key2": "value2"}),
);
let keys = snapshot.extract_state_keys();
assert!(keys.contains(&"output_key1".to_string()));
assert!(keys.contains(&"output_key2".to_string()));
assert_eq!(keys.len(), 2);
}
#[test]
fn test_trace_event_node_start() {
let event = TraceEventV2::node_start("test_agent", 1, serde_json::json!({"query": "test"}));
assert_eq!(event.event_type, "node_start");
assert_eq!(event.node, Some("test_agent".to_string()));
assert_eq!(event.step, Some(1));
assert!(event.state_snapshot.is_some());
}
#[test]
fn test_trace_event_node_end() {
let event = TraceEventV2::node_end(
"test_agent",
1,
1500,
serde_json::json!({"query": "test"}),
serde_json::json!({"query": "test", "result": "answer"}),
);
assert_eq!(event.event_type, "node_end");
assert_eq!(event.duration_ms, Some(1500));
assert!(event.state_keys.is_some());
let keys = event.state_keys.unwrap();
assert!(keys.contains(&"query".to_string()));
assert!(keys.contains(&"result".to_string()));
}
#[test]
fn test_execution_state_tracker() {
let mut tracker = ExecutionStateTracker::new();
let start_event = tracker.node_start("agent1");
assert_eq!(start_event.event_type, "node_start");
assert_eq!(tracker.current_step(), 1);
let end_event = tracker.node_end("agent1", serde_json::json!({"result": "done"}));
assert_eq!(end_event.event_type, "node_end");
assert!(end_event.duration_ms.is_some());
let done_event = tracker.done();
assert_eq!(done_event.event_type, "done");
assert_eq!(done_event.total_steps, Some(1));
}
#[test]
fn test_trigger_input_event() {
let event = TriggerInputEvent::new("manual_trigger_1", "Hello, world!");
assert_eq!(event.event_type, "trigger_input");
assert_eq!(event.trigger_id, "manual_trigger_1");
assert_eq!(event.input, "Hello, world!");
assert!(event.timestamp > 0);
let json = event.to_json();
assert!(json.contains("\"type\":\"trigger_input\""));
assert!(json.contains("\"trigger_id\":\"manual_trigger_1\""));
assert!(json.contains("\"input\":\"Hello, world!\""));
}
#[test]
fn test_interrupt_event() {
let data = serde_json::json!({
"plan": "Delete all backup files",
"risk_level": "high",
"action": "Set 'approved' to true to continue"
});
let event =
InterruptEvent::new("review", "HIGH RISK: Human approval required", data.clone());
assert_eq!(event.event_type, "interrupt");
assert_eq!(event.node_id, "review");
assert_eq!(event.message, "HIGH RISK: Human approval required");
assert_eq!(event.data, data);
assert!(event.timestamp > 0);
let json = event.to_json();
assert!(json.contains("\"type\":\"interrupt\""));
assert!(json.contains("\"node_id\":\"review\""));
assert!(json.contains("\"risk_level\":\"high\""));
}
#[test]
fn test_interrupt_event_simple() {
let event = InterruptEvent::simple("approval_node", "Please approve this action");
assert_eq!(event.event_type, "interrupt");
assert_eq!(event.node_id, "approval_node");
assert_eq!(event.message, "Please approve this action");
assert_eq!(event.data, serde_json::Value::Object(Default::default()));
}
#[test]
fn test_interrupt_response_event() {
let response = serde_json::json!({ "approved": true, "comment": "Looks good" });
let event = InterruptResponseEvent::new("review", response.clone());
assert_eq!(event.event_type, "interrupt_response");
assert_eq!(event.node_id, "review");
assert_eq!(event.response, response);
assert!(event.timestamp > 0);
let json = event.to_json();
assert!(json.contains("\"type\":\"interrupt_response\""));
assert!(json.contains("\"approved\":true"));
}
#[test]
fn test_resume_event() {
let event = ResumeEvent::new("review");
assert_eq!(event.event_type, "resume");
assert_eq!(event.node_id, "review");
assert!(event.timestamp > 0);
let json = event.to_json();
assert!(json.contains("\"type\":\"resume\""));
assert!(json.contains("\"node_id\":\"review\""));
}
#[test]
fn test_hitl_event_json_format() {
let interrupt = InterruptEvent::new(
"review",
"HIGH RISK: Human approval required",
serde_json::json!({
"plan": "...",
"risk_level": "high",
"action": "Set 'approved' to true to continue"
}),
);
let parsed: serde_json::Value = serde_json::from_str(&interrupt.to_json()).unwrap();
assert_eq!(parsed["type"], "interrupt");
assert_eq!(parsed["node_id"], "review");
assert!(parsed["message"].is_string());
assert!(parsed["data"].is_object());
assert!(parsed["timestamp"].is_number());
}
#[test]
fn test_debug_event_new() {
let detail = serde_json::json!({
"model": "gemini-3.1-flash-lite-preview",
"messages": [{"role": "user", "content": "Hello"}]
});
let event = DebugEvent::new(
"debug",
"request",
"weather_agent",
"LLM request to gemini-3.1-flash-lite-preview",
detail.clone(),
);
assert_eq!(event.level, "debug");
assert_eq!(event.category, "request");
assert_eq!(event.agent, "weather_agent");
assert_eq!(event.summary, "LLM request to gemini-3.1-flash-lite-preview");
assert_eq!(event.detail, detail);
assert!(event.timestamp > 0);
}
#[test]
fn test_debug_event_to_json() {
let event = DebugEvent::new(
"error",
"error",
"tool_agent",
"Tool execution failed",
serde_json::json!({"error": "timeout"}),
);
let json = event.to_json();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["level"], "error");
assert_eq!(parsed["category"], "error");
assert_eq!(parsed["agent"], "tool_agent");
assert_eq!(parsed["summary"], "Tool execution failed");
assert_eq!(parsed["detail"]["error"], "timeout");
assert!(parsed["timestamp"].is_number());
}
#[test]
fn test_debug_event_all_categories() {
let categories = [
"request",
"response",
"error",
"state_change",
"tool_call",
"tool_result",
"lifecycle",
];
for category in categories {
let event = DebugEvent::new("info", category, "agent", "test", serde_json::json!({}));
assert_eq!(event.category, category);
let json = event.to_json();
assert!(json.contains(&format!("\"category\":\"{category}\"")));
}
}
#[test]
fn test_debug_event_all_levels() {
let levels = ["error", "warn", "info", "debug", "trace"];
for level in levels {
let event = DebugEvent::new(level, "lifecycle", "agent", "test", serde_json::json!({}));
assert_eq!(event.level, level);
let json = event.to_json();
assert!(json.contains(&format!("\"level\":\"{level}\"")));
}
}
}