Skip to main content

mlua_swarm/store/issue/
sqlite.rs

1//! `SqliteIssueStore` — SQLite-backed [`IssueStore`] using [`rusqlite-isle`].
2//!
3//! The `Connection` is confined to a dedicated OS thread by `AsyncIsle`; every
4//! call is a typed closure dispatched over a bounded channel. `pop_pending`
5//! runs the "pick oldest pending row + flip to InFlight" as a single
6//! transaction inside one closure, so the FIFO invariant is preserved across
7//! concurrent callers.
8//!
9//! ## Schema
10//!
11//! ```sql
12//! CREATE TABLE IF NOT EXISTS issues (
13//!   issue_id      TEXT PRIMARY KEY,
14//!   blueprint_id  TEXT NOT NULL,
15//!   intent        TEXT NOT NULL,
16//!   status_kind   TEXT NOT NULL,      -- 'pending' | 'inflight' | 'applied' | 'rejected'
17//!   status_detail TEXT,               -- new_version (applied) | reason (rejected) | NULL
18//!   created_seq   INTEGER NOT NULL,   -- insertion order for `list()`
19//!   pending_seq   INTEGER              -- FIFO for `pop_pending`, NULL when not pending
20//! );
21//! CREATE INDEX IF NOT EXISTS ix_issues_pending_seq ON issues(pending_seq);
22//! ```
23
24use super::{IssueId, IssuePayload, IssueStatus, IssueStore, IssueStoreError};
25use crate::blueprint::store::BlueprintId;
26use async_trait::async_trait;
27use rusqlite::{params, OptionalExtension};
28use rusqlite_isle::{AsyncIsle, AsyncIsleDriver, IsleError};
29use std::path::Path;
30
31const SCHEMA_SQL: &str = "\
32CREATE TABLE IF NOT EXISTS issues (\
33  issue_id      TEXT PRIMARY KEY, \
34  blueprint_id  TEXT NOT NULL, \
35  intent        TEXT NOT NULL, \
36  status_kind   TEXT NOT NULL, \
37  status_detail TEXT, \
38  created_seq   INTEGER NOT NULL, \
39  pending_seq   INTEGER\
40);\
41CREATE INDEX IF NOT EXISTS ix_issues_pending_seq ON issues(pending_seq);\
42CREATE INDEX IF NOT EXISTS ix_issues_created_seq ON issues(created_seq);\
43";
44
45const STATUS_PENDING: &str = "pending";
46const STATUS_INFLIGHT: &str = "inflight";
47const STATUS_APPLIED: &str = "applied";
48const STATUS_REJECTED: &str = "rejected";
49
50/// SQLite-backed persistent [`IssueStore`].
51///
52/// Open with [`SqliteIssueStore::open`] (file path) or
53/// [`SqliteIssueStore::open_in_memory`] (tests). Both return the store plus an
54/// [`AsyncIsleDriver`] the caller must `shutdown().await` when done — dropping
55/// the driver without a shutdown call leaves the SQLite thread as-is until the
56/// process exits.
57pub struct SqliteIssueStore {
58    isle: AsyncIsle,
59}
60
61impl SqliteIssueStore {
62    /// Open (or create) a SQLite database file and run the schema migrations.
63    pub async fn open(path: impl AsRef<Path>) -> Result<(Self, AsyncIsleDriver), IssueStoreError> {
64        let (isle, driver) = AsyncIsle::spawn(path.as_ref().to_path_buf(), |conn| {
65            conn.execute_batch(SCHEMA_SQL)
66        })
67        .await
68        .map_err(map_isle_err)?;
69        Ok((Self { isle }, driver))
70    }
71
72    /// Open an ephemeral in-memory database (tests, doctests).
73    pub async fn open_in_memory() -> Result<(Self, AsyncIsleDriver), IssueStoreError> {
74        let (isle, driver) = AsyncIsle::open_in_memory(|conn| conn.execute_batch(SCHEMA_SQL))
75            .await
76            .map_err(map_isle_err)?;
77        Ok((Self { isle }, driver))
78    }
79}
80
81fn map_isle_err(e: IsleError) -> IssueStoreError {
82    IssueStoreError::Other(format!("sqlite: {e}"))
83}
84
85/// Encode an `IssueStatus` as `(kind, detail)` for storage.
86fn encode_status(s: &IssueStatus) -> (&'static str, Option<String>) {
87    match s {
88        IssueStatus::Pending => (STATUS_PENDING, None),
89        IssueStatus::InFlight => (STATUS_INFLIGHT, None),
90        IssueStatus::Applied { new_version } => (STATUS_APPLIED, Some(new_version.clone())),
91        IssueStatus::Rejected { reason } => (STATUS_REJECTED, Some(reason.clone())),
92    }
93}
94
95/// Decode a `(kind, detail)` row back into an `IssueStatus`.
96fn decode_status(kind: &str, detail: Option<String>) -> Result<IssueStatus, IssueStoreError> {
97    match kind {
98        STATUS_PENDING => Ok(IssueStatus::Pending),
99        STATUS_INFLIGHT => Ok(IssueStatus::InFlight),
100        STATUS_APPLIED => Ok(IssueStatus::Applied {
101            new_version: detail.unwrap_or_default(),
102        }),
103        STATUS_REJECTED => Ok(IssueStatus::Rejected {
104            reason: detail.unwrap_or_default(),
105        }),
106        other => Err(IssueStoreError::Other(format!(
107            "invalid status_kind: {other}"
108        ))),
109    }
110}
111
112#[async_trait]
113impl IssueStore for SqliteIssueStore {
114    fn name(&self) -> &str {
115        "sqlite"
116    }
117
118    async fn create(&self, payload: IssuePayload) -> Result<(), IssueStoreError> {
119        let id = payload.issue_id.0.clone();
120        let bp = payload.blueprint_id.as_str().to_string();
121        let intent = payload.intent.clone();
122
123        self.isle
124            .call(move |conn| {
125                let tx = conn.transaction()?;
126                // Duplicate check — surface as a distinct error kind at the
127                // trait layer (unique constraint violation would work too, but
128                // the explicit check keeps the error mapping trivial).
129                let exists: i64 = tx.query_row(
130                    "SELECT COUNT(*) FROM issues WHERE issue_id = ?1",
131                    params![id],
132                    |row| row.get(0),
133                )?;
134                if exists > 0 {
135                    // Signal via a sentinel rusqlite::Error; the outer layer
136                    // maps it back to Duplicate.
137                    return Err(rusqlite::Error::SqliteFailure(
138                        rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CONSTRAINT),
139                        Some(format!("__mlua_swarm_duplicate:{id}")),
140                    ));
141                }
142                let created_seq: i64 = tx.query_row(
143                    "SELECT COALESCE(MAX(created_seq), 0) + 1 FROM issues",
144                    [],
145                    |row| row.get(0),
146                )?;
147                let pending_seq: i64 = tx.query_row(
148                    "SELECT COALESCE(MAX(pending_seq), 0) + 1 FROM issues",
149                    [],
150                    |row| row.get(0),
151                )?;
152                tx.execute(
153                    "INSERT INTO issues (issue_id, blueprint_id, intent, status_kind, \
154                     status_detail, created_seq, pending_seq) \
155                     VALUES (?1, ?2, ?3, ?4, NULL, ?5, ?6)",
156                    params![id, bp, intent, STATUS_PENDING, created_seq, pending_seq],
157                )?;
158                tx.commit()?;
159                Ok(())
160            })
161            .await
162            .map_err(|e| match &e {
163                IsleError::Sqlite(rusqlite::Error::SqliteFailure(_, Some(msg)))
164                    if msg.starts_with("__mlua_swarm_duplicate:") =>
165                {
166                    let id = msg
167                        .trim_start_matches("__mlua_swarm_duplicate:")
168                        .to_string();
169                    IssueStoreError::Duplicate(IssueId::new(id))
170                }
171                _ => map_isle_err(e),
172            })
173    }
174
175    async fn get(&self, id: &IssueId) -> Result<IssuePayload, IssueStoreError> {
176        let id_str = id.0.clone();
177        let id_for_notfound = id.clone();
178        let row = self
179            .isle
180            .call(move |conn| {
181                conn.query_row(
182                    "SELECT blueprint_id, intent FROM issues WHERE issue_id = ?1",
183                    params![id_str],
184                    |row| {
185                        let bp: String = row.get(0)?;
186                        let intent: String = row.get(1)?;
187                        Ok((bp, intent))
188                    },
189                )
190                .optional()
191            })
192            .await
193            .map_err(map_isle_err)?;
194        match row {
195            Some((bp, intent)) => Ok(IssuePayload {
196                issue_id: id_for_notfound,
197                blueprint_id: BlueprintId::new(bp),
198                intent,
199            }),
200            None => Err(IssueStoreError::NotFound(id_for_notfound)),
201        }
202    }
203
204    async fn status(&self, id: &IssueId) -> Result<IssueStatus, IssueStoreError> {
205        let id_str = id.0.clone();
206        let id_for_notfound = id.clone();
207        let row = self
208            .isle
209            .call(move |conn| {
210                conn.query_row(
211                    "SELECT status_kind, status_detail FROM issues WHERE issue_id = ?1",
212                    params![id_str],
213                    |row| {
214                        let kind: String = row.get(0)?;
215                        let detail: Option<String> = row.get(1)?;
216                        Ok((kind, detail))
217                    },
218                )
219                .optional()
220            })
221            .await
222            .map_err(map_isle_err)?;
223        match row {
224            Some((kind, detail)) => decode_status(&kind, detail),
225            None => Err(IssueStoreError::NotFound(id_for_notfound)),
226        }
227    }
228
229    async fn list(&self) -> Result<Vec<(IssueId, IssueStatus)>, IssueStoreError> {
230        let rows = self
231            .isle
232            .call(|conn| {
233                let mut stmt = conn.prepare(
234                    "SELECT issue_id, status_kind, status_detail \
235                     FROM issues ORDER BY created_seq ASC",
236                )?;
237                let iter = stmt.query_map([], |row| {
238                    let id: String = row.get(0)?;
239                    let kind: String = row.get(1)?;
240                    let detail: Option<String> = row.get(2)?;
241                    Ok((id, kind, detail))
242                })?;
243                let mut out = Vec::new();
244                for r in iter {
245                    out.push(r?);
246                }
247                Ok(out)
248            })
249            .await
250            .map_err(map_isle_err)?;
251
252        let mut result = Vec::with_capacity(rows.len());
253        for (id, kind, detail) in rows {
254            result.push((IssueId::new(id), decode_status(&kind, detail)?));
255        }
256        Ok(result)
257    }
258
259    async fn pop_pending(&self) -> Result<Option<IssuePayload>, IssueStoreError> {
260        let picked = self
261            .isle
262            .call(move |conn| {
263                let tx = conn.transaction()?;
264                let row: Option<(String, String, String)> = tx
265                    .query_row(
266                        "SELECT issue_id, blueprint_id, intent FROM issues \
267                         WHERE pending_seq IS NOT NULL \
268                         ORDER BY pending_seq ASC LIMIT 1",
269                        [],
270                        |row| {
271                            let id: String = row.get(0)?;
272                            let bp: String = row.get(1)?;
273                            let intent: String = row.get(2)?;
274                            Ok((id, bp, intent))
275                        },
276                    )
277                    .optional()?;
278                let Some((id, bp, intent)) = row else {
279                    return Ok(None);
280                };
281                tx.execute(
282                    "UPDATE issues SET status_kind = ?1, status_detail = NULL, \
283                     pending_seq = NULL WHERE issue_id = ?2",
284                    params![STATUS_INFLIGHT, id],
285                )?;
286                tx.commit()?;
287                Ok(Some((id, bp, intent)))
288            })
289            .await
290            .map_err(map_isle_err)?;
291
292        Ok(picked.map(|(id, bp, intent)| IssuePayload {
293            issue_id: IssueId::new(id),
294            blueprint_id: BlueprintId::new(bp),
295            intent,
296        }))
297    }
298
299    async fn update_status(
300        &self,
301        id: &IssueId,
302        status: IssueStatus,
303    ) -> Result<(), IssueStoreError> {
304        let id_str = id.0.clone();
305        let id_for_notfound = id.clone();
306        let (kind, detail) = encode_status(&status);
307        let clear_pending = !matches!(status, IssueStatus::Pending);
308        let n = self
309            .isle
310            .call(move |conn| {
311                if clear_pending {
312                    conn.execute(
313                        "UPDATE issues SET status_kind = ?1, status_detail = ?2, \
314                         pending_seq = NULL WHERE issue_id = ?3",
315                        params![kind, detail, id_str],
316                    )
317                } else {
318                    conn.execute(
319                        "UPDATE issues SET status_kind = ?1, status_detail = ?2 \
320                         WHERE issue_id = ?3",
321                        params![kind, detail, id_str],
322                    )
323                }
324            })
325            .await
326            .map_err(map_isle_err)?;
327        if n == 0 {
328            Err(IssueStoreError::NotFound(id_for_notfound))
329        } else {
330            Ok(())
331        }
332    }
333}
334
335// ──────────────────────────────────────────────────────────────────────────
336// tests
337// ──────────────────────────────────────────────────────────────────────────
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342
343    fn mk(id: &str) -> IssuePayload {
344        IssuePayload {
345            issue_id: IssueId::new(id),
346            blueprint_id: BlueprintId::new("main"),
347            intent: format!("intent for {id}"),
348        }
349    }
350
351    #[tokio::test]
352    async fn create_then_get_status() {
353        let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
354        s.create(mk("i1")).await.unwrap();
355        let got = s.get(&IssueId::new("i1")).await.unwrap();
356        assert_eq!(got.issue_id, IssueId::new("i1"));
357        assert_eq!(got.intent, "intent for i1");
358        assert_eq!(
359            s.status(&IssueId::new("i1")).await.unwrap(),
360            IssueStatus::Pending
361        );
362        drop(s);
363        driver.shutdown().await.unwrap();
364    }
365
366    #[tokio::test]
367    async fn duplicate_create_rejected() {
368        let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
369        s.create(mk("i1")).await.unwrap();
370        let err = s.create(mk("i1")).await.unwrap_err();
371        assert!(matches!(err, IssueStoreError::Duplicate(_)), "got: {err:?}");
372        drop(s);
373        driver.shutdown().await.unwrap();
374    }
375
376    #[tokio::test]
377    async fn pop_pending_fifo_and_transitions_inflight() {
378        let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
379        s.create(mk("a")).await.unwrap();
380        s.create(mk("b")).await.unwrap();
381
382        let p1 = s.pop_pending().await.unwrap().unwrap();
383        assert_eq!(p1.issue_id, IssueId::new("a"));
384        assert_eq!(
385            s.status(&IssueId::new("a")).await.unwrap(),
386            IssueStatus::InFlight
387        );
388
389        let p2 = s.pop_pending().await.unwrap().unwrap();
390        assert_eq!(p2.issue_id, IssueId::new("b"));
391
392        assert!(s.pop_pending().await.unwrap().is_none());
393        drop(s);
394        driver.shutdown().await.unwrap();
395    }
396
397    #[tokio::test]
398    async fn update_status_to_applied_and_rejected() {
399        let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
400        s.create(mk("x")).await.unwrap();
401        s.pop_pending().await.unwrap();
402        s.update_status(
403            &IssueId::new("x"),
404            IssueStatus::Applied {
405                new_version: "abc123".into(),
406            },
407        )
408        .await
409        .unwrap();
410        match s.status(&IssueId::new("x")).await.unwrap() {
411            IssueStatus::Applied { new_version } => assert_eq!(new_version, "abc123"),
412            other => panic!("unexpected: {other:?}"),
413        }
414
415        s.create(mk("y")).await.unwrap();
416        s.update_status(
417            &IssueId::new("y"),
418            IssueStatus::Rejected {
419                reason: "bad shape".into(),
420            },
421        )
422        .await
423        .unwrap();
424        match s.status(&IssueId::new("y")).await.unwrap() {
425            IssueStatus::Rejected { reason } => assert_eq!(reason, "bad shape"),
426            other => panic!("unexpected: {other:?}"),
427        }
428        // `y` was updated to Rejected before ever being popped — verify the
429        // pending queue no longer offers it.
430        assert!(s.pop_pending().await.unwrap().is_none());
431        drop(s);
432        driver.shutdown().await.unwrap();
433    }
434
435    #[tokio::test]
436    async fn list_returns_insertion_order() {
437        let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
438        s.create(mk("a")).await.unwrap();
439        s.create(mk("b")).await.unwrap();
440        s.create(mk("c")).await.unwrap();
441        let v = s.list().await.unwrap();
442        let ids: Vec<_> = v.iter().map(|(i, _)| i.0.clone()).collect();
443        assert_eq!(ids, vec!["a", "b", "c"]);
444        drop(s);
445        driver.shutdown().await.unwrap();
446    }
447
448    #[tokio::test]
449    async fn update_status_unknown_fails() {
450        let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
451        let err = s
452            .update_status(&IssueId::new("nope"), IssueStatus::Pending)
453            .await
454            .unwrap_err();
455        assert!(matches!(err, IssueStoreError::NotFound(_)));
456        drop(s);
457        driver.shutdown().await.unwrap();
458    }
459
460    #[tokio::test]
461    async fn persists_across_reopen() {
462        let dir = tempfile::tempdir().unwrap();
463        let path = dir.path().join("issues.db");
464
465        {
466            let (s, driver) = SqliteIssueStore::open(&path).await.unwrap();
467            s.create(mk("keep")).await.unwrap();
468            drop(s);
469            driver.shutdown().await.unwrap();
470        }
471
472        let (s, driver) = SqliteIssueStore::open(&path).await.unwrap();
473        let got = s.get(&IssueId::new("keep")).await.unwrap();
474        assert_eq!(got.intent, "intent for keep");
475        drop(s);
476        driver.shutdown().await.unwrap();
477    }
478}