zagens_runtime_adapters/persist/
kernel_event_log.rs1use anyhow::Context as _;
27use rusqlite::{Connection, params};
28use zagens_core::engine::kernel_event::{KernelEvent, KernelEventEnvelope};
29
30pub fn ensure_kernel_events_table(db: &Connection) -> anyhow::Result<()> {
35 db.execute_batch(
36 "CREATE TABLE IF NOT EXISTS kernel_events (
37 id INTEGER PRIMARY KEY AUTOINCREMENT,
38 seq INTEGER NOT NULL,
39 ts_ms INTEGER NOT NULL,
40 kind TEXT NOT NULL,
41 turn_id TEXT,
42 payload TEXT NOT NULL
43 );
44 CREATE INDEX IF NOT EXISTS idx_kernel_events_turn
45 ON kernel_events(turn_id);
46 CREATE INDEX IF NOT EXISTS idx_kernel_events_seq
47 ON kernel_events(seq);",
48 )
49 .context("Failed to create kernel_events table")
50}
51
52pub struct KernelEventLog<'conn> {
61 db: &'conn Connection,
62 next_seq: u64,
63}
64
65impl<'conn> KernelEventLog<'conn> {
66 pub fn new(db: &'conn Connection) -> Self {
72 Self { db, next_seq: 0 }
73 }
74
75 pub fn with_next_seq(db: &'conn Connection, next_seq: u64) -> Self {
77 Self { db, next_seq }
78 }
79
80 pub fn peek_next_seq(db: &Connection) -> anyhow::Result<u64> {
82 let max: i64 = db.query_row(
83 "SELECT COALESCE(MAX(seq), -1) FROM kernel_events",
84 [],
85 |row| row.get(0),
86 )?;
87 Ok(max.saturating_add(1).max(0) as u64)
88 }
89
90 pub fn append(&mut self, event: KernelEvent) -> anyhow::Result<()> {
93 let seq = self.next_seq;
94 self.next_seq += 1;
95 let ts_ms = unix_ms_now();
96 let kind = event.kind_str().to_string();
97 let turn_id = event.turn_id().map(str::to_string);
98 let payload = serde_json::to_string(&event).context("KernelEvent serialization failed")?;
99 self.db
100 .execute(
101 "INSERT INTO kernel_events (seq, ts_ms, kind, turn_id, payload)
102 VALUES (?1, ?2, ?3, ?4, ?5)",
103 params![seq, ts_ms, kind, turn_id, payload],
104 )
105 .context("INSERT INTO kernel_events failed")?;
106 Ok(())
107 }
108
109 pub fn append_batch(
111 &mut self,
112 events: impl IntoIterator<Item = KernelEvent>,
113 ) -> anyhow::Result<()> {
114 let tx_result: anyhow::Result<()> = (|| {
115 self.db
116 .execute_batch("BEGIN")
117 .context("BEGIN transaction")?;
118 for ev in events {
119 self.append(ev)?;
120 }
121 self.db
122 .execute_batch("COMMIT")
123 .context("COMMIT transaction")?;
124 Ok(())
125 })();
126 if tx_result.is_err() {
127 let _ = self.db.execute_batch("ROLLBACK");
128 }
129 tx_result
130 }
131
132 pub fn load_turn_events(&self, turn_id: &str) -> anyhow::Result<Vec<KernelEventEnvelope>> {
135 let mut stmt = self.db.prepare(
136 "SELECT seq, ts_ms, kind, payload FROM kernel_events
137 WHERE turn_id = ?1 ORDER BY seq ASC",
138 )?;
139 let rows = stmt.query_map(params![turn_id], |row| {
140 Ok((
141 row.get::<_, u64>(0)?,
142 row.get::<_, u64>(1)?,
143 row.get::<_, String>(2)?,
144 row.get::<_, String>(3)?,
145 ))
146 })?;
147 let mut envelopes = Vec::new();
148 for row in rows {
149 let (seq, ts_ms, kind, payload) = row.context("row read")?;
150 let event: KernelEvent =
151 serde_json::from_str(&payload).context("KernelEvent deserialize")?;
152 envelopes.push(KernelEventEnvelope {
153 seq,
154 ts_ms,
155 kind,
156 event,
157 });
158 }
159 Ok(envelopes)
160 }
161}
162
163fn unix_ms_now() -> u64 {
164 use std::time::{SystemTime, UNIX_EPOCH};
165 SystemTime::now()
166 .duration_since(UNIX_EPOCH)
167 .unwrap_or_default()
168 .as_millis() as u64
169}
170
171#[cfg(test)]
174mod tests {
175 use rusqlite::Connection;
176 use zagens_core::engine::kernel_event::{KernelEvent, TurnOutcome};
177 use zagens_core::turn::TurnLoopMode;
178
179 use super::*;
180
181 fn in_memory_db() -> Connection {
182 let db = Connection::open_in_memory().expect("in-memory DB");
183 db.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")
184 .expect("pragmas");
185 ensure_kernel_events_table(&db).expect("ensure table");
186 db
187 }
188
189 #[test]
190 fn append_and_load_round_trip() {
191 let db = in_memory_db();
192 let mut log = KernelEventLog::new(&db);
193 let tid = "t-rt-001".to_string();
194
195 log.append(KernelEvent::TurnStarted {
196 turn_id: tid.clone(),
197 mode: TurnLoopMode::Agent,
198 input_text: "do the thing".into(),
199 max_steps: 10,
200 })
201 .expect("append TurnStarted");
202
203 log.append(KernelEvent::TurnEnded {
204 turn_id: tid.clone(),
205 outcome: TurnOutcome::Completed,
206 total_steps: 1,
207 })
208 .expect("append TurnEnded");
209
210 let envelopes = log.load_turn_events(&tid).expect("load");
211 assert_eq!(envelopes.len(), 2);
212 assert_eq!(envelopes[0].event.kind_str(), "turn_started");
213 assert_eq!(envelopes[1].event.kind_str(), "turn_ended");
214 assert!(envelopes[0].seq < envelopes[1].seq);
216 }
217
218 #[test]
219 fn append_batch_is_atomic() {
220 let db = in_memory_db();
221 let mut log = KernelEventLog::new(&db);
222 let tid = "t-batch-001".to_string();
223
224 let events = vec![
225 KernelEvent::TurnStarted {
226 turn_id: tid.clone(),
227 mode: TurnLoopMode::Yolo,
228 input_text: "batch test".into(),
229 max_steps: 5,
230 },
231 KernelEvent::ScratchpadSummaryInjected {
232 turn_id: tid.clone(),
233 at_step: 2,
234 },
235 KernelEvent::TurnEnded {
236 turn_id: tid.clone(),
237 outcome: TurnOutcome::Interrupted,
238 total_steps: 3,
239 },
240 ];
241
242 log.append_batch(events).expect("batch");
243
244 let envelopes = log.load_turn_events(&tid).expect("load");
245 assert_eq!(envelopes.len(), 3);
246 }
247
248 #[test]
249 fn schema_version_event_has_null_turn_id() {
250 let db = in_memory_db();
251 let mut log = KernelEventLog::new(&db);
252
253 log.append(KernelEvent::SchemaVersion { version: 1 })
255 .expect("schema version event");
256
257 let turn_id_val: Option<String> = db
259 .query_row(
260 "SELECT turn_id FROM kernel_events WHERE kind = 'schema_version'",
261 [],
262 |row| row.get(0),
263 )
264 .expect("query");
265 assert!(turn_id_val.is_none());
266 }
267
268 #[test]
269 fn ensure_table_is_idempotent() {
270 let db = in_memory_db();
271 ensure_kernel_events_table(&db).expect("second call");
273 }
274}