codetether_agent/event_stream/
mod.rs1pub mod s3_sink;
42
43use chrono::{DateTime, Utc};
44use serde::{Deserialize, Serialize};
45use std::path::PathBuf;
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(rename_all = "snake_case")]
50pub enum EventCategory {
51 User,
53 Assistant,
55 ToolResult,
57 Handoff,
59 Session,
61 Error,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct ChatEvent {
68 pub recorded_at: DateTime<Utc>,
70 pub workspace: PathBuf,
72 pub session_id: String,
74 pub role: String,
76 #[serde(default, skip_serializing_if = "Option::is_none")]
78 pub agent_name: Option<String>,
79 pub message_type: String,
81 #[serde(default, skip_serializing_if = "Option::is_none")]
83 pub content: Option<String>,
84 #[serde(default, skip_serializing_if = "Option::is_none")]
86 pub tool_name: Option<String>,
87 #[serde(default, skip_serializing_if = "Option::is_none")]
89 pub tool_success: Option<bool>,
90 #[serde(default, skip_serializing_if = "Option::is_none")]
92 pub tool_duration_ms: Option<u64>,
93 #[serde(default, skip_serializing_if = "Option::is_none")]
95 pub parent_event_id: Option<String>,
96 pub sequence: u64,
98}
99
100impl ChatEvent {
101 pub fn new(
103 workspace: PathBuf,
104 session_id: String,
105 role: &str,
106 message_type: &str,
107 sequence: u64,
108 ) -> Self {
109 Self {
110 recorded_at: Utc::now(),
111 workspace,
112 session_id,
113 role: role.to_string(),
114 agent_name: None,
115 message_type: message_type.to_string(),
116 content: None,
117 tool_name: None,
118 tool_success: None,
119 tool_duration_ms: None,
120 parent_event_id: None,
121 sequence,
122 }
123 }
124
125 pub fn tool_result(
127 workspace: PathBuf,
128 session_id: String,
129 tool_name: &str,
130 success: bool,
131 duration_ms: u64,
132 content: &str,
133 sequence: u64,
134 ) -> Self {
135 let max_content_len = 10_000;
137 let truncated_content = if content.len() > max_content_len {
138 format!(
139 "{}...[truncated {} bytes]",
140 &content[..max_content_len],
141 content.len()
142 )
143 } else {
144 content.to_string()
145 };
146
147 Self {
148 recorded_at: Utc::now(),
149 workspace,
150 session_id,
151 role: "tool".to_string(),
152 agent_name: None,
153 message_type: "tool_result".to_string(),
154 content: Some(truncated_content),
155 tool_name: Some(tool_name.to_string()),
156 tool_success: Some(success),
157 tool_duration_ms: Some(duration_ms),
158 parent_event_id: None,
159 sequence,
160 }
161 }
162
163 pub fn to_json(&self) -> String {
165 serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string())
166 }
167}
168
169#[derive(Debug, Clone)]
171pub struct EventFile {
172 pub path: PathBuf,
174 pub start_offset: u64,
176 pub end_offset: u64,
178 pub event_count: u64,
180 pub first_event_at: DateTime<Utc>,
182 pub last_event_at: DateTime<Utc>,
184}
185
186impl EventFile {
187 pub fn filename(_session_id: &str, start_offset: u64, end_offset: u64) -> String {
189 let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
190 format!(
191 "{}-chat-events-{:020}-{:020}.jsonl",
192 timestamp, start_offset, end_offset
193 )
194 }
195}
196
197pub struct EventStreamWriter {
199 session_id: String,
200 workspace: PathBuf,
201 current_path: Option<PathBuf>,
203 current_offset: u64,
205 events_in_file: u64,
207 max_events_per_file: u64,
209 max_bytes_per_file: u64,
211 sequence: u64,
213}
214
215impl EventStreamWriter {
216 pub fn new(
218 session_id: String,
219 workspace: PathBuf,
220 max_events_per_file: u64,
221 max_bytes_per_file: u64,
222 ) -> Self {
223 Self {
224 session_id,
225 workspace,
226 current_path: None,
227 current_offset: 0,
228 events_in_file: 0,
229 max_events_per_file,
230 max_bytes_per_file,
231 sequence: 0,
232 }
233 }
234
235 pub async fn write_event(&mut self, event: ChatEvent) -> std::io::Result<(u64, u64)> {
237 use tokio::io::AsyncWriteExt;
238
239 let json = event.to_json();
240 let event_size = json.len() as u64 + 1; if self.events_in_file >= self.max_events_per_file
244 || (self.current_offset + event_size) > self.max_bytes_per_file
245 {
246 self.rotate().await?;
247 }
248
249 if self.current_path.is_none() {
251 let filename = EventFile::filename(
252 &self.session_id,
253 self.current_offset,
254 self.current_offset + event_size,
255 );
256 self.current_path = Some(self.workspace.join(filename));
257 }
258
259 let start = self.current_offset;
260
261 if let Some(ref path) = self.current_path {
263 let mut file = tokio::fs::OpenOptions::new()
264 .create(true)
265 .append(true)
266 .open(path)
267 .await?;
268
269 file.write_all(json.as_bytes()).await?;
270 file.write_all(b"\n").await?;
271 }
272
273 let end = self.current_offset + event_size;
274 self.current_offset += event_size;
275 self.events_in_file += 1;
276 self.sequence += 1;
277
278 if let Some(ref path) = self.current_path {
280 if let Some(parent) = path.parent() {
281 let filename = EventFile::filename(&self.session_id, start, end);
282 let new_path = parent.join(filename);
283 if new_path != *path {
285 let _ = tokio::fs::rename(path, &new_path).await;
286 self.current_path = Some(new_path);
287 }
288 }
289 }
290
291 Ok((start, end))
292 }
293
294 async fn rotate(&mut self) -> std::io::Result<()> {
296 self.current_path = None;
297 self.events_in_file = 0;
298 Ok(())
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305
306 #[test]
307 fn test_event_serialization() {
308 let event = ChatEvent::tool_result(
309 PathBuf::from("/test/workspace"),
310 "session-123".to_string(),
311 "bash",
312 true,
313 22515,
314 "✓ bash",
315 1,
316 );
317
318 let json = event.to_json();
319 let parsed: ChatEvent = serde_json::from_str(&json).unwrap();
320
321 assert_eq!(parsed.session_id, "session-123");
322 assert_eq!(parsed.tool_name, Some("bash".to_string()));
323 assert_eq!(parsed.tool_success, Some(true));
324 assert_eq!(parsed.tool_duration_ms, Some(22515));
325 }
326
327 #[test]
328 fn test_filename_format() {
329 let filename = EventFile::filename("session-123", 1000, 2500);
330 assert!(filename.contains("chat-events-"));
331 assert!(
333 filename.contains("-00000000000000001000-")
334 || filename.contains("-00000000000000001000.")
335 );
336 assert!(filename.contains("-00000000000000002500.jsonl"));
337 }
338}