codetether_agent/event_stream/
mod.rs1pub mod s3_sink;
42
43use chrono::{DateTime, Utc};
44use serde::{Deserialize, Serialize};
45use std::path::PathBuf;
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(rename_all = "snake_case")]
50pub enum EventCategory {
51 User,
53 Assistant,
55 ToolResult,
57 Handoff,
59 Session,
61 Error,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct ChatEvent {
68 pub recorded_at: DateTime<Utc>,
70 pub workspace: PathBuf,
72 pub session_id: String,
74 pub role: String,
76 #[serde(default, skip_serializing_if = "Option::is_none")]
78 pub agent_name: Option<String>,
79 pub message_type: String,
81 #[serde(default, skip_serializing_if = "Option::is_none")]
83 pub content: Option<String>,
84 #[serde(default, skip_serializing_if = "Option::is_none")]
86 pub tool_name: Option<String>,
87 #[serde(default, skip_serializing_if = "Option::is_none")]
89 pub tool_success: Option<bool>,
90 #[serde(default, skip_serializing_if = "Option::is_none")]
92 pub tool_duration_ms: Option<u64>,
93 #[serde(default, skip_serializing_if = "Option::is_none")]
95 pub parent_event_id: Option<String>,
96 pub sequence: u64,
98 #[serde(default, skip_serializing_if = "Option::is_none")]
100 pub okr_id: Option<String>,
101 #[serde(default, skip_serializing_if = "Option::is_none")]
103 pub okr_run_id: Option<String>,
104 #[serde(default, skip_serializing_if = "Option::is_none")]
106 pub relay_id: Option<String>,
107}
108
109impl ChatEvent {
110 pub fn new(
112 workspace: PathBuf,
113 session_id: String,
114 role: &str,
115 message_type: &str,
116 sequence: u64,
117 ) -> Self {
118 Self {
119 recorded_at: Utc::now(),
120 workspace,
121 session_id,
122 role: role.to_string(),
123 agent_name: None,
124 message_type: message_type.to_string(),
125 content: None,
126 tool_name: None,
127 tool_success: None,
128 tool_duration_ms: None,
129 parent_event_id: None,
130 sequence,
131 okr_id: None,
132 okr_run_id: None,
133 relay_id: None,
134 }
135 }
136
137 pub fn with_okr(
139 mut self,
140 okr_id: Option<String>,
141 okr_run_id: Option<String>,
142 relay_id: Option<String>,
143 ) -> Self {
144 self.okr_id = okr_id;
145 self.okr_run_id = okr_run_id;
146 self.relay_id = relay_id;
147 self
148 }
149
150 pub fn tool_result(
152 workspace: PathBuf,
153 session_id: String,
154 tool_name: &str,
155 success: bool,
156 duration_ms: u64,
157 content: &str,
158 sequence: u64,
159 ) -> Self {
160 let max_content_len = 10_000;
162 let truncated_content = if content.len() > max_content_len {
163 format!(
164 "{}...[truncated {} bytes]",
165 &content[..max_content_len],
166 content.len()
167 )
168 } else {
169 content.to_string()
170 };
171
172 Self {
173 recorded_at: Utc::now(),
174 workspace,
175 session_id,
176 role: "tool".to_string(),
177 agent_name: None,
178 message_type: "tool_result".to_string(),
179 content: Some(truncated_content),
180 tool_name: Some(tool_name.to_string()),
181 tool_success: Some(success),
182 tool_duration_ms: Some(duration_ms),
183 parent_event_id: None,
184 sequence,
185 okr_id: None,
186 okr_run_id: None,
187 relay_id: None,
188 }
189 }
190
191 pub fn tool_result_with_okr(
193 workspace: PathBuf,
194 session_id: String,
195 tool_name: &str,
196 success: bool,
197 duration_ms: u64,
198 content: &str,
199 sequence: u64,
200 okr_id: Option<String>,
201 okr_run_id: Option<String>,
202 relay_id: Option<String>,
203 ) -> Self {
204 let mut event = Self::tool_result(
205 workspace,
206 session_id,
207 tool_name,
208 success,
209 duration_ms,
210 content,
211 sequence,
212 );
213 event.okr_id = okr_id;
214 event.okr_run_id = okr_run_id;
215 event.relay_id = relay_id;
216 event
217 }
218
219 pub fn to_json(&self) -> String {
221 serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string())
222 }
223}
224
225#[derive(Debug, Clone)]
227pub struct EventFile {
228 pub path: PathBuf,
230 pub start_offset: u64,
232 pub end_offset: u64,
234 pub event_count: u64,
236 pub first_event_at: DateTime<Utc>,
238 pub last_event_at: DateTime<Utc>,
240}
241
242impl EventFile {
243 pub fn filename(_session_id: &str, start_offset: u64, end_offset: u64) -> String {
245 let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
246 format!(
247 "{}-chat-events-{:020}-{:020}.jsonl",
248 timestamp, start_offset, end_offset
249 )
250 }
251}
252
253pub struct EventStreamWriter {
255 session_id: String,
256 workspace: PathBuf,
257 current_path: Option<PathBuf>,
259 current_offset: u64,
261 events_in_file: u64,
263 max_events_per_file: u64,
265 max_bytes_per_file: u64,
267 sequence: u64,
269}
270
271impl EventStreamWriter {
272 pub fn new(
274 session_id: String,
275 workspace: PathBuf,
276 max_events_per_file: u64,
277 max_bytes_per_file: u64,
278 ) -> Self {
279 Self {
280 session_id,
281 workspace,
282 current_path: None,
283 current_offset: 0,
284 events_in_file: 0,
285 max_events_per_file,
286 max_bytes_per_file,
287 sequence: 0,
288 }
289 }
290
291 pub async fn write_event(&mut self, event: ChatEvent) -> std::io::Result<(u64, u64)> {
293 use tokio::io::AsyncWriteExt;
294
295 let json = event.to_json();
296 let event_size = json.len() as u64 + 1; if self.events_in_file >= self.max_events_per_file
300 || (self.current_offset + event_size) > self.max_bytes_per_file
301 {
302 self.rotate().await?;
303 }
304
305 if self.current_path.is_none() {
307 let filename = EventFile::filename(
308 &self.session_id,
309 self.current_offset,
310 self.current_offset + event_size,
311 );
312 self.current_path = Some(self.workspace.join(filename));
313 }
314
315 let start = self.current_offset;
316
317 if let Some(ref path) = self.current_path {
319 let mut file = tokio::fs::OpenOptions::new()
320 .create(true)
321 .append(true)
322 .open(path)
323 .await?;
324
325 file.write_all(json.as_bytes()).await?;
326 file.write_all(b"\n").await?;
327 }
328
329 let end = self.current_offset + event_size;
330 self.current_offset += event_size;
331 self.events_in_file += 1;
332 self.sequence += 1;
333
334 if let Some(ref path) = self.current_path {
336 if let Some(parent) = path.parent() {
337 let filename = EventFile::filename(&self.session_id, start, end);
338 let new_path = parent.join(filename);
339 if new_path != *path {
341 let _ = tokio::fs::rename(path, &new_path).await;
342 self.current_path = Some(new_path);
343 }
344 }
345 }
346
347 Ok((start, end))
348 }
349
350 async fn rotate(&mut self) -> std::io::Result<()> {
352 self.current_path = None;
353 self.events_in_file = 0;
354 Ok(())
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361
362 #[test]
363 fn test_event_serialization() {
364 let event = ChatEvent::tool_result(
365 PathBuf::from("/test/workspace"),
366 "session-123".to_string(),
367 "bash",
368 true,
369 22515,
370 "✓ bash",
371 1,
372 );
373
374 let json = event.to_json();
375 let parsed: ChatEvent = serde_json::from_str(&json).unwrap();
376
377 assert_eq!(parsed.session_id, "session-123");
378 assert_eq!(parsed.tool_name, Some("bash".to_string()));
379 assert_eq!(parsed.tool_success, Some(true));
380 assert_eq!(parsed.tool_duration_ms, Some(22515));
381 }
382
383 #[test]
384 fn test_filename_format() {
385 let filename = EventFile::filename("session-123", 1000, 2500);
386 assert!(filename.contains("chat-events-"));
387 assert!(
389 filename.contains("-00000000000000001000-")
390 || filename.contains("-00000000000000001000.")
391 );
392 assert!(filename.contains("-00000000000000002500.jsonl"));
393 }
394}