Skip to main content

mlua_swarm/store/enhance_log/
sqlite.rs

1//! `SqliteEnhanceLogStore` — SQLite-backed [`EnhanceLogStore`].
2//!
3//! One row per `issue_id`. `verdicts` and `reasons` are stored as JSON blobs
4//! so schema evolution of `VerdictSummary` does not require a migration.
5//! Lists are ordered by `ts_ms ASC` (as promised by the trait contract).
6
7use super::{EnhanceLogEntry, EnhanceLogStore, EnhanceLogStoreError, VerdictSummary};
8use crate::blueprint::store::BlueprintId;
9use crate::store::issue::IssueId;
10use async_trait::async_trait;
11use rusqlite::{params, OptionalExtension};
12use rusqlite_isle::{AsyncIsle, AsyncIsleDriver, IsleError};
13use std::path::Path;
14
15const SCHEMA_SQL: &str = "\
16CREATE TABLE IF NOT EXISTS enhance_log (\
17  issue_id     TEXT PRIMARY KEY, \
18  blueprint_id TEXT NOT NULL, \
19  prev_hash    TEXT NOT NULL, \
20  new_hash     TEXT NOT NULL, \
21  intent       TEXT NOT NULL, \
22  rationale    TEXT NOT NULL, \
23  verdicts_json TEXT NOT NULL, \
24  status       TEXT NOT NULL, \
25  reasons_json TEXT NOT NULL, \
26  ts_ms        INTEGER NOT NULL\
27);\
28CREATE INDEX IF NOT EXISTS ix_enhance_log_bp_ts ON enhance_log(blueprint_id, ts_ms);\
29CREATE INDEX IF NOT EXISTS ix_enhance_log_ts ON enhance_log(ts_ms);\
30";
31
32/// SQLite-backed [`EnhanceLogStore`]. Append-only in the same sense as the
33/// in-memory backend: a duplicate `issue_id` returns `Conflict`, the existing
34/// row is left untouched.
35pub struct SqliteEnhanceLogStore {
36    isle: AsyncIsle,
37}
38
39impl SqliteEnhanceLogStore {
40    /// Open (or create) a SQLite file and apply the schema.
41    pub async fn open(
42        path: impl AsRef<Path>,
43    ) -> Result<(Self, AsyncIsleDriver), EnhanceLogStoreError> {
44        let (isle, driver) = AsyncIsle::spawn(path.as_ref().to_path_buf(), |conn| {
45            conn.execute_batch(SCHEMA_SQL)
46        })
47        .await
48        .map_err(map_isle_err)?;
49        Ok((Self { isle }, driver))
50    }
51
52    /// Open an ephemeral in-memory database (tests).
53    pub async fn open_in_memory() -> Result<(Self, AsyncIsleDriver), EnhanceLogStoreError> {
54        let (isle, driver) = AsyncIsle::open_in_memory(|conn| conn.execute_batch(SCHEMA_SQL))
55            .await
56            .map_err(map_isle_err)?;
57        Ok((Self { isle }, driver))
58    }
59}
60
61fn map_isle_err(e: IsleError) -> EnhanceLogStoreError {
62    EnhanceLogStoreError::Other(format!("sqlite: {e}"))
63}
64
65/// One `enhance_log` SELECT row in column order: issue_id, blueprint_id,
66/// prev_hash, new_hash, intent, rationale, verdicts_json, status,
67/// reasons_json, ts_ms.
68type LogRow = (
69    String,
70    String,
71    String,
72    String,
73    String,
74    String,
75    String,
76    String,
77    String,
78    i64,
79);
80
81fn row_to_entry(row: LogRow) -> Result<EnhanceLogEntry, EnhanceLogStoreError> {
82    let (
83        issue_id,
84        blueprint_id,
85        prev_hash,
86        new_hash,
87        intent,
88        rationale,
89        verdicts_json,
90        status,
91        reasons_json,
92        ts_ms,
93    ) = row;
94    let verdicts: Vec<VerdictSummary> = serde_json::from_str(&verdicts_json)
95        .map_err(|e| EnhanceLogStoreError::Other(format!("decode verdicts: {e}")))?;
96    let reasons: Vec<String> = serde_json::from_str(&reasons_json)
97        .map_err(|e| EnhanceLogStoreError::Other(format!("decode reasons: {e}")))?;
98    Ok(EnhanceLogEntry {
99        issue_id: IssueId::new(issue_id),
100        blueprint_id: BlueprintId::new(blueprint_id),
101        prev_hash,
102        new_hash,
103        intent,
104        rationale,
105        verdicts,
106        status,
107        reasons,
108        ts_ms,
109    })
110}
111
112#[async_trait]
113impl EnhanceLogStore for SqliteEnhanceLogStore {
114    fn name(&self) -> &str {
115        "sqlite"
116    }
117
118    async fn append(&self, entry: EnhanceLogEntry) -> Result<(), EnhanceLogStoreError> {
119        let issue_id_str = entry.issue_id.0.clone();
120        let issue_id_for_conflict = entry.issue_id.clone();
121        let blueprint_id = entry.blueprint_id.as_str().to_string();
122        let prev_hash = entry.prev_hash.clone();
123        let new_hash = entry.new_hash.clone();
124        let intent = entry.intent.clone();
125        let rationale = entry.rationale.clone();
126        let verdicts_json = serde_json::to_string(&entry.verdicts)
127            .map_err(|e| EnhanceLogStoreError::Other(format!("encode verdicts: {e}")))?;
128        let status = entry.status.clone();
129        let reasons_json = serde_json::to_string(&entry.reasons)
130            .map_err(|e| EnhanceLogStoreError::Other(format!("encode reasons: {e}")))?;
131        let ts_ms = entry.ts_ms;
132
133        self.isle
134            .call(move |conn| {
135                let tx = conn.transaction()?;
136                let exists: i64 = tx.query_row(
137                    "SELECT COUNT(*) FROM enhance_log WHERE issue_id = ?1",
138                    params![issue_id_str],
139                    |row| row.get(0),
140                )?;
141                if exists > 0 {
142                    return Err(rusqlite::Error::SqliteFailure(
143                        rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CONSTRAINT),
144                        Some(format!("__mlua_swarm_conflict:{issue_id_str}")),
145                    ));
146                }
147                tx.execute(
148                    "INSERT INTO enhance_log (issue_id, blueprint_id, prev_hash, new_hash, \
149                     intent, rationale, verdicts_json, status, reasons_json, ts_ms) \
150                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
151                    params![
152                        issue_id_str,
153                        blueprint_id,
154                        prev_hash,
155                        new_hash,
156                        intent,
157                        rationale,
158                        verdicts_json,
159                        status,
160                        reasons_json,
161                        ts_ms,
162                    ],
163                )?;
164                tx.commit()?;
165                Ok(())
166            })
167            .await
168            .map_err(|e| match &e {
169                IsleError::Sqlite(rusqlite::Error::SqliteFailure(_, Some(msg)))
170                    if msg.starts_with("__mlua_swarm_conflict:") =>
171                {
172                    EnhanceLogStoreError::Conflict(issue_id_for_conflict.clone())
173                }
174                _ => map_isle_err(e),
175            })
176    }
177
178    async fn get(&self, issue_id: &IssueId) -> Result<EnhanceLogEntry, EnhanceLogStoreError> {
179        let id_str = issue_id.0.clone();
180        let id_for_notfound = issue_id.clone();
181        let row = self
182            .isle
183            .call(move |conn| {
184                conn.query_row(
185                    "SELECT issue_id, blueprint_id, prev_hash, new_hash, intent, rationale, \
186                     verdicts_json, status, reasons_json, ts_ms \
187                     FROM enhance_log WHERE issue_id = ?1",
188                    params![id_str],
189                    |row| {
190                        Ok((
191                            row.get::<_, String>(0)?,
192                            row.get::<_, String>(1)?,
193                            row.get::<_, String>(2)?,
194                            row.get::<_, String>(3)?,
195                            row.get::<_, String>(4)?,
196                            row.get::<_, String>(5)?,
197                            row.get::<_, String>(6)?,
198                            row.get::<_, String>(7)?,
199                            row.get::<_, String>(8)?,
200                            row.get::<_, i64>(9)?,
201                        ))
202                    },
203                )
204                .optional()
205            })
206            .await
207            .map_err(map_isle_err)?;
208        match row {
209            Some(row) => row_to_entry(row),
210            None => Err(EnhanceLogStoreError::NotFound(id_for_notfound)),
211        }
212    }
213
214    async fn list_by_blueprint(
215        &self,
216        blueprint_id: &BlueprintId,
217    ) -> Result<Vec<EnhanceLogEntry>, EnhanceLogStoreError> {
218        let bp_str = blueprint_id.as_str().to_string();
219        let rows = self
220            .isle
221            .call(move |conn| {
222                let mut stmt = conn.prepare(
223                    "SELECT issue_id, blueprint_id, prev_hash, new_hash, intent, rationale, \
224                     verdicts_json, status, reasons_json, ts_ms \
225                     FROM enhance_log WHERE blueprint_id = ?1 ORDER BY ts_ms ASC",
226                )?;
227                let iter = stmt.query_map(params![bp_str], |row| {
228                    Ok((
229                        row.get::<_, String>(0)?,
230                        row.get::<_, String>(1)?,
231                        row.get::<_, String>(2)?,
232                        row.get::<_, String>(3)?,
233                        row.get::<_, String>(4)?,
234                        row.get::<_, String>(5)?,
235                        row.get::<_, String>(6)?,
236                        row.get::<_, String>(7)?,
237                        row.get::<_, String>(8)?,
238                        row.get::<_, i64>(9)?,
239                    ))
240                })?;
241                let mut out = Vec::new();
242                for r in iter {
243                    out.push(r?);
244                }
245                Ok(out)
246            })
247            .await
248            .map_err(map_isle_err)?;
249        rows.into_iter().map(row_to_entry).collect()
250    }
251
252    async fn list_all(&self) -> Result<Vec<EnhanceLogEntry>, EnhanceLogStoreError> {
253        let rows = self
254            .isle
255            .call(|conn| {
256                let mut stmt = conn.prepare(
257                    "SELECT issue_id, blueprint_id, prev_hash, new_hash, intent, rationale, \
258                     verdicts_json, status, reasons_json, ts_ms \
259                     FROM enhance_log ORDER BY ts_ms ASC",
260                )?;
261                let iter = stmt.query_map([], |row| {
262                    Ok((
263                        row.get::<_, String>(0)?,
264                        row.get::<_, String>(1)?,
265                        row.get::<_, String>(2)?,
266                        row.get::<_, String>(3)?,
267                        row.get::<_, String>(4)?,
268                        row.get::<_, String>(5)?,
269                        row.get::<_, String>(6)?,
270                        row.get::<_, String>(7)?,
271                        row.get::<_, String>(8)?,
272                        row.get::<_, i64>(9)?,
273                    ))
274                })?;
275                let mut out = Vec::new();
276                for r in iter {
277                    out.push(r?);
278                }
279                Ok(out)
280            })
281            .await
282            .map_err(map_isle_err)?;
283        rows.into_iter().map(row_to_entry).collect()
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290
291    fn mk_entry(issue: &str, bp: &str, ts_ms: i64, status: &str) -> EnhanceLogEntry {
292        EnhanceLogEntry {
293            issue_id: IssueId::new(issue),
294            blueprint_id: BlueprintId::new(bp),
295            prev_hash: "prev".into(),
296            new_hash: if status == "applied" { "new" } else { "" }.into(),
297            intent: format!("intent-{issue}"),
298            rationale: format!("rationale-{issue}"),
299            verdicts: vec![VerdictSummary {
300                axis: "des".into(),
301                status: "pass".into(),
302                detail: "ok".into(),
303            }],
304            status: status.into(),
305            reasons: if status == "rejected" {
306                vec!["des: broken".into()]
307            } else {
308                vec![]
309            },
310            ts_ms,
311        }
312    }
313
314    #[tokio::test]
315    async fn append_then_get_roundtrip() {
316        let (s, driver) = SqliteEnhanceLogStore::open_in_memory().await.unwrap();
317        let e = mk_entry("i1", "bp-1", 100, "applied");
318        s.append(e.clone()).await.unwrap();
319        let got = s.get(&IssueId::new("i1")).await.unwrap();
320        assert_eq!(got, e);
321        drop(s);
322        driver.shutdown().await.unwrap();
323    }
324
325    #[tokio::test]
326    async fn duplicate_append_returns_conflict() {
327        let (s, driver) = SqliteEnhanceLogStore::open_in_memory().await.unwrap();
328        s.append(mk_entry("i1", "bp-1", 100, "applied"))
329            .await
330            .unwrap();
331        let err = s
332            .append(mk_entry("i1", "bp-1", 200, "rejected"))
333            .await
334            .unwrap_err();
335        assert!(matches!(err, EnhanceLogStoreError::Conflict(_)));
336        drop(s);
337        driver.shutdown().await.unwrap();
338    }
339
340    #[tokio::test]
341    async fn get_missing_returns_not_found() {
342        let (s, driver) = SqliteEnhanceLogStore::open_in_memory().await.unwrap();
343        let err = s.get(&IssueId::new("nope")).await.unwrap_err();
344        assert!(matches!(err, EnhanceLogStoreError::NotFound(_)));
345        drop(s);
346        driver.shutdown().await.unwrap();
347    }
348
349    #[tokio::test]
350    async fn list_by_blueprint_ascending_ts() {
351        let (s, driver) = SqliteEnhanceLogStore::open_in_memory().await.unwrap();
352        s.append(mk_entry("a", "bp-1", 300, "applied"))
353            .await
354            .unwrap();
355        s.append(mk_entry("b", "bp-2", 200, "applied"))
356            .await
357            .unwrap();
358        s.append(mk_entry("c", "bp-1", 100, "rejected"))
359            .await
360            .unwrap();
361        let by_bp1 = s
362            .list_by_blueprint(&BlueprintId::new("bp-1"))
363            .await
364            .unwrap();
365        let ids: Vec<_> = by_bp1.iter().map(|e| e.issue_id.0.clone()).collect();
366        assert_eq!(ids, vec!["c", "a"]);
367        drop(s);
368        driver.shutdown().await.unwrap();
369    }
370
371    #[tokio::test]
372    async fn list_all_ascending_ts() {
373        let (s, driver) = SqliteEnhanceLogStore::open_in_memory().await.unwrap();
374        s.append(mk_entry("a", "bp-1", 300, "applied"))
375            .await
376            .unwrap();
377        s.append(mk_entry("b", "bp-2", 100, "applied"))
378            .await
379            .unwrap();
380        s.append(mk_entry("c", "bp-1", 200, "applied"))
381            .await
382            .unwrap();
383        let all = s.list_all().await.unwrap();
384        let ids: Vec<_> = all.iter().map(|e| e.issue_id.0.clone()).collect();
385        assert_eq!(ids, vec!["b", "c", "a"]);
386        drop(s);
387        driver.shutdown().await.unwrap();
388    }
389
390    #[tokio::test]
391    async fn persists_across_reopen() {
392        let dir = tempfile::tempdir().unwrap();
393        let path = dir.path().join("enhance_log.db");
394        {
395            let (s, driver) = SqliteEnhanceLogStore::open(&path).await.unwrap();
396            s.append(mk_entry("keep", "bp-1", 42, "applied"))
397                .await
398                .unwrap();
399            drop(s);
400            driver.shutdown().await.unwrap();
401        }
402        let (s, driver) = SqliteEnhanceLogStore::open(&path).await.unwrap();
403        let got = s.get(&IssueId::new("keep")).await.unwrap();
404        assert_eq!(got.blueprint_id.as_str(), "bp-1");
405        assert_eq!(got.ts_ms, 42);
406        drop(s);
407        driver.shutdown().await.unwrap();
408    }
409}