1use super::{EnhanceLogEntry, EnhanceLogStore, EnhanceLogStoreError, VerdictSummary};
8use crate::blueprint::store::BlueprintId;
9use crate::store::issue::IssueId;
10use async_trait::async_trait;
11use rusqlite::{params, OptionalExtension};
12use rusqlite_isle::{AsyncIsle, AsyncIsleDriver, IsleError};
13use std::path::Path;
14
15const SCHEMA_SQL: &str = "\
16CREATE TABLE IF NOT EXISTS enhance_log (\
17 issue_id TEXT PRIMARY KEY, \
18 blueprint_id TEXT NOT NULL, \
19 prev_hash TEXT NOT NULL, \
20 new_hash TEXT NOT NULL, \
21 intent TEXT NOT NULL, \
22 rationale TEXT NOT NULL, \
23 verdicts_json TEXT NOT NULL, \
24 status TEXT NOT NULL, \
25 reasons_json TEXT NOT NULL, \
26 ts_ms INTEGER NOT NULL\
27);\
28CREATE INDEX IF NOT EXISTS ix_enhance_log_bp_ts ON enhance_log(blueprint_id, ts_ms);\
29CREATE INDEX IF NOT EXISTS ix_enhance_log_ts ON enhance_log(ts_ms);\
30";
31
32pub struct SqliteEnhanceLogStore {
36 isle: AsyncIsle,
37}
38
39impl SqliteEnhanceLogStore {
40 pub async fn open(
42 path: impl AsRef<Path>,
43 ) -> Result<(Self, AsyncIsleDriver), EnhanceLogStoreError> {
44 let (isle, driver) = AsyncIsle::spawn(path.as_ref().to_path_buf(), |conn| {
45 conn.execute_batch(SCHEMA_SQL)
46 })
47 .await
48 .map_err(map_isle_err)?;
49 Ok((Self { isle }, driver))
50 }
51
52 pub async fn open_in_memory() -> Result<(Self, AsyncIsleDriver), EnhanceLogStoreError> {
54 let (isle, driver) = AsyncIsle::open_in_memory(|conn| conn.execute_batch(SCHEMA_SQL))
55 .await
56 .map_err(map_isle_err)?;
57 Ok((Self { isle }, driver))
58 }
59}
60
61fn map_isle_err(e: IsleError) -> EnhanceLogStoreError {
62 EnhanceLogStoreError::Other(format!("sqlite: {e}"))
63}
64
65fn row_to_entry(
66 issue_id: String,
67 blueprint_id: String,
68 prev_hash: String,
69 new_hash: String,
70 intent: String,
71 rationale: String,
72 verdicts_json: String,
73 status: String,
74 reasons_json: String,
75 ts_ms: i64,
76) -> Result<EnhanceLogEntry, EnhanceLogStoreError> {
77 let verdicts: Vec<VerdictSummary> = serde_json::from_str(&verdicts_json)
78 .map_err(|e| EnhanceLogStoreError::Other(format!("decode verdicts: {e}")))?;
79 let reasons: Vec<String> = serde_json::from_str(&reasons_json)
80 .map_err(|e| EnhanceLogStoreError::Other(format!("decode reasons: {e}")))?;
81 Ok(EnhanceLogEntry {
82 issue_id: IssueId::new(issue_id),
83 blueprint_id: BlueprintId::new(blueprint_id),
84 prev_hash,
85 new_hash,
86 intent,
87 rationale,
88 verdicts,
89 status,
90 reasons,
91 ts_ms,
92 })
93}
94
95#[async_trait]
96impl EnhanceLogStore for SqliteEnhanceLogStore {
97 fn name(&self) -> &str {
98 "sqlite"
99 }
100
101 async fn append(&self, entry: EnhanceLogEntry) -> Result<(), EnhanceLogStoreError> {
102 let issue_id_str = entry.issue_id.0.clone();
103 let issue_id_for_conflict = entry.issue_id.clone();
104 let blueprint_id = entry.blueprint_id.as_str().to_string();
105 let prev_hash = entry.prev_hash.clone();
106 let new_hash = entry.new_hash.clone();
107 let intent = entry.intent.clone();
108 let rationale = entry.rationale.clone();
109 let verdicts_json = serde_json::to_string(&entry.verdicts)
110 .map_err(|e| EnhanceLogStoreError::Other(format!("encode verdicts: {e}")))?;
111 let status = entry.status.clone();
112 let reasons_json = serde_json::to_string(&entry.reasons)
113 .map_err(|e| EnhanceLogStoreError::Other(format!("encode reasons: {e}")))?;
114 let ts_ms = entry.ts_ms;
115
116 self.isle
117 .call(move |conn| {
118 let tx = conn.transaction()?;
119 let exists: i64 = tx.query_row(
120 "SELECT COUNT(*) FROM enhance_log WHERE issue_id = ?1",
121 params![issue_id_str],
122 |row| row.get(0),
123 )?;
124 if exists > 0 {
125 return Err(rusqlite::Error::SqliteFailure(
126 rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CONSTRAINT),
127 Some(format!("__mlua_swarm_conflict:{issue_id_str}")),
128 ));
129 }
130 tx.execute(
131 "INSERT INTO enhance_log (issue_id, blueprint_id, prev_hash, new_hash, \
132 intent, rationale, verdicts_json, status, reasons_json, ts_ms) \
133 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
134 params![
135 issue_id_str,
136 blueprint_id,
137 prev_hash,
138 new_hash,
139 intent,
140 rationale,
141 verdicts_json,
142 status,
143 reasons_json,
144 ts_ms,
145 ],
146 )?;
147 tx.commit()?;
148 Ok(())
149 })
150 .await
151 .map_err(|e| match &e {
152 IsleError::Sqlite(rusqlite::Error::SqliteFailure(_, Some(msg)))
153 if msg.starts_with("__mlua_swarm_conflict:") =>
154 {
155 EnhanceLogStoreError::Conflict(issue_id_for_conflict.clone())
156 }
157 _ => map_isle_err(e),
158 })
159 }
160
161 async fn get(&self, issue_id: &IssueId) -> Result<EnhanceLogEntry, EnhanceLogStoreError> {
162 let id_str = issue_id.0.clone();
163 let id_for_notfound = issue_id.clone();
164 let row = self
165 .isle
166 .call(move |conn| {
167 conn.query_row(
168 "SELECT issue_id, blueprint_id, prev_hash, new_hash, intent, rationale, \
169 verdicts_json, status, reasons_json, ts_ms \
170 FROM enhance_log WHERE issue_id = ?1",
171 params![id_str],
172 |row| {
173 Ok((
174 row.get::<_, String>(0)?,
175 row.get::<_, String>(1)?,
176 row.get::<_, String>(2)?,
177 row.get::<_, String>(3)?,
178 row.get::<_, String>(4)?,
179 row.get::<_, String>(5)?,
180 row.get::<_, String>(6)?,
181 row.get::<_, String>(7)?,
182 row.get::<_, String>(8)?,
183 row.get::<_, i64>(9)?,
184 ))
185 },
186 )
187 .optional()
188 })
189 .await
190 .map_err(map_isle_err)?;
191 match row {
192 Some((iid, bp, prev, new, intent, rat, verdicts, status, reasons, ts)) => row_to_entry(
193 iid, bp, prev, new, intent, rat, verdicts, status, reasons, ts,
194 ),
195 None => Err(EnhanceLogStoreError::NotFound(id_for_notfound)),
196 }
197 }
198
199 async fn list_by_blueprint(
200 &self,
201 blueprint_id: &BlueprintId,
202 ) -> Result<Vec<EnhanceLogEntry>, EnhanceLogStoreError> {
203 let bp_str = blueprint_id.as_str().to_string();
204 let rows = self
205 .isle
206 .call(move |conn| {
207 let mut stmt = conn.prepare(
208 "SELECT issue_id, blueprint_id, prev_hash, new_hash, intent, rationale, \
209 verdicts_json, status, reasons_json, ts_ms \
210 FROM enhance_log WHERE blueprint_id = ?1 ORDER BY ts_ms ASC",
211 )?;
212 let iter = stmt.query_map(params![bp_str], |row| {
213 Ok((
214 row.get::<_, String>(0)?,
215 row.get::<_, String>(1)?,
216 row.get::<_, String>(2)?,
217 row.get::<_, String>(3)?,
218 row.get::<_, String>(4)?,
219 row.get::<_, String>(5)?,
220 row.get::<_, String>(6)?,
221 row.get::<_, String>(7)?,
222 row.get::<_, String>(8)?,
223 row.get::<_, i64>(9)?,
224 ))
225 })?;
226 let mut out = Vec::new();
227 for r in iter {
228 out.push(r?);
229 }
230 Ok(out)
231 })
232 .await
233 .map_err(map_isle_err)?;
234 rows.into_iter()
235 .map(|(iid, bp, prev, new, intent, rat, verdicts, status, reasons, ts)| {
236 row_to_entry(iid, bp, prev, new, intent, rat, verdicts, status, reasons, ts)
237 })
238 .collect()
239 }
240
241 async fn list_all(&self) -> Result<Vec<EnhanceLogEntry>, EnhanceLogStoreError> {
242 let rows = self
243 .isle
244 .call(|conn| {
245 let mut stmt = conn.prepare(
246 "SELECT issue_id, blueprint_id, prev_hash, new_hash, intent, rationale, \
247 verdicts_json, status, reasons_json, ts_ms \
248 FROM enhance_log ORDER BY ts_ms ASC",
249 )?;
250 let iter = stmt.query_map([], |row| {
251 Ok((
252 row.get::<_, String>(0)?,
253 row.get::<_, String>(1)?,
254 row.get::<_, String>(2)?,
255 row.get::<_, String>(3)?,
256 row.get::<_, String>(4)?,
257 row.get::<_, String>(5)?,
258 row.get::<_, String>(6)?,
259 row.get::<_, String>(7)?,
260 row.get::<_, String>(8)?,
261 row.get::<_, i64>(9)?,
262 ))
263 })?;
264 let mut out = Vec::new();
265 for r in iter {
266 out.push(r?);
267 }
268 Ok(out)
269 })
270 .await
271 .map_err(map_isle_err)?;
272 rows.into_iter()
273 .map(|(iid, bp, prev, new, intent, rat, verdicts, status, reasons, ts)| {
274 row_to_entry(iid, bp, prev, new, intent, rat, verdicts, status, reasons, ts)
275 })
276 .collect()
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283
284 fn mk_entry(issue: &str, bp: &str, ts_ms: i64, status: &str) -> EnhanceLogEntry {
285 EnhanceLogEntry {
286 issue_id: IssueId::new(issue),
287 blueprint_id: BlueprintId::new(bp),
288 prev_hash: "prev".into(),
289 new_hash: if status == "applied" { "new" } else { "" }.into(),
290 intent: format!("intent-{issue}"),
291 rationale: format!("rationale-{issue}"),
292 verdicts: vec![VerdictSummary {
293 axis: "des".into(),
294 status: "pass".into(),
295 detail: "ok".into(),
296 }],
297 status: status.into(),
298 reasons: if status == "rejected" {
299 vec!["des: broken".into()]
300 } else {
301 vec![]
302 },
303 ts_ms,
304 }
305 }
306
307 #[tokio::test]
308 async fn append_then_get_roundtrip() {
309 let (s, driver) = SqliteEnhanceLogStore::open_in_memory().await.unwrap();
310 let e = mk_entry("i1", "bp-1", 100, "applied");
311 s.append(e.clone()).await.unwrap();
312 let got = s.get(&IssueId::new("i1")).await.unwrap();
313 assert_eq!(got, e);
314 drop(s);
315 driver.shutdown().await.unwrap();
316 }
317
318 #[tokio::test]
319 async fn duplicate_append_returns_conflict() {
320 let (s, driver) = SqliteEnhanceLogStore::open_in_memory().await.unwrap();
321 s.append(mk_entry("i1", "bp-1", 100, "applied"))
322 .await
323 .unwrap();
324 let err = s
325 .append(mk_entry("i1", "bp-1", 200, "rejected"))
326 .await
327 .unwrap_err();
328 assert!(matches!(err, EnhanceLogStoreError::Conflict(_)));
329 drop(s);
330 driver.shutdown().await.unwrap();
331 }
332
333 #[tokio::test]
334 async fn get_missing_returns_not_found() {
335 let (s, driver) = SqliteEnhanceLogStore::open_in_memory().await.unwrap();
336 let err = s.get(&IssueId::new("nope")).await.unwrap_err();
337 assert!(matches!(err, EnhanceLogStoreError::NotFound(_)));
338 drop(s);
339 driver.shutdown().await.unwrap();
340 }
341
342 #[tokio::test]
343 async fn list_by_blueprint_ascending_ts() {
344 let (s, driver) = SqliteEnhanceLogStore::open_in_memory().await.unwrap();
345 s.append(mk_entry("a", "bp-1", 300, "applied"))
346 .await
347 .unwrap();
348 s.append(mk_entry("b", "bp-2", 200, "applied"))
349 .await
350 .unwrap();
351 s.append(mk_entry("c", "bp-1", 100, "rejected"))
352 .await
353 .unwrap();
354 let by_bp1 = s
355 .list_by_blueprint(&BlueprintId::new("bp-1"))
356 .await
357 .unwrap();
358 let ids: Vec<_> = by_bp1.iter().map(|e| e.issue_id.0.clone()).collect();
359 assert_eq!(ids, vec!["c", "a"]);
360 drop(s);
361 driver.shutdown().await.unwrap();
362 }
363
364 #[tokio::test]
365 async fn list_all_ascending_ts() {
366 let (s, driver) = SqliteEnhanceLogStore::open_in_memory().await.unwrap();
367 s.append(mk_entry("a", "bp-1", 300, "applied"))
368 .await
369 .unwrap();
370 s.append(mk_entry("b", "bp-2", 100, "applied"))
371 .await
372 .unwrap();
373 s.append(mk_entry("c", "bp-1", 200, "applied"))
374 .await
375 .unwrap();
376 let all = s.list_all().await.unwrap();
377 let ids: Vec<_> = all.iter().map(|e| e.issue_id.0.clone()).collect();
378 assert_eq!(ids, vec!["b", "c", "a"]);
379 drop(s);
380 driver.shutdown().await.unwrap();
381 }
382
383 #[tokio::test]
384 async fn persists_across_reopen() {
385 let dir = tempfile::tempdir().unwrap();
386 let path = dir.path().join("enhance_log.db");
387 {
388 let (s, driver) = SqliteEnhanceLogStore::open(&path).await.unwrap();
389 s.append(mk_entry("keep", "bp-1", 42, "applied"))
390 .await
391 .unwrap();
392 drop(s);
393 driver.shutdown().await.unwrap();
394 }
395 let (s, driver) = SqliteEnhanceLogStore::open(&path).await.unwrap();
396 let got = s.get(&IssueId::new("keep")).await.unwrap();
397 assert_eq!(got.blueprint_id.as_str(), "bp-1");
398 assert_eq!(got.ts_ms, 42);
399 drop(s);
400 driver.shutdown().await.unwrap();
401 }
402}