1use super::{IssueId, IssuePayload, IssueStatus, IssueStore, IssueStoreError};
25use crate::blueprint::store::BlueprintId;
26use async_trait::async_trait;
27use rusqlite::{params, OptionalExtension};
28use rusqlite_isle::{AsyncIsle, AsyncIsleDriver, IsleError};
29use std::path::Path;
30
31const SCHEMA_SQL: &str = "\
32CREATE TABLE IF NOT EXISTS issues (\
33 issue_id TEXT PRIMARY KEY, \
34 blueprint_id TEXT NOT NULL, \
35 intent TEXT NOT NULL, \
36 status_kind TEXT NOT NULL, \
37 status_detail TEXT, \
38 created_seq INTEGER NOT NULL, \
39 pending_seq INTEGER\
40);\
41CREATE INDEX IF NOT EXISTS ix_issues_pending_seq ON issues(pending_seq);\
42CREATE INDEX IF NOT EXISTS ix_issues_created_seq ON issues(created_seq);\
43";
44
45const STATUS_PENDING: &str = "pending";
46const STATUS_INFLIGHT: &str = "inflight";
47const STATUS_APPLIED: &str = "applied";
48const STATUS_REJECTED: &str = "rejected";
49
50pub struct SqliteIssueStore {
58 isle: AsyncIsle,
59}
60
61impl SqliteIssueStore {
62 pub async fn open(path: impl AsRef<Path>) -> Result<(Self, AsyncIsleDriver), IssueStoreError> {
64 let (isle, driver) = AsyncIsle::spawn(path.as_ref().to_path_buf(), |conn| {
65 conn.execute_batch(SCHEMA_SQL)
66 })
67 .await
68 .map_err(map_isle_err)?;
69 Ok((Self { isle }, driver))
70 }
71
72 pub async fn open_in_memory() -> Result<(Self, AsyncIsleDriver), IssueStoreError> {
74 let (isle, driver) = AsyncIsle::open_in_memory(|conn| conn.execute_batch(SCHEMA_SQL))
75 .await
76 .map_err(map_isle_err)?;
77 Ok((Self { isle }, driver))
78 }
79}
80
81fn map_isle_err(e: IsleError) -> IssueStoreError {
82 IssueStoreError::Other(format!("sqlite: {e}"))
83}
84
85fn encode_status(s: &IssueStatus) -> (&'static str, Option<String>) {
87 match s {
88 IssueStatus::Pending => (STATUS_PENDING, None),
89 IssueStatus::InFlight => (STATUS_INFLIGHT, None),
90 IssueStatus::Applied { new_version } => (STATUS_APPLIED, Some(new_version.clone())),
91 IssueStatus::Rejected { reason } => (STATUS_REJECTED, Some(reason.clone())),
92 }
93}
94
95fn decode_status(kind: &str, detail: Option<String>) -> Result<IssueStatus, IssueStoreError> {
97 match kind {
98 STATUS_PENDING => Ok(IssueStatus::Pending),
99 STATUS_INFLIGHT => Ok(IssueStatus::InFlight),
100 STATUS_APPLIED => Ok(IssueStatus::Applied {
101 new_version: detail.unwrap_or_default(),
102 }),
103 STATUS_REJECTED => Ok(IssueStatus::Rejected {
104 reason: detail.unwrap_or_default(),
105 }),
106 other => Err(IssueStoreError::Other(format!(
107 "invalid status_kind: {other}"
108 ))),
109 }
110}
111
112#[async_trait]
113impl IssueStore for SqliteIssueStore {
114 fn name(&self) -> &str {
115 "sqlite"
116 }
117
118 async fn create(&self, payload: IssuePayload) -> Result<(), IssueStoreError> {
119 let id = payload.issue_id.0.clone();
120 let bp = payload.blueprint_id.as_str().to_string();
121 let intent = payload.intent.clone();
122
123 self.isle
124 .call(move |conn| {
125 let tx = conn.transaction()?;
126 let exists: i64 = tx.query_row(
130 "SELECT COUNT(*) FROM issues WHERE issue_id = ?1",
131 params![id],
132 |row| row.get(0),
133 )?;
134 if exists > 0 {
135 return Err(rusqlite::Error::SqliteFailure(
138 rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CONSTRAINT),
139 Some(format!("__mlua_swarm_duplicate:{id}")),
140 ));
141 }
142 let created_seq: i64 = tx.query_row(
143 "SELECT COALESCE(MAX(created_seq), 0) + 1 FROM issues",
144 [],
145 |row| row.get(0),
146 )?;
147 let pending_seq: i64 = tx.query_row(
148 "SELECT COALESCE(MAX(pending_seq), 0) + 1 FROM issues",
149 [],
150 |row| row.get(0),
151 )?;
152 tx.execute(
153 "INSERT INTO issues (issue_id, blueprint_id, intent, status_kind, \
154 status_detail, created_seq, pending_seq) \
155 VALUES (?1, ?2, ?3, ?4, NULL, ?5, ?6)",
156 params![id, bp, intent, STATUS_PENDING, created_seq, pending_seq],
157 )?;
158 tx.commit()?;
159 Ok(())
160 })
161 .await
162 .map_err(|e| match &e {
163 IsleError::Sqlite(rusqlite::Error::SqliteFailure(_, Some(msg)))
164 if msg.starts_with("__mlua_swarm_duplicate:") =>
165 {
166 let id = msg
167 .trim_start_matches("__mlua_swarm_duplicate:")
168 .to_string();
169 IssueStoreError::Duplicate(IssueId::new(id))
170 }
171 _ => map_isle_err(e),
172 })
173 }
174
175 async fn get(&self, id: &IssueId) -> Result<IssuePayload, IssueStoreError> {
176 let id_str = id.0.clone();
177 let id_for_notfound = id.clone();
178 let row = self
179 .isle
180 .call(move |conn| {
181 conn.query_row(
182 "SELECT blueprint_id, intent FROM issues WHERE issue_id = ?1",
183 params![id_str],
184 |row| {
185 let bp: String = row.get(0)?;
186 let intent: String = row.get(1)?;
187 Ok((bp, intent))
188 },
189 )
190 .optional()
191 })
192 .await
193 .map_err(map_isle_err)?;
194 match row {
195 Some((bp, intent)) => Ok(IssuePayload {
196 issue_id: id_for_notfound,
197 blueprint_id: BlueprintId::new(bp),
198 intent,
199 }),
200 None => Err(IssueStoreError::NotFound(id_for_notfound)),
201 }
202 }
203
204 async fn status(&self, id: &IssueId) -> Result<IssueStatus, IssueStoreError> {
205 let id_str = id.0.clone();
206 let id_for_notfound = id.clone();
207 let row = self
208 .isle
209 .call(move |conn| {
210 conn.query_row(
211 "SELECT status_kind, status_detail FROM issues WHERE issue_id = ?1",
212 params![id_str],
213 |row| {
214 let kind: String = row.get(0)?;
215 let detail: Option<String> = row.get(1)?;
216 Ok((kind, detail))
217 },
218 )
219 .optional()
220 })
221 .await
222 .map_err(map_isle_err)?;
223 match row {
224 Some((kind, detail)) => decode_status(&kind, detail),
225 None => Err(IssueStoreError::NotFound(id_for_notfound)),
226 }
227 }
228
229 async fn list(&self) -> Result<Vec<(IssueId, IssueStatus)>, IssueStoreError> {
230 let rows = self
231 .isle
232 .call(|conn| {
233 let mut stmt = conn.prepare(
234 "SELECT issue_id, status_kind, status_detail \
235 FROM issues ORDER BY created_seq ASC",
236 )?;
237 let iter = stmt.query_map([], |row| {
238 let id: String = row.get(0)?;
239 let kind: String = row.get(1)?;
240 let detail: Option<String> = row.get(2)?;
241 Ok((id, kind, detail))
242 })?;
243 let mut out = Vec::new();
244 for r in iter {
245 out.push(r?);
246 }
247 Ok(out)
248 })
249 .await
250 .map_err(map_isle_err)?;
251
252 let mut result = Vec::with_capacity(rows.len());
253 for (id, kind, detail) in rows {
254 result.push((IssueId::new(id), decode_status(&kind, detail)?));
255 }
256 Ok(result)
257 }
258
259 async fn pop_pending(&self) -> Result<Option<IssuePayload>, IssueStoreError> {
260 let picked = self
261 .isle
262 .call(move |conn| {
263 let tx = conn.transaction()?;
264 let row: Option<(String, String, String)> = tx
265 .query_row(
266 "SELECT issue_id, blueprint_id, intent FROM issues \
267 WHERE pending_seq IS NOT NULL \
268 ORDER BY pending_seq ASC LIMIT 1",
269 [],
270 |row| {
271 let id: String = row.get(0)?;
272 let bp: String = row.get(1)?;
273 let intent: String = row.get(2)?;
274 Ok((id, bp, intent))
275 },
276 )
277 .optional()?;
278 let Some((id, bp, intent)) = row else {
279 return Ok(None);
280 };
281 tx.execute(
282 "UPDATE issues SET status_kind = ?1, status_detail = NULL, \
283 pending_seq = NULL WHERE issue_id = ?2",
284 params![STATUS_INFLIGHT, id],
285 )?;
286 tx.commit()?;
287 Ok(Some((id, bp, intent)))
288 })
289 .await
290 .map_err(map_isle_err)?;
291
292 Ok(picked.map(|(id, bp, intent)| IssuePayload {
293 issue_id: IssueId::new(id),
294 blueprint_id: BlueprintId::new(bp),
295 intent,
296 }))
297 }
298
299 async fn update_status(
300 &self,
301 id: &IssueId,
302 status: IssueStatus,
303 ) -> Result<(), IssueStoreError> {
304 let id_str = id.0.clone();
305 let id_for_notfound = id.clone();
306 let (kind, detail) = encode_status(&status);
307 let clear_pending = !matches!(status, IssueStatus::Pending);
308 let n = self
309 .isle
310 .call(move |conn| {
311 if clear_pending {
312 conn.execute(
313 "UPDATE issues SET status_kind = ?1, status_detail = ?2, \
314 pending_seq = NULL WHERE issue_id = ?3",
315 params![kind, detail, id_str],
316 )
317 } else {
318 conn.execute(
319 "UPDATE issues SET status_kind = ?1, status_detail = ?2 \
320 WHERE issue_id = ?3",
321 params![kind, detail, id_str],
322 )
323 }
324 })
325 .await
326 .map_err(map_isle_err)?;
327 if n == 0 {
328 Err(IssueStoreError::NotFound(id_for_notfound))
329 } else {
330 Ok(())
331 }
332 }
333}
334
335#[cfg(test)]
340mod tests {
341 use super::*;
342
343 fn mk(id: &str) -> IssuePayload {
344 IssuePayload {
345 issue_id: IssueId::new(id),
346 blueprint_id: BlueprintId::new("main"),
347 intent: format!("intent for {id}"),
348 }
349 }
350
351 #[tokio::test]
352 async fn create_then_get_status() {
353 let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
354 s.create(mk("i1")).await.unwrap();
355 let got = s.get(&IssueId::new("i1")).await.unwrap();
356 assert_eq!(got.issue_id, IssueId::new("i1"));
357 assert_eq!(got.intent, "intent for i1");
358 assert_eq!(
359 s.status(&IssueId::new("i1")).await.unwrap(),
360 IssueStatus::Pending
361 );
362 drop(s);
363 driver.shutdown().await.unwrap();
364 }
365
366 #[tokio::test]
367 async fn duplicate_create_rejected() {
368 let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
369 s.create(mk("i1")).await.unwrap();
370 let err = s.create(mk("i1")).await.unwrap_err();
371 assert!(matches!(err, IssueStoreError::Duplicate(_)), "got: {err:?}");
372 drop(s);
373 driver.shutdown().await.unwrap();
374 }
375
376 #[tokio::test]
377 async fn pop_pending_fifo_and_transitions_inflight() {
378 let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
379 s.create(mk("a")).await.unwrap();
380 s.create(mk("b")).await.unwrap();
381
382 let p1 = s.pop_pending().await.unwrap().unwrap();
383 assert_eq!(p1.issue_id, IssueId::new("a"));
384 assert_eq!(
385 s.status(&IssueId::new("a")).await.unwrap(),
386 IssueStatus::InFlight
387 );
388
389 let p2 = s.pop_pending().await.unwrap().unwrap();
390 assert_eq!(p2.issue_id, IssueId::new("b"));
391
392 assert!(s.pop_pending().await.unwrap().is_none());
393 drop(s);
394 driver.shutdown().await.unwrap();
395 }
396
397 #[tokio::test]
398 async fn update_status_to_applied_and_rejected() {
399 let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
400 s.create(mk("x")).await.unwrap();
401 s.pop_pending().await.unwrap();
402 s.update_status(
403 &IssueId::new("x"),
404 IssueStatus::Applied {
405 new_version: "abc123".into(),
406 },
407 )
408 .await
409 .unwrap();
410 match s.status(&IssueId::new("x")).await.unwrap() {
411 IssueStatus::Applied { new_version } => assert_eq!(new_version, "abc123"),
412 other => panic!("unexpected: {other:?}"),
413 }
414
415 s.create(mk("y")).await.unwrap();
416 s.update_status(
417 &IssueId::new("y"),
418 IssueStatus::Rejected {
419 reason: "bad shape".into(),
420 },
421 )
422 .await
423 .unwrap();
424 match s.status(&IssueId::new("y")).await.unwrap() {
425 IssueStatus::Rejected { reason } => assert_eq!(reason, "bad shape"),
426 other => panic!("unexpected: {other:?}"),
427 }
428 assert!(s.pop_pending().await.unwrap().is_none());
431 drop(s);
432 driver.shutdown().await.unwrap();
433 }
434
435 #[tokio::test]
436 async fn list_returns_insertion_order() {
437 let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
438 s.create(mk("a")).await.unwrap();
439 s.create(mk("b")).await.unwrap();
440 s.create(mk("c")).await.unwrap();
441 let v = s.list().await.unwrap();
442 let ids: Vec<_> = v.iter().map(|(i, _)| i.0.clone()).collect();
443 assert_eq!(ids, vec!["a", "b", "c"]);
444 drop(s);
445 driver.shutdown().await.unwrap();
446 }
447
448 #[tokio::test]
449 async fn update_status_unknown_fails() {
450 let (s, driver) = SqliteIssueStore::open_in_memory().await.unwrap();
451 let err = s
452 .update_status(&IssueId::new("nope"), IssueStatus::Pending)
453 .await
454 .unwrap_err();
455 assert!(matches!(err, IssueStoreError::NotFound(_)));
456 drop(s);
457 driver.shutdown().await.unwrap();
458 }
459
460 #[tokio::test]
461 async fn persists_across_reopen() {
462 let dir = tempfile::tempdir().unwrap();
463 let path = dir.path().join("issues.db");
464
465 {
466 let (s, driver) = SqliteIssueStore::open(&path).await.unwrap();
467 s.create(mk("keep")).await.unwrap();
468 drop(s);
469 driver.shutdown().await.unwrap();
470 }
471
472 let (s, driver) = SqliteIssueStore::open(&path).await.unwrap();
473 let got = s.get(&IssueId::new("keep")).await.unwrap();
474 assert_eq!(got.intent, "intent for keep");
475 drop(s);
476 driver.shutdown().await.unwrap();
477 }
478}