Skip to main content

codetether_agent/event_stream/
mod.rs

1//! Event Stream Module
2//!
3//! Structured JSONL event sourcing for agent sessions. Every agent turn,
4//! tool call, and handoff is captured as an append-only event stream with
5//! byte-range offsets for efficient random access replay.
6//!
7//! ## Event Schema
8//!
9//! ```json
10//! {
11//!   "recorded_at": "2026-02-13T04:16:56.465006066+00:00",
12//!   "workspace": "/home/riley/A2A-Server-MCP",
13//!   "session_id": "36d04218-2a47-4fbe-8579-02c21be775bc",
14//!   "role": "tool",
15//!   "agent_name": null,
16//!   "message_type": "tool_result",
17//!   "content": "✓ bash",
18//!   "tool_name": "bash",
19//!   "tool_success": true,
20//!   "tool_duration_ms": 22515
21//! }
22//! ```
23//!
24//! ## File Naming Convention
25//!
26//! Files are named with byte-range offsets to enable random access:
27//! `{timestamp}-chat-events-{start_byte}-{end_byte}.jsonl`
28//!
29//! This allows seeking to any point in a session without reading the entire log.
30//!
31//! ## S3/R2 Archival
32//!
33//! The module supports automatic archival to S3-compatible storage:
34//! ```rust,ignore
35//! use codetether_agent::event_stream::s3_sink::S3Sink;
36//!
37//! let sink = S3Sink::from_env().await?;
38//! sink.upload_file(&local_path, &session_id).await?;
39//! ```
40
41pub mod s3_sink;
42
43use chrono::{DateTime, Utc};
44use serde::{Deserialize, Serialize};
45use std::path::PathBuf;
46
47/// Categories of events in the stream
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(rename_all = "snake_case")]
50pub enum EventCategory {
51    /// User message/turn
52    User,
53    /// Assistant/agent message
54    Assistant,
55    /// Tool execution result
56    ToolResult,
57    /// Agent handoff
58    Handoff,
59    /// Session lifecycle
60    Session,
61    /// Error event
62    Error,
63}
64
65/// A single event in the JSONL event stream
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct ChatEvent {
68    /// When the event was recorded (ISO 8601)
69    pub recorded_at: DateTime<Utc>,
70    /// Workspace/working directory
71    pub workspace: PathBuf,
72    /// Session identifier
73    pub session_id: String,
74    /// Role: user, assistant, tool
75    pub role: String,
76    /// Agent name if applicable
77    #[serde(default, skip_serializing_if = "Option::is_none")]
78    pub agent_name: Option<String>,
79    /// Message type: message_start, message_end, tool_call, tool_result, etc.
80    pub message_type: String,
81    /// Event content (truncated for large outputs)
82    #[serde(default, skip_serializing_if = "Option::is_none")]
83    pub content: Option<String>,
84    /// Tool name if this is a tool event
85    #[serde(default, skip_serializing_if = "Option::is_none")]
86    pub tool_name: Option<String>,
87    /// Whether tool succeeded
88    #[serde(default, skip_serializing_if = "Option::is_none")]
89    pub tool_success: Option<bool>,
90    /// Tool execution duration in milliseconds
91    #[serde(default, skip_serializing_if = "Option::is_none")]
92    pub tool_duration_ms: Option<u64>,
93    /// Parent event ID for traceability
94    #[serde(default, skip_serializing_if = "Option::is_none")]
95    pub parent_event_id: Option<String>,
96    /// Sequence number in the session
97    pub sequence: u64,
98}
99
100impl ChatEvent {
101    /// Create a new chat event
102    pub fn new(
103        workspace: PathBuf,
104        session_id: String,
105        role: &str,
106        message_type: &str,
107        sequence: u64,
108    ) -> Self {
109        Self {
110            recorded_at: Utc::now(),
111            workspace,
112            session_id,
113            role: role.to_string(),
114            agent_name: None,
115            message_type: message_type.to_string(),
116            content: None,
117            tool_name: None,
118            tool_success: None,
119            tool_duration_ms: None,
120            parent_event_id: None,
121            sequence,
122        }
123    }
124
125    /// Create a tool result event
126    pub fn tool_result(
127        workspace: PathBuf,
128        session_id: String,
129        tool_name: &str,
130        success: bool,
131        duration_ms: u64,
132        content: &str,
133        sequence: u64,
134    ) -> Self {
135        // Truncate content if too long (object storage optimization)
136        let max_content_len = 10_000;
137        let truncated_content = if content.len() > max_content_len {
138            format!(
139                "{}...[truncated {} bytes]",
140                &content[..max_content_len],
141                content.len()
142            )
143        } else {
144            content.to_string()
145        };
146
147        Self {
148            recorded_at: Utc::now(),
149            workspace,
150            session_id,
151            role: "tool".to_string(),
152            agent_name: None,
153            message_type: "tool_result".to_string(),
154            content: Some(truncated_content),
155            tool_name: Some(tool_name.to_string()),
156            tool_success: Some(success),
157            tool_duration_ms: Some(duration_ms),
158            parent_event_id: None,
159            sequence,
160        }
161    }
162
163    /// Serialize to JSON for JSONL output
164    pub fn to_json(&self) -> String {
165        serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string())
166    }
167}
168
169/// File metadata for byte-range indexed JSONL
170#[derive(Debug, Clone)]
171pub struct EventFile {
172    /// Path to the JSONL file
173    pub path: PathBuf,
174    /// Starting byte offset
175    pub start_offset: u64,
176    /// Ending byte offset
177    pub end_offset: u64,
178    /// Number of events in this file
179    pub event_count: u64,
180    /// First event timestamp
181    pub first_event_at: DateTime<Utc>,
182    /// Last event timestamp
183    pub last_event_at: DateTime<Utc>,
184}
185
186impl EventFile {
187    /// Generate filename with byte-range offsets
188    pub fn filename(_session_id: &str, start_offset: u64, end_offset: u64) -> String {
189        let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
190        format!(
191            "{}-chat-events-{:020}-{:020}.jsonl",
192            timestamp, start_offset, end_offset
193        )
194    }
195}
196
197/// Event stream writer that manages append-only JSONL with byte-range tracking
198pub struct EventStreamWriter {
199    session_id: String,
200    workspace: PathBuf,
201    /// Current file path
202    current_path: Option<PathBuf>,
203    /// Current byte offset
204    current_offset: u64,
205    /// Events written to current file
206    events_in_file: u64,
207    /// Max events per file before rotation
208    max_events_per_file: u64,
209    /// Max bytes per file before rotation
210    max_bytes_per_file: u64,
211    /// Sequence counter
212    sequence: u64,
213}
214
215impl EventStreamWriter {
216    /// Create a new event stream writer
217    pub fn new(
218        session_id: String,
219        workspace: PathBuf,
220        max_events_per_file: u64,
221        max_bytes_per_file: u64,
222    ) -> Self {
223        Self {
224            session_id,
225            workspace,
226            current_path: None,
227            current_offset: 0,
228            events_in_file: 0,
229            max_events_per_file,
230            max_bytes_per_file,
231            sequence: 0,
232        }
233    }
234
235    /// Write an event and return the byte range
236    pub async fn write_event(&mut self, event: ChatEvent) -> std::io::Result<(u64, u64)> {
237        use tokio::io::AsyncWriteExt;
238
239        let json = event.to_json();
240        let event_size = json.len() as u64 + 1; // +1 for newline
241
242        // Check if we need to rotate
243        if self.events_in_file >= self.max_events_per_file
244            || (self.current_offset + event_size) > self.max_bytes_per_file
245        {
246            self.rotate().await?;
247        }
248
249        // Generate filename if first file
250        if self.current_path.is_none() {
251            let filename = EventFile::filename(
252                &self.session_id,
253                self.current_offset,
254                self.current_offset + event_size,
255            );
256            self.current_path = Some(self.workspace.join(filename));
257        }
258
259        let start = self.current_offset;
260
261        // Append to file
262        if let Some(ref path) = self.current_path {
263            let mut file = tokio::fs::OpenOptions::new()
264                .create(true)
265                .append(true)
266                .open(path)
267                .await?;
268
269            file.write_all(json.as_bytes()).await?;
270            file.write_all(b"\n").await?;
271        }
272
273        let end = self.current_offset + event_size;
274        self.current_offset += event_size;
275        self.events_in_file += 1;
276        self.sequence += 1;
277
278        // Update filename with new end offset
279        if let Some(ref path) = self.current_path {
280            if let Some(parent) = path.parent() {
281                let filename = EventFile::filename(&self.session_id, start, end);
282                let new_path = parent.join(filename);
283                // Rename file if offset changed significantly
284                if new_path != *path {
285                    let _ = tokio::fs::rename(path, &new_path).await;
286                    self.current_path = Some(new_path);
287                }
288            }
289        }
290
291        Ok((start, end))
292    }
293
294    /// Rotate to a new file
295    async fn rotate(&mut self) -> std::io::Result<()> {
296        self.current_path = None;
297        self.events_in_file = 0;
298        Ok(())
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305
306    #[test]
307    fn test_event_serialization() {
308        let event = ChatEvent::tool_result(
309            PathBuf::from("/test/workspace"),
310            "session-123".to_string(),
311            "bash",
312            true,
313            22515,
314            "✓ bash",
315            1,
316        );
317
318        let json = event.to_json();
319        let parsed: ChatEvent = serde_json::from_str(&json).unwrap();
320
321        assert_eq!(parsed.session_id, "session-123");
322        assert_eq!(parsed.tool_name, Some("bash".to_string()));
323        assert_eq!(parsed.tool_success, Some(true));
324        assert_eq!(parsed.tool_duration_ms, Some(22515));
325    }
326
327    #[test]
328    fn test_filename_format() {
329        let filename = EventFile::filename("session-123", 1000, 2500);
330        assert!(filename.contains("chat-events-"));
331        // Filename ends with .jsonl, so we check for the offset followed by .jsonl
332        assert!(
333            filename.contains("-00000000000000001000-")
334                || filename.contains("-00000000000000001000.")
335        );
336        assert!(filename.contains("-00000000000000002500.jsonl"));
337    }
338}