Skip to main content

mlua_swarm/store/issue/
sqlite.rs

1//! `SqliteIssueStore` — SQLite-backed [`IssueStore`] using [`rusqlite-isle`].
2//!
3//! The `Connection` is confined to a dedicated OS thread by `AsyncIsle`; every
4//! call is a typed closure dispatched over a bounded channel. `pop_pending`
5//! runs the "pick oldest pending row + flip to InFlight" as a single
6//! transaction inside one closure, so the FIFO invariant is preserved across
7//! concurrent callers.
8//!
9//! ## Schema
10//!
11//! ```sql
12//! CREATE TABLE IF NOT EXISTS issues (
13//!   issue_id      TEXT PRIMARY KEY,
14//!   blueprint_id  TEXT NOT NULL,
15//!   intent        TEXT NOT NULL,
16//!   status_kind   TEXT NOT NULL,      -- 'pending' | 'inflight' | 'applied' | 'rejected'
17//!   status_detail TEXT,               -- new_version (applied) | reason (rejected) | NULL
18//!   created_seq   INTEGER NOT NULL,   -- insertion order for `list()`
19//!   pending_seq   INTEGER              -- FIFO for `pop_pending`, NULL when not pending
20//! );
21//! CREATE INDEX IF NOT EXISTS ix_issues_pending_seq ON issues(pending_seq);
22//! ```
23
24use super::{IssueId, IssuePayload, IssueStatus, IssueStore, IssueStoreError};
25use crate::blueprint::store::BlueprintId;
26use async_trait::async_trait;
27use rusqlite::{params, OptionalExtension};
28use rusqlite_isle::{AsyncIsle, AsyncIsleDriver, IsleError};
29use std::path::Path;
30
31const SCHEMA_SQL: &str = "\
32CREATE TABLE IF NOT EXISTS issues (\
33  issue_id      TEXT PRIMARY KEY, \
34  blueprint_id  TEXT NOT NULL, \
35  intent        TEXT NOT NULL, \
36  status_kind   TEXT NOT NULL, \
37  status_detail TEXT, \
38  created_seq   INTEGER NOT NULL, \
39  pending_seq   INTEGER\
40);\
41CREATE INDEX IF NOT EXISTS ix_issues_pending_seq ON issues(pending_seq);\
42CREATE INDEX IF NOT EXISTS ix_issues_created_seq ON issues(created_seq);\
43";
44
45const STATUS_PENDING: &str = "pending";
46const STATUS_INFLIGHT: &str = "inflight";
47const STATUS_APPLIED: &str = "applied";
48const STATUS_REJECTED: &str = "rejected";
49
50/// SQLite-backed persistent [`IssueStore`].
51///
52/// Open with [`SqliteIssueStore::open`] (file path) or
53/// [`SqliteIssueStore::open_in_memory`] (tests). Both return the store plus an
54/// [`AsyncIsleDriver`] the caller must `shutdown().await` when done — dropping
55/// the driver without a shutdown call leaves the SQLite thread as-is until the
56/// process exits.
57pub struct SqliteIssueStore {
58    isle: AsyncIsle,
59}
60
61impl SqliteIssueStore {
62    /// Open (or create) a SQLite database file and run the schema migrations.
63    pub async fn open(path: impl AsRef<Path>) -> Result<(Self, AsyncIsleDriver), IssueStoreError> {
64        let (isle, driver) = AsyncIsle::spawn(path.as_ref().to_path_buf(), |conn| {
65            conn.execute_batch(SCHEMA_SQL)
66        })
67        .await
68        .map_err(map_isle_err)?;
69        Ok((Self { isle }, driver))
70    }
71
72    /// Open an ephemeral in-memory database (tests, doctests).
73    pub async fn open_in_memory() -> Result<(Self, AsyncIsleDriver), IssueStoreError> {
74        let (isle, driver) = AsyncIsle::open_in_memory(|conn| conn.execute_batch(SCHEMA_SQL))
75            .await
76            .map_err(map_isle_err)?;
77        Ok((Self { isle }, driver))
78    }
79}
80
81fn map_isle_err(e: IsleError) -> IssueStoreError {
82    IssueStoreError::Other(format!("sqlite: {e}"))
83}
84
85/// Encode an `IssueStatus` as `(kind, detail)` for storage.
86fn encode_status(s: &IssueStatus) -> (&'static str, Option<String>) {
87    match s {
88        IssueStatus::Pending => (STATUS_PENDING, None),
89        IssueStatus::InFlight => (STATUS_INFLIGHT, None),
90        IssueStatus::Applied { new_version } => (STATUS_APPLIED, Some(new_version.clone())),
91        IssueStatus::Rejected { reason } => (STATUS_REJECTED, Some(reason.clone())),
92    }
93}
94
95/// Decode a `(kind, detail)` row back into an `IssueStatus`.
96fn decode_status(kind: &str, detail: Option<String>) -> Result<IssueStatus, IssueStoreError> {
97    match kind {
98        STATUS_PENDING => Ok(IssueStatus::Pending),
99        STATUS_INFLIGHT => Ok(IssueStatus::InFlight),
100        STATUS_APPLIED => Ok(IssueStatus::Applied {
101            new_version: detail.unwrap_or_default(),
102        }),
103        STATUS_REJECTED => Ok(IssueStatus::Rejected {
104            reason: detail.unwrap_or_default(),
105        }),
106        other => Err(IssueStoreError::Other(format!(
107            "invalid status_kind: {other}"
108        ))),
109    }
110}
111
112#[async_trait]
113impl IssueStore for SqliteIssueStore {
114    fn name(&self) -> &str {
115        "sqlite"
116    }
117
118    async fn create(&self, payload: IssuePayload) -> Result<(), IssueStoreError> {
119        let id = payload.issue_id.0.clone();
120        let bp = payload.blueprint_id.as_str().to_string();
121        let intent = payload.intent.clone();
122
123        self.isle
124            .call(move |conn| {
125                let tx = conn.transaction()?;
126                // Duplicate check — surface as a distinct error kind at the
127                // trait layer (unique constraint violation would work too, but
128                // the explicit check keeps the error mapping trivial).
129                let exists: i64 = tx.query_row(
130                    "SELECT COUNT(*) FROM issues WHERE issue_id = ?1",
131                    params![id],
132                    |row| row.get(0),
133                )?;
134                if exists > 0 {
135                    // Signal via a sentinel rusqlite::Error; the outer layer
136                    // maps it back to Duplicate.
137                    return Err(rusqlite::Error::SqliteFailure(
138                        rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CONSTRAINT),
139                        Some(format!("__mlua_swarm_duplicate:{id}")),
140                    ));
141                }
142                let created_seq: i64 = tx.query_row(
143                    "SELECT COALESCE(MAX(created_seq), 0) + 1 FROM issues",
144                    [],
145                    |row| row.get(0),
146                )?;
147                let pending_seq: i64 = tx.query_row(
148                    "SELECT COALESCE(MAX(pending_seq), 0) + 1 FROM issues",
149                    [],
150                    |row| row.get(0),
151                )?;
152                tx.execute(
153                    "INSERT INTO issues (issue_id, blueprint_id, intent, status_kind, \
154                     status_detail, created_seq, pending_seq) \
155                     VALUES (?1, ?2, ?3, ?4, NULL, ?5, ?6)",
156                    params![id, bp, intent, STATUS_PENDING, created_seq, pending_seq],
157                )?;
158                tx.commit()?;
159                Ok(())
160            })
161            .await
162            .map_err(|e| match &e {
163                IsleError::Sqlite(rusqlite::Error::SqliteFailure(_, Some(msg)))
164                    if msg.starts_with("__mlua_swarm_duplicate:") =>
165                {
166                    let id = msg.trim_start_matches("__mlua_swarm_duplicate:").to_string();
167                    IssueStoreError::Duplicate(IssueId::new(id))
168                }
169                _ => map_isle_err(e),
170            })
171    }
172
173    async fn get(&self, id: &IssueId) -> Result<IssuePayload, IssueStoreError> {
174        let id_str = id.0.clone();
175        let id_for_notfound = id.clone();
176        let row = self
177            .isle
178            .call(move |conn| {
179                conn.query_row(
180                    "SELECT blueprint_id, intent FROM issues WHERE issue_id = ?1",
181                    params![id_str],
182                    |row| {
183                        let bp: String = row.get(0)?;
184                        let intent: String = row.get(1)?;
185                        Ok((bp, intent))
186                    },
187                )
188                .optional()
189            })
190            .await
191            .map_err(map_isle_err)?;
192        match row {
193            Some((bp, intent)) => Ok(IssuePayload {
194                issue_id: id_for_notfound,
195                blueprint_id: BlueprintId::new(bp),
196                intent,
197            }),
198            None => Err(IssueStoreError::NotFound(id_for_notfound)),
199        }
200    }
201
202    async fn status(&self, id: &IssueId) -> Result<IssueStatus, IssueStoreError> {
203        let id_str = id.0.clone();
204        let id_for_notfound = id.clone();
205        let row = self
206            .isle
207            .call(move |conn| {
208                conn.query_row(
209                    "SELECT status_kind, status_detail FROM issues WHERE issue_id = ?1",
210                    params![id_str],
211                    |row| {
212                        let kind: String = row.get(0)?;
213                        let detail: Option<String> = row.get(1)?;
214                        Ok((kind, detail))
215                    },
216                )
217                .optional()
218            })
219            .await
220            .map_err(map_isle_err)?;
221        match row {
222            Some((kind, detail)) => decode_status(&kind, detail),
223            None => Err(IssueStoreError::NotFound(id_for_notfound)),
224        }
225    }
226
227    async fn list(&self) -> Result<Vec<(IssueId, IssueStatus)>, IssueStoreError> {
228        let rows = self
229            .isle
230            .call(|conn| {
231                let mut stmt = conn.prepare(
232                    "SELECT issue_id, status_kind, status_detail \
233                     FROM issues ORDER BY created_seq ASC",
234                )?;
235                let iter = stmt.query_map([], |row| {
236                    let id: String = row.get(0)?;
237                    let kind: String = row.get(1)?;
238                    let detail: Option<String> = row.get(2)?;
239                    Ok((id, kind, detail))
240                })?;
241                let mut out = Vec::new();
242                for r in iter {
243                    out.push(r?);
244                }
245                Ok(out)
246            })
247            .await
248            .map_err(map_isle_err)?;
249
250        let mut result = Vec::with_capacity(rows.len());
251        for (id, kind, detail) in rows {
252            result.push((IssueId::new(id), decode_status(&kind, detail)?));
253        }
254        Ok(result)
255    }
256
257    async fn pop_pending(&self) -> Result<Option<IssuePayload>, IssueStoreError> {
258        let picked = self
259            .isle
260            .call(move |conn| {
261                let tx = conn.transaction()?;
262                let row: Option<(String, String, String)> = tx
263                    .query_row(
264                        "SELECT issue_id, blueprint_id, intent FROM issues \
265                         WHERE pending_seq IS NOT NULL \
266                         ORDER BY pending_seq ASC LIMIT 1",
267                        [],
268                        |row| {
269                            let id: String = row.get(0)?;
270                            let bp: String = row.get(1)?;
271                            let intent: String = row.get(2)?;
272                            Ok((id, bp, intent))
273                        },
274                    )
275                    .optional()?;
276                let Some((id, bp, intent)) = row else {
277                    return Ok(None);
278                };
279                tx.execute(
280                    "UPDATE issues SET status_kind = ?1, status_detail = NULL, \
281                     pending_seq = NULL WHERE issue_id = ?2",
282                    params![STATUS_INFLIGHT, id],
283                )?;
284                tx.commit()?;
285                Ok(Some((id, bp, intent)))
286            })
287            .await
288            .map_err(map_isle_err)?;
289
290        Ok(picked.map(|(id, bp, intent)| IssuePayload {
291            issue_id: IssueId::new(id),
292            blueprint_id: BlueprintId::new(bp),
293            intent,
294        }))
295    }
296
297    async fn update_status(
298        &self,
299        id: &IssueId,
300        status: IssueStatus,
301    ) -> Result<(), IssueStoreError> {
302        let id_str = id.0.clone();
303        let id_for_notfound = id.clone();
304        let (kind, detail) = encode_status(&status);
305        let clear_pending = !matches!(status, IssueStatus::Pending);
306        let n = self
307            .isle
308            .call(move |conn| {
309                if clear_pending {
310                    conn.execute(
311                        "UPDATE issues SET status_kind = ?1, status_detail = ?2, \
312                         pending_seq = NULL WHERE issue_id = ?3",
313                        params![kind, detail, id_str],
314                    )
315                } else {
316                    conn.execute(
317                        "UPDATE issues SET status_kind = ?1, status_detail = ?2 \
318                         WHERE issue_id = ?3",
319                        params![kind, detail, id_str],
320                    )
321                }
322            })
323            .await
324            .map_err(map_isle_err)?;
325        if n == 0 {
326            Err(IssueStoreError::NotFound(id_for_notfound))
327        } else {
328            Ok(())
329        }
330    }
331}
332
333// ──────────────────────────────────────────────────────────────────────────
334// tests
335// ──────────────────────────────────────────────────────────────────────────
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340
341    fn mk(id: &str) -> IssuePayload {
342        IssuePayload {
343            issue_id: IssueId::new(id),
344            blueprint_id: BlueprintId::new("main"),
345            intent: format!("intent for {id}"),
346        }
347    }
348
349    #[tokio::test]
350    async fn create_then_get_status() {
351        let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
352        s.create(mk("i1")).await.unwrap();
353        let got = s.get(&IssueId::new("i1")).await.unwrap();
354        assert_eq!(got.issue_id, IssueId::new("i1"));
355        assert_eq!(got.intent, "intent for i1");
356        assert_eq!(
357            s.status(&IssueId::new("i1")).await.unwrap(),
358            IssueStatus::Pending
359        );
360        drop(s);
361        driver.shutdown().await.unwrap();
362    }
363
364    #[tokio::test]
365    async fn duplicate_create_rejected() {
366        let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
367        s.create(mk("i1")).await.unwrap();
368        let err = s.create(mk("i1")).await.unwrap_err();
369        assert!(matches!(err, IssueStoreError::Duplicate(_)), "got: {err:?}");
370        drop(s);
371        driver.shutdown().await.unwrap();
372    }
373
374    #[tokio::test]
375    async fn pop_pending_fifo_and_transitions_inflight() {
376        let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
377        s.create(mk("a")).await.unwrap();
378        s.create(mk("b")).await.unwrap();
379
380        let p1 = s.pop_pending().await.unwrap().unwrap();
381        assert_eq!(p1.issue_id, IssueId::new("a"));
382        assert_eq!(
383            s.status(&IssueId::new("a")).await.unwrap(),
384            IssueStatus::InFlight
385        );
386
387        let p2 = s.pop_pending().await.unwrap().unwrap();
388        assert_eq!(p2.issue_id, IssueId::new("b"));
389
390        assert!(s.pop_pending().await.unwrap().is_none());
391        drop(s);
392        driver.shutdown().await.unwrap();
393    }
394
395    #[tokio::test]
396    async fn update_status_to_applied_and_rejected() {
397        let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
398        s.create(mk("x")).await.unwrap();
399        s.pop_pending().await.unwrap();
400        s.update_status(
401            &IssueId::new("x"),
402            IssueStatus::Applied {
403                new_version: "abc123".into(),
404            },
405        )
406        .await
407        .unwrap();
408        match s.status(&IssueId::new("x")).await.unwrap() {
409            IssueStatus::Applied { new_version } => assert_eq!(new_version, "abc123"),
410            other => panic!("unexpected: {other:?}"),
411        }
412
413        s.create(mk("y")).await.unwrap();
414        s.update_status(
415            &IssueId::new("y"),
416            IssueStatus::Rejected {
417                reason: "bad shape".into(),
418            },
419        )
420        .await
421        .unwrap();
422        match s.status(&IssueId::new("y")).await.unwrap() {
423            IssueStatus::Rejected { reason } => assert_eq!(reason, "bad shape"),
424            other => panic!("unexpected: {other:?}"),
425        }
426        // `y` was updated to Rejected before ever being popped — verify the
427        // pending queue no longer offers it.
428        assert!(s.pop_pending().await.unwrap().is_none());
429        drop(s);
430        driver.shutdown().await.unwrap();
431    }
432
433    #[tokio::test]
434    async fn list_returns_insertion_order() {
435        let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
436        s.create(mk("a")).await.unwrap();
437        s.create(mk("b")).await.unwrap();
438        s.create(mk("c")).await.unwrap();
439        let v = s.list().await.unwrap();
440        let ids: Vec<_> = v.iter().map(|(i, _)| i.0.clone()).collect();
441        assert_eq!(ids, vec!["a", "b", "c"]);
442        drop(s);
443        driver.shutdown().await.unwrap();
444    }
445
446    #[tokio::test]
447    async fn update_status_unknown_fails() {
448        let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
449        let err = s
450            .update_status(&IssueId::new("nope"), IssueStatus::Pending)
451            .await
452            .unwrap_err();
453        assert!(matches!(err, IssueStoreError::NotFound(_)));
454        drop(s);
455        driver.shutdown().await.unwrap();
456    }
457
458    #[tokio::test]
459    async fn persists_across_reopen() {
460        let dir = tempfile::tempdir().unwrap();
461        let path = dir.path().join("issues.db");
462
463        {
464            let (s, driver) = SqliteIssueStore::open(&path).await.unwrap();
465            s.create(mk("keep")).await.unwrap();
466            drop(s);
467            driver.shutdown().await.unwrap();
468        }
469
470        let (s, driver) = SqliteIssueStore::open(&path).await.unwrap();
471        let got = s.get(&IssueId::new("keep")).await.unwrap();
472        assert_eq!(got.intent, "intent for keep");
473        drop(s);
474        driver.shutdown().await.unwrap();
475    }
476}