Skip to main content

zagens_runtime_adapters/persist/
kernel_event_writer.rs

1//! Background drain for [`KernelEvent`] double-write (Phase 3b batch 2).
2//!
3//! Opens (or creates) `sessions.db`, ensures the `kernel_events` table exists,
4//! and spawns a tokio task that batches events from an unbounded channel into
5//! SQLite via [`KernelEventLog`].  Turn-loop code emits through the returned
6//! [`KernelEventSink`] without blocking on disk I/O.
7
8use std::path::Path;
9use std::sync::{Arc, Mutex as StdMutex};
10
11use anyhow::Context as _;
12use rusqlite::Connection;
13use tokio::sync::mpsc;
14use tokio::task::JoinHandle;
15use tracing::{debug, warn};
16use zagens_core::engine::kernel_event::KernelEvent;
17use zagens_core::engine::turn_machine::KernelEventSink;
18
19use super::kernel_event_log::{KernelEventLog, ensure_kernel_events_table};
20use super::session_manager::default_sessions_dir;
21
22/// Owns the channel sender, shared DB handle, and the background drain task handle.
23///
24/// Drop shuts down the drain loop (sender closed → task exits after flushing).
25pub struct KernelEventWriter {
26    tx: KernelEventSink,
27    db: Arc<StdMutex<Connection>>,
28    _drain: JoinHandle<()>,
29}
30
31impl KernelEventWriter {
32    /// Open the default `~/.zagens/sessions/sessions.db` and start draining.
33    /// Returns `None` when the sessions directory cannot be resolved (e.g.
34    /// headless CI without home dir) — double-write is silently disabled.
35    pub fn try_open_default() -> Option<Self> {
36        let dir = default_sessions_dir().ok()?;
37        std::fs::create_dir_all(&dir).ok()?;
38        let db_path = dir.join("sessions.db");
39        match Self::try_open(&db_path) {
40            Ok(writer) => Some(writer),
41            Err(err) => {
42                warn!(target: "kernel_event", %err, "kernel event log disabled");
43                None
44            }
45        }
46    }
47
48    /// Open (or create) `db_path` and start the drain task.
49    pub fn try_open(db_path: &Path) -> anyhow::Result<Self> {
50        let conn = Connection::open(db_path)
51            .with_context(|| format!("open kernel event db {}", db_path.display()))?;
52        conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")
53            .context("set SQLite pragmas")?;
54        ensure_kernel_events_table(&conn).context("ensure kernel_events table")?;
55
56        seed_schema_version_if_empty(&conn)?;
57
58        let db = Arc::new(StdMutex::new(conn));
59        let (tx, mut rx) = mpsc::unbounded_channel::<KernelEvent>();
60
61        let db_path_log = db_path.to_path_buf();
62        let db_drain = Arc::clone(&db);
63        let drain = tokio::spawn(async move {
64            while let Some(first) = rx.recv().await {
65                let mut batch = vec![first];
66                while let Ok(more) = rx.try_recv() {
67                    batch.push(more);
68                }
69                let db = Arc::clone(&db_drain);
70                let count = batch.len();
71                let write_result = tokio::task::spawn_blocking(move || append_batch(&db, batch))
72                    .await
73                    .context("kernel event drain join");
74                match write_result {
75                    Ok(Ok(())) => {
76                        debug!(
77                            target: "kernel_event",
78                            count,
79                            db = %db_path_log.display(),
80                            "appended kernel events"
81                        );
82                    }
83                    Ok(Err(err)) | Err(err) => {
84                        warn!(
85                            target: "kernel_event",
86                            %err,
87                            count,
88                            "kernel event append failed"
89                        );
90                    }
91                }
92            }
93            debug!(target: "kernel_event", "kernel event drain stopped");
94        });
95
96        Ok(Self {
97            tx,
98            db,
99            _drain: drain,
100        })
101    }
102
103    /// Load all persisted events for `turn_id` (blocking read on the shared DB).
104    pub fn load_turn_events_sync(&self, turn_id: &str) -> anyhow::Result<Vec<KernelEvent>> {
105        let db = self
106            .db
107            .lock()
108            .map_err(|_| anyhow::anyhow!("kernel event db mutex poisoned"))?;
109        let log = KernelEventLog::new(&db);
110        Ok(log
111            .load_turn_events(turn_id)?
112            .into_iter()
113            .map(|env| env.event)
114            .collect())
115    }
116
117    /// Compare in-memory turn events with the SQLite log (Phase 3b replay gate).
118    #[must_use]
119    pub fn verify_persisted_turn_matches(
120        &self,
121        turn_id: &str,
122        in_memory: &[KernelEvent],
123    ) -> Option<String> {
124        let loaded = match self.load_turn_events_sync(turn_id) {
125            Ok(events) => events,
126            Err(err) => return Some(format!("load failed: {err}")),
127        };
128        if loaded.len() != in_memory.len() {
129            return Some(format!(
130                "persist count {} != memory {}",
131                loaded.len(),
132                in_memory.len()
133            ));
134        }
135        for (idx, (a, b)) in loaded.iter().zip(in_memory.iter()).enumerate() {
136            let a_json = serde_json::to_string(a).ok();
137            let b_json = serde_json::to_string(b).ok();
138            if a_json != b_json {
139                return Some(format!(
140                    "event mismatch at index {idx}: persist={} memory={}",
141                    a.kind_str(),
142                    b.kind_str()
143                ));
144            }
145        }
146        None
147    }
148
149    #[must_use]
150    pub fn sink(&self) -> KernelEventSink {
151        self.tx.clone()
152    }
153
154    /// Borrow the live sender (for `TurnLoopHost::kernel_event_sink`).
155    #[must_use]
156    pub fn tx(&self) -> &KernelEventSink {
157        &self.tx
158    }
159}
160
161fn seed_schema_version_if_empty(db: &Connection) -> anyhow::Result<()> {
162    let count: i64 = db.query_row("SELECT COUNT(*) FROM kernel_events", [], |row| row.get(0))?;
163    if count == 0 {
164        let mut log = KernelEventLog::new(db);
165        log.append(KernelEvent::SchemaVersion { version: 1 })?;
166    }
167    Ok(())
168}
169
170fn append_batch(db: &StdMutex<Connection>, events: Vec<KernelEvent>) -> anyhow::Result<()> {
171    let db = db
172        .lock()
173        .map_err(|_| anyhow::anyhow!("kernel event db mutex poisoned"))?;
174    let next_seq = KernelEventLog::peek_next_seq(&db)?;
175    let mut log = KernelEventLog::with_next_seq(&db, next_seq);
176    log.append_batch(events)
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use std::path::PathBuf;
183    use zagens_core::engine::kernel_event::TurnOutcome;
184    use zagens_core::turn::TurnLoopMode;
185
186    #[tokio::test]
187    async fn writer_drains_events_to_sqlite() {
188        let dir = tempfile::tempdir().expect("tempdir");
189        let db_path: PathBuf = dir.path().join("sessions.db");
190        let writer = KernelEventWriter::try_open(&db_path).expect("open writer");
191        let sink = writer.sink();
192
193        sink.send(KernelEvent::TurnStarted {
194            turn_id: "t-writer-001".into(),
195            mode: TurnLoopMode::Agent,
196            input_text: "hello".into(),
197            max_steps: 5,
198        })
199        .expect("send");
200        sink.send(KernelEvent::TurnEnded {
201            turn_id: "t-writer-001".into(),
202            outcome: TurnOutcome::Completed,
203            total_steps: 1,
204        })
205        .expect("send");
206
207        drop(sink);
208        drop(writer);
209
210        // Allow drain task to finish.
211        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
212
213        let db = Connection::open(&db_path).expect("reopen");
214        let count: i64 = db
215            .query_row("SELECT COUNT(*) FROM kernel_events", [], |r| r.get(0))
216            .expect("count");
217        // schema_version + turn_started + turn_ended
218        assert_eq!(count, 3, "expected schema_version + 2 turn events");
219
220        let log = KernelEventLog::new(&db);
221        let envelopes = log
222            .load_turn_events("t-writer-001")
223            .expect("load turn events");
224        assert_eq!(envelopes.len(), 2);
225
226        let writer2 = KernelEventWriter::try_open(&db_path).expect("reopen writer");
227        let loaded = writer2
228            .load_turn_events_sync("t-writer-001")
229            .expect("load sync");
230        assert_eq!(loaded.len(), 2);
231        let in_memory: Vec<KernelEvent> = envelopes.into_iter().map(|e| e.event).collect();
232        assert!(
233            writer2
234                .verify_persisted_turn_matches("t-writer-001", &in_memory)
235                .is_none()
236        );
237    }
238}