Skip to main content

meerkat_core/
event.rs

1//! Agent events for streaming output
2//!
3//! These events form the streaming API for consumers.
4
5use crate::hooks::{HookPatch, HookPatchEnvelope, HookPoint, HookReasonCode};
6use crate::time_compat::SystemTime;
7use crate::types::{SessionId, StopReason, Usage};
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use std::cmp::Ordering;
11
12/// Canonical event envelope for stream transport and ordering.
13#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
14pub struct EventEnvelope<T> {
15    pub event_id: uuid::Uuid,
16    pub source_id: String,
17    pub seq: u64,
18    #[serde(default, skip_serializing_if = "Option::is_none")]
19    pub mob_id: Option<String>,
20    pub timestamp_ms: u64,
21    pub payload: T,
22}
23
24impl<T> EventEnvelope<T> {
25    /// Create a new envelope with a UUIDv7 id and current wall-clock timestamp.
26    pub fn new(source_id: impl Into<String>, seq: u64, mob_id: Option<String>, payload: T) -> Self {
27        let timestamp_ms = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
28            Ok(duration) => duration.as_millis() as u64,
29            Err(_) => u64::MAX,
30        };
31        Self {
32            event_id: uuid::Uuid::now_v7(),
33            source_id: source_id.into(),
34            seq,
35            mob_id,
36            timestamp_ms,
37            payload,
38        }
39    }
40}
41
42/// Canonical serialized event kind for SSE/RPC discriminators.
43///
44/// Intentionally exhaustive: when a new `AgentEvent` variant is added, this
45/// match should fail to compile until that variant gets an explicit wire name.
46pub fn agent_event_type(event: &AgentEvent) -> &'static str {
47    match event {
48        AgentEvent::RunStarted { .. } => "run_started",
49        AgentEvent::RunCompleted { .. } => "run_completed",
50        AgentEvent::RunFailed { .. } => "run_failed",
51        AgentEvent::HookStarted { .. } => "hook_started",
52        AgentEvent::HookCompleted { .. } => "hook_completed",
53        AgentEvent::HookFailed { .. } => "hook_failed",
54        AgentEvent::HookDenied { .. } => "hook_denied",
55        AgentEvent::HookRewriteApplied { .. } => "hook_rewrite_applied",
56        AgentEvent::HookPatchPublished { .. } => "hook_patch_published",
57        AgentEvent::TurnStarted { .. } => "turn_started",
58        AgentEvent::ReasoningDelta { .. } => "reasoning_delta",
59        AgentEvent::ReasoningComplete { .. } => "reasoning_complete",
60        AgentEvent::TextDelta { .. } => "text_delta",
61        AgentEvent::TextComplete { .. } => "text_complete",
62        AgentEvent::ToolCallRequested { .. } => "tool_call_requested",
63        AgentEvent::ToolResultReceived { .. } => "tool_result_received",
64        AgentEvent::TurnCompleted { .. } => "turn_completed",
65        AgentEvent::ToolExecutionStarted { .. } => "tool_execution_started",
66        AgentEvent::ToolExecutionCompleted { .. } => "tool_execution_completed",
67        AgentEvent::ToolExecutionTimedOut { .. } => "tool_execution_timed_out",
68        AgentEvent::CompactionStarted { .. } => "compaction_started",
69        AgentEvent::CompactionCompleted { .. } => "compaction_completed",
70        AgentEvent::CompactionFailed { .. } => "compaction_failed",
71        AgentEvent::BudgetWarning { .. } => "budget_warning",
72        AgentEvent::Retrying { .. } => "retrying",
73        AgentEvent::SkillsResolved { .. } => "skills_resolved",
74        AgentEvent::SkillResolutionFailed { .. } => "skill_resolution_failed",
75        AgentEvent::InteractionComplete { .. } => "interaction_complete",
76        AgentEvent::InteractionCallbackPending { .. } => "interaction_callback_pending",
77        AgentEvent::InteractionFailed { .. } => "interaction_failed",
78        AgentEvent::StreamTruncated { .. } => "stream_truncated",
79        AgentEvent::ToolConfigChanged { .. } => "tool_config_changed",
80        AgentEvent::BackgroundJobCompleted { .. } => "background_job_completed",
81    }
82}
83
84/// Deterministic total ordering comparator for event envelopes.
85pub fn compare_event_envelopes<T>(a: &EventEnvelope<T>, b: &EventEnvelope<T>) -> Ordering {
86    a.timestamp_ms
87        .cmp(&b.timestamp_ms)
88        .then_with(|| a.source_id.cmp(&b.source_id))
89        .then_with(|| a.seq.cmp(&b.seq))
90        .then_with(|| a.event_id.cmp(&b.event_id))
91}
92
93/// Payload for tool configuration change notifications.
94#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
95#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
96pub struct ToolConfigChangedPayload {
97    pub operation: ToolConfigChangeOperation,
98    pub target: String,
99    pub status: String,
100    pub persisted: bool,
101    #[serde(skip_serializing_if = "Option::is_none")]
102    pub applied_at_turn: Option<u32>,
103    #[serde(default, skip_serializing_if = "Option::is_none")]
104    pub domain: Option<ToolConfigChangeDomain>,
105    #[serde(default, skip_serializing_if = "Option::is_none")]
106    pub deferred_catalog_delta: Option<DeferredCatalogDelta>,
107}
108
109/// Optional typed domain for tool-configuration change payloads.
110#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
111#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
112#[serde(rename_all = "snake_case")]
113pub enum ToolConfigChangeDomain {
114    ToolScope,
115    DeferredCatalog,
116}
117
118/// Additive hidden-catalog delta metadata for runtime notices.
119#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
120#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
121pub struct DeferredCatalogDelta {
122    #[serde(default, skip_serializing_if = "Vec::is_empty")]
123    pub added_hidden_names: Vec<String>,
124    #[serde(default, skip_serializing_if = "Vec::is_empty")]
125    pub removed_hidden_names: Vec<String>,
126    #[serde(default, skip_serializing_if = "Vec::is_empty")]
127    pub pending_sources: Vec<String>,
128}
129
130/// Operation kind for live tool configuration changes.
131#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
132#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
133#[serde(rename_all = "snake_case")]
134pub enum ToolConfigChangeOperation {
135    Add,
136    Remove,
137    Reload,
138}
139
140/// Canonical lifecycle phase for external-tool boundary deltas.
141#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
142#[serde(rename_all = "snake_case")]
143pub enum ExternalToolDeltaPhase {
144    Pending,
145    Applied,
146    Draining,
147    Forced,
148    Failed,
149}
150
151impl ExternalToolDeltaPhase {
152    #[must_use]
153    pub fn as_status(self) -> &'static str {
154        match self {
155            Self::Pending => "pending",
156            Self::Applied => "applied",
157            Self::Draining => "draining",
158            Self::Forced => "forced",
159            Self::Failed => "failed",
160        }
161    }
162}
163
164/// Canonical outward lifecycle delta for external-tool surface changes.
165#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
166pub struct ExternalToolDelta {
167    pub target: String,
168    pub operation: ToolConfigChangeOperation,
169    pub phase: ExternalToolDeltaPhase,
170    pub persisted: bool,
171    #[serde(skip_serializing_if = "Option::is_none")]
172    pub applied_at_turn: Option<u32>,
173    #[serde(default, skip_serializing_if = "Option::is_none")]
174    pub tool_count: Option<usize>,
175    #[serde(default, skip_serializing_if = "Option::is_none")]
176    pub detail: Option<String>,
177}
178
179impl ExternalToolDelta {
180    #[must_use]
181    pub fn new(
182        target: impl Into<String>,
183        operation: ToolConfigChangeOperation,
184        phase: ExternalToolDeltaPhase,
185    ) -> Self {
186        Self {
187            target: target.into(),
188            operation,
189            phase,
190            persisted: !matches!(
191                phase,
192                ExternalToolDeltaPhase::Pending | ExternalToolDeltaPhase::Draining
193            ),
194            applied_at_turn: None,
195            tool_count: None,
196            detail: None,
197        }
198    }
199
200    #[must_use]
201    pub fn with_tool_count(mut self, tool_count: Option<usize>) -> Self {
202        self.tool_count = tool_count;
203        self
204    }
205
206    #[must_use]
207    pub fn with_detail(mut self, detail: Option<String>) -> Self {
208        self.detail = detail;
209        self
210    }
211
212    #[must_use]
213    pub fn status_text(&self) -> String {
214        let mut status = self.phase.as_status().to_string();
215        if self.phase == ExternalToolDeltaPhase::Failed
216            && let Some(detail) = &self.detail
217        {
218            status = format!("{status}: {detail}");
219        }
220        status
221    }
222
223    #[must_use]
224    pub fn to_tool_config_changed_payload(&self) -> ToolConfigChangedPayload {
225        ToolConfigChangedPayload {
226            operation: self.operation.clone(),
227            target: self.target.clone(),
228            status: self.status_text(),
229            persisted: self.persisted,
230            applied_at_turn: self.applied_at_turn,
231            domain: None,
232            deferred_catalog_delta: None,
233        }
234    }
235}
236
237/// Events emitted during agent execution
238///
239/// These events form the streaming API for consumers.
240#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
241#[derive(Debug, Clone, Serialize, Deserialize)]
242#[serde(tag = "type", rename_all = "snake_case")]
243#[non_exhaustive]
244pub enum AgentEvent {
245    // === Session Lifecycle ===
246    /// Agent run started
247    RunStarted {
248        session_id: SessionId,
249        prompt: String,
250    },
251
252    /// Agent run completed successfully
253    RunCompleted {
254        session_id: SessionId,
255        result: String,
256        usage: Usage,
257    },
258
259    /// Agent run failed
260    RunFailed {
261        session_id: SessionId,
262        error: String,
263    },
264
265    // === Hook Lifecycle ===
266    /// Hook invocation started.
267    HookStarted { hook_id: String, point: HookPoint },
268
269    /// Hook invocation completed.
270    HookCompleted {
271        hook_id: String,
272        point: HookPoint,
273        duration_ms: u64,
274    },
275
276    /// Hook invocation failed.
277    HookFailed {
278        hook_id: String,
279        point: HookPoint,
280        error: String,
281    },
282
283    /// Hook denied an action.
284    HookDenied {
285        hook_id: String,
286        point: HookPoint,
287        reason_code: HookReasonCode,
288        message: String,
289        #[serde(default, skip_serializing_if = "Option::is_none")]
290        payload: Option<Value>,
291    },
292
293    /// A rewrite patch was applied synchronously.
294    HookRewriteApplied {
295        hook_id: String,
296        point: HookPoint,
297        patch: HookPatch,
298    },
299
300    /// A background patch was published for downstream surfaces.
301    HookPatchPublished {
302        hook_id: String,
303        point: HookPoint,
304        envelope: HookPatchEnvelope,
305    },
306
307    // === LLM Interaction ===
308    /// New turn started (calling LLM)
309    TurnStarted { turn_number: u32 },
310
311    /// Streaming reasoning/thinking from the model
312    ReasoningDelta { delta: String },
313
314    /// Reasoning/thinking complete for this block
315    ReasoningComplete { content: String },
316
317    /// Streaming text from the model
318    TextDelta { delta: String },
319
320    /// Text generation complete for this turn
321    TextComplete { content: String },
322
323    /// Model requested a tool call
324    ToolCallRequested {
325        id: String,
326        name: String,
327        args: Value,
328    },
329
330    /// Tool result received (injected into conversation)
331    ToolResultReceived {
332        id: String,
333        name: String,
334        is_error: bool,
335    },
336
337    /// Turn completed
338    TurnCompleted {
339        stop_reason: StopReason,
340        usage: Usage,
341    },
342
343    // === Tool Execution ===
344    /// Starting tool execution
345    ToolExecutionStarted { id: String, name: String },
346
347    /// Tool execution completed
348    ToolExecutionCompleted {
349        id: String,
350        name: String,
351        result: String,
352        is_error: bool,
353        duration_ms: u64,
354        /// Whether the tool result contains image content blocks.
355        #[serde(default)]
356        has_images: bool,
357    },
358
359    /// Tool execution timed out
360    ToolExecutionTimedOut {
361        id: String,
362        name: String,
363        timeout_ms: u64,
364    },
365
366    // === Compaction ===
367    /// Context compaction started.
368    CompactionStarted {
369        /// Input tokens from the last LLM call that triggered compaction.
370        input_tokens: u64,
371        /// Estimated total history tokens before compaction.
372        estimated_history_tokens: u64,
373        /// Number of messages before compaction.
374        message_count: usize,
375    },
376
377    /// Context compaction completed successfully.
378    CompactionCompleted {
379        /// Tokens consumed by the summary.
380        summary_tokens: u64,
381        /// Messages before compaction.
382        messages_before: usize,
383        /// Messages after compaction.
384        messages_after: usize,
385    },
386
387    /// Context compaction failed (non-fatal — agent continues with uncompacted history).
388    CompactionFailed { error: String },
389
390    // === Budget ===
391    /// Budget warning (approaching limits)
392    BudgetWarning {
393        budget_type: BudgetType,
394        used: u64,
395        limit: u64,
396        percent: f32,
397    },
398
399    // === Retry Events ===
400    /// Retrying after error
401    Retrying {
402        attempt: u32,
403        max_attempts: u32,
404        error: String,
405        delay_ms: u64,
406    },
407
408    // === Skill Events ===
409    /// Skills resolved for this turn.
410    SkillsResolved {
411        skills: Vec<crate::skills::SkillId>,
412        injection_bytes: usize,
413    },
414
415    /// A skill reference could not be resolved.
416    SkillResolutionFailed { reference: String, error: String },
417
418    // === Interaction-Scoped Streaming ===
419    /// An interaction completed successfully (terminal event for tap subscribers).
420    InteractionComplete {
421        interaction_id: crate::interaction::InteractionId,
422        result: String,
423    },
424
425    /// An interaction reached an external callback boundary and is waiting for
426    /// tool results before the session can continue.
427    InteractionCallbackPending {
428        interaction_id: crate::interaction::InteractionId,
429        tool_name: String,
430        args: Value,
431    },
432
433    /// An interaction failed (terminal event for tap subscribers).
434    InteractionFailed {
435        interaction_id: crate::interaction::InteractionId,
436        error: String,
437    },
438
439    /// Some streaming events were dropped due to channel backpressure.
440    /// Best-effort marker — the terminal event is authoritative.
441    StreamTruncated { reason: String },
442
443    /// Live tool configuration changed for this session.
444    ToolConfigChanged { payload: ToolConfigChangedPayload },
445
446    /// A background shell job completed (or failed/cancelled/timed out).
447    BackgroundJobCompleted {
448        job_id: String,
449        display_name: String,
450        status: String,
451        detail: String,
452    },
453}
454
455/// Scope attribution frame for multi-agent streaming.
456#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
457#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
458#[serde(tag = "scope", rename_all = "snake_case")]
459#[non_exhaustive]
460pub enum StreamScopeFrame {
461    /// Top-level primary session scope.
462    Primary { session_id: String },
463    /// Mob member scope for flow dispatch turns.
464    MobMember {
465        flow_run_id: String,
466        member_ref: String,
467        session_id: String,
468    },
469}
470
471/// Attributed stream event wrapper for multi-agent streaming.
472#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
473#[derive(Debug, Clone, Serialize, Deserialize)]
474pub struct ScopedAgentEvent {
475    pub scope_id: String,
476    pub scope_path: Vec<StreamScopeFrame>,
477    pub event: AgentEvent,
478}
479
480impl ScopedAgentEvent {
481    /// Build a scoped event from a scope path and payload event.
482    pub fn new(scope_path: Vec<StreamScopeFrame>, event: AgentEvent) -> Self {
483        let scope_id = Self::scope_id_from_path(&scope_path);
484        Self {
485            scope_id,
486            scope_path,
487            event,
488        }
489    }
490
491    /// Build a primary-scoped event for a top-level session event.
492    pub fn primary(session_id: impl Into<String>, event: AgentEvent) -> Self {
493        Self::new(
494            vec![StreamScopeFrame::Primary {
495                session_id: session_id.into(),
496            }],
497            event,
498        )
499    }
500
501    /// Convenience alias for converting a legacy event into primary scope.
502    pub fn from_agent_event_primary(session_id: impl Into<String>, event: AgentEvent) -> Self {
503        Self::primary(session_id, event)
504    }
505
506    /// Append one scope frame and recompute scope_id deterministically.
507    pub fn append_scope(mut self, frame: StreamScopeFrame) -> Self {
508        self.scope_path.push(frame);
509        self.scope_id = Self::scope_id_from_path(&self.scope_path);
510        self
511    }
512
513    /// Deterministic canonical selector from scope path.
514    ///
515    /// Formats:
516    /// - `primary`
517    /// - `mob:<member_ref>`
518    pub fn scope_id_from_path(path: &[StreamScopeFrame]) -> String {
519        if path.is_empty() {
520            return "primary".to_string();
521        }
522        let mut segments: Vec<String> = Vec::with_capacity(path.len());
523        for frame in path {
524            match frame {
525                StreamScopeFrame::Primary { .. } => segments.push("primary".to_string()),
526                StreamScopeFrame::MobMember { member_ref, .. } => {
527                    segments.push(format!("mob:{member_ref}"));
528                }
529            }
530        }
531        segments.join("/")
532    }
533}
534
535/// Type of budget being tracked
536#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
537#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
538#[serde(rename_all = "snake_case")]
539pub enum BudgetType {
540    Tokens,
541    Time,
542    ToolCalls,
543}
544
545/// Configuration for formatting verbose event output.
546#[derive(Debug, Clone, Copy)]
547pub struct VerboseEventConfig {
548    pub max_tool_args_bytes: usize,
549    pub max_tool_result_bytes: usize,
550    pub max_text_bytes: usize,
551}
552
553impl Default for VerboseEventConfig {
554    fn default() -> Self {
555        Self {
556            max_tool_args_bytes: 100,
557            max_tool_result_bytes: 200,
558            max_text_bytes: 500,
559        }
560    }
561}
562
563/// Format an agent event using default verbose formatting rules.
564pub fn format_verbose_event(event: &AgentEvent) -> Option<String> {
565    format_verbose_event_with_config(event, &VerboseEventConfig::default())
566}
567
568/// Format an agent event using custom verbose formatting rules.
569pub fn format_verbose_event_with_config(
570    event: &AgentEvent,
571    config: &VerboseEventConfig,
572) -> Option<String> {
573    match event {
574        AgentEvent::TurnStarted { turn_number } => {
575            Some(format!("\n━━━ Turn {} ━━━", turn_number + 1))
576        }
577        AgentEvent::ToolCallRequested { name, args, .. } => {
578            let args_str = serde_json::to_string(args).unwrap_or_default();
579            let args_preview = truncate_preview(&args_str, config.max_tool_args_bytes);
580            Some(format!("  → Calling tool: {name} {args_preview}"))
581        }
582        AgentEvent::ToolExecutionCompleted {
583            name,
584            result,
585            is_error,
586            duration_ms,
587            ..
588        } => {
589            let status = if *is_error { "✗" } else { "✓" };
590            let result_preview = truncate_preview(result, config.max_tool_result_bytes);
591            Some(format!(
592                "  {status} {name} ({duration_ms}ms): {result_preview}"
593            ))
594        }
595        AgentEvent::TurnCompleted { stop_reason, usage } => Some(format!(
596            "  ── Turn complete: {:?} ({} in / {} out tokens)",
597            stop_reason, usage.input_tokens, usage.output_tokens
598        )),
599        AgentEvent::TextComplete { content } => {
600            if content.is_empty() {
601                None
602            } else {
603                let preview = truncate_preview(content, config.max_text_bytes);
604                Some(format!("  💬 Response: {preview}"))
605            }
606        }
607        AgentEvent::ReasoningComplete { content } => {
608            if content.is_empty() {
609                None
610            } else {
611                let preview = truncate_preview(content, config.max_text_bytes);
612                Some(format!("  💭 Thinking: {preview}"))
613            }
614        }
615        AgentEvent::Retrying {
616            attempt,
617            max_attempts,
618            error,
619            delay_ms,
620        } => Some(format!(
621            "  ⟳ Retry {attempt}/{max_attempts}: {error} (waiting {delay_ms}ms)"
622        )),
623        AgentEvent::BudgetWarning {
624            budget_type,
625            used,
626            limit,
627            percent,
628        } => Some(format!(
629            "  ⚠ Budget warning: {:?} at {:.0}% ({}/{})",
630            budget_type,
631            percent * 100.0,
632            used,
633            limit
634        )),
635        AgentEvent::CompactionStarted {
636            input_tokens,
637            estimated_history_tokens,
638            message_count,
639        } => Some(format!(
640            "  ⟳ Compaction started: {input_tokens} input tokens, ~{estimated_history_tokens} history tokens, {message_count} messages"
641        )),
642        AgentEvent::CompactionCompleted {
643            summary_tokens,
644            messages_before,
645            messages_after,
646        } => Some(format!(
647            "  ✓ Compaction complete: {messages_before} → {messages_after} messages, {summary_tokens} summary tokens"
648        )),
649        AgentEvent::CompactionFailed { error } => {
650            Some(format!("  ✗ Compaction failed (continuing): {error}"))
651        }
652        AgentEvent::BackgroundJobCompleted {
653            job_id,
654            display_name,
655            status,
656            detail,
657        } => Some(format!(
658            "  BG job {job_id} ({display_name}) {status}: {detail}"
659        )),
660        AgentEvent::InteractionCallbackPending {
661            tool_name, args, ..
662        } => Some(format!(
663            "  ⧖ Callback pending: {tool_name} {}",
664            truncate_preview(&args.to_string(), config.max_tool_args_bytes)
665        )),
666        _ => None,
667    }
668}
669
670fn truncate_preview(input: &str, max_bytes: usize) -> String {
671    if input.len() <= max_bytes {
672        return input.to_string();
673    }
674    format!("{}...", truncate_str(input, max_bytes))
675}
676
677fn truncate_str(s: &str, max_bytes: usize) -> &str {
678    if s.len() <= max_bytes {
679        return s;
680    }
681    let truncate_at = s
682        .char_indices()
683        .take_while(|(i, _)| *i < max_bytes)
684        .last()
685        .map_or(0, |(i, c)| i + c.len_utf8());
686    &s[..truncate_at]
687}
688
689#[cfg(test)]
690#[allow(clippy::unwrap_used, clippy::expect_used)]
691mod tests {
692    use super::*;
693
694    #[test]
695    fn test_agent_event_json_schema() {
696        // Test all event variants serialize correctly
697        let events = vec![
698            AgentEvent::RunStarted {
699                session_id: SessionId::new(),
700                prompt: "Hello".to_string(),
701            },
702            AgentEvent::TextDelta {
703                delta: "chunk".to_string(),
704            },
705            AgentEvent::TurnStarted { turn_number: 1 },
706            AgentEvent::TurnCompleted {
707                stop_reason: StopReason::EndTurn,
708                usage: Usage::default(),
709            },
710            AgentEvent::ToolCallRequested {
711                id: "tc_1".to_string(),
712                name: "read_file".to_string(),
713                args: serde_json::json!({"path": "/tmp/test"}),
714            },
715            AgentEvent::ToolResultReceived {
716                id: "tc_1".to_string(),
717                name: "read_file".to_string(),
718                is_error: false,
719            },
720            AgentEvent::BudgetWarning {
721                budget_type: BudgetType::Tokens,
722                used: 8000,
723                limit: 10000,
724                percent: 0.8,
725            },
726            AgentEvent::Retrying {
727                attempt: 1,
728                max_attempts: 3,
729                error: "Rate limited".to_string(),
730                delay_ms: 1000,
731            },
732            AgentEvent::RunCompleted {
733                session_id: SessionId::new(),
734                result: "Done".to_string(),
735                usage: Usage {
736                    input_tokens: 100,
737                    output_tokens: 50,
738                    cache_creation_tokens: None,
739                    cache_read_tokens: None,
740                },
741            },
742            AgentEvent::RunFailed {
743                session_id: SessionId::new(),
744                error: "Budget exceeded".to_string(),
745            },
746            AgentEvent::CompactionStarted {
747                input_tokens: 120_000,
748                estimated_history_tokens: 150_000,
749                message_count: 42,
750            },
751            AgentEvent::CompactionCompleted {
752                summary_tokens: 2048,
753                messages_before: 42,
754                messages_after: 8,
755            },
756            AgentEvent::CompactionFailed {
757                error: "LLM request failed".to_string(),
758            },
759            AgentEvent::InteractionComplete {
760                interaction_id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
761                result: "agent response".to_string(),
762            },
763            AgentEvent::InteractionCallbackPending {
764                interaction_id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
765                tool_name: "external_mock".to_string(),
766                args: serde_json::json!({"value": "browser"}),
767            },
768            AgentEvent::InteractionFailed {
769                interaction_id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
770                error: "LLM failure".to_string(),
771            },
772            AgentEvent::StreamTruncated {
773                reason: "channel full".to_string(),
774            },
775            AgentEvent::ToolConfigChanged {
776                payload: ToolConfigChangedPayload {
777                    operation: ToolConfigChangeOperation::Remove,
778                    target: "filesystem".to_string(),
779                    status: "staged".to_string(),
780                    persisted: false,
781                    applied_at_turn: Some(12),
782                    domain: None,
783                    deferred_catalog_delta: None,
784                },
785            },
786            AgentEvent::BackgroundJobCompleted {
787                job_id: "j_123".to_string(),
788                display_name: "sleep 2".to_string(),
789                status: "completed".to_string(),
790                detail: "exit_code: 0".to_string(),
791            },
792        ];
793
794        for event in events {
795            let json = serde_json::to_value(&event).unwrap();
796
797            // All events should have a "type" field
798            assert!(
799                json.get("type").is_some(),
800                "Event missing type field: {event:?}"
801            );
802
803            // Should roundtrip
804            let roundtrip: AgentEvent = serde_json::from_value(json.clone()).unwrap();
805            let json2 = serde_json::to_value(&roundtrip).unwrap();
806            assert_eq!(json, json2);
807        }
808    }
809
810    #[test]
811    fn test_agent_event_type_mapping_is_total_for_all_variants() {
812        let events = vec![
813            AgentEvent::RunStarted {
814                session_id: SessionId::new(),
815                prompt: "Hello".to_string(),
816            },
817            AgentEvent::RunCompleted {
818                session_id: SessionId::new(),
819                result: "Done".to_string(),
820                usage: Usage::default(),
821            },
822            AgentEvent::RunFailed {
823                session_id: SessionId::new(),
824                error: "failed".to_string(),
825            },
826            AgentEvent::HookStarted {
827                hook_id: "hook-1".to_string(),
828                point: HookPoint::RunStarted,
829            },
830            AgentEvent::HookCompleted {
831                hook_id: "hook-1".to_string(),
832                point: HookPoint::RunStarted,
833                duration_ms: 1,
834            },
835            AgentEvent::HookFailed {
836                hook_id: "hook-1".to_string(),
837                point: HookPoint::RunStarted,
838                error: "failed".to_string(),
839            },
840            AgentEvent::HookDenied {
841                hook_id: "hook-1".to_string(),
842                point: HookPoint::RunStarted,
843                reason_code: HookReasonCode::PolicyViolation,
844                message: "nope".to_string(),
845                payload: None,
846            },
847            AgentEvent::HookRewriteApplied {
848                hook_id: "hook-1".to_string(),
849                point: HookPoint::RunStarted,
850                patch: HookPatch::AssistantText {
851                    text: "patched".to_string(),
852                },
853            },
854            AgentEvent::HookPatchPublished {
855                hook_id: "hook-1".to_string(),
856                point: HookPoint::RunStarted,
857                envelope: HookPatchEnvelope {
858                    revision: crate::hooks::HookRevision(1),
859                    hook_id: crate::hooks::HookId("hook-1".to_string()),
860                    point: HookPoint::RunStarted,
861                    patch: HookPatch::AssistantText {
862                        text: "patched".to_string(),
863                    },
864                    published_at: chrono::Utc::now(),
865                },
866            },
867            AgentEvent::TurnStarted { turn_number: 1 },
868            AgentEvent::ReasoningDelta {
869                delta: "think".to_string(),
870            },
871            AgentEvent::ReasoningComplete {
872                content: "done".to_string(),
873            },
874            AgentEvent::TextDelta {
875                delta: "chunk".to_string(),
876            },
877            AgentEvent::TextComplete {
878                content: "done".to_string(),
879            },
880            AgentEvent::ToolCallRequested {
881                id: "tool-1".to_string(),
882                name: "search".to_string(),
883                args: serde_json::json!({}),
884            },
885            AgentEvent::ToolResultReceived {
886                id: "tool-1".to_string(),
887                name: "search".to_string(),
888                is_error: false,
889            },
890            AgentEvent::TurnCompleted {
891                stop_reason: StopReason::EndTurn,
892                usage: Usage::default(),
893            },
894            AgentEvent::ToolExecutionStarted {
895                id: "tool-1".to_string(),
896                name: "search".to_string(),
897            },
898            AgentEvent::ToolExecutionCompleted {
899                id: "tool-1".to_string(),
900                name: "search".to_string(),
901                result: "ok".to_string(),
902                is_error: false,
903                duration_ms: 1,
904                has_images: false,
905            },
906            AgentEvent::ToolExecutionTimedOut {
907                id: "tool-1".to_string(),
908                name: "search".to_string(),
909                timeout_ms: 1000,
910            },
911            AgentEvent::CompactionStarted {
912                input_tokens: 1,
913                estimated_history_tokens: 2,
914                message_count: 3,
915            },
916            AgentEvent::CompactionCompleted {
917                summary_tokens: 1,
918                messages_before: 3,
919                messages_after: 1,
920            },
921            AgentEvent::CompactionFailed {
922                error: "failed".to_string(),
923            },
924            AgentEvent::BudgetWarning {
925                budget_type: BudgetType::Time,
926                used: 1,
927                limit: 2,
928                percent: 50.0,
929            },
930            AgentEvent::Retrying {
931                attempt: 1,
932                max_attempts: 2,
933                error: "retry".to_string(),
934                delay_ms: 100,
935            },
936            AgentEvent::SkillsResolved {
937                skills: vec![],
938                injection_bytes: 0,
939            },
940            AgentEvent::SkillResolutionFailed {
941                reference: "skill".to_string(),
942                error: "missing".to_string(),
943            },
944            AgentEvent::InteractionComplete {
945                interaction_id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
946                result: "ok".to_string(),
947            },
948            AgentEvent::InteractionCallbackPending {
949                interaction_id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
950                tool_name: "external_mock".to_string(),
951                args: serde_json::json!({"value": "browser"}),
952            },
953            AgentEvent::InteractionFailed {
954                interaction_id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
955                error: "failed".to_string(),
956            },
957            AgentEvent::StreamTruncated {
958                reason: "lag".to_string(),
959            },
960            AgentEvent::ToolConfigChanged {
961                payload: ToolConfigChangedPayload {
962                    operation: ToolConfigChangeOperation::Reload,
963                    target: "external".to_string(),
964                    status: "applied".to_string(),
965                    persisted: true,
966                    applied_at_turn: Some(1),
967                    domain: None,
968                    deferred_catalog_delta: None,
969                },
970            },
971            AgentEvent::BackgroundJobCompleted {
972                job_id: "j_123".to_string(),
973                display_name: "sleep 2".to_string(),
974                status: "completed".to_string(),
975                detail: "exit_code: 0".to_string(),
976            },
977        ];
978
979        let mut kinds = std::collections::BTreeSet::new();
980        for event in events {
981            let kind = agent_event_type(&event);
982            assert!(
983                !kind.is_empty(),
984                "event type mapping returned empty discriminator"
985            );
986            kinds.insert(kind);
987        }
988        assert!(
989            kinds.len() >= 33,
990            "expected at least one discriminator per covered event variant"
991        );
992    }
993
994    #[test]
995    fn test_budget_type_serialization() {
996        assert_eq!(serde_json::to_value(BudgetType::Tokens).unwrap(), "tokens");
997        assert_eq!(serde_json::to_value(BudgetType::Time).unwrap(), "time");
998        assert_eq!(
999            serde_json::to_value(BudgetType::ToolCalls).unwrap(),
1000            "tool_calls"
1001        );
1002    }
1003
1004    #[test]
1005    fn test_scoped_agent_event_roundtrip() {
1006        let event = ScopedAgentEvent::new(
1007            vec![StreamScopeFrame::MobMember {
1008                flow_run_id: "run_123".to_string(),
1009                member_ref: "writer".to_string(),
1010                session_id: "sid_1".to_string(),
1011            }],
1012            AgentEvent::TextDelta {
1013                delta: "hello".to_string(),
1014            },
1015        );
1016
1017        assert_eq!(event.scope_id, "mob:writer");
1018
1019        let json = serde_json::to_value(&event).unwrap();
1020        let roundtrip: ScopedAgentEvent = serde_json::from_value(json).unwrap();
1021        assert_eq!(roundtrip.scope_id, "mob:writer");
1022        assert!(matches!(
1023            roundtrip.event,
1024            AgentEvent::TextDelta { ref delta } if delta == "hello"
1025        ));
1026    }
1027
1028    #[test]
1029    fn test_scope_id_from_path_formats() {
1030        let primary = vec![StreamScopeFrame::Primary {
1031            session_id: "sid_x".to_string(),
1032        }];
1033        assert_eq!(ScopedAgentEvent::scope_id_from_path(&primary), "primary");
1034
1035        let mob = vec![StreamScopeFrame::MobMember {
1036            flow_run_id: "run_1".to_string(),
1037            member_ref: "planner".to_string(),
1038            session_id: "sid_m".to_string(),
1039        }];
1040        assert_eq!(ScopedAgentEvent::scope_id_from_path(&mob), "mob:planner");
1041    }
1042
1043    #[test]
1044    fn test_event_envelope_roundtrip() {
1045        let envelope = EventEnvelope::new(
1046            "session:sid_test",
1047            7,
1048            Some("mob_1".to_string()),
1049            AgentEvent::TextDelta {
1050                delta: "hello".to_string(),
1051            },
1052        );
1053        let value = serde_json::to_value(&envelope).expect("serialize envelope");
1054        let parsed: EventEnvelope<AgentEvent> =
1055            serde_json::from_value(value).expect("deserialize envelope");
1056        assert_eq!(parsed.source_id, "session:sid_test");
1057        assert_eq!(parsed.seq, 7);
1058        assert_eq!(parsed.mob_id.as_deref(), Some("mob_1"));
1059        assert!(parsed.timestamp_ms > 0);
1060        assert!(matches!(
1061            parsed.payload,
1062            AgentEvent::TextDelta { delta } if delta == "hello"
1063        ));
1064    }
1065
1066    #[test]
1067    fn test_compare_event_envelopes_total_order() {
1068        let mut a = EventEnvelope::new("a", 1, None, AgentEvent::TurnStarted { turn_number: 1 });
1069        let mut b = EventEnvelope::new("a", 2, None, AgentEvent::TurnStarted { turn_number: 2 });
1070        a.timestamp_ms = 10;
1071        b.timestamp_ms = 10;
1072        assert_eq!(compare_event_envelopes(&a, &b), Ordering::Less);
1073        assert_eq!(compare_event_envelopes(&b, &a), Ordering::Greater);
1074    }
1075}