use crate::hooks::{HookPatch, HookPatchEnvelope, HookPoint, HookReasonCode};
use crate::time_compat::SystemTime;
use crate::types::{SessionId, StopReason, Usage};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::cmp::Ordering;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct EventEnvelope<T> {
pub event_id: uuid::Uuid,
pub source_id: String,
pub seq: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub mob_id: Option<String>,
pub timestamp_ms: u64,
pub payload: T,
}
impl<T> EventEnvelope<T> {
pub fn new(source_id: impl Into<String>, seq: u64, mob_id: Option<String>, payload: T) -> Self {
let timestamp_ms = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
Ok(duration) => duration.as_millis() as u64,
Err(_) => u64::MAX,
};
Self {
event_id: uuid::Uuid::now_v7(),
source_id: source_id.into(),
seq,
mob_id,
timestamp_ms,
payload,
}
}
}
pub fn agent_event_type(event: &AgentEvent) -> &'static str {
match event {
AgentEvent::RunStarted { .. } => "run_started",
AgentEvent::RunCompleted { .. } => "run_completed",
AgentEvent::RunFailed { .. } => "run_failed",
AgentEvent::HookStarted { .. } => "hook_started",
AgentEvent::HookCompleted { .. } => "hook_completed",
AgentEvent::HookFailed { .. } => "hook_failed",
AgentEvent::HookDenied { .. } => "hook_denied",
AgentEvent::HookRewriteApplied { .. } => "hook_rewrite_applied",
AgentEvent::HookPatchPublished { .. } => "hook_patch_published",
AgentEvent::TurnStarted { .. } => "turn_started",
AgentEvent::ReasoningDelta { .. } => "reasoning_delta",
AgentEvent::ReasoningComplete { .. } => "reasoning_complete",
AgentEvent::TextDelta { .. } => "text_delta",
AgentEvent::TextComplete { .. } => "text_complete",
AgentEvent::ToolCallRequested { .. } => "tool_call_requested",
AgentEvent::ToolResultReceived { .. } => "tool_result_received",
AgentEvent::TurnCompleted { .. } => "turn_completed",
AgentEvent::ToolExecutionStarted { .. } => "tool_execution_started",
AgentEvent::ToolExecutionCompleted { .. } => "tool_execution_completed",
AgentEvent::ToolExecutionTimedOut { .. } => "tool_execution_timed_out",
AgentEvent::CompactionStarted { .. } => "compaction_started",
AgentEvent::CompactionCompleted { .. } => "compaction_completed",
AgentEvent::CompactionFailed { .. } => "compaction_failed",
AgentEvent::BudgetWarning { .. } => "budget_warning",
AgentEvent::Retrying { .. } => "retrying",
AgentEvent::SkillsResolved { .. } => "skills_resolved",
AgentEvent::SkillResolutionFailed { .. } => "skill_resolution_failed",
AgentEvent::InteractionComplete { .. } => "interaction_complete",
AgentEvent::InteractionCallbackPending { .. } => "interaction_callback_pending",
AgentEvent::InteractionFailed { .. } => "interaction_failed",
AgentEvent::StreamTruncated { .. } => "stream_truncated",
AgentEvent::ToolConfigChanged { .. } => "tool_config_changed",
AgentEvent::BackgroundJobCompleted { .. } => "background_job_completed",
}
}
pub fn compare_event_envelopes<T>(a: &EventEnvelope<T>, b: &EventEnvelope<T>) -> Ordering {
a.timestamp_ms
.cmp(&b.timestamp_ms)
.then_with(|| a.source_id.cmp(&b.source_id))
.then_with(|| a.seq.cmp(&b.seq))
.then_with(|| a.event_id.cmp(&b.event_id))
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ToolConfigChangedPayload {
pub operation: ToolConfigChangeOperation,
pub target: String,
pub status: String,
pub persisted: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub applied_at_turn: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub domain: Option<ToolConfigChangeDomain>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub deferred_catalog_delta: Option<DeferredCatalogDelta>,
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ToolConfigChangeDomain {
ToolScope,
DeferredCatalog,
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DeferredCatalogDelta {
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub added_hidden_names: Vec<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub removed_hidden_names: Vec<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub pending_sources: Vec<String>,
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ToolConfigChangeOperation {
Add,
Remove,
Reload,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "snake_case")]
pub enum ExternalToolDeltaPhase {
Pending,
Applied,
Draining,
Forced,
Failed,
}
impl ExternalToolDeltaPhase {
#[must_use]
pub fn as_status(self) -> &'static str {
match self {
Self::Pending => "pending",
Self::Applied => "applied",
Self::Draining => "draining",
Self::Forced => "forced",
Self::Failed => "failed",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ExternalToolDelta {
pub target: String,
pub operation: ToolConfigChangeOperation,
pub phase: ExternalToolDeltaPhase,
pub persisted: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub applied_at_turn: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool_count: Option<usize>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub detail: Option<String>,
}
impl ExternalToolDelta {
#[must_use]
pub fn new(
target: impl Into<String>,
operation: ToolConfigChangeOperation,
phase: ExternalToolDeltaPhase,
) -> Self {
Self {
target: target.into(),
operation,
phase,
persisted: !matches!(
phase,
ExternalToolDeltaPhase::Pending | ExternalToolDeltaPhase::Draining
),
applied_at_turn: None,
tool_count: None,
detail: None,
}
}
#[must_use]
pub fn with_tool_count(mut self, tool_count: Option<usize>) -> Self {
self.tool_count = tool_count;
self
}
#[must_use]
pub fn with_detail(mut self, detail: Option<String>) -> Self {
self.detail = detail;
self
}
#[must_use]
pub fn status_text(&self) -> String {
let mut status = self.phase.as_status().to_string();
if self.phase == ExternalToolDeltaPhase::Failed
&& let Some(detail) = &self.detail
{
status = format!("{status}: {detail}");
}
status
}
#[must_use]
pub fn to_tool_config_changed_payload(&self) -> ToolConfigChangedPayload {
ToolConfigChangedPayload {
operation: self.operation.clone(),
target: self.target.clone(),
status: self.status_text(),
persisted: self.persisted,
applied_at_turn: self.applied_at_turn,
domain: None,
deferred_catalog_delta: None,
}
}
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[non_exhaustive]
pub enum AgentEvent {
RunStarted {
session_id: SessionId,
prompt: String,
},
RunCompleted {
session_id: SessionId,
result: String,
usage: Usage,
},
RunFailed {
session_id: SessionId,
error: String,
},
HookStarted { hook_id: String, point: HookPoint },
HookCompleted {
hook_id: String,
point: HookPoint,
duration_ms: u64,
},
HookFailed {
hook_id: String,
point: HookPoint,
error: String,
},
HookDenied {
hook_id: String,
point: HookPoint,
reason_code: HookReasonCode,
message: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
payload: Option<Value>,
},
HookRewriteApplied {
hook_id: String,
point: HookPoint,
patch: HookPatch,
},
HookPatchPublished {
hook_id: String,
point: HookPoint,
envelope: HookPatchEnvelope,
},
TurnStarted { turn_number: u32 },
ReasoningDelta { delta: String },
ReasoningComplete { content: String },
TextDelta { delta: String },
TextComplete { content: String },
ToolCallRequested {
id: String,
name: String,
args: Value,
},
ToolResultReceived {
id: String,
name: String,
is_error: bool,
},
TurnCompleted {
stop_reason: StopReason,
usage: Usage,
},
ToolExecutionStarted { id: String, name: String },
ToolExecutionCompleted {
id: String,
name: String,
result: String,
is_error: bool,
duration_ms: u64,
#[serde(default)]
has_images: bool,
},
ToolExecutionTimedOut {
id: String,
name: String,
timeout_ms: u64,
},
CompactionStarted {
input_tokens: u64,
estimated_history_tokens: u64,
message_count: usize,
},
CompactionCompleted {
summary_tokens: u64,
messages_before: usize,
messages_after: usize,
},
CompactionFailed { error: String },
BudgetWarning {
budget_type: BudgetType,
used: u64,
limit: u64,
percent: f32,
},
Retrying {
attempt: u32,
max_attempts: u32,
error: String,
delay_ms: u64,
},
SkillsResolved {
skills: Vec<crate::skills::SkillId>,
injection_bytes: usize,
},
SkillResolutionFailed { reference: String, error: String },
InteractionComplete {
interaction_id: crate::interaction::InteractionId,
result: String,
},
InteractionCallbackPending {
interaction_id: crate::interaction::InteractionId,
tool_name: String,
args: Value,
},
InteractionFailed {
interaction_id: crate::interaction::InteractionId,
error: String,
},
StreamTruncated { reason: String },
ToolConfigChanged { payload: ToolConfigChangedPayload },
BackgroundJobCompleted {
job_id: String,
display_name: String,
status: String,
detail: String,
},
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "scope", rename_all = "snake_case")]
#[non_exhaustive]
pub enum StreamScopeFrame {
Primary { session_id: String },
MobMember {
flow_run_id: String,
member_ref: String,
session_id: String,
},
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScopedAgentEvent {
pub scope_id: String,
pub scope_path: Vec<StreamScopeFrame>,
pub event: AgentEvent,
}
impl ScopedAgentEvent {
pub fn new(scope_path: Vec<StreamScopeFrame>, event: AgentEvent) -> Self {
let scope_id = Self::scope_id_from_path(&scope_path);
Self {
scope_id,
scope_path,
event,
}
}
pub fn primary(session_id: impl Into<String>, event: AgentEvent) -> Self {
Self::new(
vec![StreamScopeFrame::Primary {
session_id: session_id.into(),
}],
event,
)
}
pub fn from_agent_event_primary(session_id: impl Into<String>, event: AgentEvent) -> Self {
Self::primary(session_id, event)
}
pub fn append_scope(mut self, frame: StreamScopeFrame) -> Self {
self.scope_path.push(frame);
self.scope_id = Self::scope_id_from_path(&self.scope_path);
self
}
pub fn scope_id_from_path(path: &[StreamScopeFrame]) -> String {
if path.is_empty() {
return "primary".to_string();
}
let mut segments: Vec<String> = Vec::with_capacity(path.len());
for frame in path {
match frame {
StreamScopeFrame::Primary { .. } => segments.push("primary".to_string()),
StreamScopeFrame::MobMember { member_ref, .. } => {
segments.push(format!("mob:{member_ref}"));
}
}
}
segments.join("/")
}
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum BudgetType {
Tokens,
Time,
ToolCalls,
}
#[derive(Debug, Clone, Copy)]
pub struct VerboseEventConfig {
pub max_tool_args_bytes: usize,
pub max_tool_result_bytes: usize,
pub max_text_bytes: usize,
}
impl Default for VerboseEventConfig {
fn default() -> Self {
Self {
max_tool_args_bytes: 100,
max_tool_result_bytes: 200,
max_text_bytes: 500,
}
}
}
pub fn format_verbose_event(event: &AgentEvent) -> Option<String> {
format_verbose_event_with_config(event, &VerboseEventConfig::default())
}
pub fn format_verbose_event_with_config(
event: &AgentEvent,
config: &VerboseEventConfig,
) -> Option<String> {
match event {
AgentEvent::TurnStarted { turn_number } => {
Some(format!("\n━━━ Turn {} ━━━", turn_number + 1))
}
AgentEvent::ToolCallRequested { name, args, .. } => {
let args_str = serde_json::to_string(args).unwrap_or_default();
let args_preview = truncate_preview(&args_str, config.max_tool_args_bytes);
Some(format!(" → Calling tool: {name} {args_preview}"))
}
AgentEvent::ToolExecutionCompleted {
name,
result,
is_error,
duration_ms,
..
} => {
let status = if *is_error { "✗" } else { "✓" };
let result_preview = truncate_preview(result, config.max_tool_result_bytes);
Some(format!(
" {status} {name} ({duration_ms}ms): {result_preview}"
))
}
AgentEvent::TurnCompleted { stop_reason, usage } => Some(format!(
" ── Turn complete: {:?} ({} in / {} out tokens)",
stop_reason, usage.input_tokens, usage.output_tokens
)),
AgentEvent::TextComplete { content } => {
if content.is_empty() {
None
} else {
let preview = truncate_preview(content, config.max_text_bytes);
Some(format!(" 💬 Response: {preview}"))
}
}
AgentEvent::ReasoningComplete { content } => {
if content.is_empty() {
None
} else {
let preview = truncate_preview(content, config.max_text_bytes);
Some(format!(" 💭 Thinking: {preview}"))
}
}
AgentEvent::Retrying {
attempt,
max_attempts,
error,
delay_ms,
} => Some(format!(
" ⟳ Retry {attempt}/{max_attempts}: {error} (waiting {delay_ms}ms)"
)),
AgentEvent::BudgetWarning {
budget_type,
used,
limit,
percent,
} => Some(format!(
" ⚠ Budget warning: {:?} at {:.0}% ({}/{})",
budget_type,
percent * 100.0,
used,
limit
)),
AgentEvent::CompactionStarted {
input_tokens,
estimated_history_tokens,
message_count,
} => Some(format!(
" ⟳ Compaction started: {input_tokens} input tokens, ~{estimated_history_tokens} history tokens, {message_count} messages"
)),
AgentEvent::CompactionCompleted {
summary_tokens,
messages_before,
messages_after,
} => Some(format!(
" ✓ Compaction complete: {messages_before} → {messages_after} messages, {summary_tokens} summary tokens"
)),
AgentEvent::CompactionFailed { error } => {
Some(format!(" ✗ Compaction failed (continuing): {error}"))
}
AgentEvent::BackgroundJobCompleted {
job_id,
display_name,
status,
detail,
} => Some(format!(
" BG job {job_id} ({display_name}) {status}: {detail}"
)),
AgentEvent::InteractionCallbackPending {
tool_name, args, ..
} => Some(format!(
" ⧖ Callback pending: {tool_name} {}",
truncate_preview(&args.to_string(), config.max_tool_args_bytes)
)),
_ => None,
}
}
fn truncate_preview(input: &str, max_bytes: usize) -> String {
if input.len() <= max_bytes {
return input.to_string();
}
format!("{}...", truncate_str(input, max_bytes))
}
fn truncate_str(s: &str, max_bytes: usize) -> &str {
if s.len() <= max_bytes {
return s;
}
let truncate_at = s
.char_indices()
.take_while(|(i, _)| *i < max_bytes)
.last()
.map_or(0, |(i, c)| i + c.len_utf8());
&s[..truncate_at]
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
#[test]
fn test_agent_event_json_schema() {
let events = vec![
AgentEvent::RunStarted {
session_id: SessionId::new(),
prompt: "Hello".to_string(),
},
AgentEvent::TextDelta {
delta: "chunk".to_string(),
},
AgentEvent::TurnStarted { turn_number: 1 },
AgentEvent::TurnCompleted {
stop_reason: StopReason::EndTurn,
usage: Usage::default(),
},
AgentEvent::ToolCallRequested {
id: "tc_1".to_string(),
name: "read_file".to_string(),
args: serde_json::json!({"path": "/tmp/test"}),
},
AgentEvent::ToolResultReceived {
id: "tc_1".to_string(),
name: "read_file".to_string(),
is_error: false,
},
AgentEvent::BudgetWarning {
budget_type: BudgetType::Tokens,
used: 8000,
limit: 10000,
percent: 0.8,
},
AgentEvent::Retrying {
attempt: 1,
max_attempts: 3,
error: "Rate limited".to_string(),
delay_ms: 1000,
},
AgentEvent::RunCompleted {
session_id: SessionId::new(),
result: "Done".to_string(),
usage: Usage {
input_tokens: 100,
output_tokens: 50,
cache_creation_tokens: None,
cache_read_tokens: None,
},
},
AgentEvent::RunFailed {
session_id: SessionId::new(),
error: "Budget exceeded".to_string(),
},
AgentEvent::CompactionStarted {
input_tokens: 120_000,
estimated_history_tokens: 150_000,
message_count: 42,
},
AgentEvent::CompactionCompleted {
summary_tokens: 2048,
messages_before: 42,
messages_after: 8,
},
AgentEvent::CompactionFailed {
error: "LLM request failed".to_string(),
},
AgentEvent::InteractionComplete {
interaction_id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
result: "agent response".to_string(),
},
AgentEvent::InteractionCallbackPending {
interaction_id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
tool_name: "external_mock".to_string(),
args: serde_json::json!({"value": "browser"}),
},
AgentEvent::InteractionFailed {
interaction_id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
error: "LLM failure".to_string(),
},
AgentEvent::StreamTruncated {
reason: "channel full".to_string(),
},
AgentEvent::ToolConfigChanged {
payload: ToolConfigChangedPayload {
operation: ToolConfigChangeOperation::Remove,
target: "filesystem".to_string(),
status: "staged".to_string(),
persisted: false,
applied_at_turn: Some(12),
domain: None,
deferred_catalog_delta: None,
},
},
AgentEvent::BackgroundJobCompleted {
job_id: "j_123".to_string(),
display_name: "sleep 2".to_string(),
status: "completed".to_string(),
detail: "exit_code: 0".to_string(),
},
];
for event in events {
let json = serde_json::to_value(&event).unwrap();
assert!(
json.get("type").is_some(),
"Event missing type field: {event:?}"
);
let roundtrip: AgentEvent = serde_json::from_value(json.clone()).unwrap();
let json2 = serde_json::to_value(&roundtrip).unwrap();
assert_eq!(json, json2);
}
}
#[test]
fn test_agent_event_type_mapping_is_total_for_all_variants() {
let events = vec![
AgentEvent::RunStarted {
session_id: SessionId::new(),
prompt: "Hello".to_string(),
},
AgentEvent::RunCompleted {
session_id: SessionId::new(),
result: "Done".to_string(),
usage: Usage::default(),
},
AgentEvent::RunFailed {
session_id: SessionId::new(),
error: "failed".to_string(),
},
AgentEvent::HookStarted {
hook_id: "hook-1".to_string(),
point: HookPoint::RunStarted,
},
AgentEvent::HookCompleted {
hook_id: "hook-1".to_string(),
point: HookPoint::RunStarted,
duration_ms: 1,
},
AgentEvent::HookFailed {
hook_id: "hook-1".to_string(),
point: HookPoint::RunStarted,
error: "failed".to_string(),
},
AgentEvent::HookDenied {
hook_id: "hook-1".to_string(),
point: HookPoint::RunStarted,
reason_code: HookReasonCode::PolicyViolation,
message: "nope".to_string(),
payload: None,
},
AgentEvent::HookRewriteApplied {
hook_id: "hook-1".to_string(),
point: HookPoint::RunStarted,
patch: HookPatch::AssistantText {
text: "patched".to_string(),
},
},
AgentEvent::HookPatchPublished {
hook_id: "hook-1".to_string(),
point: HookPoint::RunStarted,
envelope: HookPatchEnvelope {
revision: crate::hooks::HookRevision(1),
hook_id: crate::hooks::HookId("hook-1".to_string()),
point: HookPoint::RunStarted,
patch: HookPatch::AssistantText {
text: "patched".to_string(),
},
published_at: chrono::Utc::now(),
},
},
AgentEvent::TurnStarted { turn_number: 1 },
AgentEvent::ReasoningDelta {
delta: "think".to_string(),
},
AgentEvent::ReasoningComplete {
content: "done".to_string(),
},
AgentEvent::TextDelta {
delta: "chunk".to_string(),
},
AgentEvent::TextComplete {
content: "done".to_string(),
},
AgentEvent::ToolCallRequested {
id: "tool-1".to_string(),
name: "search".to_string(),
args: serde_json::json!({}),
},
AgentEvent::ToolResultReceived {
id: "tool-1".to_string(),
name: "search".to_string(),
is_error: false,
},
AgentEvent::TurnCompleted {
stop_reason: StopReason::EndTurn,
usage: Usage::default(),
},
AgentEvent::ToolExecutionStarted {
id: "tool-1".to_string(),
name: "search".to_string(),
},
AgentEvent::ToolExecutionCompleted {
id: "tool-1".to_string(),
name: "search".to_string(),
result: "ok".to_string(),
is_error: false,
duration_ms: 1,
has_images: false,
},
AgentEvent::ToolExecutionTimedOut {
id: "tool-1".to_string(),
name: "search".to_string(),
timeout_ms: 1000,
},
AgentEvent::CompactionStarted {
input_tokens: 1,
estimated_history_tokens: 2,
message_count: 3,
},
AgentEvent::CompactionCompleted {
summary_tokens: 1,
messages_before: 3,
messages_after: 1,
},
AgentEvent::CompactionFailed {
error: "failed".to_string(),
},
AgentEvent::BudgetWarning {
budget_type: BudgetType::Time,
used: 1,
limit: 2,
percent: 50.0,
},
AgentEvent::Retrying {
attempt: 1,
max_attempts: 2,
error: "retry".to_string(),
delay_ms: 100,
},
AgentEvent::SkillsResolved {
skills: vec![],
injection_bytes: 0,
},
AgentEvent::SkillResolutionFailed {
reference: "skill".to_string(),
error: "missing".to_string(),
},
AgentEvent::InteractionComplete {
interaction_id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
result: "ok".to_string(),
},
AgentEvent::InteractionCallbackPending {
interaction_id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
tool_name: "external_mock".to_string(),
args: serde_json::json!({"value": "browser"}),
},
AgentEvent::InteractionFailed {
interaction_id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
error: "failed".to_string(),
},
AgentEvent::StreamTruncated {
reason: "lag".to_string(),
},
AgentEvent::ToolConfigChanged {
payload: ToolConfigChangedPayload {
operation: ToolConfigChangeOperation::Reload,
target: "external".to_string(),
status: "applied".to_string(),
persisted: true,
applied_at_turn: Some(1),
domain: None,
deferred_catalog_delta: None,
},
},
AgentEvent::BackgroundJobCompleted {
job_id: "j_123".to_string(),
display_name: "sleep 2".to_string(),
status: "completed".to_string(),
detail: "exit_code: 0".to_string(),
},
];
let mut kinds = std::collections::BTreeSet::new();
for event in events {
let kind = agent_event_type(&event);
assert!(
!kind.is_empty(),
"event type mapping returned empty discriminator"
);
kinds.insert(kind);
}
assert!(
kinds.len() >= 33,
"expected at least one discriminator per covered event variant"
);
}
#[test]
fn test_budget_type_serialization() {
assert_eq!(serde_json::to_value(BudgetType::Tokens).unwrap(), "tokens");
assert_eq!(serde_json::to_value(BudgetType::Time).unwrap(), "time");
assert_eq!(
serde_json::to_value(BudgetType::ToolCalls).unwrap(),
"tool_calls"
);
}
#[test]
fn test_scoped_agent_event_roundtrip() {
let event = ScopedAgentEvent::new(
vec![StreamScopeFrame::MobMember {
flow_run_id: "run_123".to_string(),
member_ref: "writer".to_string(),
session_id: "sid_1".to_string(),
}],
AgentEvent::TextDelta {
delta: "hello".to_string(),
},
);
assert_eq!(event.scope_id, "mob:writer");
let json = serde_json::to_value(&event).unwrap();
let roundtrip: ScopedAgentEvent = serde_json::from_value(json).unwrap();
assert_eq!(roundtrip.scope_id, "mob:writer");
assert!(matches!(
roundtrip.event,
AgentEvent::TextDelta { ref delta } if delta == "hello"
));
}
#[test]
fn test_scope_id_from_path_formats() {
let primary = vec![StreamScopeFrame::Primary {
session_id: "sid_x".to_string(),
}];
assert_eq!(ScopedAgentEvent::scope_id_from_path(&primary), "primary");
let mob = vec![StreamScopeFrame::MobMember {
flow_run_id: "run_1".to_string(),
member_ref: "planner".to_string(),
session_id: "sid_m".to_string(),
}];
assert_eq!(ScopedAgentEvent::scope_id_from_path(&mob), "mob:planner");
}
#[test]
fn test_event_envelope_roundtrip() {
let envelope = EventEnvelope::new(
"session:sid_test",
7,
Some("mob_1".to_string()),
AgentEvent::TextDelta {
delta: "hello".to_string(),
},
);
let value = serde_json::to_value(&envelope).expect("serialize envelope");
let parsed: EventEnvelope<AgentEvent> =
serde_json::from_value(value).expect("deserialize envelope");
assert_eq!(parsed.source_id, "session:sid_test");
assert_eq!(parsed.seq, 7);
assert_eq!(parsed.mob_id.as_deref(), Some("mob_1"));
assert!(parsed.timestamp_ms > 0);
assert!(matches!(
parsed.payload,
AgentEvent::TextDelta { delta } if delta == "hello"
));
}
#[test]
fn test_compare_event_envelopes_total_order() {
let mut a = EventEnvelope::new("a", 1, None, AgentEvent::TurnStarted { turn_number: 1 });
let mut b = EventEnvelope::new("a", 2, None, AgentEvent::TurnStarted { turn_number: 2 });
a.timestamp_ms = 10;
b.timestamp_ms = 10;
assert_eq!(compare_event_envelopes(&a, &b), Ordering::Less);
assert_eq!(compare_event_envelopes(&b, &a), Ordering::Greater);
}
}