Skip to main content

agent_sdk_foundation/
events.rs

1//! Agent events for real-time streaming.
2//!
3//! The [`AgentEvent`] enum represents all events that can occur during agent
4//! execution. These events are streamed via an async channel for real-time
5//! UI updates and logging.
6//!
7//! # Event Flow
8//!
9//! A typical event sequence looks like:
10//! 1. `Start` - Agent begins processing
11//! 2. `Text` / `ToolCallStart` / `ToolCallEnd` - Processing events
12//! 3. `TurnComplete` - One LLM round-trip finished
13//! 4. `Done` - Agent completed successfully, or `Error` if failed
14
15use crate::llm::ContentBlock;
16use crate::types::{ThreadId, TokenUsage, ToolResult, ToolTier};
17use serde::{Deserialize, Serialize};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20use std::time::Duration;
21use time::OffsetDateTime;
22
23/// Serde adapter encoding a [`Duration`] as a millisecond integer
24/// (`duration_ms`) instead of serde's default `{secs,nanos}` object.
25mod duration_ms_serde {
26    use serde::{Deserialize, Deserializer, Serializer};
27    use std::time::Duration;
28
29    pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
30    where
31        S: Serializer,
32    {
33        let ms = u64::try_from(duration.as_millis()).unwrap_or(u64::MAX);
34        serializer.serialize_u64(ms)
35    }
36
37    pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
38    where
39        D: Deserializer<'de>,
40    {
41        let ms = u64::deserialize(deserializer)?;
42        Ok(Duration::from_millis(ms))
43    }
44}
45
46/// Events emitted by the agent loop during execution.
47/// These are streamed to the client for real-time UI updates.
48#[derive(Clone, Debug, Serialize, Deserialize)]
49#[serde(tag = "type", rename_all = "snake_case")]
50#[non_exhaustive]
51pub enum AgentEvent {
52    /// Agent loop has started
53    Start { thread_id: ThreadId, turn: usize },
54
55    /// The user prompt that opens a turn.
56    ///
57    /// Committed by the worker on the **first attempt** of each
58    /// root-turn task, immediately before the matching
59    /// [`AgentEvent::Start`]. Carries the task's admitted
60    /// `submitted_input` lifted into the LLM-content shape
61    /// (`Vec<ContentBlock>` — text, image, document) so consumers
62    /// can render the prompt without reaching into the projection
63    /// or the task store. Retries of the same turn do not re-emit
64    /// the event; downstream readers can pair `UserInput` 1:1
65    /// with the *first* `Start { turn: N }` per turn.
66    ///
67    /// This is the durable, sequence-numbered admission event the
68    /// projection never carried — `MessageProjection::messages`
69    /// still holds the same prompt as an `llm::Message`, but it
70    /// has no sequence and commingles with tool-result and
71    /// compaction-summary user-role rows. Replay clients that
72    /// need a clean, chronological "this is what the user typed"
73    /// signal read this event instead.
74    UserInput {
75        thread_id: ThreadId,
76        /// Lifted from the admitted task's
77        /// `submitted_input`. Only `Text`, `Image`, and `Document`
78        /// blocks appear — the runtime never admits user prompts
79        /// containing tool blocks, but the broader
80        /// `ContentBlock` type lets the field round-trip through
81        /// the same wire shapes the projection uses.
82        content: Vec<ContentBlock>,
83    },
84
85    /// Agent is "thinking" - complete thinking text after stream ends
86    Thinking { message_id: String, text: String },
87
88    /// A thinking delta for streaming thinking content
89    ThinkingDelta { message_id: String, delta: String },
90
91    /// A text delta for streaming responses
92    TextDelta { message_id: String, delta: String },
93
94    /// Complete text block from the agent
95    Text { message_id: String, text: String },
96
97    /// Agent is about to call a tool
98    ToolCallStart {
99        id: String,
100        name: String,
101        display_name: String,
102        input: serde_json::Value,
103        tier: ToolTier,
104    },
105
106    /// Tool execution completed
107    ToolCallEnd {
108        id: String,
109        name: String,
110        display_name: String,
111        result: ToolResult,
112    },
113
114    /// Progress update from an async tool operation
115    ToolProgress {
116        /// Tool call ID
117        id: String,
118        /// Tool name
119        name: String,
120        /// Human-readable display name
121        display_name: String,
122        /// Progress stage
123        stage: String,
124        /// Human-readable progress message
125        message: String,
126        /// Optional tool-specific data
127        data: Option<serde_json::Value>,
128    },
129
130    /// Tool requires confirmation before execution.
131    /// The application determines the confirmation type (normal, PIN, biometric).
132    ToolRequiresConfirmation {
133        id: String,
134        name: String,
135        display_name: String,
136        input: serde_json::Value,
137        description: String,
138    },
139
140    /// Agent turn completed (one LLM round-trip)
141    TurnComplete { turn: usize, usage: TokenUsage },
142
143    /// Agent loop completed successfully
144    Done {
145        thread_id: ThreadId,
146        total_turns: usize,
147        total_usage: TokenUsage,
148        /// Wall-clock run duration.
149        ///
150        /// Serialized on the wire as `duration_ms` (a millisecond integer) to
151        /// match [`TurnSummary::duration_ms`](crate::types::TurnSummary) — the
152        /// flattened envelope previously encoded this as a nested
153        /// `{"secs":..,"nanos":..}` object, inconsistent with the rest of the
154        /// streaming contract. The Rust field keeps the `Duration` type.
155        #[serde(rename = "duration_ms", with = "duration_ms_serde")]
156        duration: Duration,
157    },
158
159    /// An error occurred during execution
160    Error { message: String, recoverable: bool },
161
162    /// Auto-retry was initiated for a transient LLM error (rate
163    /// limit, server error, etc.). The `delay_ms` field gives the
164    /// runtime's chosen backoff before re-attempting; consumers
165    /// can render a "Retrying X/N in Ys…" indicator and clear it
166    /// on the matching `AutoRetryEnd`.
167    AutoRetryStart {
168        /// 1-based retry attempt number (first retry = 1).
169        attempt: u32,
170        /// Maximum retry attempts configured for this run.
171        max_attempts: u32,
172        /// Backoff before the next attempt in milliseconds.
173        delay_ms: u64,
174        /// Human-readable reason the retry was triggered.
175        error_message: String,
176    },
177
178    /// Auto-retry settled. `success = true` means a subsequent
179    /// attempt succeeded; `success = false` means the retry budget
180    /// was exhausted and `final_error` carries the last error.
181    AutoRetryEnd {
182        /// Total attempts performed (matches the last
183        /// `AutoRetryStart`'s `attempt`).
184        attempt: u32,
185        /// Whether a follow-up attempt eventually succeeded.
186        success: bool,
187        /// Last error when the retry budget ran out.
188        final_error: Option<String>,
189    },
190
191    /// The model refused the request (safety/policy).
192    Refusal {
193        message_id: String,
194        text: Option<String>,
195    },
196
197    /// The run was cancelled via its [`CancellationToken`].
198    ///
199    /// This is a **terminal** event, emitted exactly once on every
200    /// cancellation return site (mirroring [`AgentEvent::Done`] and
201    /// [`AgentEvent::Refusal`]). Cancellation can land at the top of a
202    /// turn, mid-stream while the model is still producing tokens,
203    /// while a tool is in flight, or during context compaction — in
204    /// every case the run closes with this event so a streaming
205    /// consumer receives a closing marker and never hangs waiting for
206    /// `Done`.
207    ///
208    /// `turn` is the turn number reached when the cancel was honored
209    /// and `usage` is the partial token usage accumulated so far.
210    ///
211    /// [`CancellationToken`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.CancellationToken.html
212    Cancelled { turn: usize, usage: TokenUsage },
213
214    /// Context was compacted to reduce size
215    ContextCompacted {
216        /// Number of messages before compaction
217        original_count: usize,
218        /// Number of messages after compaction
219        new_count: usize,
220        /// Estimated tokens before compaction
221        original_tokens: usize,
222        /// Estimated tokens after compaction
223        new_tokens: usize,
224    },
225
226    /// Progress update from a running subagent
227    SubagentProgress {
228        /// ID of the parent tool call that spawned this subagent
229        subagent_id: String,
230        /// Name of the subagent (e.g., "explore", "plan")
231        subagent_name: String,
232        /// Human-friendly nickname assigned by the parent (e.g., "Zara")
233        nickname: Option<String>,
234        /// Durable child thread reference, when available.
235        child_thread_id: Option<ThreadId>,
236        /// Durable child root task reference, when available.
237        child_root_task_id: Option<String>,
238        /// Durable parent-visible invocation task reference, when available.
239        subagent_task_id: Option<String>,
240        /// Maximum turns configured for this subagent
241        max_turns: Option<u32>,
242        /// Current turn number of the subagent
243        current_turn: Option<u32>,
244        /// Model being used by the subagent
245        model: Option<String>,
246        /// Summary label associated with the latest subagent update.
247        tool_name: String,
248        /// Brief context associated with the latest subagent update.
249        tool_context: String,
250        /// Whether the summarized update represents terminal completion.
251        completed: bool,
252        /// Whether the subagent succeeded (only meaningful if completed)
253        success: bool,
254        /// Current total tool count for this subagent
255        tool_count: u32,
256        /// Current total tokens used by this subagent
257        total_tokens: u64,
258    },
259}
260
261impl AgentEvent {
262    #[must_use]
263    pub const fn start(thread_id: ThreadId, turn: usize) -> Self {
264        Self::Start { thread_id, turn }
265    }
266
267    #[must_use]
268    pub const fn user_input(thread_id: ThreadId, content: Vec<ContentBlock>) -> Self {
269        Self::UserInput { thread_id, content }
270    }
271
272    #[must_use]
273    pub fn thinking(message_id: impl Into<String>, text: impl Into<String>) -> Self {
274        Self::Thinking {
275            message_id: message_id.into(),
276            text: text.into(),
277        }
278    }
279
280    #[must_use]
281    pub fn thinking_delta(message_id: impl Into<String>, delta: impl Into<String>) -> Self {
282        Self::ThinkingDelta {
283            message_id: message_id.into(),
284            delta: delta.into(),
285        }
286    }
287
288    #[must_use]
289    pub fn text_delta(message_id: impl Into<String>, delta: impl Into<String>) -> Self {
290        Self::TextDelta {
291            message_id: message_id.into(),
292            delta: delta.into(),
293        }
294    }
295
296    #[must_use]
297    pub fn text(message_id: impl Into<String>, text: impl Into<String>) -> Self {
298        Self::Text {
299            message_id: message_id.into(),
300            text: text.into(),
301        }
302    }
303
304    #[must_use]
305    pub fn tool_call_start(
306        id: impl Into<String>,
307        name: impl Into<String>,
308        display_name: impl Into<String>,
309        input: serde_json::Value,
310        tier: ToolTier,
311    ) -> Self {
312        Self::ToolCallStart {
313            id: id.into(),
314            name: name.into(),
315            display_name: display_name.into(),
316            input,
317            tier,
318        }
319    }
320
321    #[must_use]
322    pub fn tool_call_end(
323        id: impl Into<String>,
324        name: impl Into<String>,
325        display_name: impl Into<String>,
326        result: ToolResult,
327    ) -> Self {
328        Self::ToolCallEnd {
329            id: id.into(),
330            name: name.into(),
331            display_name: display_name.into(),
332            result,
333        }
334    }
335
336    #[must_use]
337    pub fn tool_progress(
338        id: impl Into<String>,
339        name: impl Into<String>,
340        display_name: impl Into<String>,
341        stage: impl Into<String>,
342        message: impl Into<String>,
343        data: Option<serde_json::Value>,
344    ) -> Self {
345        Self::ToolProgress {
346            id: id.into(),
347            name: name.into(),
348            display_name: display_name.into(),
349            stage: stage.into(),
350            message: message.into(),
351            data,
352        }
353    }
354
355    #[must_use]
356    pub fn tool_requires_confirmation(
357        id: impl Into<String>,
358        name: impl Into<String>,
359        display_name: impl Into<String>,
360        input: serde_json::Value,
361        description: impl Into<String>,
362    ) -> Self {
363        Self::ToolRequiresConfirmation {
364            id: id.into(),
365            name: name.into(),
366            display_name: display_name.into(),
367            input,
368            description: description.into(),
369        }
370    }
371
372    #[must_use]
373    pub const fn done(
374        thread_id: ThreadId,
375        total_turns: usize,
376        total_usage: TokenUsage,
377        duration: Duration,
378    ) -> Self {
379        Self::Done {
380            thread_id,
381            total_turns,
382            total_usage,
383            duration,
384        }
385    }
386
387    #[must_use]
388    pub fn error(message: impl Into<String>, recoverable: bool) -> Self {
389        Self::Error {
390            message: message.into(),
391            recoverable,
392        }
393    }
394
395    #[must_use]
396    pub fn refusal(message_id: impl Into<String>, text: Option<String>) -> Self {
397        Self::Refusal {
398            message_id: message_id.into(),
399            text,
400        }
401    }
402
403    #[must_use]
404    pub const fn cancelled(turn: usize, usage: TokenUsage) -> Self {
405        Self::Cancelled { turn, usage }
406    }
407
408    #[must_use]
409    pub const fn context_compacted(
410        original_count: usize,
411        new_count: usize,
412        original_tokens: usize,
413        new_tokens: usize,
414    ) -> Self {
415        Self::ContextCompacted {
416            original_count,
417            new_count,
418            original_tokens,
419            new_tokens,
420        }
421    }
422}
423
424/// Monotonically increasing per-run counter for event ordering.
425///
426/// Each `run()` or `run_turn()` call creates a fresh counter starting at 0.
427/// The counter is `Arc`-wrapped so it can be shared across tasks (e.g., subagent
428/// progress events sent from child tokio tasks).
429///
430/// `Ordering::Relaxed` is sufficient because the mpsc channel provides the
431/// happens-before ordering guarantee between sender and receiver.
432#[derive(Clone, Debug)]
433pub struct SequenceCounter(Arc<AtomicU64>);
434
435impl SequenceCounter {
436    /// Create a new counter starting at 0.
437    #[must_use]
438    pub fn new() -> Self {
439        Self(Arc::new(AtomicU64::new(0)))
440    }
441
442    /// Create a counter starting at the given offset.
443    ///
444    /// Used by server mode to resume sequencing across turns within
445    /// the same thread — the server seeds the counter with the last
446    /// known sequence value so numbering is continuous.
447    #[must_use]
448    pub fn with_offset(start: u64) -> Self {
449        Self(Arc::new(AtomicU64::new(start)))
450    }
451
452    /// Get the next sequence number, incrementing the counter.
453    #[must_use]
454    pub fn next(&self) -> u64 {
455        self.0.fetch_add(1, Ordering::Relaxed)
456    }
457}
458
459impl Default for SequenceCounter {
460    fn default() -> Self {
461        Self::new()
462    }
463}
464
465/// Envelope wrapping every [`AgentEvent`] with idempotency metadata.
466///
467/// Mobile clients can use `event_id` for deduplication on retry, `sequence`
468/// for ordering after persistence, and `timestamp` for display.
469///
470/// The `event` field is flattened in JSON so that `event_id`, `sequence`,
471/// `timestamp`, and the event's `type` discriminant all appear at the same level.
472#[derive(Clone, Debug, Serialize, Deserialize)]
473pub struct AgentEventEnvelope {
474    /// Unique identifier for this event emission.
475    ///
476    /// UUID v4 when created via [`AgentEventEnvelope::wrap`] (SDK-local path),
477    /// UUID v7 when created via server-committed `CommittedEvent::into_envelope`.
478    pub event_id: uuid::Uuid,
479    /// Monotonically increasing sequence number within a single run.
480    pub sequence: u64,
481    /// UTC timestamp of when the event was emitted.
482    #[serde(with = "time::serde::rfc3339")]
483    pub timestamp: OffsetDateTime,
484    /// The actual event payload.
485    #[serde(flatten)]
486    pub event: AgentEvent,
487}
488
489impl AgentEventEnvelope {
490    /// Wrap an [`AgentEvent`] in an envelope, assigning it a unique ID,
491    /// the next sequence number, and the current UTC timestamp.
492    #[must_use]
493    pub fn wrap(event: AgentEvent, seq: &SequenceCounter) -> Self {
494        Self {
495            event_id: uuid::Uuid::new_v4(),
496            sequence: seq.next(),
497            timestamp: OffsetDateTime::now_utc(),
498            event,
499        }
500    }
501}
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506    use std::collections::HashSet;
507
508    // ===================
509    // SequenceCounter
510    // ===================
511
512    #[test]
513    fn sequence_counter_starts_at_zero() {
514        let seq = SequenceCounter::new();
515        assert_eq!(seq.next(), 0);
516    }
517
518    #[test]
519    fn sequence_counter_increments_monotonically() {
520        let seq = SequenceCounter::new();
521        for expected in 0..100 {
522            assert_eq!(seq.next(), expected);
523        }
524    }
525
526    #[test]
527    fn sequence_counter_no_gaps() {
528        let seq = SequenceCounter::new();
529        let values: Vec<u64> = (0..50).map(|_| seq.next()).collect();
530        let expected: Vec<u64> = (0..50).collect();
531        assert_eq!(values, expected);
532    }
533
534    #[test]
535    fn sequence_counter_clones_share_state() {
536        let seq = SequenceCounter::new();
537        let clone = seq.clone();
538
539        assert_eq!(seq.next(), 0);
540        assert_eq!(clone.next(), 1);
541        assert_eq!(seq.next(), 2);
542    }
543
544    #[test]
545    fn sequence_counter_default_starts_at_zero() {
546        let seq = SequenceCounter::default();
547        assert_eq!(seq.next(), 0);
548    }
549
550    #[test]
551    fn sequence_counter_with_offset_starts_at_given_value() {
552        let seq = SequenceCounter::with_offset(42);
553        assert_eq!(seq.next(), 42);
554        assert_eq!(seq.next(), 43);
555        assert_eq!(seq.next(), 44);
556    }
557
558    #[test]
559    fn sequence_counter_with_offset_zero_same_as_new() {
560        let seq = SequenceCounter::with_offset(0);
561        assert_eq!(seq.next(), 0);
562        assert_eq!(seq.next(), 1);
563    }
564
565    #[tokio::test]
566    async fn sequence_counter_unique_across_concurrent_tasks() {
567        let seq = SequenceCounter::new();
568        let n = 1000;
569
570        let mut handles = Vec::new();
571        for _ in 0..n {
572            let seq_clone = seq.clone();
573            handles.push(tokio::spawn(async move { seq_clone.next() }));
574        }
575
576        let mut values = HashSet::new();
577        for handle in handles {
578            let val = handle.await.unwrap();
579            assert!(values.insert(val), "duplicate sequence number: {val}");
580        }
581
582        assert_eq!(values.len(), n);
583        // All values should be in [0, n)
584        for v in &values {
585            assert!(*v < n as u64);
586        }
587    }
588
589    // ===================
590    // AgentEventEnvelope
591    // ===================
592
593    fn sample_event() -> AgentEvent {
594        AgentEvent::text("msg_1", "hello")
595    }
596
597    #[test]
598    fn wrap_assigns_unique_event_ids() {
599        let seq = SequenceCounter::new();
600        let ids: HashSet<uuid::Uuid> = (0..100)
601            .map(|_| AgentEventEnvelope::wrap(sample_event(), &seq).event_id)
602            .collect();
603        assert_eq!(ids.len(), 100);
604    }
605
606    #[test]
607    fn wrap_event_id_is_valid_uuid_v4() {
608        let seq = SequenceCounter::new();
609        let envelope = AgentEventEnvelope::wrap(sample_event(), &seq);
610        assert_eq!(envelope.event_id.get_version(), Some(uuid::Version::Random));
611    }
612
613    #[test]
614    fn wrap_assigns_incrementing_sequences() {
615        let seq = SequenceCounter::new();
616        let envelopes: Vec<AgentEventEnvelope> = (0..10)
617            .map(|_| AgentEventEnvelope::wrap(sample_event(), &seq))
618            .collect();
619
620        for (i, env) in envelopes.iter().enumerate() {
621            assert_eq!(env.sequence, i as u64);
622        }
623    }
624
625    #[test]
626    fn wrap_timestamps_are_non_decreasing() {
627        let seq = SequenceCounter::new();
628        let envelopes: Vec<AgentEventEnvelope> = (0..20)
629            .map(|_| AgentEventEnvelope::wrap(sample_event(), &seq))
630            .collect();
631
632        for pair in envelopes.windows(2) {
633            assert!(pair[1].timestamp >= pair[0].timestamp);
634        }
635    }
636
637    #[test]
638    fn wrap_preserves_inner_event() {
639        let seq = SequenceCounter::new();
640        let envelope = AgentEventEnvelope::wrap(AgentEvent::text("msg_42", "content"), &seq);
641        match &envelope.event {
642            AgentEvent::Text { message_id, text } => {
643                assert_eq!(message_id, "msg_42");
644                assert_eq!(text, "content");
645            }
646            other => panic!("expected Text, got {other:?}"),
647        }
648    }
649
650    #[test]
651    fn separate_counters_produce_independent_sequences() {
652        let seq_a = SequenceCounter::new();
653        let seq_b = SequenceCounter::new();
654
655        let a0 = AgentEventEnvelope::wrap(sample_event(), &seq_a);
656        let b0 = AgentEventEnvelope::wrap(sample_event(), &seq_b);
657        let a1 = AgentEventEnvelope::wrap(sample_event(), &seq_a);
658        let b1 = AgentEventEnvelope::wrap(sample_event(), &seq_b);
659
660        // Both start at 0 independently
661        assert_eq!(a0.sequence, 0);
662        assert_eq!(b0.sequence, 0);
663        assert_eq!(a1.sequence, 1);
664        assert_eq!(b1.sequence, 1);
665
666        // But event_ids are still globally unique
667        let ids: HashSet<uuid::Uuid> = [&a0, &b0, &a1, &b1].iter().map(|e| e.event_id).collect();
668        assert_eq!(ids.len(), 4);
669    }
670
671    // ===================
672    // Serialization
673    // ===================
674
675    #[test]
676    fn envelope_serializes_flat_json() {
677        let seq = SequenceCounter::new();
678        let envelope = AgentEventEnvelope::wrap(AgentEvent::text("msg_1", "hi"), &seq);
679        let json: serde_json::Value = serde_json::to_value(&envelope).expect("serialize");
680
681        // Top-level fields from the envelope
682        assert!(json.get("event_id").is_some());
683        assert!(json.get("sequence").is_some());
684        assert!(json.get("timestamp").is_some());
685
686        // Flattened event fields at the same level
687        assert_eq!(json.get("type").and_then(|v| v.as_str()), Some("text"));
688        assert_eq!(
689            json.get("message_id").and_then(|v| v.as_str()),
690            Some("msg_1")
691        );
692        assert_eq!(json.get("text").and_then(|v| v.as_str()), Some("hi"));
693
694        // No nested "event" key
695        assert!(json.get("event").is_none());
696    }
697
698    #[test]
699    fn envelope_event_id_does_not_collide_with_tool_id() {
700        let seq = SequenceCounter::new();
701        let envelope = AgentEventEnvelope::wrap(
702            AgentEvent::tool_call_start(
703                "tool_123",
704                "bash",
705                "Bash",
706                serde_json::json!({}),
707                ToolTier::Observe,
708            ),
709            &seq,
710        );
711        let json: serde_json::Value = serde_json::to_value(&envelope).expect("serialize");
712
713        // Both `event_id` and tool `id` are present and distinct
714        let event_id = json.get("event_id").and_then(|v| v.as_str()).unwrap();
715        let tool_id = json.get("id").and_then(|v| v.as_str()).unwrap();
716        assert_ne!(event_id, tool_id);
717        assert_eq!(tool_id, "tool_123");
718    }
719
720    #[test]
721    fn envelope_roundtrip_serde() {
722        let seq = SequenceCounter::new();
723        let original = AgentEventEnvelope::wrap(AgentEvent::text("msg_1", "hello"), &seq);
724
725        let json_str = serde_json::to_string(&original).expect("serialize");
726        let restored: AgentEventEnvelope = serde_json::from_str(&json_str).expect("deserialize");
727
728        assert_eq!(restored.event_id, original.event_id);
729        assert_eq!(restored.sequence, original.sequence);
730        assert_eq!(restored.timestamp, original.timestamp);
731        match &restored.event {
732            AgentEvent::Text { message_id, text } => {
733                assert_eq!(message_id, "msg_1");
734                assert_eq!(text, "hello");
735            }
736            other => panic!("expected Text, got {other:?}"),
737        }
738    }
739
740    #[test]
741    fn envelope_sequence_is_u64_in_json() {
742        let seq = SequenceCounter::new();
743        let envelope = AgentEventEnvelope::wrap(sample_event(), &seq);
744        let json: serde_json::Value = serde_json::to_value(&envelope).expect("serialize");
745
746        assert!(json.get("sequence").unwrap().is_u64());
747        assert_eq!(json.get("sequence").unwrap().as_u64(), Some(0));
748    }
749
750    #[test]
751    fn envelope_timestamp_is_rfc3339_string() {
752        let seq = SequenceCounter::new();
753        let envelope = AgentEventEnvelope::wrap(sample_event(), &seq);
754        let json: serde_json::Value = serde_json::to_value(&envelope).expect("serialize");
755
756        let ts_str = json.get("timestamp").unwrap().as_str().unwrap();
757        // Should parse as RFC 3339
758        time::OffsetDateTime::parse(ts_str, &time::format_description::well_known::Rfc3339)
759            .expect("timestamp should be valid RFC 3339");
760    }
761
762    #[test]
763    fn done_event_serializes_duration_as_millis() -> serde_json::Result<()> {
764        let seq = SequenceCounter::new();
765        let envelope = AgentEventEnvelope::wrap(
766            AgentEvent::done(
767                ThreadId::from_string("t"),
768                3,
769                TokenUsage::default(),
770                Duration::from_millis(2500),
771            ),
772            &seq,
773        );
774        let json = serde_json::to_value(&envelope)?;
775
776        // Flat millisecond integer, matching `TurnSummary::duration_ms` — not
777        // the old nested `{"secs":..,"nanos":..}` object under `duration`.
778        assert_eq!(
779            json.get("duration_ms").and_then(serde_json::Value::as_u64),
780            Some(2500)
781        );
782        assert!(
783            json.get("duration").is_none(),
784            "old `duration` key must be gone: {json}"
785        );
786
787        let restored: AgentEventEnvelope = serde_json::from_value(json)?;
788        match restored.event {
789            AgentEvent::Done { duration, .. } => {
790                assert_eq!(duration, Duration::from_millis(2500));
791            }
792            other => panic!("expected Done, got {other:?}"),
793        }
794        Ok(())
795    }
796
797    /// One representative value of every [`AgentEvent`] variant, so the
798    /// envelope round-trip test exercises the full streaming contract.
799    ///
800    /// The variants are produced by a few cohesive builders concatenated
801    /// in the original order, so the round-trip test still sees the exact
802    /// same set of values.
803    fn sample_all_variants() -> Vec<AgentEvent> {
804        let thread = ThreadId::from_string("thread-1");
805        let usage = TokenUsage::default();
806        let mut events = session_open_events(&thread);
807        events.extend(streamed_content_events());
808        events.extend(tool_call_events());
809        events.extend(turn_completion_events(&thread, &usage));
810        events.extend(failure_and_retry_events());
811        events.extend(auxiliary_events(&usage));
812        events
813    }
814
815    /// `Start` / `UserInput`: the events opening a thread turn.
816    fn session_open_events(thread: &ThreadId) -> Vec<AgentEvent> {
817        vec![
818            AgentEvent::Start {
819                thread_id: thread.clone(),
820                turn: 1,
821            },
822            AgentEvent::UserInput {
823                thread_id: thread.clone(),
824                content: vec![ContentBlock::Text { text: "hi".into() }],
825            },
826        ]
827    }
828
829    /// Streamed assistant content: consolidated and delta forms of
830    /// thinking and text.
831    fn streamed_content_events() -> Vec<AgentEvent> {
832        vec![
833            AgentEvent::Thinking {
834                message_id: "m".into(),
835                text: "t".into(),
836            },
837            AgentEvent::ThinkingDelta {
838                message_id: "m".into(),
839                delta: "d".into(),
840            },
841            AgentEvent::TextDelta {
842                message_id: "m".into(),
843                delta: "d".into(),
844            },
845            AgentEvent::Text {
846                message_id: "m".into(),
847                text: "t".into(),
848            },
849        ]
850    }
851
852    /// Tool-call lifecycle: start, end, progress, and confirmation.
853    fn tool_call_events() -> Vec<AgentEvent> {
854        vec![
855            AgentEvent::ToolCallStart {
856                id: "id".into(),
857                name: "n".into(),
858                display_name: "N".into(),
859                input: serde_json::json!({}),
860                tier: ToolTier::Observe,
861            },
862            AgentEvent::ToolCallEnd {
863                id: "id".into(),
864                name: "n".into(),
865                display_name: "N".into(),
866                result: ToolResult::success("ok"),
867            },
868            AgentEvent::ToolProgress {
869                id: "id".into(),
870                name: "n".into(),
871                display_name: "N".into(),
872                stage: "s".into(),
873                message: "m".into(),
874                data: None,
875            },
876            AgentEvent::ToolRequiresConfirmation {
877                id: "id".into(),
878                name: "n".into(),
879                display_name: "N".into(),
880                input: serde_json::json!({}),
881                description: "d".into(),
882            },
883        ]
884    }
885
886    /// Turn-completion summaries: `TurnComplete` and the terminal `Done`.
887    fn turn_completion_events(thread: &ThreadId, usage: &TokenUsage) -> Vec<AgentEvent> {
888        vec![
889            AgentEvent::TurnComplete {
890                turn: 1,
891                usage: usage.clone(),
892            },
893            AgentEvent::Done {
894                thread_id: thread.clone(),
895                total_turns: 2,
896                total_usage: usage.clone(),
897                duration: Duration::from_millis(1500),
898            },
899        ]
900    }
901
902    /// Error and auto-retry signalling events.
903    fn failure_and_retry_events() -> Vec<AgentEvent> {
904        vec![
905            AgentEvent::Error {
906                message: "e".into(),
907                recoverable: true,
908            },
909            AgentEvent::AutoRetryStart {
910                attempt: 1,
911                max_attempts: 5,
912                delay_ms: 100,
913                error_message: "rate limited".into(),
914            },
915            AgentEvent::AutoRetryEnd {
916                attempt: 1,
917                success: true,
918                final_error: None,
919            },
920        ]
921    }
922
923    /// Remaining auxiliary events: refusal, cancellation, compaction,
924    /// and subagent progress.
925    fn auxiliary_events(usage: &TokenUsage) -> Vec<AgentEvent> {
926        vec![
927            AgentEvent::Refusal {
928                message_id: "m".into(),
929                text: Some("no".into()),
930            },
931            AgentEvent::Cancelled {
932                turn: 1,
933                usage: usage.clone(),
934            },
935            AgentEvent::ContextCompacted {
936                original_count: 10,
937                new_count: 5,
938                original_tokens: 100,
939                new_tokens: 50,
940            },
941            AgentEvent::SubagentProgress {
942                subagent_id: "s".into(),
943                subagent_name: "explore".into(),
944                nickname: None,
945                child_thread_id: None,
946                child_root_task_id: None,
947                subagent_task_id: None,
948                max_turns: None,
949                current_turn: None,
950                model: None,
951                tool_name: "t".into(),
952                tool_context: "c".into(),
953                completed: false,
954                success: false,
955                tool_count: 0,
956                total_tokens: 0,
957            },
958        ]
959    }
960
961    #[test]
962    fn every_variant_envelope_has_flat_keys_and_round_trips() -> serde_json::Result<()> {
963        let seq = SequenceCounter::new();
964        for event in sample_all_variants() {
965            let label = format!("{event:?}");
966            let envelope = AgentEventEnvelope::wrap(event, &seq);
967            let json = serde_json::to_value(&envelope)?;
968
969            // Envelope metadata + the event discriminant are all flat keys.
970            for key in ["event_id", "sequence", "timestamp", "type"] {
971                assert!(
972                    json.get(key).is_some(),
973                    "{label}: missing flat key `{key}` in {json}"
974                );
975            }
976            // The `#[serde(flatten)]` must not leave a nested wrapper, and no
977            // variant field may collide with an envelope key.
978            assert!(
979                json.get("event").is_none(),
980                "{label}: unexpected nested `event` key in {json}"
981            );
982
983            let restored: AgentEventEnvelope = serde_json::from_value(json.clone())?;
984            assert_eq!(
985                serde_json::to_value(&restored)?,
986                json,
987                "{label}: envelope round-trip changed the wire form"
988            );
989        }
990        Ok(())
991    }
992}