Skip to main content

agent_sdk_foundation/
events.rs

1//! Agent events for real-time streaming.
2//!
3//! The [`AgentEvent`] enum represents all events that can occur during agent
4//! execution. These events are streamed via an async channel for real-time
5//! UI updates and logging.
6//!
7//! # Event Flow
8//!
9//! A typical event sequence looks like:
10//! 1. `Start` - Agent begins processing
11//! 2. `Text` / `ToolCallStart` / `ToolCallEnd` - Processing events
12//! 3. `TurnComplete` - One LLM round-trip finished
13//! 4. `Done` - Agent completed successfully, or `Error` if failed
14
15use crate::llm::ContentBlock;
16use crate::types::{ThreadId, TokenUsage, ToolResult, ToolTier};
17use serde::{Deserialize, Serialize};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20use std::time::Duration;
21use time::OffsetDateTime;
22
23/// Events emitted by the agent loop during execution.
24/// These are streamed to the client for real-time UI updates.
25#[derive(Clone, Debug, Serialize, Deserialize)]
26#[serde(tag = "type", rename_all = "snake_case")]
27#[non_exhaustive]
28pub enum AgentEvent {
29    /// Agent loop has started
30    Start { thread_id: ThreadId, turn: usize },
31
32    /// The user prompt that opens a turn.
33    ///
34    /// Committed by the worker on the **first attempt** of each
35    /// root-turn task, immediately before the matching
36    /// [`AgentEvent::Start`]. Carries the task's admitted
37    /// `submitted_input` lifted into the LLM-content shape
38    /// (`Vec<ContentBlock>` — text, image, document) so consumers
39    /// can render the prompt without reaching into the projection
40    /// or the task store. Retries of the same turn do not re-emit
41    /// the event; downstream readers can pair `UserInput` 1:1
42    /// with the *first* `Start { turn: N }` per turn.
43    ///
44    /// This is the durable, sequence-numbered admission event the
45    /// projection never carried — `MessageProjection::messages`
46    /// still holds the same prompt as an `llm::Message`, but it
47    /// has no sequence and commingles with tool-result and
48    /// compaction-summary user-role rows. Replay clients that
49    /// need a clean, chronological "this is what the user typed"
50    /// signal read this event instead.
51    UserInput {
52        thread_id: ThreadId,
53        /// Lifted from the admitted task's
54        /// `submitted_input`. Only `Text`, `Image`, and `Document`
55        /// blocks appear — the runtime never admits user prompts
56        /// containing tool blocks, but the broader
57        /// `ContentBlock` type lets the field round-trip through
58        /// the same wire shapes the projection uses.
59        content: Vec<ContentBlock>,
60    },
61
62    /// Agent is "thinking" - complete thinking text after stream ends
63    Thinking { message_id: String, text: String },
64
65    /// A thinking delta for streaming thinking content
66    ThinkingDelta { message_id: String, delta: String },
67
68    /// A text delta for streaming responses
69    TextDelta { message_id: String, delta: String },
70
71    /// Complete text block from the agent
72    Text { message_id: String, text: String },
73
74    /// Agent is about to call a tool
75    ToolCallStart {
76        id: String,
77        name: String,
78        display_name: String,
79        input: serde_json::Value,
80        tier: ToolTier,
81    },
82
83    /// Tool execution completed
84    ToolCallEnd {
85        id: String,
86        name: String,
87        display_name: String,
88        result: ToolResult,
89    },
90
91    /// Progress update from an async tool operation
92    ToolProgress {
93        /// Tool call ID
94        id: String,
95        /// Tool name
96        name: String,
97        /// Human-readable display name
98        display_name: String,
99        /// Progress stage
100        stage: String,
101        /// Human-readable progress message
102        message: String,
103        /// Optional tool-specific data
104        data: Option<serde_json::Value>,
105    },
106
107    /// Tool requires confirmation before execution.
108    /// The application determines the confirmation type (normal, PIN, biometric).
109    ToolRequiresConfirmation {
110        id: String,
111        name: String,
112        display_name: String,
113        input: serde_json::Value,
114        description: String,
115    },
116
117    /// Agent turn completed (one LLM round-trip)
118    TurnComplete { turn: usize, usage: TokenUsage },
119
120    /// Agent loop completed successfully
121    Done {
122        thread_id: ThreadId,
123        total_turns: usize,
124        total_usage: TokenUsage,
125        duration: Duration,
126    },
127
128    /// An error occurred during execution
129    Error { message: String, recoverable: bool },
130
131    /// Auto-retry was initiated for a transient LLM error (rate
132    /// limit, server error, etc.). The `delay_ms` field gives the
133    /// runtime's chosen backoff before re-attempting; consumers
134    /// can render a "Retrying X/N in Ys…" indicator and clear it
135    /// on the matching `AutoRetryEnd`.
136    AutoRetryStart {
137        /// 1-based retry attempt number (first retry = 1).
138        attempt: u32,
139        /// Maximum retry attempts configured for this run.
140        max_attempts: u32,
141        /// Backoff before the next attempt in milliseconds.
142        delay_ms: u64,
143        /// Human-readable reason the retry was triggered.
144        error_message: String,
145    },
146
147    /// Auto-retry settled. `success = true` means a subsequent
148    /// attempt succeeded; `success = false` means the retry budget
149    /// was exhausted and `final_error` carries the last error.
150    AutoRetryEnd {
151        /// Total attempts performed (matches the last
152        /// `AutoRetryStart`'s `attempt`).
153        attempt: u32,
154        /// Whether a follow-up attempt eventually succeeded.
155        success: bool,
156        /// Last error when the retry budget ran out.
157        final_error: Option<String>,
158    },
159
160    /// The model refused the request (safety/policy).
161    Refusal {
162        message_id: String,
163        text: Option<String>,
164    },
165
166    /// The run was cancelled via its [`CancellationToken`].
167    ///
168    /// This is a **terminal** event, emitted exactly once on every
169    /// cancellation return site (mirroring [`AgentEvent::Done`] and
170    /// [`AgentEvent::Refusal`]). Cancellation can land at the top of a
171    /// turn, mid-stream while the model is still producing tokens,
172    /// while a tool is in flight, or during context compaction — in
173    /// every case the run closes with this event so a streaming
174    /// consumer receives a closing marker and never hangs waiting for
175    /// `Done`.
176    ///
177    /// `turn` is the turn number reached when the cancel was honored
178    /// and `usage` is the partial token usage accumulated so far.
179    ///
180    /// [`CancellationToken`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.CancellationToken.html
181    Cancelled { turn: usize, usage: TokenUsage },
182
183    /// Context was compacted to reduce size
184    ContextCompacted {
185        /// Number of messages before compaction
186        original_count: usize,
187        /// Number of messages after compaction
188        new_count: usize,
189        /// Estimated tokens before compaction
190        original_tokens: usize,
191        /// Estimated tokens after compaction
192        new_tokens: usize,
193    },
194
195    /// Progress update from a running subagent
196    SubagentProgress {
197        /// ID of the parent tool call that spawned this subagent
198        subagent_id: String,
199        /// Name of the subagent (e.g., "explore", "plan")
200        subagent_name: String,
201        /// Human-friendly nickname assigned by the parent (e.g., "Zara")
202        nickname: Option<String>,
203        /// Durable child thread reference, when available.
204        child_thread_id: Option<ThreadId>,
205        /// Durable child root task reference, when available.
206        child_root_task_id: Option<String>,
207        /// Durable parent-visible invocation task reference, when available.
208        subagent_task_id: Option<String>,
209        /// Maximum turns configured for this subagent
210        max_turns: Option<u32>,
211        /// Current turn number of the subagent
212        current_turn: Option<u32>,
213        /// Model being used by the subagent
214        model: Option<String>,
215        /// Summary label associated with the latest subagent update.
216        tool_name: String,
217        /// Brief context associated with the latest subagent update.
218        tool_context: String,
219        /// Whether the summarized update represents terminal completion.
220        completed: bool,
221        /// Whether the subagent succeeded (only meaningful if completed)
222        success: bool,
223        /// Current total tool count for this subagent
224        tool_count: u32,
225        /// Current total tokens used by this subagent
226        total_tokens: u64,
227    },
228}
229
230impl AgentEvent {
231    #[must_use]
232    pub const fn start(thread_id: ThreadId, turn: usize) -> Self {
233        Self::Start { thread_id, turn }
234    }
235
236    #[must_use]
237    pub const fn user_input(thread_id: ThreadId, content: Vec<ContentBlock>) -> Self {
238        Self::UserInput { thread_id, content }
239    }
240
241    #[must_use]
242    pub fn thinking(message_id: impl Into<String>, text: impl Into<String>) -> Self {
243        Self::Thinking {
244            message_id: message_id.into(),
245            text: text.into(),
246        }
247    }
248
249    #[must_use]
250    pub fn thinking_delta(message_id: impl Into<String>, delta: impl Into<String>) -> Self {
251        Self::ThinkingDelta {
252            message_id: message_id.into(),
253            delta: delta.into(),
254        }
255    }
256
257    #[must_use]
258    pub fn text_delta(message_id: impl Into<String>, delta: impl Into<String>) -> Self {
259        Self::TextDelta {
260            message_id: message_id.into(),
261            delta: delta.into(),
262        }
263    }
264
265    #[must_use]
266    pub fn text(message_id: impl Into<String>, text: impl Into<String>) -> Self {
267        Self::Text {
268            message_id: message_id.into(),
269            text: text.into(),
270        }
271    }
272
273    #[must_use]
274    pub fn tool_call_start(
275        id: impl Into<String>,
276        name: impl Into<String>,
277        display_name: impl Into<String>,
278        input: serde_json::Value,
279        tier: ToolTier,
280    ) -> Self {
281        Self::ToolCallStart {
282            id: id.into(),
283            name: name.into(),
284            display_name: display_name.into(),
285            input,
286            tier,
287        }
288    }
289
290    #[must_use]
291    pub fn tool_call_end(
292        id: impl Into<String>,
293        name: impl Into<String>,
294        display_name: impl Into<String>,
295        result: ToolResult,
296    ) -> Self {
297        Self::ToolCallEnd {
298            id: id.into(),
299            name: name.into(),
300            display_name: display_name.into(),
301            result,
302        }
303    }
304
305    #[must_use]
306    pub fn tool_progress(
307        id: impl Into<String>,
308        name: impl Into<String>,
309        display_name: impl Into<String>,
310        stage: impl Into<String>,
311        message: impl Into<String>,
312        data: Option<serde_json::Value>,
313    ) -> Self {
314        Self::ToolProgress {
315            id: id.into(),
316            name: name.into(),
317            display_name: display_name.into(),
318            stage: stage.into(),
319            message: message.into(),
320            data,
321        }
322    }
323
324    #[must_use]
325    pub fn tool_requires_confirmation(
326        id: impl Into<String>,
327        name: impl Into<String>,
328        display_name: impl Into<String>,
329        input: serde_json::Value,
330        description: impl Into<String>,
331    ) -> Self {
332        Self::ToolRequiresConfirmation {
333            id: id.into(),
334            name: name.into(),
335            display_name: display_name.into(),
336            input,
337            description: description.into(),
338        }
339    }
340
341    #[must_use]
342    pub const fn done(
343        thread_id: ThreadId,
344        total_turns: usize,
345        total_usage: TokenUsage,
346        duration: Duration,
347    ) -> Self {
348        Self::Done {
349            thread_id,
350            total_turns,
351            total_usage,
352            duration,
353        }
354    }
355
356    #[must_use]
357    pub fn error(message: impl Into<String>, recoverable: bool) -> Self {
358        Self::Error {
359            message: message.into(),
360            recoverable,
361        }
362    }
363
364    #[must_use]
365    pub fn refusal(message_id: impl Into<String>, text: Option<String>) -> Self {
366        Self::Refusal {
367            message_id: message_id.into(),
368            text,
369        }
370    }
371
372    #[must_use]
373    pub const fn cancelled(turn: usize, usage: TokenUsage) -> Self {
374        Self::Cancelled { turn, usage }
375    }
376
377    #[must_use]
378    pub const fn context_compacted(
379        original_count: usize,
380        new_count: usize,
381        original_tokens: usize,
382        new_tokens: usize,
383    ) -> Self {
384        Self::ContextCompacted {
385            original_count,
386            new_count,
387            original_tokens,
388            new_tokens,
389        }
390    }
391}
392
393/// Monotonically increasing per-run counter for event ordering.
394///
395/// Each `run()` or `run_turn()` call creates a fresh counter starting at 0.
396/// The counter is `Arc`-wrapped so it can be shared across tasks (e.g., subagent
397/// progress events sent from child tokio tasks).
398///
399/// `Ordering::Relaxed` is sufficient because the mpsc channel provides the
400/// happens-before ordering guarantee between sender and receiver.
401#[derive(Clone, Debug)]
402pub struct SequenceCounter(Arc<AtomicU64>);
403
404impl SequenceCounter {
405    /// Create a new counter starting at 0.
406    #[must_use]
407    pub fn new() -> Self {
408        Self(Arc::new(AtomicU64::new(0)))
409    }
410
411    /// Create a counter starting at the given offset.
412    ///
413    /// Used by server mode to resume sequencing across turns within
414    /// the same thread — the server seeds the counter with the last
415    /// known sequence value so numbering is continuous.
416    #[must_use]
417    pub fn with_offset(start: u64) -> Self {
418        Self(Arc::new(AtomicU64::new(start)))
419    }
420
421    /// Get the next sequence number, incrementing the counter.
422    #[must_use]
423    pub fn next(&self) -> u64 {
424        self.0.fetch_add(1, Ordering::Relaxed)
425    }
426}
427
428impl Default for SequenceCounter {
429    fn default() -> Self {
430        Self::new()
431    }
432}
433
434/// Envelope wrapping every [`AgentEvent`] with idempotency metadata.
435///
436/// Mobile clients can use `event_id` for deduplication on retry, `sequence`
437/// for ordering after persistence, and `timestamp` for display.
438///
439/// The `event` field is flattened in JSON so that `event_id`, `sequence`,
440/// `timestamp`, and the event's `type` discriminant all appear at the same level.
441#[derive(Clone, Debug, Serialize, Deserialize)]
442pub struct AgentEventEnvelope {
443    /// Unique identifier for this event emission.
444    ///
445    /// UUID v4 when created via [`AgentEventEnvelope::wrap`] (SDK-local path),
446    /// UUID v7 when created via server-committed `CommittedEvent::into_envelope`.
447    pub event_id: uuid::Uuid,
448    /// Monotonically increasing sequence number within a single run.
449    pub sequence: u64,
450    /// UTC timestamp of when the event was emitted.
451    #[serde(with = "time::serde::rfc3339")]
452    pub timestamp: OffsetDateTime,
453    /// The actual event payload.
454    #[serde(flatten)]
455    pub event: AgentEvent,
456}
457
458impl AgentEventEnvelope {
459    /// Wrap an [`AgentEvent`] in an envelope, assigning it a unique ID,
460    /// the next sequence number, and the current UTC timestamp.
461    #[must_use]
462    pub fn wrap(event: AgentEvent, seq: &SequenceCounter) -> Self {
463        Self {
464            event_id: uuid::Uuid::new_v4(),
465            sequence: seq.next(),
466            timestamp: OffsetDateTime::now_utc(),
467            event,
468        }
469    }
470}
471
472#[cfg(test)]
473mod tests {
474    use super::*;
475    use std::collections::HashSet;
476
477    // ===================
478    // SequenceCounter
479    // ===================
480
481    #[test]
482    fn sequence_counter_starts_at_zero() {
483        let seq = SequenceCounter::new();
484        assert_eq!(seq.next(), 0);
485    }
486
487    #[test]
488    fn sequence_counter_increments_monotonically() {
489        let seq = SequenceCounter::new();
490        for expected in 0..100 {
491            assert_eq!(seq.next(), expected);
492        }
493    }
494
495    #[test]
496    fn sequence_counter_no_gaps() {
497        let seq = SequenceCounter::new();
498        let values: Vec<u64> = (0..50).map(|_| seq.next()).collect();
499        let expected: Vec<u64> = (0..50).collect();
500        assert_eq!(values, expected);
501    }
502
503    #[test]
504    fn sequence_counter_clones_share_state() {
505        let seq = SequenceCounter::new();
506        let clone = seq.clone();
507
508        assert_eq!(seq.next(), 0);
509        assert_eq!(clone.next(), 1);
510        assert_eq!(seq.next(), 2);
511    }
512
513    #[test]
514    fn sequence_counter_default_starts_at_zero() {
515        let seq = SequenceCounter::default();
516        assert_eq!(seq.next(), 0);
517    }
518
519    #[test]
520    fn sequence_counter_with_offset_starts_at_given_value() {
521        let seq = SequenceCounter::with_offset(42);
522        assert_eq!(seq.next(), 42);
523        assert_eq!(seq.next(), 43);
524        assert_eq!(seq.next(), 44);
525    }
526
527    #[test]
528    fn sequence_counter_with_offset_zero_same_as_new() {
529        let seq = SequenceCounter::with_offset(0);
530        assert_eq!(seq.next(), 0);
531        assert_eq!(seq.next(), 1);
532    }
533
534    #[tokio::test]
535    async fn sequence_counter_unique_across_concurrent_tasks() {
536        let seq = SequenceCounter::new();
537        let n = 1000;
538
539        let mut handles = Vec::new();
540        for _ in 0..n {
541            let seq_clone = seq.clone();
542            handles.push(tokio::spawn(async move { seq_clone.next() }));
543        }
544
545        let mut values = HashSet::new();
546        for handle in handles {
547            let val = handle.await.unwrap();
548            assert!(values.insert(val), "duplicate sequence number: {val}");
549        }
550
551        assert_eq!(values.len(), n);
552        // All values should be in [0, n)
553        for v in &values {
554            assert!(*v < n as u64);
555        }
556    }
557
558    // ===================
559    // AgentEventEnvelope
560    // ===================
561
562    fn sample_event() -> AgentEvent {
563        AgentEvent::text("msg_1", "hello")
564    }
565
566    #[test]
567    fn wrap_assigns_unique_event_ids() {
568        let seq = SequenceCounter::new();
569        let ids: HashSet<uuid::Uuid> = (0..100)
570            .map(|_| AgentEventEnvelope::wrap(sample_event(), &seq).event_id)
571            .collect();
572        assert_eq!(ids.len(), 100);
573    }
574
575    #[test]
576    fn wrap_event_id_is_valid_uuid_v4() {
577        let seq = SequenceCounter::new();
578        let envelope = AgentEventEnvelope::wrap(sample_event(), &seq);
579        assert_eq!(envelope.event_id.get_version(), Some(uuid::Version::Random));
580    }
581
582    #[test]
583    fn wrap_assigns_incrementing_sequences() {
584        let seq = SequenceCounter::new();
585        let envelopes: Vec<AgentEventEnvelope> = (0..10)
586            .map(|_| AgentEventEnvelope::wrap(sample_event(), &seq))
587            .collect();
588
589        for (i, env) in envelopes.iter().enumerate() {
590            assert_eq!(env.sequence, i as u64);
591        }
592    }
593
594    #[test]
595    fn wrap_timestamps_are_non_decreasing() {
596        let seq = SequenceCounter::new();
597        let envelopes: Vec<AgentEventEnvelope> = (0..20)
598            .map(|_| AgentEventEnvelope::wrap(sample_event(), &seq))
599            .collect();
600
601        for pair in envelopes.windows(2) {
602            assert!(pair[1].timestamp >= pair[0].timestamp);
603        }
604    }
605
606    #[test]
607    fn wrap_preserves_inner_event() {
608        let seq = SequenceCounter::new();
609        let envelope = AgentEventEnvelope::wrap(AgentEvent::text("msg_42", "content"), &seq);
610        match &envelope.event {
611            AgentEvent::Text { message_id, text } => {
612                assert_eq!(message_id, "msg_42");
613                assert_eq!(text, "content");
614            }
615            other => panic!("expected Text, got {other:?}"),
616        }
617    }
618
619    #[test]
620    fn separate_counters_produce_independent_sequences() {
621        let seq_a = SequenceCounter::new();
622        let seq_b = SequenceCounter::new();
623
624        let a0 = AgentEventEnvelope::wrap(sample_event(), &seq_a);
625        let b0 = AgentEventEnvelope::wrap(sample_event(), &seq_b);
626        let a1 = AgentEventEnvelope::wrap(sample_event(), &seq_a);
627        let b1 = AgentEventEnvelope::wrap(sample_event(), &seq_b);
628
629        // Both start at 0 independently
630        assert_eq!(a0.sequence, 0);
631        assert_eq!(b0.sequence, 0);
632        assert_eq!(a1.sequence, 1);
633        assert_eq!(b1.sequence, 1);
634
635        // But event_ids are still globally unique
636        let ids: HashSet<uuid::Uuid> = [&a0, &b0, &a1, &b1].iter().map(|e| e.event_id).collect();
637        assert_eq!(ids.len(), 4);
638    }
639
640    // ===================
641    // Serialization
642    // ===================
643
644    #[test]
645    fn envelope_serializes_flat_json() {
646        let seq = SequenceCounter::new();
647        let envelope = AgentEventEnvelope::wrap(AgentEvent::text("msg_1", "hi"), &seq);
648        let json: serde_json::Value = serde_json::to_value(&envelope).expect("serialize");
649
650        // Top-level fields from the envelope
651        assert!(json.get("event_id").is_some());
652        assert!(json.get("sequence").is_some());
653        assert!(json.get("timestamp").is_some());
654
655        // Flattened event fields at the same level
656        assert_eq!(json.get("type").and_then(|v| v.as_str()), Some("text"));
657        assert_eq!(
658            json.get("message_id").and_then(|v| v.as_str()),
659            Some("msg_1")
660        );
661        assert_eq!(json.get("text").and_then(|v| v.as_str()), Some("hi"));
662
663        // No nested "event" key
664        assert!(json.get("event").is_none());
665    }
666
667    #[test]
668    fn envelope_event_id_does_not_collide_with_tool_id() {
669        let seq = SequenceCounter::new();
670        let envelope = AgentEventEnvelope::wrap(
671            AgentEvent::tool_call_start(
672                "tool_123",
673                "bash",
674                "Bash",
675                serde_json::json!({}),
676                ToolTier::Observe,
677            ),
678            &seq,
679        );
680        let json: serde_json::Value = serde_json::to_value(&envelope).expect("serialize");
681
682        // Both `event_id` and tool `id` are present and distinct
683        let event_id = json.get("event_id").and_then(|v| v.as_str()).unwrap();
684        let tool_id = json.get("id").and_then(|v| v.as_str()).unwrap();
685        assert_ne!(event_id, tool_id);
686        assert_eq!(tool_id, "tool_123");
687    }
688
689    #[test]
690    fn envelope_roundtrip_serde() {
691        let seq = SequenceCounter::new();
692        let original = AgentEventEnvelope::wrap(AgentEvent::text("msg_1", "hello"), &seq);
693
694        let json_str = serde_json::to_string(&original).expect("serialize");
695        let restored: AgentEventEnvelope = serde_json::from_str(&json_str).expect("deserialize");
696
697        assert_eq!(restored.event_id, original.event_id);
698        assert_eq!(restored.sequence, original.sequence);
699        assert_eq!(restored.timestamp, original.timestamp);
700        match &restored.event {
701            AgentEvent::Text { message_id, text } => {
702                assert_eq!(message_id, "msg_1");
703                assert_eq!(text, "hello");
704            }
705            other => panic!("expected Text, got {other:?}"),
706        }
707    }
708
709    #[test]
710    fn envelope_sequence_is_u64_in_json() {
711        let seq = SequenceCounter::new();
712        let envelope = AgentEventEnvelope::wrap(sample_event(), &seq);
713        let json: serde_json::Value = serde_json::to_value(&envelope).expect("serialize");
714
715        assert!(json.get("sequence").unwrap().is_u64());
716        assert_eq!(json.get("sequence").unwrap().as_u64(), Some(0));
717    }
718
719    #[test]
720    fn envelope_timestamp_is_rfc3339_string() {
721        let seq = SequenceCounter::new();
722        let envelope = AgentEventEnvelope::wrap(sample_event(), &seq);
723        let json: serde_json::Value = serde_json::to_value(&envelope).expect("serialize");
724
725        let ts_str = json.get("timestamp").unwrap().as_str().unwrap();
726        // Should parse as RFC 3339
727        time::OffsetDateTime::parse(ts_str, &time::format_description::well_known::Rfc3339)
728            .expect("timestamp should be valid RFC 3339");
729    }
730}