zagens_runtime_adapters/persist/
kernel_event_writer.rs1use std::path::Path;
9use std::sync::{Arc, Mutex as StdMutex};
10
11use anyhow::Context as _;
12use rusqlite::Connection;
13use tokio::sync::mpsc;
14use tokio::task::JoinHandle;
15use tracing::{debug, warn};
16use zagens_core::engine::kernel_event::KernelEvent;
17use zagens_core::engine::turn_machine::KernelEventSink;
18
19use super::kernel_event_log::{KernelEventLog, ensure_kernel_events_table};
20use super::session_manager::default_sessions_dir;
21
22pub struct KernelEventWriter {
26 tx: KernelEventSink,
27 db: Arc<StdMutex<Connection>>,
28 _drain: JoinHandle<()>,
29}
30
31impl KernelEventWriter {
32 pub fn try_open_default() -> Option<Self> {
36 let dir = default_sessions_dir().ok()?;
37 std::fs::create_dir_all(&dir).ok()?;
38 let db_path = dir.join("sessions.db");
39 match Self::try_open(&db_path) {
40 Ok(writer) => Some(writer),
41 Err(err) => {
42 warn!(target: "kernel_event", %err, "kernel event log disabled");
43 None
44 }
45 }
46 }
47
48 pub fn try_open(db_path: &Path) -> anyhow::Result<Self> {
50 let conn = Connection::open(db_path)
51 .with_context(|| format!("open kernel event db {}", db_path.display()))?;
52 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")
53 .context("set SQLite pragmas")?;
54 ensure_kernel_events_table(&conn).context("ensure kernel_events table")?;
55
56 seed_schema_version_if_empty(&conn)?;
57
58 let db = Arc::new(StdMutex::new(conn));
59 let (tx, mut rx) = mpsc::unbounded_channel::<KernelEvent>();
60
61 let db_path_log = db_path.to_path_buf();
62 let db_drain = Arc::clone(&db);
63 let drain = tokio::spawn(async move {
64 while let Some(first) = rx.recv().await {
65 let mut batch = vec![first];
66 while let Ok(more) = rx.try_recv() {
67 batch.push(more);
68 }
69 let db = Arc::clone(&db_drain);
70 let count = batch.len();
71 let write_result = tokio::task::spawn_blocking(move || append_batch(&db, batch))
72 .await
73 .context("kernel event drain join");
74 match write_result {
75 Ok(Ok(())) => {
76 debug!(
77 target: "kernel_event",
78 count,
79 db = %db_path_log.display(),
80 "appended kernel events"
81 );
82 }
83 Ok(Err(err)) | Err(err) => {
84 warn!(
85 target: "kernel_event",
86 %err,
87 count,
88 "kernel event append failed"
89 );
90 }
91 }
92 }
93 debug!(target: "kernel_event", "kernel event drain stopped");
94 });
95
96 Ok(Self {
97 tx,
98 db,
99 _drain: drain,
100 })
101 }
102
103 pub fn load_turn_events_sync(&self, turn_id: &str) -> anyhow::Result<Vec<KernelEvent>> {
105 let db = self
106 .db
107 .lock()
108 .map_err(|_| anyhow::anyhow!("kernel event db mutex poisoned"))?;
109 let log = KernelEventLog::new(&db);
110 Ok(log
111 .load_turn_events(turn_id)?
112 .into_iter()
113 .map(|env| env.event)
114 .collect())
115 }
116
117 #[must_use]
119 pub fn verify_persisted_turn_matches(
120 &self,
121 turn_id: &str,
122 in_memory: &[KernelEvent],
123 ) -> Option<String> {
124 let loaded = match self.load_turn_events_sync(turn_id) {
125 Ok(events) => events,
126 Err(err) => return Some(format!("load failed: {err}")),
127 };
128 if loaded.len() != in_memory.len() {
129 return Some(format!(
130 "persist count {} != memory {}",
131 loaded.len(),
132 in_memory.len()
133 ));
134 }
135 for (idx, (a, b)) in loaded.iter().zip(in_memory.iter()).enumerate() {
136 let a_json = serde_json::to_string(a).ok();
137 let b_json = serde_json::to_string(b).ok();
138 if a_json != b_json {
139 return Some(format!(
140 "event mismatch at index {idx}: persist={} memory={}",
141 a.kind_str(),
142 b.kind_str()
143 ));
144 }
145 }
146 None
147 }
148
149 #[must_use]
150 pub fn sink(&self) -> KernelEventSink {
151 self.tx.clone()
152 }
153
154 #[must_use]
156 pub fn tx(&self) -> &KernelEventSink {
157 &self.tx
158 }
159}
160
161fn seed_schema_version_if_empty(db: &Connection) -> anyhow::Result<()> {
162 let count: i64 = db.query_row("SELECT COUNT(*) FROM kernel_events", [], |row| row.get(0))?;
163 if count == 0 {
164 let mut log = KernelEventLog::new(db);
165 log.append(KernelEvent::SchemaVersion { version: 1 })?;
166 }
167 Ok(())
168}
169
170fn append_batch(db: &StdMutex<Connection>, events: Vec<KernelEvent>) -> anyhow::Result<()> {
171 let db = db
172 .lock()
173 .map_err(|_| anyhow::anyhow!("kernel event db mutex poisoned"))?;
174 let next_seq = KernelEventLog::peek_next_seq(&db)?;
175 let mut log = KernelEventLog::with_next_seq(&db, next_seq);
176 log.append_batch(events)
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use std::path::PathBuf;
183 use zagens_core::engine::kernel_event::TurnOutcome;
184 use zagens_core::turn::TurnLoopMode;
185
186 #[tokio::test]
187 async fn writer_drains_events_to_sqlite() {
188 let dir = tempfile::tempdir().expect("tempdir");
189 let db_path: PathBuf = dir.path().join("sessions.db");
190 let writer = KernelEventWriter::try_open(&db_path).expect("open writer");
191 let sink = writer.sink();
192
193 sink.send(KernelEvent::TurnStarted {
194 turn_id: "t-writer-001".into(),
195 mode: TurnLoopMode::Agent,
196 input_text: "hello".into(),
197 max_steps: 5,
198 })
199 .expect("send");
200 sink.send(KernelEvent::TurnEnded {
201 turn_id: "t-writer-001".into(),
202 outcome: TurnOutcome::Completed,
203 total_steps: 1,
204 })
205 .expect("send");
206
207 drop(sink);
208 drop(writer);
209
210 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
212
213 let db = Connection::open(&db_path).expect("reopen");
214 let count: i64 = db
215 .query_row("SELECT COUNT(*) FROM kernel_events", [], |r| r.get(0))
216 .expect("count");
217 assert_eq!(count, 3, "expected schema_version + 2 turn events");
219
220 let log = KernelEventLog::new(&db);
221 let envelopes = log
222 .load_turn_events("t-writer-001")
223 .expect("load turn events");
224 assert_eq!(envelopes.len(), 2);
225
226 let writer2 = KernelEventWriter::try_open(&db_path).expect("reopen writer");
227 let loaded = writer2
228 .load_turn_events_sync("t-writer-001")
229 .expect("load sync");
230 assert_eq!(loaded.len(), 2);
231 let in_memory: Vec<KernelEvent> = envelopes.into_iter().map(|e| e.event).collect();
232 assert!(
233 writer2
234 .verify_persisted_turn_matches("t-writer-001", &in_memory)
235 .is_none()
236 );
237 }
238}