Skip to main content

agent_sdk/
events.rs

1//! Agent events for real-time streaming.
2//!
3//! The [`AgentEvent`] enum represents all events that can occur during agent
4//! execution. These events are streamed via an async channel for real-time
5//! UI updates and logging.
6//!
7//! # Event Flow
8//!
9//! A typical event sequence looks like:
10//! 1. `Start` - Agent begins processing
11//! 2. `Text` / `ToolCallStart` / `ToolCallEnd` - Processing events
12//! 3. `TurnComplete` - One LLM round-trip finished
13//! 4. `Done` - Agent completed successfully, or `Error` if failed
14
15use crate::types::{ThreadId, TokenUsage, ToolResult, ToolTier};
16use serde::{Deserialize, Serialize};
17use std::sync::Arc;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::time::Duration;
20use time::OffsetDateTime;
21
22/// Events emitted by the agent loop during execution.
23/// These are streamed to the client for real-time UI updates.
24#[derive(Clone, Debug, Serialize, Deserialize)]
25#[serde(tag = "type", rename_all = "snake_case")]
26pub enum AgentEvent {
27    /// Agent loop has started
28    Start { thread_id: ThreadId, turn: usize },
29
30    /// Agent is "thinking" - complete thinking text after stream ends
31    Thinking { message_id: String, text: String },
32
33    /// A thinking delta for streaming thinking content
34    ThinkingDelta { message_id: String, delta: String },
35
36    /// A text delta for streaming responses
37    TextDelta { message_id: String, delta: String },
38
39    /// Complete text block from the agent
40    Text { message_id: String, text: String },
41
42    /// Agent is about to call a tool
43    ToolCallStart {
44        id: String,
45        name: String,
46        display_name: String,
47        input: serde_json::Value,
48        tier: ToolTier,
49    },
50
51    /// Tool execution completed
52    ToolCallEnd {
53        id: String,
54        name: String,
55        display_name: String,
56        result: ToolResult,
57    },
58
59    /// Progress update from an async tool operation
60    ToolProgress {
61        /// Tool call ID
62        id: String,
63        /// Tool name
64        name: String,
65        /// Human-readable display name
66        display_name: String,
67        /// Progress stage
68        stage: String,
69        /// Human-readable progress message
70        message: String,
71        /// Optional tool-specific data
72        data: Option<serde_json::Value>,
73    },
74
75    /// Tool requires confirmation before execution.
76    /// The application determines the confirmation type (normal, PIN, biometric).
77    ToolRequiresConfirmation {
78        id: String,
79        name: String,
80        input: serde_json::Value,
81        description: String,
82    },
83
84    /// Agent turn completed (one LLM round-trip)
85    TurnComplete { turn: usize, usage: TokenUsage },
86
87    /// Agent loop completed successfully
88    Done {
89        thread_id: ThreadId,
90        total_turns: usize,
91        total_usage: TokenUsage,
92        duration: Duration,
93    },
94
95    /// An error occurred during execution
96    Error { message: String, recoverable: bool },
97
98    /// The model refused the request (safety/policy).
99    Refusal {
100        message_id: String,
101        text: Option<String>,
102    },
103
104    /// Context was compacted to reduce size
105    ContextCompacted {
106        /// Number of messages before compaction
107        original_count: usize,
108        /// Number of messages after compaction
109        new_count: usize,
110        /// Estimated tokens before compaction
111        original_tokens: usize,
112        /// Estimated tokens after compaction
113        new_tokens: usize,
114    },
115
116    /// Progress update from a running subagent
117    SubagentProgress {
118        /// ID of the parent tool call that spawned this subagent
119        subagent_id: String,
120        /// Name of the subagent (e.g., "explore", "plan")
121        subagent_name: String,
122        /// Tool name that just started or completed
123        tool_name: String,
124        /// Brief context for the tool (e.g., file path, pattern)
125        tool_context: String,
126        /// Whether the tool completed (false = started, true = ended)
127        completed: bool,
128        /// Whether the tool succeeded (only meaningful if completed)
129        success: bool,
130        /// Current total tool count for this subagent
131        tool_count: u32,
132        /// Current total tokens used by this subagent
133        total_tokens: u64,
134    },
135}
136
137impl AgentEvent {
138    #[must_use]
139    pub const fn start(thread_id: ThreadId, turn: usize) -> Self {
140        Self::Start { thread_id, turn }
141    }
142
143    #[must_use]
144    pub fn thinking(message_id: impl Into<String>, text: impl Into<String>) -> Self {
145        Self::Thinking {
146            message_id: message_id.into(),
147            text: text.into(),
148        }
149    }
150
151    #[must_use]
152    pub fn thinking_delta(message_id: impl Into<String>, delta: impl Into<String>) -> Self {
153        Self::ThinkingDelta {
154            message_id: message_id.into(),
155            delta: delta.into(),
156        }
157    }
158
159    #[must_use]
160    pub fn text_delta(message_id: impl Into<String>, delta: impl Into<String>) -> Self {
161        Self::TextDelta {
162            message_id: message_id.into(),
163            delta: delta.into(),
164        }
165    }
166
167    #[must_use]
168    pub fn text(message_id: impl Into<String>, text: impl Into<String>) -> Self {
169        Self::Text {
170            message_id: message_id.into(),
171            text: text.into(),
172        }
173    }
174
175    #[must_use]
176    pub fn tool_call_start(
177        id: impl Into<String>,
178        name: impl Into<String>,
179        display_name: impl Into<String>,
180        input: serde_json::Value,
181        tier: ToolTier,
182    ) -> Self {
183        Self::ToolCallStart {
184            id: id.into(),
185            name: name.into(),
186            display_name: display_name.into(),
187            input,
188            tier,
189        }
190    }
191
192    #[must_use]
193    pub fn tool_call_end(
194        id: impl Into<String>,
195        name: impl Into<String>,
196        display_name: impl Into<String>,
197        result: ToolResult,
198    ) -> Self {
199        Self::ToolCallEnd {
200            id: id.into(),
201            name: name.into(),
202            display_name: display_name.into(),
203            result,
204        }
205    }
206
207    #[must_use]
208    pub fn tool_progress(
209        id: impl Into<String>,
210        name: impl Into<String>,
211        display_name: impl Into<String>,
212        stage: impl Into<String>,
213        message: impl Into<String>,
214        data: Option<serde_json::Value>,
215    ) -> Self {
216        Self::ToolProgress {
217            id: id.into(),
218            name: name.into(),
219            display_name: display_name.into(),
220            stage: stage.into(),
221            message: message.into(),
222            data,
223        }
224    }
225
226    #[must_use]
227    pub const fn done(
228        thread_id: ThreadId,
229        total_turns: usize,
230        total_usage: TokenUsage,
231        duration: Duration,
232    ) -> Self {
233        Self::Done {
234            thread_id,
235            total_turns,
236            total_usage,
237            duration,
238        }
239    }
240
241    #[must_use]
242    pub fn error(message: impl Into<String>, recoverable: bool) -> Self {
243        Self::Error {
244            message: message.into(),
245            recoverable,
246        }
247    }
248
249    #[must_use]
250    pub fn refusal(message_id: impl Into<String>, text: Option<String>) -> Self {
251        Self::Refusal {
252            message_id: message_id.into(),
253            text,
254        }
255    }
256
257    #[must_use]
258    pub const fn context_compacted(
259        original_count: usize,
260        new_count: usize,
261        original_tokens: usize,
262        new_tokens: usize,
263    ) -> Self {
264        Self::ContextCompacted {
265            original_count,
266            new_count,
267            original_tokens,
268            new_tokens,
269        }
270    }
271}
272
273/// Monotonically increasing per-run counter for event ordering.
274///
275/// Each `run()` or `run_turn()` call creates a fresh counter starting at 0.
276/// The counter is `Arc`-wrapped so it can be shared across tasks (e.g., subagent
277/// progress events sent from child tokio tasks).
278///
279/// `Ordering::Relaxed` is sufficient because the mpsc channel provides the
280/// happens-before ordering guarantee between sender and receiver.
281#[derive(Clone, Debug)]
282pub struct SequenceCounter(Arc<AtomicU64>);
283
284impl SequenceCounter {
285    /// Create a new counter starting at 0.
286    #[must_use]
287    pub fn new() -> Self {
288        Self(Arc::new(AtomicU64::new(0)))
289    }
290
291    /// Get the next sequence number, incrementing the counter.
292    #[must_use]
293    pub fn next(&self) -> u64 {
294        self.0.fetch_add(1, Ordering::Relaxed)
295    }
296}
297
298impl Default for SequenceCounter {
299    fn default() -> Self {
300        Self::new()
301    }
302}
303
304/// Envelope wrapping every [`AgentEvent`] with idempotency metadata.
305///
306/// Mobile clients can use `event_id` for deduplication on retry, `sequence`
307/// for ordering after persistence, and `timestamp` for display.
308///
309/// The `event` field is flattened in JSON so that `event_id`, `sequence`,
310/// `timestamp`, and the event's `type` discriminant all appear at the same level.
311#[derive(Clone, Debug, Serialize, Deserialize)]
312pub struct AgentEventEnvelope {
313    /// Unique identifier (UUID v4) for this event emission.
314    pub event_id: uuid::Uuid,
315    /// Monotonically increasing sequence number within a single run.
316    pub sequence: u64,
317    /// UTC timestamp of when the event was emitted.
318    #[serde(with = "time::serde::rfc3339")]
319    pub timestamp: OffsetDateTime,
320    /// The actual event payload.
321    #[serde(flatten)]
322    pub event: AgentEvent,
323}
324
325impl AgentEventEnvelope {
326    /// Wrap an [`AgentEvent`] in an envelope, assigning it a unique ID,
327    /// the next sequence number, and the current UTC timestamp.
328    #[must_use]
329    pub fn wrap(event: AgentEvent, seq: &SequenceCounter) -> Self {
330        Self {
331            event_id: uuid::Uuid::new_v4(),
332            sequence: seq.next(),
333            timestamp: OffsetDateTime::now_utc(),
334            event,
335        }
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342    use std::collections::HashSet;
343
344    // ===================
345    // SequenceCounter
346    // ===================
347
348    #[test]
349    fn sequence_counter_starts_at_zero() {
350        let seq = SequenceCounter::new();
351        assert_eq!(seq.next(), 0);
352    }
353
354    #[test]
355    fn sequence_counter_increments_monotonically() {
356        let seq = SequenceCounter::new();
357        for expected in 0..100 {
358            assert_eq!(seq.next(), expected);
359        }
360    }
361
362    #[test]
363    fn sequence_counter_no_gaps() {
364        let seq = SequenceCounter::new();
365        let values: Vec<u64> = (0..50).map(|_| seq.next()).collect();
366        let expected: Vec<u64> = (0..50).collect();
367        assert_eq!(values, expected);
368    }
369
370    #[test]
371    fn sequence_counter_clones_share_state() {
372        let seq = SequenceCounter::new();
373        let clone = seq.clone();
374
375        assert_eq!(seq.next(), 0);
376        assert_eq!(clone.next(), 1);
377        assert_eq!(seq.next(), 2);
378    }
379
380    #[test]
381    fn sequence_counter_default_starts_at_zero() {
382        let seq = SequenceCounter::default();
383        assert_eq!(seq.next(), 0);
384    }
385
386    #[tokio::test]
387    async fn sequence_counter_unique_across_concurrent_tasks() {
388        let seq = SequenceCounter::new();
389        let n = 1000;
390
391        let mut handles = Vec::new();
392        for _ in 0..n {
393            let seq_clone = seq.clone();
394            handles.push(tokio::spawn(async move { seq_clone.next() }));
395        }
396
397        let mut values = HashSet::new();
398        for handle in handles {
399            let val = handle.await.unwrap();
400            assert!(values.insert(val), "duplicate sequence number: {val}");
401        }
402
403        assert_eq!(values.len(), n);
404        // All values should be in [0, n)
405        for v in &values {
406            assert!(*v < n as u64);
407        }
408    }
409
410    // ===================
411    // AgentEventEnvelope
412    // ===================
413
414    fn sample_event() -> AgentEvent {
415        AgentEvent::text("msg_1", "hello")
416    }
417
418    #[test]
419    fn wrap_assigns_unique_event_ids() {
420        let seq = SequenceCounter::new();
421        let ids: HashSet<uuid::Uuid> = (0..100)
422            .map(|_| AgentEventEnvelope::wrap(sample_event(), &seq).event_id)
423            .collect();
424        assert_eq!(ids.len(), 100);
425    }
426
427    #[test]
428    fn wrap_event_id_is_valid_uuid_v4() {
429        let seq = SequenceCounter::new();
430        let envelope = AgentEventEnvelope::wrap(sample_event(), &seq);
431        assert_eq!(envelope.event_id.get_version(), Some(uuid::Version::Random));
432    }
433
434    #[test]
435    fn wrap_assigns_incrementing_sequences() {
436        let seq = SequenceCounter::new();
437        let envelopes: Vec<AgentEventEnvelope> = (0..10)
438            .map(|_| AgentEventEnvelope::wrap(sample_event(), &seq))
439            .collect();
440
441        for (i, env) in envelopes.iter().enumerate() {
442            assert_eq!(env.sequence, i as u64);
443        }
444    }
445
446    #[test]
447    fn wrap_timestamps_are_non_decreasing() {
448        let seq = SequenceCounter::new();
449        let envelopes: Vec<AgentEventEnvelope> = (0..20)
450            .map(|_| AgentEventEnvelope::wrap(sample_event(), &seq))
451            .collect();
452
453        for pair in envelopes.windows(2) {
454            assert!(pair[1].timestamp >= pair[0].timestamp);
455        }
456    }
457
458    #[test]
459    fn wrap_preserves_inner_event() {
460        let seq = SequenceCounter::new();
461        let envelope = AgentEventEnvelope::wrap(AgentEvent::text("msg_42", "content"), &seq);
462        match &envelope.event {
463            AgentEvent::Text { message_id, text } => {
464                assert_eq!(message_id, "msg_42");
465                assert_eq!(text, "content");
466            }
467            other => panic!("expected Text, got {other:?}"),
468        }
469    }
470
471    #[test]
472    fn separate_counters_produce_independent_sequences() {
473        let seq_a = SequenceCounter::new();
474        let seq_b = SequenceCounter::new();
475
476        let a0 = AgentEventEnvelope::wrap(sample_event(), &seq_a);
477        let b0 = AgentEventEnvelope::wrap(sample_event(), &seq_b);
478        let a1 = AgentEventEnvelope::wrap(sample_event(), &seq_a);
479        let b1 = AgentEventEnvelope::wrap(sample_event(), &seq_b);
480
481        // Both start at 0 independently
482        assert_eq!(a0.sequence, 0);
483        assert_eq!(b0.sequence, 0);
484        assert_eq!(a1.sequence, 1);
485        assert_eq!(b1.sequence, 1);
486
487        // But event_ids are still globally unique
488        let ids: HashSet<uuid::Uuid> = [&a0, &b0, &a1, &b1].iter().map(|e| e.event_id).collect();
489        assert_eq!(ids.len(), 4);
490    }
491
492    // ===================
493    // Serialization
494    // ===================
495
496    #[test]
497    fn envelope_serializes_flat_json() {
498        let seq = SequenceCounter::new();
499        let envelope = AgentEventEnvelope::wrap(AgentEvent::text("msg_1", "hi"), &seq);
500        let json: serde_json::Value = serde_json::to_value(&envelope).expect("serialize");
501
502        // Top-level fields from the envelope
503        assert!(json.get("event_id").is_some());
504        assert!(json.get("sequence").is_some());
505        assert!(json.get("timestamp").is_some());
506
507        // Flattened event fields at the same level
508        assert_eq!(json.get("type").and_then(|v| v.as_str()), Some("text"));
509        assert_eq!(
510            json.get("message_id").and_then(|v| v.as_str()),
511            Some("msg_1")
512        );
513        assert_eq!(json.get("text").and_then(|v| v.as_str()), Some("hi"));
514
515        // No nested "event" key
516        assert!(json.get("event").is_none());
517    }
518
519    #[test]
520    fn envelope_event_id_does_not_collide_with_tool_id() {
521        let seq = SequenceCounter::new();
522        let envelope = AgentEventEnvelope::wrap(
523            AgentEvent::tool_call_start(
524                "tool_123",
525                "bash",
526                "Bash",
527                serde_json::json!({}),
528                ToolTier::Observe,
529            ),
530            &seq,
531        );
532        let json: serde_json::Value = serde_json::to_value(&envelope).expect("serialize");
533
534        // Both `event_id` and tool `id` are present and distinct
535        let event_id = json.get("event_id").and_then(|v| v.as_str()).unwrap();
536        let tool_id = json.get("id").and_then(|v| v.as_str()).unwrap();
537        assert_ne!(event_id, tool_id);
538        assert_eq!(tool_id, "tool_123");
539    }
540
541    #[test]
542    fn envelope_roundtrip_serde() {
543        let seq = SequenceCounter::new();
544        let original = AgentEventEnvelope::wrap(AgentEvent::text("msg_1", "hello"), &seq);
545
546        let json_str = serde_json::to_string(&original).expect("serialize");
547        let restored: AgentEventEnvelope = serde_json::from_str(&json_str).expect("deserialize");
548
549        assert_eq!(restored.event_id, original.event_id);
550        assert_eq!(restored.sequence, original.sequence);
551        assert_eq!(restored.timestamp, original.timestamp);
552        match &restored.event {
553            AgentEvent::Text { message_id, text } => {
554                assert_eq!(message_id, "msg_1");
555                assert_eq!(text, "hello");
556            }
557            other => panic!("expected Text, got {other:?}"),
558        }
559    }
560
561    #[test]
562    fn envelope_sequence_is_u64_in_json() {
563        let seq = SequenceCounter::new();
564        let envelope = AgentEventEnvelope::wrap(sample_event(), &seq);
565        let json: serde_json::Value = serde_json::to_value(&envelope).expect("serialize");
566
567        assert!(json.get("sequence").unwrap().is_u64());
568        assert_eq!(json.get("sequence").unwrap().as_u64(), Some(0));
569    }
570
571    #[test]
572    fn envelope_timestamp_is_rfc3339_string() {
573        let seq = SequenceCounter::new();
574        let envelope = AgentEventEnvelope::wrap(sample_event(), &seq);
575        let json: serde_json::Value = serde_json::to_value(&envelope).expect("serialize");
576
577        let ts_str = json.get("timestamp").unwrap().as_str().unwrap();
578        // Should parse as RFC 3339
579        time::OffsetDateTime::parse(ts_str, &time::format_description::well_known::Rfc3339)
580            .expect("timestamp should be valid RFC 3339");
581    }
582}