1use crate::llm::ContentBlock;
16use crate::types::{ThreadId, TokenUsage, ToolResult, ToolTier};
17use serde::{Deserialize, Serialize};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20use std::time::Duration;
21use time::OffsetDateTime;
22
23mod duration_ms_serde {
26 use serde::{Deserialize, Deserializer, Serializer};
27 use std::time::Duration;
28
29 pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
30 where
31 S: Serializer,
32 {
33 let ms = u64::try_from(duration.as_millis()).unwrap_or(u64::MAX);
34 serializer.serialize_u64(ms)
35 }
36
37 pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
38 where
39 D: Deserializer<'de>,
40 {
41 let ms = u64::deserialize(deserializer)?;
42 Ok(Duration::from_millis(ms))
43 }
44}
45
46#[derive(Clone, Debug, Serialize, Deserialize)]
49#[serde(tag = "type", rename_all = "snake_case")]
50#[non_exhaustive]
51pub enum AgentEvent {
52 Start { thread_id: ThreadId, turn: usize },
54
55 UserInput {
75 thread_id: ThreadId,
76 content: Vec<ContentBlock>,
83 },
84
85 Thinking { message_id: String, text: String },
87
88 ThinkingDelta { message_id: String, delta: String },
90
91 TextDelta { message_id: String, delta: String },
93
94 Text { message_id: String, text: String },
96
97 ToolCallStart {
99 id: String,
100 name: String,
101 display_name: String,
102 input: serde_json::Value,
103 tier: ToolTier,
104 },
105
106 ToolCallEnd {
108 id: String,
109 name: String,
110 display_name: String,
111 result: ToolResult,
112 },
113
114 ToolProgress {
116 id: String,
118 name: String,
120 display_name: String,
122 stage: String,
124 message: String,
126 data: Option<serde_json::Value>,
128 },
129
130 ToolRequiresConfirmation {
133 id: String,
134 name: String,
135 display_name: String,
136 input: serde_json::Value,
137 description: String,
138 },
139
140 TurnComplete { turn: usize, usage: TokenUsage },
142
143 Done {
145 thread_id: ThreadId,
146 total_turns: usize,
147 total_usage: TokenUsage,
148 #[serde(rename = "duration_ms", with = "duration_ms_serde")]
156 duration: Duration,
157 },
158
159 Error { message: String, recoverable: bool },
161
162 AutoRetryStart {
168 attempt: u32,
170 max_attempts: u32,
172 delay_ms: u64,
174 error_message: String,
176 },
177
178 AutoRetryEnd {
182 attempt: u32,
185 success: bool,
187 final_error: Option<String>,
189 },
190
191 Refusal {
193 message_id: String,
194 text: Option<String>,
195 },
196
197 Cancelled { turn: usize, usage: TokenUsage },
213
214 ContextCompacted {
216 original_count: usize,
218 new_count: usize,
220 original_tokens: usize,
222 new_tokens: usize,
224 },
225
226 SubagentProgress {
228 subagent_id: String,
230 subagent_name: String,
232 nickname: Option<String>,
234 child_thread_id: Option<ThreadId>,
236 child_root_task_id: Option<String>,
238 subagent_task_id: Option<String>,
240 max_turns: Option<u32>,
242 current_turn: Option<u32>,
244 model: Option<String>,
246 tool_name: String,
248 tool_context: String,
250 completed: bool,
252 success: bool,
254 tool_count: u32,
256 total_tokens: u64,
258 },
259}
260
261impl AgentEvent {
262 #[must_use]
263 pub const fn start(thread_id: ThreadId, turn: usize) -> Self {
264 Self::Start { thread_id, turn }
265 }
266
267 #[must_use]
268 pub const fn user_input(thread_id: ThreadId, content: Vec<ContentBlock>) -> Self {
269 Self::UserInput { thread_id, content }
270 }
271
272 #[must_use]
273 pub fn thinking(message_id: impl Into<String>, text: impl Into<String>) -> Self {
274 Self::Thinking {
275 message_id: message_id.into(),
276 text: text.into(),
277 }
278 }
279
280 #[must_use]
281 pub fn thinking_delta(message_id: impl Into<String>, delta: impl Into<String>) -> Self {
282 Self::ThinkingDelta {
283 message_id: message_id.into(),
284 delta: delta.into(),
285 }
286 }
287
288 #[must_use]
289 pub fn text_delta(message_id: impl Into<String>, delta: impl Into<String>) -> Self {
290 Self::TextDelta {
291 message_id: message_id.into(),
292 delta: delta.into(),
293 }
294 }
295
296 #[must_use]
297 pub fn text(message_id: impl Into<String>, text: impl Into<String>) -> Self {
298 Self::Text {
299 message_id: message_id.into(),
300 text: text.into(),
301 }
302 }
303
304 #[must_use]
305 pub fn tool_call_start(
306 id: impl Into<String>,
307 name: impl Into<String>,
308 display_name: impl Into<String>,
309 input: serde_json::Value,
310 tier: ToolTier,
311 ) -> Self {
312 Self::ToolCallStart {
313 id: id.into(),
314 name: name.into(),
315 display_name: display_name.into(),
316 input,
317 tier,
318 }
319 }
320
321 #[must_use]
322 pub fn tool_call_end(
323 id: impl Into<String>,
324 name: impl Into<String>,
325 display_name: impl Into<String>,
326 result: ToolResult,
327 ) -> Self {
328 Self::ToolCallEnd {
329 id: id.into(),
330 name: name.into(),
331 display_name: display_name.into(),
332 result,
333 }
334 }
335
336 #[must_use]
337 pub fn tool_progress(
338 id: impl Into<String>,
339 name: impl Into<String>,
340 display_name: impl Into<String>,
341 stage: impl Into<String>,
342 message: impl Into<String>,
343 data: Option<serde_json::Value>,
344 ) -> Self {
345 Self::ToolProgress {
346 id: id.into(),
347 name: name.into(),
348 display_name: display_name.into(),
349 stage: stage.into(),
350 message: message.into(),
351 data,
352 }
353 }
354
355 #[must_use]
356 pub fn tool_requires_confirmation(
357 id: impl Into<String>,
358 name: impl Into<String>,
359 display_name: impl Into<String>,
360 input: serde_json::Value,
361 description: impl Into<String>,
362 ) -> Self {
363 Self::ToolRequiresConfirmation {
364 id: id.into(),
365 name: name.into(),
366 display_name: display_name.into(),
367 input,
368 description: description.into(),
369 }
370 }
371
372 #[must_use]
373 pub const fn done(
374 thread_id: ThreadId,
375 total_turns: usize,
376 total_usage: TokenUsage,
377 duration: Duration,
378 ) -> Self {
379 Self::Done {
380 thread_id,
381 total_turns,
382 total_usage,
383 duration,
384 }
385 }
386
387 #[must_use]
388 pub fn error(message: impl Into<String>, recoverable: bool) -> Self {
389 Self::Error {
390 message: message.into(),
391 recoverable,
392 }
393 }
394
395 #[must_use]
396 pub fn refusal(message_id: impl Into<String>, text: Option<String>) -> Self {
397 Self::Refusal {
398 message_id: message_id.into(),
399 text,
400 }
401 }
402
403 #[must_use]
404 pub const fn cancelled(turn: usize, usage: TokenUsage) -> Self {
405 Self::Cancelled { turn, usage }
406 }
407
408 #[must_use]
409 pub const fn context_compacted(
410 original_count: usize,
411 new_count: usize,
412 original_tokens: usize,
413 new_tokens: usize,
414 ) -> Self {
415 Self::ContextCompacted {
416 original_count,
417 new_count,
418 original_tokens,
419 new_tokens,
420 }
421 }
422}
423
424#[derive(Clone, Debug)]
433pub struct SequenceCounter(Arc<AtomicU64>);
434
435impl SequenceCounter {
436 #[must_use]
438 pub fn new() -> Self {
439 Self(Arc::new(AtomicU64::new(0)))
440 }
441
442 #[must_use]
448 pub fn with_offset(start: u64) -> Self {
449 Self(Arc::new(AtomicU64::new(start)))
450 }
451
452 #[must_use]
454 pub fn next(&self) -> u64 {
455 self.0.fetch_add(1, Ordering::Relaxed)
456 }
457}
458
459impl Default for SequenceCounter {
460 fn default() -> Self {
461 Self::new()
462 }
463}
464
465#[derive(Clone, Debug, Serialize, Deserialize)]
473pub struct AgentEventEnvelope {
474 pub event_id: uuid::Uuid,
479 pub sequence: u64,
481 #[serde(with = "time::serde::rfc3339")]
483 pub timestamp: OffsetDateTime,
484 #[serde(flatten)]
486 pub event: AgentEvent,
487}
488
489impl AgentEventEnvelope {
490 #[must_use]
493 pub fn wrap(event: AgentEvent, seq: &SequenceCounter) -> Self {
494 Self {
495 event_id: uuid::Uuid::new_v4(),
496 sequence: seq.next(),
497 timestamp: OffsetDateTime::now_utc(),
498 event,
499 }
500 }
501}
502
503#[cfg(test)]
504mod tests {
505 use super::*;
506 use std::collections::HashSet;
507
508 #[test]
513 fn sequence_counter_starts_at_zero() {
514 let seq = SequenceCounter::new();
515 assert_eq!(seq.next(), 0);
516 }
517
518 #[test]
519 fn sequence_counter_increments_monotonically() {
520 let seq = SequenceCounter::new();
521 for expected in 0..100 {
522 assert_eq!(seq.next(), expected);
523 }
524 }
525
526 #[test]
527 fn sequence_counter_no_gaps() {
528 let seq = SequenceCounter::new();
529 let values: Vec<u64> = (0..50).map(|_| seq.next()).collect();
530 let expected: Vec<u64> = (0..50).collect();
531 assert_eq!(values, expected);
532 }
533
534 #[test]
535 fn sequence_counter_clones_share_state() {
536 let seq = SequenceCounter::new();
537 let clone = seq.clone();
538
539 assert_eq!(seq.next(), 0);
540 assert_eq!(clone.next(), 1);
541 assert_eq!(seq.next(), 2);
542 }
543
544 #[test]
545 fn sequence_counter_default_starts_at_zero() {
546 let seq = SequenceCounter::default();
547 assert_eq!(seq.next(), 0);
548 }
549
550 #[test]
551 fn sequence_counter_with_offset_starts_at_given_value() {
552 let seq = SequenceCounter::with_offset(42);
553 assert_eq!(seq.next(), 42);
554 assert_eq!(seq.next(), 43);
555 assert_eq!(seq.next(), 44);
556 }
557
558 #[test]
559 fn sequence_counter_with_offset_zero_same_as_new() {
560 let seq = SequenceCounter::with_offset(0);
561 assert_eq!(seq.next(), 0);
562 assert_eq!(seq.next(), 1);
563 }
564
565 #[tokio::test]
566 async fn sequence_counter_unique_across_concurrent_tasks() {
567 let seq = SequenceCounter::new();
568 let n = 1000;
569
570 let mut handles = Vec::new();
571 for _ in 0..n {
572 let seq_clone = seq.clone();
573 handles.push(tokio::spawn(async move { seq_clone.next() }));
574 }
575
576 let mut values = HashSet::new();
577 for handle in handles {
578 let val = handle.await.unwrap();
579 assert!(values.insert(val), "duplicate sequence number: {val}");
580 }
581
582 assert_eq!(values.len(), n);
583 for v in &values {
585 assert!(*v < n as u64);
586 }
587 }
588
589 fn sample_event() -> AgentEvent {
594 AgentEvent::text("msg_1", "hello")
595 }
596
597 #[test]
598 fn wrap_assigns_unique_event_ids() {
599 let seq = SequenceCounter::new();
600 let ids: HashSet<uuid::Uuid> = (0..100)
601 .map(|_| AgentEventEnvelope::wrap(sample_event(), &seq).event_id)
602 .collect();
603 assert_eq!(ids.len(), 100);
604 }
605
606 #[test]
607 fn wrap_event_id_is_valid_uuid_v4() {
608 let seq = SequenceCounter::new();
609 let envelope = AgentEventEnvelope::wrap(sample_event(), &seq);
610 assert_eq!(envelope.event_id.get_version(), Some(uuid::Version::Random));
611 }
612
613 #[test]
614 fn wrap_assigns_incrementing_sequences() {
615 let seq = SequenceCounter::new();
616 let envelopes: Vec<AgentEventEnvelope> = (0..10)
617 .map(|_| AgentEventEnvelope::wrap(sample_event(), &seq))
618 .collect();
619
620 for (i, env) in envelopes.iter().enumerate() {
621 assert_eq!(env.sequence, i as u64);
622 }
623 }
624
625 #[test]
626 fn wrap_timestamps_are_non_decreasing() {
627 let seq = SequenceCounter::new();
628 let envelopes: Vec<AgentEventEnvelope> = (0..20)
629 .map(|_| AgentEventEnvelope::wrap(sample_event(), &seq))
630 .collect();
631
632 for pair in envelopes.windows(2) {
633 assert!(pair[1].timestamp >= pair[0].timestamp);
634 }
635 }
636
637 #[test]
638 fn wrap_preserves_inner_event() {
639 let seq = SequenceCounter::new();
640 let envelope = AgentEventEnvelope::wrap(AgentEvent::text("msg_42", "content"), &seq);
641 match &envelope.event {
642 AgentEvent::Text { message_id, text } => {
643 assert_eq!(message_id, "msg_42");
644 assert_eq!(text, "content");
645 }
646 other => panic!("expected Text, got {other:?}"),
647 }
648 }
649
650 #[test]
651 fn separate_counters_produce_independent_sequences() {
652 let seq_a = SequenceCounter::new();
653 let seq_b = SequenceCounter::new();
654
655 let a0 = AgentEventEnvelope::wrap(sample_event(), &seq_a);
656 let b0 = AgentEventEnvelope::wrap(sample_event(), &seq_b);
657 let a1 = AgentEventEnvelope::wrap(sample_event(), &seq_a);
658 let b1 = AgentEventEnvelope::wrap(sample_event(), &seq_b);
659
660 assert_eq!(a0.sequence, 0);
662 assert_eq!(b0.sequence, 0);
663 assert_eq!(a1.sequence, 1);
664 assert_eq!(b1.sequence, 1);
665
666 let ids: HashSet<uuid::Uuid> = [&a0, &b0, &a1, &b1].iter().map(|e| e.event_id).collect();
668 assert_eq!(ids.len(), 4);
669 }
670
671 #[test]
676 fn envelope_serializes_flat_json() {
677 let seq = SequenceCounter::new();
678 let envelope = AgentEventEnvelope::wrap(AgentEvent::text("msg_1", "hi"), &seq);
679 let json: serde_json::Value = serde_json::to_value(&envelope).expect("serialize");
680
681 assert!(json.get("event_id").is_some());
683 assert!(json.get("sequence").is_some());
684 assert!(json.get("timestamp").is_some());
685
686 assert_eq!(json.get("type").and_then(|v| v.as_str()), Some("text"));
688 assert_eq!(
689 json.get("message_id").and_then(|v| v.as_str()),
690 Some("msg_1")
691 );
692 assert_eq!(json.get("text").and_then(|v| v.as_str()), Some("hi"));
693
694 assert!(json.get("event").is_none());
696 }
697
698 #[test]
699 fn envelope_event_id_does_not_collide_with_tool_id() {
700 let seq = SequenceCounter::new();
701 let envelope = AgentEventEnvelope::wrap(
702 AgentEvent::tool_call_start(
703 "tool_123",
704 "bash",
705 "Bash",
706 serde_json::json!({}),
707 ToolTier::Observe,
708 ),
709 &seq,
710 );
711 let json: serde_json::Value = serde_json::to_value(&envelope).expect("serialize");
712
713 let event_id = json.get("event_id").and_then(|v| v.as_str()).unwrap();
715 let tool_id = json.get("id").and_then(|v| v.as_str()).unwrap();
716 assert_ne!(event_id, tool_id);
717 assert_eq!(tool_id, "tool_123");
718 }
719
720 #[test]
721 fn envelope_roundtrip_serde() {
722 let seq = SequenceCounter::new();
723 let original = AgentEventEnvelope::wrap(AgentEvent::text("msg_1", "hello"), &seq);
724
725 let json_str = serde_json::to_string(&original).expect("serialize");
726 let restored: AgentEventEnvelope = serde_json::from_str(&json_str).expect("deserialize");
727
728 assert_eq!(restored.event_id, original.event_id);
729 assert_eq!(restored.sequence, original.sequence);
730 assert_eq!(restored.timestamp, original.timestamp);
731 match &restored.event {
732 AgentEvent::Text { message_id, text } => {
733 assert_eq!(message_id, "msg_1");
734 assert_eq!(text, "hello");
735 }
736 other => panic!("expected Text, got {other:?}"),
737 }
738 }
739
740 #[test]
741 fn envelope_sequence_is_u64_in_json() {
742 let seq = SequenceCounter::new();
743 let envelope = AgentEventEnvelope::wrap(sample_event(), &seq);
744 let json: serde_json::Value = serde_json::to_value(&envelope).expect("serialize");
745
746 assert!(json.get("sequence").unwrap().is_u64());
747 assert_eq!(json.get("sequence").unwrap().as_u64(), Some(0));
748 }
749
750 #[test]
751 fn envelope_timestamp_is_rfc3339_string() {
752 let seq = SequenceCounter::new();
753 let envelope = AgentEventEnvelope::wrap(sample_event(), &seq);
754 let json: serde_json::Value = serde_json::to_value(&envelope).expect("serialize");
755
756 let ts_str = json.get("timestamp").unwrap().as_str().unwrap();
757 time::OffsetDateTime::parse(ts_str, &time::format_description::well_known::Rfc3339)
759 .expect("timestamp should be valid RFC 3339");
760 }
761
762 #[test]
763 fn done_event_serializes_duration_as_millis() -> serde_json::Result<()> {
764 let seq = SequenceCounter::new();
765 let envelope = AgentEventEnvelope::wrap(
766 AgentEvent::done(
767 ThreadId::from_string("t"),
768 3,
769 TokenUsage::default(),
770 Duration::from_millis(2500),
771 ),
772 &seq,
773 );
774 let json = serde_json::to_value(&envelope)?;
775
776 assert_eq!(
779 json.get("duration_ms").and_then(serde_json::Value::as_u64),
780 Some(2500)
781 );
782 assert!(
783 json.get("duration").is_none(),
784 "old `duration` key must be gone: {json}"
785 );
786
787 let restored: AgentEventEnvelope = serde_json::from_value(json)?;
788 match restored.event {
789 AgentEvent::Done { duration, .. } => {
790 assert_eq!(duration, Duration::from_millis(2500));
791 }
792 other => panic!("expected Done, got {other:?}"),
793 }
794 Ok(())
795 }
796
797 fn sample_all_variants() -> Vec<AgentEvent> {
804 let thread = ThreadId::from_string("thread-1");
805 let usage = TokenUsage::default();
806 let mut events = session_open_events(&thread);
807 events.extend(streamed_content_events());
808 events.extend(tool_call_events());
809 events.extend(turn_completion_events(&thread, &usage));
810 events.extend(failure_and_retry_events());
811 events.extend(auxiliary_events(&usage));
812 events
813 }
814
815 fn session_open_events(thread: &ThreadId) -> Vec<AgentEvent> {
817 vec![
818 AgentEvent::Start {
819 thread_id: thread.clone(),
820 turn: 1,
821 },
822 AgentEvent::UserInput {
823 thread_id: thread.clone(),
824 content: vec![ContentBlock::Text { text: "hi".into() }],
825 },
826 ]
827 }
828
829 fn streamed_content_events() -> Vec<AgentEvent> {
832 vec![
833 AgentEvent::Thinking {
834 message_id: "m".into(),
835 text: "t".into(),
836 },
837 AgentEvent::ThinkingDelta {
838 message_id: "m".into(),
839 delta: "d".into(),
840 },
841 AgentEvent::TextDelta {
842 message_id: "m".into(),
843 delta: "d".into(),
844 },
845 AgentEvent::Text {
846 message_id: "m".into(),
847 text: "t".into(),
848 },
849 ]
850 }
851
852 fn tool_call_events() -> Vec<AgentEvent> {
854 vec![
855 AgentEvent::ToolCallStart {
856 id: "id".into(),
857 name: "n".into(),
858 display_name: "N".into(),
859 input: serde_json::json!({}),
860 tier: ToolTier::Observe,
861 },
862 AgentEvent::ToolCallEnd {
863 id: "id".into(),
864 name: "n".into(),
865 display_name: "N".into(),
866 result: ToolResult::success("ok"),
867 },
868 AgentEvent::ToolProgress {
869 id: "id".into(),
870 name: "n".into(),
871 display_name: "N".into(),
872 stage: "s".into(),
873 message: "m".into(),
874 data: None,
875 },
876 AgentEvent::ToolRequiresConfirmation {
877 id: "id".into(),
878 name: "n".into(),
879 display_name: "N".into(),
880 input: serde_json::json!({}),
881 description: "d".into(),
882 },
883 ]
884 }
885
886 fn turn_completion_events(thread: &ThreadId, usage: &TokenUsage) -> Vec<AgentEvent> {
888 vec![
889 AgentEvent::TurnComplete {
890 turn: 1,
891 usage: usage.clone(),
892 },
893 AgentEvent::Done {
894 thread_id: thread.clone(),
895 total_turns: 2,
896 total_usage: usage.clone(),
897 duration: Duration::from_millis(1500),
898 },
899 ]
900 }
901
902 fn failure_and_retry_events() -> Vec<AgentEvent> {
904 vec![
905 AgentEvent::Error {
906 message: "e".into(),
907 recoverable: true,
908 },
909 AgentEvent::AutoRetryStart {
910 attempt: 1,
911 max_attempts: 5,
912 delay_ms: 100,
913 error_message: "rate limited".into(),
914 },
915 AgentEvent::AutoRetryEnd {
916 attempt: 1,
917 success: true,
918 final_error: None,
919 },
920 ]
921 }
922
923 fn auxiliary_events(usage: &TokenUsage) -> Vec<AgentEvent> {
926 vec![
927 AgentEvent::Refusal {
928 message_id: "m".into(),
929 text: Some("no".into()),
930 },
931 AgentEvent::Cancelled {
932 turn: 1,
933 usage: usage.clone(),
934 },
935 AgentEvent::ContextCompacted {
936 original_count: 10,
937 new_count: 5,
938 original_tokens: 100,
939 new_tokens: 50,
940 },
941 AgentEvent::SubagentProgress {
942 subagent_id: "s".into(),
943 subagent_name: "explore".into(),
944 nickname: None,
945 child_thread_id: None,
946 child_root_task_id: None,
947 subagent_task_id: None,
948 max_turns: None,
949 current_turn: None,
950 model: None,
951 tool_name: "t".into(),
952 tool_context: "c".into(),
953 completed: false,
954 success: false,
955 tool_count: 0,
956 total_tokens: 0,
957 },
958 ]
959 }
960
961 #[test]
962 fn every_variant_envelope_has_flat_keys_and_round_trips() -> serde_json::Result<()> {
963 let seq = SequenceCounter::new();
964 for event in sample_all_variants() {
965 let label = format!("{event:?}");
966 let envelope = AgentEventEnvelope::wrap(event, &seq);
967 let json = serde_json::to_value(&envelope)?;
968
969 for key in ["event_id", "sequence", "timestamp", "type"] {
971 assert!(
972 json.get(key).is_some(),
973 "{label}: missing flat key `{key}` in {json}"
974 );
975 }
976 assert!(
979 json.get("event").is_none(),
980 "{label}: unexpected nested `event` key in {json}"
981 );
982
983 let restored: AgentEventEnvelope = serde_json::from_value(json.clone())?;
984 assert_eq!(
985 serde_json::to_value(&restored)?,
986 json,
987 "{label}: envelope round-trip changed the wire form"
988 );
989 }
990 Ok(())
991 }
992}