1use super::{EnhanceLogEntry, EnhanceLogStore, EnhanceLogStoreError, VerdictSummary};
8use crate::blueprint::store::BlueprintId;
9use crate::store::issue::IssueId;
10use async_trait::async_trait;
11use rusqlite::{params, OptionalExtension};
12use rusqlite_isle::{AsyncIsle, AsyncIsleDriver, IsleError};
13use std::path::Path;
14
15const SCHEMA_SQL: &str = "\
16CREATE TABLE IF NOT EXISTS enhance_log (\
17 issue_id TEXT PRIMARY KEY, \
18 blueprint_id TEXT NOT NULL, \
19 prev_hash TEXT NOT NULL, \
20 new_hash TEXT NOT NULL, \
21 intent TEXT NOT NULL, \
22 rationale TEXT NOT NULL, \
23 verdicts_json TEXT NOT NULL, \
24 status TEXT NOT NULL, \
25 reasons_json TEXT NOT NULL, \
26 ts_ms INTEGER NOT NULL\
27);\
28CREATE INDEX IF NOT EXISTS ix_enhance_log_bp_ts ON enhance_log(blueprint_id, ts_ms);\
29CREATE INDEX IF NOT EXISTS ix_enhance_log_ts ON enhance_log(ts_ms);\
30";
31
32pub struct SqliteEnhanceLogStore {
36 isle: AsyncIsle,
37}
38
39impl SqliteEnhanceLogStore {
40 pub async fn open(
42 path: impl AsRef<Path>,
43 ) -> Result<(Self, AsyncIsleDriver), EnhanceLogStoreError> {
44 let (isle, driver) = AsyncIsle::spawn(path.as_ref().to_path_buf(), |conn| {
45 conn.execute_batch(SCHEMA_SQL)
46 })
47 .await
48 .map_err(map_isle_err)?;
49 Ok((Self { isle }, driver))
50 }
51
52 pub async fn open_in_memory() -> Result<(Self, AsyncIsleDriver), EnhanceLogStoreError> {
54 let (isle, driver) = AsyncIsle::open_in_memory(|conn| conn.execute_batch(SCHEMA_SQL))
55 .await
56 .map_err(map_isle_err)?;
57 Ok((Self { isle }, driver))
58 }
59}
60
61fn map_isle_err(e: IsleError) -> EnhanceLogStoreError {
62 EnhanceLogStoreError::Other(format!("sqlite: {e}"))
63}
64
65type LogRow = (
69 String,
70 String,
71 String,
72 String,
73 String,
74 String,
75 String,
76 String,
77 String,
78 i64,
79);
80
81fn row_to_entry(row: LogRow) -> Result<EnhanceLogEntry, EnhanceLogStoreError> {
82 let (
83 issue_id,
84 blueprint_id,
85 prev_hash,
86 new_hash,
87 intent,
88 rationale,
89 verdicts_json,
90 status,
91 reasons_json,
92 ts_ms,
93 ) = row;
94 let verdicts: Vec<VerdictSummary> = serde_json::from_str(&verdicts_json)
95 .map_err(|e| EnhanceLogStoreError::Other(format!("decode verdicts: {e}")))?;
96 let reasons: Vec<String> = serde_json::from_str(&reasons_json)
97 .map_err(|e| EnhanceLogStoreError::Other(format!("decode reasons: {e}")))?;
98 Ok(EnhanceLogEntry {
99 issue_id: IssueId::new(issue_id),
100 blueprint_id: BlueprintId::new(blueprint_id),
101 prev_hash,
102 new_hash,
103 intent,
104 rationale,
105 verdicts,
106 status,
107 reasons,
108 ts_ms,
109 })
110}
111
112#[async_trait]
113impl EnhanceLogStore for SqliteEnhanceLogStore {
114 fn name(&self) -> &str {
115 "sqlite"
116 }
117
118 async fn append(&self, entry: EnhanceLogEntry) -> Result<(), EnhanceLogStoreError> {
119 let issue_id_str = entry.issue_id.0.clone();
120 let issue_id_for_conflict = entry.issue_id.clone();
121 let blueprint_id = entry.blueprint_id.as_str().to_string();
122 let prev_hash = entry.prev_hash.clone();
123 let new_hash = entry.new_hash.clone();
124 let intent = entry.intent.clone();
125 let rationale = entry.rationale.clone();
126 let verdicts_json = serde_json::to_string(&entry.verdicts)
127 .map_err(|e| EnhanceLogStoreError::Other(format!("encode verdicts: {e}")))?;
128 let status = entry.status.clone();
129 let reasons_json = serde_json::to_string(&entry.reasons)
130 .map_err(|e| EnhanceLogStoreError::Other(format!("encode reasons: {e}")))?;
131 let ts_ms = entry.ts_ms;
132
133 self.isle
134 .call(move |conn| {
135 let tx = conn.transaction()?;
136 let exists: i64 = tx.query_row(
137 "SELECT COUNT(*) FROM enhance_log WHERE issue_id = ?1",
138 params![issue_id_str],
139 |row| row.get(0),
140 )?;
141 if exists > 0 {
142 return Err(rusqlite::Error::SqliteFailure(
143 rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CONSTRAINT),
144 Some(format!("__mlua_swarm_conflict:{issue_id_str}")),
145 ));
146 }
147 tx.execute(
148 "INSERT INTO enhance_log (issue_id, blueprint_id, prev_hash, new_hash, \
149 intent, rationale, verdicts_json, status, reasons_json, ts_ms) \
150 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
151 params![
152 issue_id_str,
153 blueprint_id,
154 prev_hash,
155 new_hash,
156 intent,
157 rationale,
158 verdicts_json,
159 status,
160 reasons_json,
161 ts_ms,
162 ],
163 )?;
164 tx.commit()?;
165 Ok(())
166 })
167 .await
168 .map_err(|e| match &e {
169 IsleError::Sqlite(rusqlite::Error::SqliteFailure(_, Some(msg)))
170 if msg.starts_with("__mlua_swarm_conflict:") =>
171 {
172 EnhanceLogStoreError::Conflict(issue_id_for_conflict.clone())
173 }
174 _ => map_isle_err(e),
175 })
176 }
177
178 async fn get(&self, issue_id: &IssueId) -> Result<EnhanceLogEntry, EnhanceLogStoreError> {
179 let id_str = issue_id.0.clone();
180 let id_for_notfound = issue_id.clone();
181 let row = self
182 .isle
183 .call(move |conn| {
184 conn.query_row(
185 "SELECT issue_id, blueprint_id, prev_hash, new_hash, intent, rationale, \
186 verdicts_json, status, reasons_json, ts_ms \
187 FROM enhance_log WHERE issue_id = ?1",
188 params![id_str],
189 |row| {
190 Ok((
191 row.get::<_, String>(0)?,
192 row.get::<_, String>(1)?,
193 row.get::<_, String>(2)?,
194 row.get::<_, String>(3)?,
195 row.get::<_, String>(4)?,
196 row.get::<_, String>(5)?,
197 row.get::<_, String>(6)?,
198 row.get::<_, String>(7)?,
199 row.get::<_, String>(8)?,
200 row.get::<_, i64>(9)?,
201 ))
202 },
203 )
204 .optional()
205 })
206 .await
207 .map_err(map_isle_err)?;
208 match row {
209 Some(row) => row_to_entry(row),
210 None => Err(EnhanceLogStoreError::NotFound(id_for_notfound)),
211 }
212 }
213
214 async fn list_by_blueprint(
215 &self,
216 blueprint_id: &BlueprintId,
217 ) -> Result<Vec<EnhanceLogEntry>, EnhanceLogStoreError> {
218 let bp_str = blueprint_id.as_str().to_string();
219 let rows = self
220 .isle
221 .call(move |conn| {
222 let mut stmt = conn.prepare(
223 "SELECT issue_id, blueprint_id, prev_hash, new_hash, intent, rationale, \
224 verdicts_json, status, reasons_json, ts_ms \
225 FROM enhance_log WHERE blueprint_id = ?1 ORDER BY ts_ms ASC",
226 )?;
227 let iter = stmt.query_map(params![bp_str], |row| {
228 Ok((
229 row.get::<_, String>(0)?,
230 row.get::<_, String>(1)?,
231 row.get::<_, String>(2)?,
232 row.get::<_, String>(3)?,
233 row.get::<_, String>(4)?,
234 row.get::<_, String>(5)?,
235 row.get::<_, String>(6)?,
236 row.get::<_, String>(7)?,
237 row.get::<_, String>(8)?,
238 row.get::<_, i64>(9)?,
239 ))
240 })?;
241 let mut out = Vec::new();
242 for r in iter {
243 out.push(r?);
244 }
245 Ok(out)
246 })
247 .await
248 .map_err(map_isle_err)?;
249 rows.into_iter().map(row_to_entry).collect()
250 }
251
252 async fn list_all(&self) -> Result<Vec<EnhanceLogEntry>, EnhanceLogStoreError> {
253 let rows = self
254 .isle
255 .call(|conn| {
256 let mut stmt = conn.prepare(
257 "SELECT issue_id, blueprint_id, prev_hash, new_hash, intent, rationale, \
258 verdicts_json, status, reasons_json, ts_ms \
259 FROM enhance_log ORDER BY ts_ms ASC",
260 )?;
261 let iter = stmt.query_map([], |row| {
262 Ok((
263 row.get::<_, String>(0)?,
264 row.get::<_, String>(1)?,
265 row.get::<_, String>(2)?,
266 row.get::<_, String>(3)?,
267 row.get::<_, String>(4)?,
268 row.get::<_, String>(5)?,
269 row.get::<_, String>(6)?,
270 row.get::<_, String>(7)?,
271 row.get::<_, String>(8)?,
272 row.get::<_, i64>(9)?,
273 ))
274 })?;
275 let mut out = Vec::new();
276 for r in iter {
277 out.push(r?);
278 }
279 Ok(out)
280 })
281 .await
282 .map_err(map_isle_err)?;
283 rows.into_iter().map(row_to_entry).collect()
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290
291 fn mk_entry(issue: &str, bp: &str, ts_ms: i64, status: &str) -> EnhanceLogEntry {
292 EnhanceLogEntry {
293 issue_id: IssueId::new(issue),
294 blueprint_id: BlueprintId::new(bp),
295 prev_hash: "prev".into(),
296 new_hash: if status == "applied" { "new" } else { "" }.into(),
297 intent: format!("intent-{issue}"),
298 rationale: format!("rationale-{issue}"),
299 verdicts: vec![VerdictSummary {
300 axis: "des".into(),
301 status: "pass".into(),
302 detail: "ok".into(),
303 }],
304 status: status.into(),
305 reasons: if status == "rejected" {
306 vec!["des: broken".into()]
307 } else {
308 vec![]
309 },
310 ts_ms,
311 }
312 }
313
314 #[tokio::test]
315 async fn append_then_get_roundtrip() {
316 let (s, driver) = SqliteEnhanceLogStore::open_in_memory().await.unwrap();
317 let e = mk_entry("i1", "bp-1", 100, "applied");
318 s.append(e.clone()).await.unwrap();
319 let got = s.get(&IssueId::new("i1")).await.unwrap();
320 assert_eq!(got, e);
321 drop(s);
322 driver.shutdown().await.unwrap();
323 }
324
325 #[tokio::test]
326 async fn duplicate_append_returns_conflict() {
327 let (s, driver) = SqliteEnhanceLogStore::open_in_memory().await.unwrap();
328 s.append(mk_entry("i1", "bp-1", 100, "applied"))
329 .await
330 .unwrap();
331 let err = s
332 .append(mk_entry("i1", "bp-1", 200, "rejected"))
333 .await
334 .unwrap_err();
335 assert!(matches!(err, EnhanceLogStoreError::Conflict(_)));
336 drop(s);
337 driver.shutdown().await.unwrap();
338 }
339
340 #[tokio::test]
341 async fn get_missing_returns_not_found() {
342 let (s, driver) = SqliteEnhanceLogStore::open_in_memory().await.unwrap();
343 let err = s.get(&IssueId::new("nope")).await.unwrap_err();
344 assert!(matches!(err, EnhanceLogStoreError::NotFound(_)));
345 drop(s);
346 driver.shutdown().await.unwrap();
347 }
348
349 #[tokio::test]
350 async fn list_by_blueprint_ascending_ts() {
351 let (s, driver) = SqliteEnhanceLogStore::open_in_memory().await.unwrap();
352 s.append(mk_entry("a", "bp-1", 300, "applied"))
353 .await
354 .unwrap();
355 s.append(mk_entry("b", "bp-2", 200, "applied"))
356 .await
357 .unwrap();
358 s.append(mk_entry("c", "bp-1", 100, "rejected"))
359 .await
360 .unwrap();
361 let by_bp1 = s
362 .list_by_blueprint(&BlueprintId::new("bp-1"))
363 .await
364 .unwrap();
365 let ids: Vec<_> = by_bp1.iter().map(|e| e.issue_id.0.clone()).collect();
366 assert_eq!(ids, vec!["c", "a"]);
367 drop(s);
368 driver.shutdown().await.unwrap();
369 }
370
371 #[tokio::test]
372 async fn list_all_ascending_ts() {
373 let (s, driver) = SqliteEnhanceLogStore::open_in_memory().await.unwrap();
374 s.append(mk_entry("a", "bp-1", 300, "applied"))
375 .await
376 .unwrap();
377 s.append(mk_entry("b", "bp-2", 100, "applied"))
378 .await
379 .unwrap();
380 s.append(mk_entry("c", "bp-1", 200, "applied"))
381 .await
382 .unwrap();
383 let all = s.list_all().await.unwrap();
384 let ids: Vec<_> = all.iter().map(|e| e.issue_id.0.clone()).collect();
385 assert_eq!(ids, vec!["b", "c", "a"]);
386 drop(s);
387 driver.shutdown().await.unwrap();
388 }
389
390 #[tokio::test]
391 async fn persists_across_reopen() {
392 let dir = tempfile::tempdir().unwrap();
393 let path = dir.path().join("enhance_log.db");
394 {
395 let (s, driver) = SqliteEnhanceLogStore::open(&path).await.unwrap();
396 s.append(mk_entry("keep", "bp-1", 42, "applied"))
397 .await
398 .unwrap();
399 drop(s);
400 driver.shutdown().await.unwrap();
401 }
402 let (s, driver) = SqliteEnhanceLogStore::open(&path).await.unwrap();
403 let got = s.get(&IssueId::new("keep")).await.unwrap();
404 assert_eq!(got.blueprint_id.as_str(), "bp-1");
405 assert_eq!(got.ts_ms, 42);
406 drop(s);
407 driver.shutdown().await.unwrap();
408 }
409}