Skip to main content

codetether_agent/event_stream/
mod.rs

1//! Event Stream Module
2//!
3//! Structured JSONL event sourcing for agent sessions. Every agent turn,
4//! tool call, and handoff is captured as an append-only event stream with
5//! byte-range offsets for efficient random access replay.
6//!
7//! ## Event Schema
8//!
9//! ```json
10//! {
11//!   "recorded_at": "2026-02-13T04:16:56.465006066+00:00",
12//!   "workspace": "/home/riley/A2A-Server-MCP",
13//!   "session_id": "36d04218-2a47-4fbe-8579-02c21be775bc",
14//!   "role": "tool",
15//!   "agent_name": null,
16//!   "message_type": "tool_result",
17//!   "content": "✓ bash",
18//!   "tool_name": "bash",
19//!   "tool_success": true,
20//!   "tool_duration_ms": 22515
21//! }
22//! ```
23//!
24//! ## File Naming Convention
25//!
26//! Files are named with byte-range offsets to enable random access:
27//! `{timestamp}-chat-events-{start_byte}-{end_byte}.jsonl`
28//!
29//! This allows seeking to any point in a session without reading the entire log.
30//!
31//! ## S3/R2 Archival
32//!
33//! The module supports automatic archival to S3-compatible storage:
34//! ```rust,ignore
35//! use codetether_agent::event_stream::s3_sink::S3Sink;
36//!
37//! let sink = S3Sink::from_env().await?;
38//! sink.upload_file(&local_path, &session_id).await?;
39//! ```
40
41pub mod s3_sink;
42
43use chrono::{DateTime, Utc};
44use serde::{Deserialize, Serialize};
45use std::path::PathBuf;
46
47/// Categories of events in the stream
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(rename_all = "snake_case")]
50pub enum EventCategory {
51    /// User message/turn
52    User,
53    /// Assistant/agent message
54    Assistant,
55    /// Tool execution result
56    ToolResult,
57    /// Agent handoff
58    Handoff,
59    /// Session lifecycle
60    Session,
61    /// Error event
62    Error,
63}
64
65/// A single event in the JSONL event stream
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct ChatEvent {
68    /// When the event was recorded (ISO 8601)
69    pub recorded_at: DateTime<Utc>,
70    /// Workspace/working directory
71    pub workspace: PathBuf,
72    /// Session identifier
73    pub session_id: String,
74    /// Role: user, assistant, tool
75    pub role: String,
76    /// Agent name if applicable
77    #[serde(default, skip_serializing_if = "Option::is_none")]
78    pub agent_name: Option<String>,
79    /// Message type: message_start, message_end, tool_call, tool_result, etc.
80    pub message_type: String,
81    /// Event content (truncated for large outputs)
82    #[serde(default, skip_serializing_if = "Option::is_none")]
83    pub content: Option<String>,
84    /// Tool name if this is a tool event
85    #[serde(default, skip_serializing_if = "Option::is_none")]
86    pub tool_name: Option<String>,
87    /// Whether tool succeeded
88    #[serde(default, skip_serializing_if = "Option::is_none")]
89    pub tool_success: Option<bool>,
90    /// Tool execution duration in milliseconds
91    #[serde(default, skip_serializing_if = "Option::is_none")]
92    pub tool_duration_ms: Option<u64>,
93    /// Parent event ID for traceability
94    #[serde(default, skip_serializing_if = "Option::is_none")]
95    pub parent_event_id: Option<String>,
96    /// Sequence number in the session
97    pub sequence: u64,
98    /// OKR ID if this event is part of an OKR-gated operation
99    #[serde(default, skip_serializing_if = "Option::is_none")]
100    pub okr_id: Option<String>,
101    /// OKR run ID if this event is part of an OKR run
102    #[serde(default, skip_serializing_if = "Option::is_none")]
103    pub okr_run_id: Option<String>,
104    /// Relay ID if this event is part of a relay execution
105    #[serde(default, skip_serializing_if = "Option::is_none")]
106    pub relay_id: Option<String>,
107}
108
109impl ChatEvent {
110    /// Create a new chat event
111    pub fn new(
112        workspace: PathBuf,
113        session_id: String,
114        role: &str,
115        message_type: &str,
116        sequence: u64,
117    ) -> Self {
118        Self {
119            recorded_at: Utc::now(),
120            workspace,
121            session_id,
122            role: role.to_string(),
123            agent_name: None,
124            message_type: message_type.to_string(),
125            content: None,
126            tool_name: None,
127            tool_success: None,
128            tool_duration_ms: None,
129            parent_event_id: None,
130            sequence,
131            okr_id: None,
132            okr_run_id: None,
133            relay_id: None,
134        }
135    }
136
137    /// Create a chat event with OKR correlation
138    pub fn with_okr(
139        mut self,
140        okr_id: Option<String>,
141        okr_run_id: Option<String>,
142        relay_id: Option<String>,
143    ) -> Self {
144        self.okr_id = okr_id;
145        self.okr_run_id = okr_run_id;
146        self.relay_id = relay_id;
147        self
148    }
149
150    /// Create a tool result event
151    pub fn tool_result(
152        workspace: PathBuf,
153        session_id: String,
154        tool_name: &str,
155        success: bool,
156        duration_ms: u64,
157        content: &str,
158        sequence: u64,
159    ) -> Self {
160        // Truncate content if too long (object storage optimization)
161        let max_content_len = 10_000;
162        let truncated_content = if content.len() > max_content_len {
163            format!(
164                "{}...[truncated {} bytes]",
165                &content[..max_content_len],
166                content.len()
167            )
168        } else {
169            content.to_string()
170        };
171
172        Self {
173            recorded_at: Utc::now(),
174            workspace,
175            session_id,
176            role: "tool".to_string(),
177            agent_name: None,
178            message_type: "tool_result".to_string(),
179            content: Some(truncated_content),
180            tool_name: Some(tool_name.to_string()),
181            tool_success: Some(success),
182            tool_duration_ms: Some(duration_ms),
183            parent_event_id: None,
184            sequence,
185            okr_id: None,
186            okr_run_id: None,
187            relay_id: None,
188        }
189    }
190
191    /// Create a tool result event with OKR correlation
192    pub fn tool_result_with_okr(
193        workspace: PathBuf,
194        session_id: String,
195        tool_name: &str,
196        success: bool,
197        duration_ms: u64,
198        content: &str,
199        sequence: u64,
200        okr_id: Option<String>,
201        okr_run_id: Option<String>,
202        relay_id: Option<String>,
203    ) -> Self {
204        let mut event = Self::tool_result(
205            workspace,
206            session_id,
207            tool_name,
208            success,
209            duration_ms,
210            content,
211            sequence,
212        );
213        event.okr_id = okr_id;
214        event.okr_run_id = okr_run_id;
215        event.relay_id = relay_id;
216        event
217    }
218
219    /// Serialize to JSON for JSONL output
220    pub fn to_json(&self) -> String {
221        serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string())
222    }
223}
224
225/// File metadata for byte-range indexed JSONL
226#[derive(Debug, Clone)]
227pub struct EventFile {
228    /// Path to the JSONL file
229    pub path: PathBuf,
230    /// Starting byte offset
231    pub start_offset: u64,
232    /// Ending byte offset
233    pub end_offset: u64,
234    /// Number of events in this file
235    pub event_count: u64,
236    /// First event timestamp
237    pub first_event_at: DateTime<Utc>,
238    /// Last event timestamp
239    pub last_event_at: DateTime<Utc>,
240}
241
242impl EventFile {
243    /// Generate filename with byte-range offsets
244    pub fn filename(_session_id: &str, start_offset: u64, end_offset: u64) -> String {
245        let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
246        format!(
247            "{}-chat-events-{:020}-{:020}.jsonl",
248            timestamp, start_offset, end_offset
249        )
250    }
251}
252
253/// Event stream writer that manages append-only JSONL with byte-range tracking
254pub struct EventStreamWriter {
255    session_id: String,
256    workspace: PathBuf,
257    /// Current file path
258    current_path: Option<PathBuf>,
259    /// Current byte offset
260    current_offset: u64,
261    /// Events written to current file
262    events_in_file: u64,
263    /// Max events per file before rotation
264    max_events_per_file: u64,
265    /// Max bytes per file before rotation
266    max_bytes_per_file: u64,
267    /// Sequence counter
268    sequence: u64,
269}
270
271impl EventStreamWriter {
272    /// Create a new event stream writer
273    pub fn new(
274        session_id: String,
275        workspace: PathBuf,
276        max_events_per_file: u64,
277        max_bytes_per_file: u64,
278    ) -> Self {
279        Self {
280            session_id,
281            workspace,
282            current_path: None,
283            current_offset: 0,
284            events_in_file: 0,
285            max_events_per_file,
286            max_bytes_per_file,
287            sequence: 0,
288        }
289    }
290
291    /// Write an event and return the byte range
292    pub async fn write_event(&mut self, event: ChatEvent) -> std::io::Result<(u64, u64)> {
293        use tokio::io::AsyncWriteExt;
294
295        let json = event.to_json();
296        let event_size = json.len() as u64 + 1; // +1 for newline
297
298        // Check if we need to rotate
299        if self.events_in_file >= self.max_events_per_file
300            || (self.current_offset + event_size) > self.max_bytes_per_file
301        {
302            self.rotate().await?;
303        }
304
305        // Generate filename if first file
306        if self.current_path.is_none() {
307            let filename = EventFile::filename(
308                &self.session_id,
309                self.current_offset,
310                self.current_offset + event_size,
311            );
312            self.current_path = Some(self.workspace.join(filename));
313        }
314
315        let start = self.current_offset;
316
317        // Append to file
318        if let Some(ref path) = self.current_path {
319            let mut file = tokio::fs::OpenOptions::new()
320                .create(true)
321                .append(true)
322                .open(path)
323                .await?;
324
325            file.write_all(json.as_bytes()).await?;
326            file.write_all(b"\n").await?;
327        }
328
329        let end = self.current_offset + event_size;
330        self.current_offset += event_size;
331        self.events_in_file += 1;
332        self.sequence += 1;
333
334        // Update filename with new end offset
335        if let Some(ref path) = self.current_path {
336            if let Some(parent) = path.parent() {
337                let filename = EventFile::filename(&self.session_id, start, end);
338                let new_path = parent.join(filename);
339                // Rename file if offset changed significantly
340                if new_path != *path {
341                    let _ = tokio::fs::rename(path, &new_path).await;
342                    self.current_path = Some(new_path);
343                }
344            }
345        }
346
347        Ok((start, end))
348    }
349
350    /// Rotate to a new file
351    async fn rotate(&mut self) -> std::io::Result<()> {
352        self.current_path = None;
353        self.events_in_file = 0;
354        Ok(())
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361
362    #[test]
363    fn test_event_serialization() {
364        let event = ChatEvent::tool_result(
365            PathBuf::from("/test/workspace"),
366            "session-123".to_string(),
367            "bash",
368            true,
369            22515,
370            "✓ bash",
371            1,
372        );
373
374        let json = event.to_json();
375        let parsed: ChatEvent = serde_json::from_str(&json).unwrap();
376
377        assert_eq!(parsed.session_id, "session-123");
378        assert_eq!(parsed.tool_name, Some("bash".to_string()));
379        assert_eq!(parsed.tool_success, Some(true));
380        assert_eq!(parsed.tool_duration_ms, Some(22515));
381    }
382
383    #[test]
384    fn test_filename_format() {
385        let filename = EventFile::filename("session-123", 1000, 2500);
386        assert!(filename.contains("chat-events-"));
387        // Filename ends with .jsonl, so we check for the offset followed by .jsonl
388        assert!(
389            filename.contains("-00000000000000001000-")
390                || filename.contains("-00000000000000001000.")
391        );
392        assert!(filename.contains("-00000000000000002500.jsonl"));
393    }
394}