use crate::types::{ThreadId, TokenUsage, ToolResult, ToolTier};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use time::OffsetDateTime;
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum AgentEvent {
Start { thread_id: ThreadId, turn: usize },
Thinking { message_id: String, text: String },
ThinkingDelta { message_id: String, delta: String },
TextDelta { message_id: String, delta: String },
Text { message_id: String, text: String },
ToolCallStart {
id: String,
name: String,
display_name: String,
input: serde_json::Value,
tier: ToolTier,
},
ToolCallEnd {
id: String,
name: String,
display_name: String,
result: ToolResult,
},
ToolProgress {
id: String,
name: String,
display_name: String,
stage: String,
message: String,
data: Option<serde_json::Value>,
},
ToolRequiresConfirmation {
id: String,
name: String,
input: serde_json::Value,
description: String,
},
TurnComplete { turn: usize, usage: TokenUsage },
Done {
thread_id: ThreadId,
total_turns: usize,
total_usage: TokenUsage,
duration: Duration,
},
Error { message: String, recoverable: bool },
Refusal {
message_id: String,
text: Option<String>,
},
ContextCompacted {
original_count: usize,
new_count: usize,
original_tokens: usize,
new_tokens: usize,
},
SubagentProgress {
subagent_id: String,
subagent_name: String,
tool_name: String,
tool_context: String,
completed: bool,
success: bool,
tool_count: u32,
total_tokens: u64,
},
}
impl AgentEvent {
#[must_use]
pub const fn start(thread_id: ThreadId, turn: usize) -> Self {
Self::Start { thread_id, turn }
}
#[must_use]
pub fn thinking(message_id: impl Into<String>, text: impl Into<String>) -> Self {
Self::Thinking {
message_id: message_id.into(),
text: text.into(),
}
}
#[must_use]
pub fn thinking_delta(message_id: impl Into<String>, delta: impl Into<String>) -> Self {
Self::ThinkingDelta {
message_id: message_id.into(),
delta: delta.into(),
}
}
#[must_use]
pub fn text_delta(message_id: impl Into<String>, delta: impl Into<String>) -> Self {
Self::TextDelta {
message_id: message_id.into(),
delta: delta.into(),
}
}
#[must_use]
pub fn text(message_id: impl Into<String>, text: impl Into<String>) -> Self {
Self::Text {
message_id: message_id.into(),
text: text.into(),
}
}
#[must_use]
pub fn tool_call_start(
id: impl Into<String>,
name: impl Into<String>,
display_name: impl Into<String>,
input: serde_json::Value,
tier: ToolTier,
) -> Self {
Self::ToolCallStart {
id: id.into(),
name: name.into(),
display_name: display_name.into(),
input,
tier,
}
}
#[must_use]
pub fn tool_call_end(
id: impl Into<String>,
name: impl Into<String>,
display_name: impl Into<String>,
result: ToolResult,
) -> Self {
Self::ToolCallEnd {
id: id.into(),
name: name.into(),
display_name: display_name.into(),
result,
}
}
#[must_use]
pub fn tool_progress(
id: impl Into<String>,
name: impl Into<String>,
display_name: impl Into<String>,
stage: impl Into<String>,
message: impl Into<String>,
data: Option<serde_json::Value>,
) -> Self {
Self::ToolProgress {
id: id.into(),
name: name.into(),
display_name: display_name.into(),
stage: stage.into(),
message: message.into(),
data,
}
}
#[must_use]
pub const fn done(
thread_id: ThreadId,
total_turns: usize,
total_usage: TokenUsage,
duration: Duration,
) -> Self {
Self::Done {
thread_id,
total_turns,
total_usage,
duration,
}
}
#[must_use]
pub fn error(message: impl Into<String>, recoverable: bool) -> Self {
Self::Error {
message: message.into(),
recoverable,
}
}
#[must_use]
pub fn refusal(message_id: impl Into<String>, text: Option<String>) -> Self {
Self::Refusal {
message_id: message_id.into(),
text,
}
}
#[must_use]
pub const fn context_compacted(
original_count: usize,
new_count: usize,
original_tokens: usize,
new_tokens: usize,
) -> Self {
Self::ContextCompacted {
original_count,
new_count,
original_tokens,
new_tokens,
}
}
}
#[derive(Clone, Debug)]
pub struct SequenceCounter(Arc<AtomicU64>);
impl SequenceCounter {
#[must_use]
pub fn new() -> Self {
Self(Arc::new(AtomicU64::new(0)))
}
#[must_use]
pub fn next(&self) -> u64 {
self.0.fetch_add(1, Ordering::Relaxed)
}
}
impl Default for SequenceCounter {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AgentEventEnvelope {
pub event_id: uuid::Uuid,
pub sequence: u64,
#[serde(with = "time::serde::rfc3339")]
pub timestamp: OffsetDateTime,
#[serde(flatten)]
pub event: AgentEvent,
}
impl AgentEventEnvelope {
#[must_use]
pub fn wrap(event: AgentEvent, seq: &SequenceCounter) -> Self {
Self {
event_id: uuid::Uuid::new_v4(),
sequence: seq.next(),
timestamp: OffsetDateTime::now_utc(),
event,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;
#[test]
fn sequence_counter_starts_at_zero() {
let seq = SequenceCounter::new();
assert_eq!(seq.next(), 0);
}
#[test]
fn sequence_counter_increments_monotonically() {
let seq = SequenceCounter::new();
for expected in 0..100 {
assert_eq!(seq.next(), expected);
}
}
#[test]
fn sequence_counter_no_gaps() {
let seq = SequenceCounter::new();
let values: Vec<u64> = (0..50).map(|_| seq.next()).collect();
let expected: Vec<u64> = (0..50).collect();
assert_eq!(values, expected);
}
#[test]
fn sequence_counter_clones_share_state() {
let seq = SequenceCounter::new();
let clone = seq.clone();
assert_eq!(seq.next(), 0);
assert_eq!(clone.next(), 1);
assert_eq!(seq.next(), 2);
}
#[test]
fn sequence_counter_default_starts_at_zero() {
let seq = SequenceCounter::default();
assert_eq!(seq.next(), 0);
}
#[tokio::test]
async fn sequence_counter_unique_across_concurrent_tasks() {
let seq = SequenceCounter::new();
let n = 1000;
let mut handles = Vec::new();
for _ in 0..n {
let seq_clone = seq.clone();
handles.push(tokio::spawn(async move { seq_clone.next() }));
}
let mut values = HashSet::new();
for handle in handles {
let val = handle.await.unwrap();
assert!(values.insert(val), "duplicate sequence number: {val}");
}
assert_eq!(values.len(), n);
for v in &values {
assert!(*v < n as u64);
}
}
fn sample_event() -> AgentEvent {
AgentEvent::text("msg_1", "hello")
}
#[test]
fn wrap_assigns_unique_event_ids() {
let seq = SequenceCounter::new();
let ids: HashSet<uuid::Uuid> = (0..100)
.map(|_| AgentEventEnvelope::wrap(sample_event(), &seq).event_id)
.collect();
assert_eq!(ids.len(), 100);
}
#[test]
fn wrap_event_id_is_valid_uuid_v4() {
let seq = SequenceCounter::new();
let envelope = AgentEventEnvelope::wrap(sample_event(), &seq);
assert_eq!(envelope.event_id.get_version(), Some(uuid::Version::Random));
}
#[test]
fn wrap_assigns_incrementing_sequences() {
let seq = SequenceCounter::new();
let envelopes: Vec<AgentEventEnvelope> = (0..10)
.map(|_| AgentEventEnvelope::wrap(sample_event(), &seq))
.collect();
for (i, env) in envelopes.iter().enumerate() {
assert_eq!(env.sequence, i as u64);
}
}
#[test]
fn wrap_timestamps_are_non_decreasing() {
let seq = SequenceCounter::new();
let envelopes: Vec<AgentEventEnvelope> = (0..20)
.map(|_| AgentEventEnvelope::wrap(sample_event(), &seq))
.collect();
for pair in envelopes.windows(2) {
assert!(pair[1].timestamp >= pair[0].timestamp);
}
}
#[test]
fn wrap_preserves_inner_event() {
let seq = SequenceCounter::new();
let envelope = AgentEventEnvelope::wrap(AgentEvent::text("msg_42", "content"), &seq);
match &envelope.event {
AgentEvent::Text { message_id, text } => {
assert_eq!(message_id, "msg_42");
assert_eq!(text, "content");
}
other => panic!("expected Text, got {other:?}"),
}
}
#[test]
fn separate_counters_produce_independent_sequences() {
let seq_a = SequenceCounter::new();
let seq_b = SequenceCounter::new();
let a0 = AgentEventEnvelope::wrap(sample_event(), &seq_a);
let b0 = AgentEventEnvelope::wrap(sample_event(), &seq_b);
let a1 = AgentEventEnvelope::wrap(sample_event(), &seq_a);
let b1 = AgentEventEnvelope::wrap(sample_event(), &seq_b);
assert_eq!(a0.sequence, 0);
assert_eq!(b0.sequence, 0);
assert_eq!(a1.sequence, 1);
assert_eq!(b1.sequence, 1);
let ids: HashSet<uuid::Uuid> = [&a0, &b0, &a1, &b1].iter().map(|e| e.event_id).collect();
assert_eq!(ids.len(), 4);
}
#[test]
fn envelope_serializes_flat_json() {
let seq = SequenceCounter::new();
let envelope = AgentEventEnvelope::wrap(AgentEvent::text("msg_1", "hi"), &seq);
let json: serde_json::Value = serde_json::to_value(&envelope).expect("serialize");
assert!(json.get("event_id").is_some());
assert!(json.get("sequence").is_some());
assert!(json.get("timestamp").is_some());
assert_eq!(json.get("type").and_then(|v| v.as_str()), Some("text"));
assert_eq!(
json.get("message_id").and_then(|v| v.as_str()),
Some("msg_1")
);
assert_eq!(json.get("text").and_then(|v| v.as_str()), Some("hi"));
assert!(json.get("event").is_none());
}
#[test]
fn envelope_event_id_does_not_collide_with_tool_id() {
let seq = SequenceCounter::new();
let envelope = AgentEventEnvelope::wrap(
AgentEvent::tool_call_start(
"tool_123",
"bash",
"Bash",
serde_json::json!({}),
ToolTier::Observe,
),
&seq,
);
let json: serde_json::Value = serde_json::to_value(&envelope).expect("serialize");
let event_id = json.get("event_id").and_then(|v| v.as_str()).unwrap();
let tool_id = json.get("id").and_then(|v| v.as_str()).unwrap();
assert_ne!(event_id, tool_id);
assert_eq!(tool_id, "tool_123");
}
#[test]
fn envelope_roundtrip_serde() {
let seq = SequenceCounter::new();
let original = AgentEventEnvelope::wrap(AgentEvent::text("msg_1", "hello"), &seq);
let json_str = serde_json::to_string(&original).expect("serialize");
let restored: AgentEventEnvelope = serde_json::from_str(&json_str).expect("deserialize");
assert_eq!(restored.event_id, original.event_id);
assert_eq!(restored.sequence, original.sequence);
assert_eq!(restored.timestamp, original.timestamp);
match &restored.event {
AgentEvent::Text { message_id, text } => {
assert_eq!(message_id, "msg_1");
assert_eq!(text, "hello");
}
other => panic!("expected Text, got {other:?}"),
}
}
#[test]
fn envelope_sequence_is_u64_in_json() {
let seq = SequenceCounter::new();
let envelope = AgentEventEnvelope::wrap(sample_event(), &seq);
let json: serde_json::Value = serde_json::to_value(&envelope).expect("serialize");
assert!(json.get("sequence").unwrap().is_u64());
assert_eq!(json.get("sequence").unwrap().as_u64(), Some(0));
}
#[test]
fn envelope_timestamp_is_rfc3339_string() {
let seq = SequenceCounter::new();
let envelope = AgentEventEnvelope::wrap(sample_event(), &seq);
let json: serde_json::Value = serde_json::to_value(&envelope).expect("serialize");
let ts_str = json.get("timestamp").unwrap().as_str().unwrap();
time::OffsetDateTime::parse(ts_str, &time::format_description::well_known::Rfc3339)
.expect("timestamp should be valid RFC 3339");
}
}