Skip to main content

openhawk_core/
session_recorder.rs

1use rusqlite::{Connection, params};
2use thiserror::Error;
3
4#[derive(Debug, Error)]
5pub enum RecorderError {
6    #[error("database error: {0}")]
7    Database(#[from] rusqlite::Error),
8    #[error("session not found: {0}")]
9    SessionNotFound(String),
10}
11
12pub struct SessionRecorder {
13    db: Connection,
14}
15
16pub struct SessionAction {
17    pub id: i64,
18    pub session_id: String,
19    pub step_number: u32,
20    pub timestamp: String,
21    pub action_type: String,
22    pub agent_pid: u32,
23    pub payload: String,
24}
25
26pub struct SessionState {
27    pub session_id: String,
28    pub actions_up_to_step: Vec<SessionAction>,
29    pub step: u32,
30}
31
32impl SessionRecorder {
33    pub fn new(db: Connection) -> Self {
34        Self { db }
35    }
36
37    pub fn record_action(
38        &self,
39        session_id: &str,
40        agent_pid: u32,
41        action_type: &str,
42        payload: serde_json::Value,
43    ) -> Result<u32, RecorderError> {
44        let next_step: u32 = self.db.query_row(
45            "SELECT COALESCE(MAX(step_number), 0) + 1 FROM session_actions WHERE session_id = ?1",
46            params![session_id],
47            |row| row.get::<_, u32>(0),
48        )?;
49
50        let timestamp = chrono::Utc::now().to_rfc3339();
51        self.db.execute(
52            "INSERT INTO session_actions (session_id, step_number, timestamp, action_type, agent_pid, payload) \
53             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
54            params![session_id, next_step, timestamp, action_type, agent_pid, payload.to_string()],
55        )?;
56
57        Ok(next_step)
58    }
59
60    pub fn get_log(&self, session_id: &str) -> Result<Vec<SessionAction>, RecorderError> {
61        let mut stmt = self.db.prepare(
62            "SELECT id, session_id, step_number, timestamp, action_type, agent_pid, payload \
63             FROM session_actions WHERE session_id = ?1 ORDER BY step_number ASC",
64        )?;
65        let rows = stmt.query_map(params![session_id], |row| {
66            Ok(SessionAction {
67                id: row.get(0)?,
68                session_id: row.get(1)?,
69                step_number: row.get(2)?,
70                timestamp: row.get(3)?,
71                action_type: row.get(4)?,
72                agent_pid: row.get(5)?,
73                payload: row.get(6)?,
74            })
75        })?;
76        rows.collect::<Result<Vec<_>, _>>().map_err(RecorderError::Database)
77    }
78
79    pub fn get_state_at_step(&self, session_id: &str, step: u32) -> Result<SessionState, RecorderError> {
80        let mut stmt = self.db.prepare(
81            "SELECT id, session_id, step_number, timestamp, action_type, agent_pid, payload \
82             FROM session_actions WHERE session_id = ?1 AND step_number <= ?2 ORDER BY step_number ASC",
83        )?;
84        let rows = stmt.query_map(params![session_id, step], |row| {
85            Ok(SessionAction {
86                id: row.get(0)?,
87                session_id: row.get(1)?,
88                step_number: row.get(2)?,
89                timestamp: row.get(3)?,
90                action_type: row.get(4)?,
91                agent_pid: row.get(5)?,
92                payload: row.get(6)?,
93            })
94        })?;
95        let actions = rows.collect::<Result<Vec<_>, _>>().map_err(RecorderError::Database)?;
96        Ok(SessionState { session_id: session_id.to_string(), actions_up_to_step: actions, step })
97    }
98
99    pub fn cleanup_old_sessions(&self, retention_days: u32) -> Result<u64, RecorderError> {
100        let cutoff = chrono::Utc::now() - chrono::Duration::days(retention_days as i64);
101        let cutoff_str = cutoff.to_rfc3339();
102        let deleted = self.db.execute(
103            "DELETE FROM session_actions WHERE timestamp < ?1",
104            params![cutoff_str],
105        )?;
106        Ok(deleted as u64)
107    }
108}
109
110#[cfg(test)]
111mod tests {
112    use super::*;
113    use crate::db::init_database;
114    use tempfile::NamedTempFile;
115
116    fn make_recorder() -> (NamedTempFile, SessionRecorder) {
117        let f = NamedTempFile::new().unwrap();
118        let conn = init_database(f.path()).unwrap();
119        conn.execute(
120            "INSERT INTO sessions (id, started_at, status) VALUES ('sess-1', datetime('now'), 'Active')",
121            [],
122        ).unwrap();
123        (f, SessionRecorder::new(conn))
124    }
125
126    #[test]
127    fn record_action_stores_correct_data() {
128        let (_f, rec) = make_recorder();
129        let payload = serde_json::json!({"path": "/tmp/foo.txt"});
130        let step = rec.record_action("sess-1", 42, "file_write", payload).unwrap();
131        assert_eq!(step, 1);
132        let log = rec.get_log("sess-1").unwrap();
133        assert_eq!(log.len(), 1);
134        let action = &log[0];
135        assert_eq!(action.session_id, "sess-1");
136        assert_eq!(action.step_number, 1);
137        assert_eq!(action.action_type, "file_write");
138        assert_eq!(action.agent_pid, 42);
139        assert!(!action.timestamp.is_empty());
140        assert!(action.payload.contains("/tmp/foo.txt"));
141    }
142
143    #[test]
144    fn step_numbers_are_sequential() {
145        let (_f, rec) = make_recorder();
146        let s1 = rec.record_action("sess-1", 1, "file_read", serde_json::json!({})).unwrap();
147        let s2 = rec.record_action("sess-1", 1, "api_call", serde_json::json!({})).unwrap();
148        let s3 = rec.record_action("sess-1", 1, "msg_sent", serde_json::json!({})).unwrap();
149        assert_eq!(s1, 1);
150        assert_eq!(s2, 2);
151        assert_eq!(s3, 3);
152    }
153
154    #[test]
155    fn get_log_returns_chronological_order() {
156        let (_f, rec) = make_recorder();
157        for action_type in &["file_read", "api_call", "llm_prompt", "llm_response"] {
158            rec.record_action("sess-1", 1, action_type, serde_json::json!({})).unwrap();
159        }
160        let log = rec.get_log("sess-1").unwrap();
161        assert_eq!(log.len(), 4);
162        for (i, action) in log.iter().enumerate() {
163            assert_eq!(action.step_number, (i + 1) as u32);
164        }
165    }
166
167    #[test]
168    fn get_state_at_step_returns_actions_up_to_step() {
169        let (_f, rec) = make_recorder();
170        for action_type in &["file_read", "file_write", "api_call", "msg_sent"] {
171            rec.record_action("sess-1", 1, action_type, serde_json::json!({})).unwrap();
172        }
173        let state = rec.get_state_at_step("sess-1", 2).unwrap();
174        assert_eq!(state.step, 2);
175        assert_eq!(state.actions_up_to_step.len(), 2);
176        assert_eq!(state.actions_up_to_step[0].action_type, "file_read");
177        assert_eq!(state.actions_up_to_step[1].action_type, "file_write");
178    }
179
180    #[test]
181    fn get_state_at_step_includes_step_itself() {
182        let (_f, rec) = make_recorder();
183        rec.record_action("sess-1", 1, "file_read", serde_json::json!({})).unwrap();
184        rec.record_action("sess-1", 1, "file_write", serde_json::json!({})).unwrap();
185        let state = rec.get_state_at_step("sess-1", 1).unwrap();
186        assert_eq!(state.actions_up_to_step.len(), 1);
187    }
188
189    #[test]
190    fn cleanup_removes_old_actions() {
191        let f = NamedTempFile::new().unwrap();
192        let conn = init_database(f.path()).unwrap();
193        conn.execute(
194            "INSERT INTO sessions (id, started_at, status) VALUES ('sess-old', datetime('now'), 'Active')",
195            [],
196        ).unwrap();
197        conn.execute(
198            "INSERT INTO session_actions (session_id, step_number, timestamp, action_type, agent_pid, payload) \
199             VALUES ('sess-old', 1, '2000-01-01T00:00:00+00:00', 'file_read', 1, '{}')",
200            [],
201        ).unwrap();
202        let rec = SessionRecorder::new(conn);
203        let deleted = rec.cleanup_old_sessions(30).unwrap();
204        assert_eq!(deleted, 1);
205        assert!(rec.get_log("sess-old").unwrap().is_empty());
206    }
207
208    #[test]
209    fn cleanup_preserves_recent_actions() {
210        let (_f, rec) = make_recorder();
211        rec.record_action("sess-1", 1, "file_read", serde_json::json!({})).unwrap();
212        let deleted = rec.cleanup_old_sessions(30).unwrap();
213        assert_eq!(deleted, 0);
214        assert_eq!(rec.get_log("sess-1").unwrap().len(), 1);
215    }
216}