1use rusqlite::{Connection, params};
2use thiserror::Error;
3
4#[derive(Debug, Error)]
5pub enum RecorderError {
6 #[error("database error: {0}")]
7 Database(#[from] rusqlite::Error),
8 #[error("session not found: {0}")]
9 SessionNotFound(String),
10}
11
12pub struct SessionRecorder {
13 db: Connection,
14}
15
16pub struct SessionAction {
17 pub id: i64,
18 pub session_id: String,
19 pub step_number: u32,
20 pub timestamp: String,
21 pub action_type: String,
22 pub agent_pid: u32,
23 pub payload: String,
24}
25
26pub struct SessionState {
27 pub session_id: String,
28 pub actions_up_to_step: Vec<SessionAction>,
29 pub step: u32,
30}
31
32impl SessionRecorder {
33 pub fn new(db: Connection) -> Self {
34 Self { db }
35 }
36
37 pub fn record_action(
38 &self,
39 session_id: &str,
40 agent_pid: u32,
41 action_type: &str,
42 payload: serde_json::Value,
43 ) -> Result<u32, RecorderError> {
44 let next_step: u32 = self.db.query_row(
45 "SELECT COALESCE(MAX(step_number), 0) + 1 FROM session_actions WHERE session_id = ?1",
46 params![session_id],
47 |row| row.get::<_, u32>(0),
48 )?;
49
50 let timestamp = chrono::Utc::now().to_rfc3339();
51 self.db.execute(
52 "INSERT INTO session_actions (session_id, step_number, timestamp, action_type, agent_pid, payload) \
53 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
54 params![session_id, next_step, timestamp, action_type, agent_pid, payload.to_string()],
55 )?;
56
57 Ok(next_step)
58 }
59
60 pub fn get_log(&self, session_id: &str) -> Result<Vec<SessionAction>, RecorderError> {
61 let mut stmt = self.db.prepare(
62 "SELECT id, session_id, step_number, timestamp, action_type, agent_pid, payload \
63 FROM session_actions WHERE session_id = ?1 ORDER BY step_number ASC",
64 )?;
65 let rows = stmt.query_map(params![session_id], |row| {
66 Ok(SessionAction {
67 id: row.get(0)?,
68 session_id: row.get(1)?,
69 step_number: row.get(2)?,
70 timestamp: row.get(3)?,
71 action_type: row.get(4)?,
72 agent_pid: row.get(5)?,
73 payload: row.get(6)?,
74 })
75 })?;
76 rows.collect::<Result<Vec<_>, _>>().map_err(RecorderError::Database)
77 }
78
79 pub fn get_state_at_step(&self, session_id: &str, step: u32) -> Result<SessionState, RecorderError> {
80 let mut stmt = self.db.prepare(
81 "SELECT id, session_id, step_number, timestamp, action_type, agent_pid, payload \
82 FROM session_actions WHERE session_id = ?1 AND step_number <= ?2 ORDER BY step_number ASC",
83 )?;
84 let rows = stmt.query_map(params![session_id, step], |row| {
85 Ok(SessionAction {
86 id: row.get(0)?,
87 session_id: row.get(1)?,
88 step_number: row.get(2)?,
89 timestamp: row.get(3)?,
90 action_type: row.get(4)?,
91 agent_pid: row.get(5)?,
92 payload: row.get(6)?,
93 })
94 })?;
95 let actions = rows.collect::<Result<Vec<_>, _>>().map_err(RecorderError::Database)?;
96 Ok(SessionState { session_id: session_id.to_string(), actions_up_to_step: actions, step })
97 }
98
99 pub fn cleanup_old_sessions(&self, retention_days: u32) -> Result<u64, RecorderError> {
100 let cutoff = chrono::Utc::now() - chrono::Duration::days(retention_days as i64);
101 let cutoff_str = cutoff.to_rfc3339();
102 let deleted = self.db.execute(
103 "DELETE FROM session_actions WHERE timestamp < ?1",
104 params![cutoff_str],
105 )?;
106 Ok(deleted as u64)
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use super::*;
113 use crate::db::init_database;
114 use tempfile::NamedTempFile;
115
116 fn make_recorder() -> (NamedTempFile, SessionRecorder) {
117 let f = NamedTempFile::new().unwrap();
118 let conn = init_database(f.path()).unwrap();
119 conn.execute(
120 "INSERT INTO sessions (id, started_at, status) VALUES ('sess-1', datetime('now'), 'Active')",
121 [],
122 ).unwrap();
123 (f, SessionRecorder::new(conn))
124 }
125
126 #[test]
127 fn record_action_stores_correct_data() {
128 let (_f, rec) = make_recorder();
129 let payload = serde_json::json!({"path": "/tmp/foo.txt"});
130 let step = rec.record_action("sess-1", 42, "file_write", payload).unwrap();
131 assert_eq!(step, 1);
132 let log = rec.get_log("sess-1").unwrap();
133 assert_eq!(log.len(), 1);
134 let action = &log[0];
135 assert_eq!(action.session_id, "sess-1");
136 assert_eq!(action.step_number, 1);
137 assert_eq!(action.action_type, "file_write");
138 assert_eq!(action.agent_pid, 42);
139 assert!(!action.timestamp.is_empty());
140 assert!(action.payload.contains("/tmp/foo.txt"));
141 }
142
143 #[test]
144 fn step_numbers_are_sequential() {
145 let (_f, rec) = make_recorder();
146 let s1 = rec.record_action("sess-1", 1, "file_read", serde_json::json!({})).unwrap();
147 let s2 = rec.record_action("sess-1", 1, "api_call", serde_json::json!({})).unwrap();
148 let s3 = rec.record_action("sess-1", 1, "msg_sent", serde_json::json!({})).unwrap();
149 assert_eq!(s1, 1);
150 assert_eq!(s2, 2);
151 assert_eq!(s3, 3);
152 }
153
154 #[test]
155 fn get_log_returns_chronological_order() {
156 let (_f, rec) = make_recorder();
157 for action_type in &["file_read", "api_call", "llm_prompt", "llm_response"] {
158 rec.record_action("sess-1", 1, action_type, serde_json::json!({})).unwrap();
159 }
160 let log = rec.get_log("sess-1").unwrap();
161 assert_eq!(log.len(), 4);
162 for (i, action) in log.iter().enumerate() {
163 assert_eq!(action.step_number, (i + 1) as u32);
164 }
165 }
166
167 #[test]
168 fn get_state_at_step_returns_actions_up_to_step() {
169 let (_f, rec) = make_recorder();
170 for action_type in &["file_read", "file_write", "api_call", "msg_sent"] {
171 rec.record_action("sess-1", 1, action_type, serde_json::json!({})).unwrap();
172 }
173 let state = rec.get_state_at_step("sess-1", 2).unwrap();
174 assert_eq!(state.step, 2);
175 assert_eq!(state.actions_up_to_step.len(), 2);
176 assert_eq!(state.actions_up_to_step[0].action_type, "file_read");
177 assert_eq!(state.actions_up_to_step[1].action_type, "file_write");
178 }
179
180 #[test]
181 fn get_state_at_step_includes_step_itself() {
182 let (_f, rec) = make_recorder();
183 rec.record_action("sess-1", 1, "file_read", serde_json::json!({})).unwrap();
184 rec.record_action("sess-1", 1, "file_write", serde_json::json!({})).unwrap();
185 let state = rec.get_state_at_step("sess-1", 1).unwrap();
186 assert_eq!(state.actions_up_to_step.len(), 1);
187 }
188
189 #[test]
190 fn cleanup_removes_old_actions() {
191 let f = NamedTempFile::new().unwrap();
192 let conn = init_database(f.path()).unwrap();
193 conn.execute(
194 "INSERT INTO sessions (id, started_at, status) VALUES ('sess-old', datetime('now'), 'Active')",
195 [],
196 ).unwrap();
197 conn.execute(
198 "INSERT INTO session_actions (session_id, step_number, timestamp, action_type, agent_pid, payload) \
199 VALUES ('sess-old', 1, '2000-01-01T00:00:00+00:00', 'file_read', 1, '{}')",
200 [],
201 ).unwrap();
202 let rec = SessionRecorder::new(conn);
203 let deleted = rec.cleanup_old_sessions(30).unwrap();
204 assert_eq!(deleted, 1);
205 assert!(rec.get_log("sess-old").unwrap().is_empty());
206 }
207
208 #[test]
209 fn cleanup_preserves_recent_actions() {
210 let (_f, rec) = make_recorder();
211 rec.record_action("sess-1", 1, "file_read", serde_json::json!({})).unwrap();
212 let deleted = rec.cleanup_old_sessions(30).unwrap();
213 assert_eq!(deleted, 0);
214 assert_eq!(rec.get_log("sess-1").unwrap().len(), 1);
215 }
216}