openhawk-core 0.1.0

Core runtime for OpenHawk Agent OS — agent lifecycle, orchestration, config, healing
Documentation
use rusqlite::{Connection, params};
use thiserror::Error;

#[derive(Debug, Error)]
pub enum RecorderError {
    #[error("database error: {0}")]
    Database(#[from] rusqlite::Error),
    #[error("session not found: {0}")]
    SessionNotFound(String),
}

pub struct SessionRecorder {
    db: Connection,
}

pub struct SessionAction {
    pub id: i64,
    pub session_id: String,
    pub step_number: u32,
    pub timestamp: String,
    pub action_type: String,
    pub agent_pid: u32,
    pub payload: String,
}

pub struct SessionState {
    pub session_id: String,
    pub actions_up_to_step: Vec<SessionAction>,
    pub step: u32,
}

impl SessionRecorder {
    pub fn new(db: Connection) -> Self {
        Self { db }
    }

    pub fn record_action(
        &self,
        session_id: &str,
        agent_pid: u32,
        action_type: &str,
        payload: serde_json::Value,
    ) -> Result<u32, RecorderError> {
        let next_step: u32 = self.db.query_row(
            "SELECT COALESCE(MAX(step_number), 0) + 1 FROM session_actions WHERE session_id = ?1",
            params![session_id],
            |row| row.get::<_, u32>(0),
        )?;

        let timestamp = chrono::Utc::now().to_rfc3339();
        self.db.execute(
            "INSERT INTO session_actions (session_id, step_number, timestamp, action_type, agent_pid, payload) \
             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
            params![session_id, next_step, timestamp, action_type, agent_pid, payload.to_string()],
        )?;

        Ok(next_step)
    }

    pub fn get_log(&self, session_id: &str) -> Result<Vec<SessionAction>, RecorderError> {
        let mut stmt = self.db.prepare(
            "SELECT id, session_id, step_number, timestamp, action_type, agent_pid, payload \
             FROM session_actions WHERE session_id = ?1 ORDER BY step_number ASC",
        )?;
        let rows = stmt.query_map(params![session_id], |row| {
            Ok(SessionAction {
                id: row.get(0)?,
                session_id: row.get(1)?,
                step_number: row.get(2)?,
                timestamp: row.get(3)?,
                action_type: row.get(4)?,
                agent_pid: row.get(5)?,
                payload: row.get(6)?,
            })
        })?;
        rows.collect::<Result<Vec<_>, _>>().map_err(RecorderError::Database)
    }

    pub fn get_state_at_step(&self, session_id: &str, step: u32) -> Result<SessionState, RecorderError> {
        let mut stmt = self.db.prepare(
            "SELECT id, session_id, step_number, timestamp, action_type, agent_pid, payload \
             FROM session_actions WHERE session_id = ?1 AND step_number <= ?2 ORDER BY step_number ASC",
        )?;
        let rows = stmt.query_map(params![session_id, step], |row| {
            Ok(SessionAction {
                id: row.get(0)?,
                session_id: row.get(1)?,
                step_number: row.get(2)?,
                timestamp: row.get(3)?,
                action_type: row.get(4)?,
                agent_pid: row.get(5)?,
                payload: row.get(6)?,
            })
        })?;
        let actions = rows.collect::<Result<Vec<_>, _>>().map_err(RecorderError::Database)?;
        Ok(SessionState { session_id: session_id.to_string(), actions_up_to_step: actions, step })
    }

    pub fn cleanup_old_sessions(&self, retention_days: u32) -> Result<u64, RecorderError> {
        let cutoff = chrono::Utc::now() - chrono::Duration::days(retention_days as i64);
        let cutoff_str = cutoff.to_rfc3339();
        let deleted = self.db.execute(
            "DELETE FROM session_actions WHERE timestamp < ?1",
            params![cutoff_str],
        )?;
        Ok(deleted as u64)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::db::init_database;
    use tempfile::NamedTempFile;

    fn make_recorder() -> (NamedTempFile, SessionRecorder) {
        let f = NamedTempFile::new().unwrap();
        let conn = init_database(f.path()).unwrap();
        conn.execute(
            "INSERT INTO sessions (id, started_at, status) VALUES ('sess-1', datetime('now'), 'Active')",
            [],
        ).unwrap();
        (f, SessionRecorder::new(conn))
    }

    #[test]
    fn record_action_stores_correct_data() {
        let (_f, rec) = make_recorder();
        let payload = serde_json::json!({"path": "/tmp/foo.txt"});
        let step = rec.record_action("sess-1", 42, "file_write", payload).unwrap();
        assert_eq!(step, 1);
        let log = rec.get_log("sess-1").unwrap();
        assert_eq!(log.len(), 1);
        let action = &log[0];
        assert_eq!(action.session_id, "sess-1");
        assert_eq!(action.step_number, 1);
        assert_eq!(action.action_type, "file_write");
        assert_eq!(action.agent_pid, 42);
        assert!(!action.timestamp.is_empty());
        assert!(action.payload.contains("/tmp/foo.txt"));
    }

    #[test]
    fn step_numbers_are_sequential() {
        let (_f, rec) = make_recorder();
        let s1 = rec.record_action("sess-1", 1, "file_read", serde_json::json!({})).unwrap();
        let s2 = rec.record_action("sess-1", 1, "api_call", serde_json::json!({})).unwrap();
        let s3 = rec.record_action("sess-1", 1, "msg_sent", serde_json::json!({})).unwrap();
        assert_eq!(s1, 1);
        assert_eq!(s2, 2);
        assert_eq!(s3, 3);
    }

    #[test]
    fn get_log_returns_chronological_order() {
        let (_f, rec) = make_recorder();
        for action_type in &["file_read", "api_call", "llm_prompt", "llm_response"] {
            rec.record_action("sess-1", 1, action_type, serde_json::json!({})).unwrap();
        }
        let log = rec.get_log("sess-1").unwrap();
        assert_eq!(log.len(), 4);
        for (i, action) in log.iter().enumerate() {
            assert_eq!(action.step_number, (i + 1) as u32);
        }
    }

    #[test]
    fn get_state_at_step_returns_actions_up_to_step() {
        let (_f, rec) = make_recorder();
        for action_type in &["file_read", "file_write", "api_call", "msg_sent"] {
            rec.record_action("sess-1", 1, action_type, serde_json::json!({})).unwrap();
        }
        let state = rec.get_state_at_step("sess-1", 2).unwrap();
        assert_eq!(state.step, 2);
        assert_eq!(state.actions_up_to_step.len(), 2);
        assert_eq!(state.actions_up_to_step[0].action_type, "file_read");
        assert_eq!(state.actions_up_to_step[1].action_type, "file_write");
    }

    #[test]
    fn get_state_at_step_includes_step_itself() {
        let (_f, rec) = make_recorder();
        rec.record_action("sess-1", 1, "file_read", serde_json::json!({})).unwrap();
        rec.record_action("sess-1", 1, "file_write", serde_json::json!({})).unwrap();
        let state = rec.get_state_at_step("sess-1", 1).unwrap();
        assert_eq!(state.actions_up_to_step.len(), 1);
    }

    #[test]
    fn cleanup_removes_old_actions() {
        let f = NamedTempFile::new().unwrap();
        let conn = init_database(f.path()).unwrap();
        conn.execute(
            "INSERT INTO sessions (id, started_at, status) VALUES ('sess-old', datetime('now'), 'Active')",
            [],
        ).unwrap();
        conn.execute(
            "INSERT INTO session_actions (session_id, step_number, timestamp, action_type, agent_pid, payload) \
             VALUES ('sess-old', 1, '2000-01-01T00:00:00+00:00', 'file_read', 1, '{}')",
            [],
        ).unwrap();
        let rec = SessionRecorder::new(conn);
        let deleted = rec.cleanup_old_sessions(30).unwrap();
        assert_eq!(deleted, 1);
        assert!(rec.get_log("sess-old").unwrap().is_empty());
    }

    #[test]
    fn cleanup_preserves_recent_actions() {
        let (_f, rec) = make_recorder();
        rec.record_action("sess-1", 1, "file_read", serde_json::json!({})).unwrap();
        let deleted = rec.cleanup_old_sessions(30).unwrap();
        assert_eq!(deleted, 0);
        assert_eq!(rec.get_log("sess-1").unwrap().len(), 1);
    }
}