Skip to main content

mlua_swarm/store/enhance_log/
sqlite.rs

1//! `SqliteEnhanceLogStore` — SQLite-backed [`EnhanceLogStore`].
2//!
3//! One row per `issue_id`. `verdicts` and `reasons` are stored as JSON blobs
4//! so schema evolution of `VerdictSummary` does not require a migration.
5//! Lists are ordered by `ts_ms ASC` (as promised by the trait contract).
6
7use super::{EnhanceLogEntry, EnhanceLogStore, EnhanceLogStoreError, VerdictSummary};
8use crate::blueprint::store::BlueprintId;
9use crate::store::issue::IssueId;
10use async_trait::async_trait;
11use rusqlite::{params, OptionalExtension};
12use rusqlite_isle::{AsyncIsle, AsyncIsleDriver, IsleError};
13use std::path::Path;
14
15const SCHEMA_SQL: &str = "\
16CREATE TABLE IF NOT EXISTS enhance_log (\
17  issue_id     TEXT PRIMARY KEY, \
18  blueprint_id TEXT NOT NULL, \
19  prev_hash    TEXT NOT NULL, \
20  new_hash     TEXT NOT NULL, \
21  intent       TEXT NOT NULL, \
22  rationale    TEXT NOT NULL, \
23  verdicts_json TEXT NOT NULL, \
24  status       TEXT NOT NULL, \
25  reasons_json TEXT NOT NULL, \
26  ts_ms        INTEGER NOT NULL\
27);\
28CREATE INDEX IF NOT EXISTS ix_enhance_log_bp_ts ON enhance_log(blueprint_id, ts_ms);\
29CREATE INDEX IF NOT EXISTS ix_enhance_log_ts ON enhance_log(ts_ms);\
30";
31
32/// SQLite-backed [`EnhanceLogStore`]. Append-only in the same sense as the
33/// in-memory backend: a duplicate `issue_id` returns `Conflict`, the existing
34/// row is left untouched.
35pub struct SqliteEnhanceLogStore {
36    isle: AsyncIsle,
37}
38
39impl SqliteEnhanceLogStore {
40    /// Open (or create) a SQLite file and apply the schema.
41    pub async fn open(
42        path: impl AsRef<Path>,
43    ) -> Result<(Self, AsyncIsleDriver), EnhanceLogStoreError> {
44        let (isle, driver) = AsyncIsle::spawn(path.as_ref().to_path_buf(), |conn| {
45            conn.execute_batch(SCHEMA_SQL)
46        })
47        .await
48        .map_err(map_isle_err)?;
49        Ok((Self { isle }, driver))
50    }
51
52    /// Open an ephemeral in-memory database (tests).
53    pub async fn open_in_memory() -> Result<(Self, AsyncIsleDriver), EnhanceLogStoreError> {
54        let (isle, driver) = AsyncIsle::open_in_memory(|conn| conn.execute_batch(SCHEMA_SQL))
55            .await
56            .map_err(map_isle_err)?;
57        Ok((Self { isle }, driver))
58    }
59}
60
61fn map_isle_err(e: IsleError) -> EnhanceLogStoreError {
62    EnhanceLogStoreError::Other(format!("sqlite: {e}"))
63}
64
65fn row_to_entry(
66    issue_id: String,
67    blueprint_id: String,
68    prev_hash: String,
69    new_hash: String,
70    intent: String,
71    rationale: String,
72    verdicts_json: String,
73    status: String,
74    reasons_json: String,
75    ts_ms: i64,
76) -> Result<EnhanceLogEntry, EnhanceLogStoreError> {
77    let verdicts: Vec<VerdictSummary> = serde_json::from_str(&verdicts_json)
78        .map_err(|e| EnhanceLogStoreError::Other(format!("decode verdicts: {e}")))?;
79    let reasons: Vec<String> = serde_json::from_str(&reasons_json)
80        .map_err(|e| EnhanceLogStoreError::Other(format!("decode reasons: {e}")))?;
81    Ok(EnhanceLogEntry {
82        issue_id: IssueId::new(issue_id),
83        blueprint_id: BlueprintId::new(blueprint_id),
84        prev_hash,
85        new_hash,
86        intent,
87        rationale,
88        verdicts,
89        status,
90        reasons,
91        ts_ms,
92    })
93}
94
95#[async_trait]
96impl EnhanceLogStore for SqliteEnhanceLogStore {
97    fn name(&self) -> &str {
98        "sqlite"
99    }
100
101    async fn append(&self, entry: EnhanceLogEntry) -> Result<(), EnhanceLogStoreError> {
102        let issue_id_str = entry.issue_id.0.clone();
103        let issue_id_for_conflict = entry.issue_id.clone();
104        let blueprint_id = entry.blueprint_id.as_str().to_string();
105        let prev_hash = entry.prev_hash.clone();
106        let new_hash = entry.new_hash.clone();
107        let intent = entry.intent.clone();
108        let rationale = entry.rationale.clone();
109        let verdicts_json = serde_json::to_string(&entry.verdicts)
110            .map_err(|e| EnhanceLogStoreError::Other(format!("encode verdicts: {e}")))?;
111        let status = entry.status.clone();
112        let reasons_json = serde_json::to_string(&entry.reasons)
113            .map_err(|e| EnhanceLogStoreError::Other(format!("encode reasons: {e}")))?;
114        let ts_ms = entry.ts_ms;
115
116        self.isle
117            .call(move |conn| {
118                let tx = conn.transaction()?;
119                let exists: i64 = tx.query_row(
120                    "SELECT COUNT(*) FROM enhance_log WHERE issue_id = ?1",
121                    params![issue_id_str],
122                    |row| row.get(0),
123                )?;
124                if exists > 0 {
125                    return Err(rusqlite::Error::SqliteFailure(
126                        rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CONSTRAINT),
127                        Some(format!("__mlua_swarm_conflict:{issue_id_str}")),
128                    ));
129                }
130                tx.execute(
131                    "INSERT INTO enhance_log (issue_id, blueprint_id, prev_hash, new_hash, \
132                     intent, rationale, verdicts_json, status, reasons_json, ts_ms) \
133                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
134                    params![
135                        issue_id_str,
136                        blueprint_id,
137                        prev_hash,
138                        new_hash,
139                        intent,
140                        rationale,
141                        verdicts_json,
142                        status,
143                        reasons_json,
144                        ts_ms,
145                    ],
146                )?;
147                tx.commit()?;
148                Ok(())
149            })
150            .await
151            .map_err(|e| match &e {
152                IsleError::Sqlite(rusqlite::Error::SqliteFailure(_, Some(msg)))
153                    if msg.starts_with("__mlua_swarm_conflict:") =>
154                {
155                    EnhanceLogStoreError::Conflict(issue_id_for_conflict.clone())
156                }
157                _ => map_isle_err(e),
158            })
159    }
160
161    async fn get(&self, issue_id: &IssueId) -> Result<EnhanceLogEntry, EnhanceLogStoreError> {
162        let id_str = issue_id.0.clone();
163        let id_for_notfound = issue_id.clone();
164        let row = self
165            .isle
166            .call(move |conn| {
167                conn.query_row(
168                    "SELECT issue_id, blueprint_id, prev_hash, new_hash, intent, rationale, \
169                     verdicts_json, status, reasons_json, ts_ms \
170                     FROM enhance_log WHERE issue_id = ?1",
171                    params![id_str],
172                    |row| {
173                        Ok((
174                            row.get::<_, String>(0)?,
175                            row.get::<_, String>(1)?,
176                            row.get::<_, String>(2)?,
177                            row.get::<_, String>(3)?,
178                            row.get::<_, String>(4)?,
179                            row.get::<_, String>(5)?,
180                            row.get::<_, String>(6)?,
181                            row.get::<_, String>(7)?,
182                            row.get::<_, String>(8)?,
183                            row.get::<_, i64>(9)?,
184                        ))
185                    },
186                )
187                .optional()
188            })
189            .await
190            .map_err(map_isle_err)?;
191        match row {
192            Some((iid, bp, prev, new, intent, rat, verdicts, status, reasons, ts)) => row_to_entry(
193                iid, bp, prev, new, intent, rat, verdicts, status, reasons, ts,
194            ),
195            None => Err(EnhanceLogStoreError::NotFound(id_for_notfound)),
196        }
197    }
198
199    async fn list_by_blueprint(
200        &self,
201        blueprint_id: &BlueprintId,
202    ) -> Result<Vec<EnhanceLogEntry>, EnhanceLogStoreError> {
203        let bp_str = blueprint_id.as_str().to_string();
204        let rows = self
205            .isle
206            .call(move |conn| {
207                let mut stmt = conn.prepare(
208                    "SELECT issue_id, blueprint_id, prev_hash, new_hash, intent, rationale, \
209                     verdicts_json, status, reasons_json, ts_ms \
210                     FROM enhance_log WHERE blueprint_id = ?1 ORDER BY ts_ms ASC",
211                )?;
212                let iter = stmt.query_map(params![bp_str], |row| {
213                    Ok((
214                        row.get::<_, String>(0)?,
215                        row.get::<_, String>(1)?,
216                        row.get::<_, String>(2)?,
217                        row.get::<_, String>(3)?,
218                        row.get::<_, String>(4)?,
219                        row.get::<_, String>(5)?,
220                        row.get::<_, String>(6)?,
221                        row.get::<_, String>(7)?,
222                        row.get::<_, String>(8)?,
223                        row.get::<_, i64>(9)?,
224                    ))
225                })?;
226                let mut out = Vec::new();
227                for r in iter {
228                    out.push(r?);
229                }
230                Ok(out)
231            })
232            .await
233            .map_err(map_isle_err)?;
234        rows.into_iter()
235            .map(|(iid, bp, prev, new, intent, rat, verdicts, status, reasons, ts)| {
236                row_to_entry(iid, bp, prev, new, intent, rat, verdicts, status, reasons, ts)
237            })
238            .collect()
239    }
240
241    async fn list_all(&self) -> Result<Vec<EnhanceLogEntry>, EnhanceLogStoreError> {
242        let rows = self
243            .isle
244            .call(|conn| {
245                let mut stmt = conn.prepare(
246                    "SELECT issue_id, blueprint_id, prev_hash, new_hash, intent, rationale, \
247                     verdicts_json, status, reasons_json, ts_ms \
248                     FROM enhance_log ORDER BY ts_ms ASC",
249                )?;
250                let iter = stmt.query_map([], |row| {
251                    Ok((
252                        row.get::<_, String>(0)?,
253                        row.get::<_, String>(1)?,
254                        row.get::<_, String>(2)?,
255                        row.get::<_, String>(3)?,
256                        row.get::<_, String>(4)?,
257                        row.get::<_, String>(5)?,
258                        row.get::<_, String>(6)?,
259                        row.get::<_, String>(7)?,
260                        row.get::<_, String>(8)?,
261                        row.get::<_, i64>(9)?,
262                    ))
263                })?;
264                let mut out = Vec::new();
265                for r in iter {
266                    out.push(r?);
267                }
268                Ok(out)
269            })
270            .await
271            .map_err(map_isle_err)?;
272        rows.into_iter()
273            .map(|(iid, bp, prev, new, intent, rat, verdicts, status, reasons, ts)| {
274                row_to_entry(iid, bp, prev, new, intent, rat, verdicts, status, reasons, ts)
275            })
276            .collect()
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283
284    fn mk_entry(issue: &str, bp: &str, ts_ms: i64, status: &str) -> EnhanceLogEntry {
285        EnhanceLogEntry {
286            issue_id: IssueId::new(issue),
287            blueprint_id: BlueprintId::new(bp),
288            prev_hash: "prev".into(),
289            new_hash: if status == "applied" { "new" } else { "" }.into(),
290            intent: format!("intent-{issue}"),
291            rationale: format!("rationale-{issue}"),
292            verdicts: vec![VerdictSummary {
293                axis: "des".into(),
294                status: "pass".into(),
295                detail: "ok".into(),
296            }],
297            status: status.into(),
298            reasons: if status == "rejected" {
299                vec!["des: broken".into()]
300            } else {
301                vec![]
302            },
303            ts_ms,
304        }
305    }
306
307    #[tokio::test]
308    async fn append_then_get_roundtrip() {
309        let (s, driver) = SqliteEnhanceLogStore::open_in_memory().await.unwrap();
310        let e = mk_entry("i1", "bp-1", 100, "applied");
311        s.append(e.clone()).await.unwrap();
312        let got = s.get(&IssueId::new("i1")).await.unwrap();
313        assert_eq!(got, e);
314        drop(s);
315        driver.shutdown().await.unwrap();
316    }
317
318    #[tokio::test]
319    async fn duplicate_append_returns_conflict() {
320        let (s, driver) = SqliteEnhanceLogStore::open_in_memory().await.unwrap();
321        s.append(mk_entry("i1", "bp-1", 100, "applied"))
322            .await
323            .unwrap();
324        let err = s
325            .append(mk_entry("i1", "bp-1", 200, "rejected"))
326            .await
327            .unwrap_err();
328        assert!(matches!(err, EnhanceLogStoreError::Conflict(_)));
329        drop(s);
330        driver.shutdown().await.unwrap();
331    }
332
333    #[tokio::test]
334    async fn get_missing_returns_not_found() {
335        let (s, driver) = SqliteEnhanceLogStore::open_in_memory().await.unwrap();
336        let err = s.get(&IssueId::new("nope")).await.unwrap_err();
337        assert!(matches!(err, EnhanceLogStoreError::NotFound(_)));
338        drop(s);
339        driver.shutdown().await.unwrap();
340    }
341
342    #[tokio::test]
343    async fn list_by_blueprint_ascending_ts() {
344        let (s, driver) = SqliteEnhanceLogStore::open_in_memory().await.unwrap();
345        s.append(mk_entry("a", "bp-1", 300, "applied"))
346            .await
347            .unwrap();
348        s.append(mk_entry("b", "bp-2", 200, "applied"))
349            .await
350            .unwrap();
351        s.append(mk_entry("c", "bp-1", 100, "rejected"))
352            .await
353            .unwrap();
354        let by_bp1 = s
355            .list_by_blueprint(&BlueprintId::new("bp-1"))
356            .await
357            .unwrap();
358        let ids: Vec<_> = by_bp1.iter().map(|e| e.issue_id.0.clone()).collect();
359        assert_eq!(ids, vec!["c", "a"]);
360        drop(s);
361        driver.shutdown().await.unwrap();
362    }
363
364    #[tokio::test]
365    async fn list_all_ascending_ts() {
366        let (s, driver) = SqliteEnhanceLogStore::open_in_memory().await.unwrap();
367        s.append(mk_entry("a", "bp-1", 300, "applied"))
368            .await
369            .unwrap();
370        s.append(mk_entry("b", "bp-2", 100, "applied"))
371            .await
372            .unwrap();
373        s.append(mk_entry("c", "bp-1", 200, "applied"))
374            .await
375            .unwrap();
376        let all = s.list_all().await.unwrap();
377        let ids: Vec<_> = all.iter().map(|e| e.issue_id.0.clone()).collect();
378        assert_eq!(ids, vec!["b", "c", "a"]);
379        drop(s);
380        driver.shutdown().await.unwrap();
381    }
382
383    #[tokio::test]
384    async fn persists_across_reopen() {
385        let dir = tempfile::tempdir().unwrap();
386        let path = dir.path().join("enhance_log.db");
387        {
388            let (s, driver) = SqliteEnhanceLogStore::open(&path).await.unwrap();
389            s.append(mk_entry("keep", "bp-1", 42, "applied"))
390                .await
391                .unwrap();
392            drop(s);
393            driver.shutdown().await.unwrap();
394        }
395        let (s, driver) = SqliteEnhanceLogStore::open(&path).await.unwrap();
396        let got = s.get(&IssueId::new("keep")).await.unwrap();
397        assert_eq!(got.blueprint_id.as_str(), "bp-1");
398        assert_eq!(got.ts_ms, 42);
399        drop(s);
400        driver.shutdown().await.unwrap();
401    }
402}