Skip to main content

zagens_runtime_adapters/persist/
kernel_event_log.rs

1//! Append-only SQLite log for [`KernelEvent`] records — Phase 3a double-write.
2//!
3//! ## Purpose
4//! Written by the double-write adapter during Phase 3a; **not yet read** by any
5//! production code path.  Phase 3b will add the projection/replay consumer.
6//!
7//! ## Schema (additive, backward-compatible)
8//! ```sql
9//! CREATE TABLE IF NOT EXISTS kernel_events (
10//!     id      INTEGER PRIMARY KEY AUTOINCREMENT,
11//!     seq     INTEGER NOT NULL,
12//!     ts_ms   INTEGER NOT NULL,
13//!     kind    TEXT    NOT NULL,
14//!     turn_id TEXT,           -- NULL only for schema_version sentinel
15//!     payload TEXT    NOT NULL -- JSON-serialised KernelEvent
16//! );
17//! ```
18//! Adding columns in future schema versions: `ALTER TABLE ADD COLUMN … DEFAULT …`.
19//! Removing columns: increment schema_version and provide an upcast script.
20//!
21//! ## WAL / performance
22//! Inherits the WAL-mode connection from `open_sqlite_session_db`.  Each
23//! [`KernelEventLog::append`] call is a single `INSERT`; callers may batch via
24//! `append_batch` to amortise transaction overhead.
25
26use anyhow::Context as _;
27use rusqlite::{Connection, params};
28use zagens_core::engine::kernel_event::{KernelEvent, KernelEventEnvelope};
29
30// ── Schema migration ─────────────────────────────────────────────────────────
31
32/// Ensure the `kernel_events` table and index exist.
33/// Safe to call on every startup — uses `CREATE … IF NOT EXISTS`.
34pub fn ensure_kernel_events_table(db: &Connection) -> anyhow::Result<()> {
35    db.execute_batch(
36        "CREATE TABLE IF NOT EXISTS kernel_events (
37            id      INTEGER PRIMARY KEY AUTOINCREMENT,
38            seq     INTEGER NOT NULL,
39            ts_ms   INTEGER NOT NULL,
40            kind    TEXT    NOT NULL,
41            turn_id TEXT,
42            payload TEXT    NOT NULL
43        );
44        CREATE INDEX IF NOT EXISTS idx_kernel_events_turn
45            ON kernel_events(turn_id);
46        CREATE INDEX IF NOT EXISTS idx_kernel_events_seq
47            ON kernel_events(seq);",
48    )
49    .context("Failed to create kernel_events table")
50}
51
52// ── Log writer ───────────────────────────────────────────────────────────────
53
54/// Append-only writer for the `kernel_events` table.
55///
56/// Maintains a monotone `seq` counter per [`KernelEventLog`] instance
57/// (per-session / per-sidecar lifetime).  The counter is **not** persisted
58/// across restarts; callers that need stable cross-restart ordering should
59/// use the SQLite `id` column (autoincrement) instead.
60pub struct KernelEventLog<'conn> {
61    db: &'conn Connection,
62    next_seq: u64,
63}
64
65impl<'conn> KernelEventLog<'conn> {
66    /// Create a writer that shares the given (already-migrated) connection.
67    /// Call [`ensure_kernel_events_table`] before constructing this.
68    ///
69    /// Starts `seq` at 0 — suitable for empty in-memory test DBs. Production
70    /// callers should prefer [`Self::with_next_seq`] after [`peek_next_seq`].
71    pub fn new(db: &'conn Connection) -> Self {
72        Self { db, next_seq: 0 }
73    }
74
75    /// Resume appending after restart using the next free sequence number.
76    pub fn with_next_seq(db: &'conn Connection, next_seq: u64) -> Self {
77        Self { db, next_seq }
78    }
79
80    /// Next `seq` value to assign (max existing + 1, or 0 when empty).
81    pub fn peek_next_seq(db: &Connection) -> anyhow::Result<u64> {
82        let max: i64 = db.query_row(
83            "SELECT COALESCE(MAX(seq), -1) FROM kernel_events",
84            [],
85            |row| row.get(0),
86        )?;
87        Ok(max.saturating_add(1).max(0) as u64)
88    }
89
90    /// Append one event.  The `ts_ms` is the Unix time in milliseconds at
91    /// call time.
92    pub fn append(&mut self, event: KernelEvent) -> anyhow::Result<()> {
93        let seq = self.next_seq;
94        self.next_seq += 1;
95        let ts_ms = unix_ms_now();
96        let kind = event.kind_str().to_string();
97        let turn_id = event.turn_id().map(str::to_string);
98        let payload = serde_json::to_string(&event).context("KernelEvent serialization failed")?;
99        self.db
100            .execute(
101                "INSERT INTO kernel_events (seq, ts_ms, kind, turn_id, payload)
102                 VALUES (?1, ?2, ?3, ?4, ?5)",
103                params![seq, ts_ms, kind, turn_id, payload],
104            )
105            .context("INSERT INTO kernel_events failed")?;
106        Ok(())
107    }
108
109    /// Append multiple events in a single transaction.
110    pub fn append_batch(
111        &mut self,
112        events: impl IntoIterator<Item = KernelEvent>,
113    ) -> anyhow::Result<()> {
114        let tx_result: anyhow::Result<()> = (|| {
115            self.db
116                .execute_batch("BEGIN")
117                .context("BEGIN transaction")?;
118            for ev in events {
119                self.append(ev)?;
120            }
121            self.db
122                .execute_batch("COMMIT")
123                .context("COMMIT transaction")?;
124            Ok(())
125        })();
126        if tx_result.is_err() {
127            let _ = self.db.execute_batch("ROLLBACK");
128        }
129        tx_result
130    }
131
132    /// Load all events for a given `turn_id` in sequence order.
133    /// Used by Phase 3b replay and completeness verification tests.
134    pub fn load_turn_events(&self, turn_id: &str) -> anyhow::Result<Vec<KernelEventEnvelope>> {
135        let mut stmt = self.db.prepare(
136            "SELECT seq, ts_ms, kind, payload FROM kernel_events
137             WHERE turn_id = ?1 ORDER BY seq ASC",
138        )?;
139        let rows = stmt.query_map(params![turn_id], |row| {
140            Ok((
141                row.get::<_, u64>(0)?,
142                row.get::<_, u64>(1)?,
143                row.get::<_, String>(2)?,
144                row.get::<_, String>(3)?,
145            ))
146        })?;
147        let mut envelopes = Vec::new();
148        for row in rows {
149            let (seq, ts_ms, kind, payload) = row.context("row read")?;
150            let event: KernelEvent =
151                serde_json::from_str(&payload).context("KernelEvent deserialize")?;
152            envelopes.push(KernelEventEnvelope {
153                seq,
154                ts_ms,
155                kind,
156                event,
157            });
158        }
159        Ok(envelopes)
160    }
161}
162
163fn unix_ms_now() -> u64 {
164    use std::time::{SystemTime, UNIX_EPOCH};
165    SystemTime::now()
166        .duration_since(UNIX_EPOCH)
167        .unwrap_or_default()
168        .as_millis() as u64
169}
170
171// ── Tests ─────────────────────────────────────────────────────────────────────
172
173#[cfg(test)]
174mod tests {
175    use rusqlite::Connection;
176    use zagens_core::engine::kernel_event::{KernelEvent, TurnOutcome};
177    use zagens_core::turn::TurnLoopMode;
178
179    use super::*;
180
181    fn in_memory_db() -> Connection {
182        let db = Connection::open_in_memory().expect("in-memory DB");
183        db.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")
184            .expect("pragmas");
185        ensure_kernel_events_table(&db).expect("ensure table");
186        db
187    }
188
189    #[test]
190    fn append_and_load_round_trip() {
191        let db = in_memory_db();
192        let mut log = KernelEventLog::new(&db);
193        let tid = "t-rt-001".to_string();
194
195        log.append(KernelEvent::TurnStarted {
196            turn_id: tid.clone(),
197            mode: TurnLoopMode::Agent,
198            input_text: "do the thing".into(),
199            max_steps: 10,
200        })
201        .expect("append TurnStarted");
202
203        log.append(KernelEvent::TurnEnded {
204            turn_id: tid.clone(),
205            outcome: TurnOutcome::Completed,
206            total_steps: 1,
207        })
208        .expect("append TurnEnded");
209
210        let envelopes = log.load_turn_events(&tid).expect("load");
211        assert_eq!(envelopes.len(), 2);
212        assert_eq!(envelopes[0].event.kind_str(), "turn_started");
213        assert_eq!(envelopes[1].event.kind_str(), "turn_ended");
214        // Sequence is monotone.
215        assert!(envelopes[0].seq < envelopes[1].seq);
216    }
217
218    #[test]
219    fn append_batch_is_atomic() {
220        let db = in_memory_db();
221        let mut log = KernelEventLog::new(&db);
222        let tid = "t-batch-001".to_string();
223
224        let events = vec![
225            KernelEvent::TurnStarted {
226                turn_id: tid.clone(),
227                mode: TurnLoopMode::Yolo,
228                input_text: "batch test".into(),
229                max_steps: 5,
230            },
231            KernelEvent::ScratchpadSummaryInjected {
232                turn_id: tid.clone(),
233                at_step: 2,
234            },
235            KernelEvent::TurnEnded {
236                turn_id: tid.clone(),
237                outcome: TurnOutcome::Interrupted,
238                total_steps: 3,
239            },
240        ];
241
242        log.append_batch(events).expect("batch");
243
244        let envelopes = log.load_turn_events(&tid).expect("load");
245        assert_eq!(envelopes.len(), 3);
246    }
247
248    #[test]
249    fn schema_version_event_has_null_turn_id() {
250        let db = in_memory_db();
251        let mut log = KernelEventLog::new(&db);
252
253        // schema_version has no turn_id — stored as NULL.
254        log.append(KernelEvent::SchemaVersion { version: 1 })
255            .expect("schema version event");
256
257        // Verify via direct SQL that turn_id IS NULL.
258        let turn_id_val: Option<String> = db
259            .query_row(
260                "SELECT turn_id FROM kernel_events WHERE kind = 'schema_version'",
261                [],
262                |row| row.get(0),
263            )
264            .expect("query");
265        assert!(turn_id_val.is_none());
266    }
267
268    #[test]
269    fn ensure_table_is_idempotent() {
270        let db = in_memory_db();
271        // Calling twice should not error.
272        ensure_kernel_events_table(&db).expect("second call");
273    }
274}