1use super::{IssueId, IssuePayload, IssueStatus, IssueStore, IssueStoreError};
25use crate::blueprint::store::BlueprintId;
26use async_trait::async_trait;
27use rusqlite::{params, OptionalExtension};
28use rusqlite_isle::{AsyncIsle, AsyncIsleDriver, IsleError};
29use std::path::Path;
30
31const SCHEMA_SQL: &str = "\
32CREATE TABLE IF NOT EXISTS issues (\
33 issue_id TEXT PRIMARY KEY, \
34 blueprint_id TEXT NOT NULL, \
35 intent TEXT NOT NULL, \
36 status_kind TEXT NOT NULL, \
37 status_detail TEXT, \
38 created_seq INTEGER NOT NULL, \
39 pending_seq INTEGER\
40);\
41CREATE INDEX IF NOT EXISTS ix_issues_pending_seq ON issues(pending_seq);\
42CREATE INDEX IF NOT EXISTS ix_issues_created_seq ON issues(created_seq);\
43";
44
45const STATUS_PENDING: &str = "pending";
46const STATUS_INFLIGHT: &str = "inflight";
47const STATUS_APPLIED: &str = "applied";
48const STATUS_REJECTED: &str = "rejected";
49
50pub struct SqliteIssueStore {
58 isle: AsyncIsle,
59}
60
61impl SqliteIssueStore {
62 pub async fn open(path: impl AsRef<Path>) -> Result<(Self, AsyncIsleDriver), IssueStoreError> {
64 let (isle, driver) = AsyncIsle::spawn(path.as_ref().to_path_buf(), |conn| {
65 conn.execute_batch(SCHEMA_SQL)
66 })
67 .await
68 .map_err(map_isle_err)?;
69 Ok((Self { isle }, driver))
70 }
71
72 pub async fn open_in_memory() -> Result<(Self, AsyncIsleDriver), IssueStoreError> {
74 let (isle, driver) = AsyncIsle::open_in_memory(|conn| conn.execute_batch(SCHEMA_SQL))
75 .await
76 .map_err(map_isle_err)?;
77 Ok((Self { isle }, driver))
78 }
79}
80
81fn map_isle_err(e: IsleError) -> IssueStoreError {
82 IssueStoreError::Other(format!("sqlite: {e}"))
83}
84
85fn encode_status(s: &IssueStatus) -> (&'static str, Option<String>) {
87 match s {
88 IssueStatus::Pending => (STATUS_PENDING, None),
89 IssueStatus::InFlight => (STATUS_INFLIGHT, None),
90 IssueStatus::Applied { new_version } => (STATUS_APPLIED, Some(new_version.clone())),
91 IssueStatus::Rejected { reason } => (STATUS_REJECTED, Some(reason.clone())),
92 }
93}
94
95fn decode_status(kind: &str, detail: Option<String>) -> Result<IssueStatus, IssueStoreError> {
97 match kind {
98 STATUS_PENDING => Ok(IssueStatus::Pending),
99 STATUS_INFLIGHT => Ok(IssueStatus::InFlight),
100 STATUS_APPLIED => Ok(IssueStatus::Applied {
101 new_version: detail.unwrap_or_default(),
102 }),
103 STATUS_REJECTED => Ok(IssueStatus::Rejected {
104 reason: detail.unwrap_or_default(),
105 }),
106 other => Err(IssueStoreError::Other(format!(
107 "invalid status_kind: {other}"
108 ))),
109 }
110}
111
112#[async_trait]
113impl IssueStore for SqliteIssueStore {
114 fn name(&self) -> &str {
115 "sqlite"
116 }
117
118 async fn create(&self, payload: IssuePayload) -> Result<(), IssueStoreError> {
119 let id = payload.issue_id.0.clone();
120 let bp = payload.blueprint_id.as_str().to_string();
121 let intent = payload.intent.clone();
122
123 self.isle
124 .call(move |conn| {
125 let tx = conn.transaction()?;
126 let exists: i64 = tx.query_row(
130 "SELECT COUNT(*) FROM issues WHERE issue_id = ?1",
131 params![id],
132 |row| row.get(0),
133 )?;
134 if exists > 0 {
135 return Err(rusqlite::Error::SqliteFailure(
138 rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CONSTRAINT),
139 Some(format!("__mlua_swarm_duplicate:{id}")),
140 ));
141 }
142 let created_seq: i64 = tx.query_row(
143 "SELECT COALESCE(MAX(created_seq), 0) + 1 FROM issues",
144 [],
145 |row| row.get(0),
146 )?;
147 let pending_seq: i64 = tx.query_row(
148 "SELECT COALESCE(MAX(pending_seq), 0) + 1 FROM issues",
149 [],
150 |row| row.get(0),
151 )?;
152 tx.execute(
153 "INSERT INTO issues (issue_id, blueprint_id, intent, status_kind, \
154 status_detail, created_seq, pending_seq) \
155 VALUES (?1, ?2, ?3, ?4, NULL, ?5, ?6)",
156 params![id, bp, intent, STATUS_PENDING, created_seq, pending_seq],
157 )?;
158 tx.commit()?;
159 Ok(())
160 })
161 .await
162 .map_err(|e| match &e {
163 IsleError::Sqlite(rusqlite::Error::SqliteFailure(_, Some(msg)))
164 if msg.starts_with("__mlua_swarm_duplicate:") =>
165 {
166 let id = msg.trim_start_matches("__mlua_swarm_duplicate:").to_string();
167 IssueStoreError::Duplicate(IssueId::new(id))
168 }
169 _ => map_isle_err(e),
170 })
171 }
172
173 async fn get(&self, id: &IssueId) -> Result<IssuePayload, IssueStoreError> {
174 let id_str = id.0.clone();
175 let id_for_notfound = id.clone();
176 let row = self
177 .isle
178 .call(move |conn| {
179 conn.query_row(
180 "SELECT blueprint_id, intent FROM issues WHERE issue_id = ?1",
181 params![id_str],
182 |row| {
183 let bp: String = row.get(0)?;
184 let intent: String = row.get(1)?;
185 Ok((bp, intent))
186 },
187 )
188 .optional()
189 })
190 .await
191 .map_err(map_isle_err)?;
192 match row {
193 Some((bp, intent)) => Ok(IssuePayload {
194 issue_id: id_for_notfound,
195 blueprint_id: BlueprintId::new(bp),
196 intent,
197 }),
198 None => Err(IssueStoreError::NotFound(id_for_notfound)),
199 }
200 }
201
202 async fn status(&self, id: &IssueId) -> Result<IssueStatus, IssueStoreError> {
203 let id_str = id.0.clone();
204 let id_for_notfound = id.clone();
205 let row = self
206 .isle
207 .call(move |conn| {
208 conn.query_row(
209 "SELECT status_kind, status_detail FROM issues WHERE issue_id = ?1",
210 params![id_str],
211 |row| {
212 let kind: String = row.get(0)?;
213 let detail: Option<String> = row.get(1)?;
214 Ok((kind, detail))
215 },
216 )
217 .optional()
218 })
219 .await
220 .map_err(map_isle_err)?;
221 match row {
222 Some((kind, detail)) => decode_status(&kind, detail),
223 None => Err(IssueStoreError::NotFound(id_for_notfound)),
224 }
225 }
226
227 async fn list(&self) -> Result<Vec<(IssueId, IssueStatus)>, IssueStoreError> {
228 let rows = self
229 .isle
230 .call(|conn| {
231 let mut stmt = conn.prepare(
232 "SELECT issue_id, status_kind, status_detail \
233 FROM issues ORDER BY created_seq ASC",
234 )?;
235 let iter = stmt.query_map([], |row| {
236 let id: String = row.get(0)?;
237 let kind: String = row.get(1)?;
238 let detail: Option<String> = row.get(2)?;
239 Ok((id, kind, detail))
240 })?;
241 let mut out = Vec::new();
242 for r in iter {
243 out.push(r?);
244 }
245 Ok(out)
246 })
247 .await
248 .map_err(map_isle_err)?;
249
250 let mut result = Vec::with_capacity(rows.len());
251 for (id, kind, detail) in rows {
252 result.push((IssueId::new(id), decode_status(&kind, detail)?));
253 }
254 Ok(result)
255 }
256
257 async fn pop_pending(&self) -> Result<Option<IssuePayload>, IssueStoreError> {
258 let picked = self
259 .isle
260 .call(move |conn| {
261 let tx = conn.transaction()?;
262 let row: Option<(String, String, String)> = tx
263 .query_row(
264 "SELECT issue_id, blueprint_id, intent FROM issues \
265 WHERE pending_seq IS NOT NULL \
266 ORDER BY pending_seq ASC LIMIT 1",
267 [],
268 |row| {
269 let id: String = row.get(0)?;
270 let bp: String = row.get(1)?;
271 let intent: String = row.get(2)?;
272 Ok((id, bp, intent))
273 },
274 )
275 .optional()?;
276 let Some((id, bp, intent)) = row else {
277 return Ok(None);
278 };
279 tx.execute(
280 "UPDATE issues SET status_kind = ?1, status_detail = NULL, \
281 pending_seq = NULL WHERE issue_id = ?2",
282 params![STATUS_INFLIGHT, id],
283 )?;
284 tx.commit()?;
285 Ok(Some((id, bp, intent)))
286 })
287 .await
288 .map_err(map_isle_err)?;
289
290 Ok(picked.map(|(id, bp, intent)| IssuePayload {
291 issue_id: IssueId::new(id),
292 blueprint_id: BlueprintId::new(bp),
293 intent,
294 }))
295 }
296
297 async fn update_status(
298 &self,
299 id: &IssueId,
300 status: IssueStatus,
301 ) -> Result<(), IssueStoreError> {
302 let id_str = id.0.clone();
303 let id_for_notfound = id.clone();
304 let (kind, detail) = encode_status(&status);
305 let clear_pending = !matches!(status, IssueStatus::Pending);
306 let n = self
307 .isle
308 .call(move |conn| {
309 if clear_pending {
310 conn.execute(
311 "UPDATE issues SET status_kind = ?1, status_detail = ?2, \
312 pending_seq = NULL WHERE issue_id = ?3",
313 params![kind, detail, id_str],
314 )
315 } else {
316 conn.execute(
317 "UPDATE issues SET status_kind = ?1, status_detail = ?2 \
318 WHERE issue_id = ?3",
319 params![kind, detail, id_str],
320 )
321 }
322 })
323 .await
324 .map_err(map_isle_err)?;
325 if n == 0 {
326 Err(IssueStoreError::NotFound(id_for_notfound))
327 } else {
328 Ok(())
329 }
330 }
331}
332
333#[cfg(test)]
338mod tests {
339 use super::*;
340
341 fn mk(id: &str) -> IssuePayload {
342 IssuePayload {
343 issue_id: IssueId::new(id),
344 blueprint_id: BlueprintId::new("main"),
345 intent: format!("intent for {id}"),
346 }
347 }
348
349 #[tokio::test]
350 async fn create_then_get_status() {
351 let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
352 s.create(mk("i1")).await.unwrap();
353 let got = s.get(&IssueId::new("i1")).await.unwrap();
354 assert_eq!(got.issue_id, IssueId::new("i1"));
355 assert_eq!(got.intent, "intent for i1");
356 assert_eq!(
357 s.status(&IssueId::new("i1")).await.unwrap(),
358 IssueStatus::Pending
359 );
360 drop(s);
361 driver.shutdown().await.unwrap();
362 }
363
364 #[tokio::test]
365 async fn duplicate_create_rejected() {
366 let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
367 s.create(mk("i1")).await.unwrap();
368 let err = s.create(mk("i1")).await.unwrap_err();
369 assert!(matches!(err, IssueStoreError::Duplicate(_)), "got: {err:?}");
370 drop(s);
371 driver.shutdown().await.unwrap();
372 }
373
374 #[tokio::test]
375 async fn pop_pending_fifo_and_transitions_inflight() {
376 let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
377 s.create(mk("a")).await.unwrap();
378 s.create(mk("b")).await.unwrap();
379
380 let p1 = s.pop_pending().await.unwrap().unwrap();
381 assert_eq!(p1.issue_id, IssueId::new("a"));
382 assert_eq!(
383 s.status(&IssueId::new("a")).await.unwrap(),
384 IssueStatus::InFlight
385 );
386
387 let p2 = s.pop_pending().await.unwrap().unwrap();
388 assert_eq!(p2.issue_id, IssueId::new("b"));
389
390 assert!(s.pop_pending().await.unwrap().is_none());
391 drop(s);
392 driver.shutdown().await.unwrap();
393 }
394
395 #[tokio::test]
396 async fn update_status_to_applied_and_rejected() {
397 let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
398 s.create(mk("x")).await.unwrap();
399 s.pop_pending().await.unwrap();
400 s.update_status(
401 &IssueId::new("x"),
402 IssueStatus::Applied {
403 new_version: "abc123".into(),
404 },
405 )
406 .await
407 .unwrap();
408 match s.status(&IssueId::new("x")).await.unwrap() {
409 IssueStatus::Applied { new_version } => assert_eq!(new_version, "abc123"),
410 other => panic!("unexpected: {other:?}"),
411 }
412
413 s.create(mk("y")).await.unwrap();
414 s.update_status(
415 &IssueId::new("y"),
416 IssueStatus::Rejected {
417 reason: "bad shape".into(),
418 },
419 )
420 .await
421 .unwrap();
422 match s.status(&IssueId::new("y")).await.unwrap() {
423 IssueStatus::Rejected { reason } => assert_eq!(reason, "bad shape"),
424 other => panic!("unexpected: {other:?}"),
425 }
426 assert!(s.pop_pending().await.unwrap().is_none());
429 drop(s);
430 driver.shutdown().await.unwrap();
431 }
432
433 #[tokio::test]
434 async fn list_returns_insertion_order() {
435 let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
436 s.create(mk("a")).await.unwrap();
437 s.create(mk("b")).await.unwrap();
438 s.create(mk("c")).await.unwrap();
439 let v = s.list().await.unwrap();
440 let ids: Vec<_> = v.iter().map(|(i, _)| i.0.clone()).collect();
441 assert_eq!(ids, vec!["a", "b", "c"]);
442 drop(s);
443 driver.shutdown().await.unwrap();
444 }
445
446 #[tokio::test]
447 async fn update_status_unknown_fails() {
448 let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
449 let err = s
450 .update_status(&IssueId::new("nope"), IssueStatus::Pending)
451 .await
452 .unwrap_err();
453 assert!(matches!(err, IssueStoreError::NotFound(_)));
454 drop(s);
455 driver.shutdown().await.unwrap();
456 }
457
458 #[tokio::test]
459 async fn persists_across_reopen() {
460 let dir = tempfile::tempdir().unwrap();
461 let path = dir.path().join("issues.db");
462
463 {
464 let (s, driver) = SqliteIssueStore::open(&path).await.unwrap();
465 s.create(mk("keep")).await.unwrap();
466 drop(s);
467 driver.shutdown().await.unwrap();
468 }
469
470 let (s, driver) = SqliteIssueStore::open(&path).await.unwrap();
471 let got = s.get(&IssueId::new("keep")).await.unwrap();
472 assert_eq!(got.intent, "intent for keep");
473 drop(s);
474 driver.shutdown().await.unwrap();
475 }
476}