1use rusqlite::{params, Connection};
7
8use crate::errors::AppError;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
11#[serde(rename_all = "snake_case")]
12pub enum PendingEmbeddingStatus {
13 Pending,
14 InProgress,
15 Done,
16 Abandoned,
17}
18
19impl PendingEmbeddingStatus {
20 pub fn as_str(&self) -> &'static str {
21 match self {
22 Self::Pending => "pending",
23 Self::InProgress => "in_progress",
24 Self::Done => "done",
25 Self::Abandoned => "abandoned",
26 }
27 }
28}
29
30#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
31pub struct PendingEmbedding {
32 pub pending_id: i64,
33 pub memory_id: i64,
34 pub namespace: String,
35 pub name: String,
36 pub backend_chain: String,
37 pub last_error: Option<String>,
38 pub last_exit_code: Option<i32>,
39 pub last_stderr_tail: Option<String>,
40 pub attempt_count: i32,
41 pub status: PendingEmbeddingStatus,
42 pub created_at: i64,
43 pub updated_at: i64,
44}
45
46#[allow(clippy::too_many_arguments)]
48pub fn insert(
49 conn: &Connection,
50 memory_id: i64,
51 namespace: &str,
52 name: &str,
53 backend_chain: &str,
54 last_error: Option<&str>,
55 last_exit_code: Option<i32>,
56 last_stderr_tail: Option<&str>,
57) -> Result<i64, AppError> {
58 conn.execute(
59 "INSERT INTO pending_embeddings
60 (memory_id, namespace, name, backend_chain, last_error,
61 last_exit_code, last_stderr_tail, attempt_count, status)
62 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 0, 'pending')",
63 params![
64 memory_id,
65 namespace,
66 name,
67 backend_chain,
68 last_error,
69 last_exit_code,
70 last_stderr_tail,
71 ],
72 )?;
73 Ok(conn.last_insert_rowid())
74}
75
76pub fn update_status(
77 conn: &Connection,
78 pending_id: i64,
79 status: PendingEmbeddingStatus,
80 last_error: Option<&str>,
81 last_exit_code: Option<i32>,
82 last_stderr_tail: Option<&str>,
83) -> Result<(), AppError> {
84 conn.execute(
85 "UPDATE pending_embeddings
86 SET status = ?1,
87 last_error = COALESCE(?2, last_error),
88 last_exit_code = COALESCE(?3, last_exit_code),
89 last_stderr_tail = COALESCE(?4, last_stderr_tail),
90 attempt_count = attempt_count + 1,
91 updated_at = unixepoch()
92 WHERE pending_id = ?5",
93 params![
94 status.as_str(),
95 last_error,
96 last_exit_code,
97 last_stderr_tail,
98 pending_id
99 ],
100 )?;
101 Ok(())
102}
103
104pub fn list_by_status(
105 conn: &Connection,
106 status: PendingEmbeddingStatus,
107 limit: usize,
108) -> Result<Vec<PendingEmbedding>, AppError> {
109 let mut stmt = conn.prepare(
110 "SELECT pending_id, memory_id, namespace, name, backend_chain,
111 last_error, last_exit_code, last_stderr_tail,
112 attempt_count, status, created_at, updated_at
113 FROM pending_embeddings
114 WHERE status = ?1
115 ORDER BY updated_at ASC
116 LIMIT ?2",
117 )?;
118 let rows = stmt.query_map(params![status.as_str(), limit as i64], |row| {
119 Ok(PendingEmbedding {
120 pending_id: row.get(0)?,
121 memory_id: row.get(1)?,
122 namespace: row.get(2)?,
123 name: row.get(3)?,
124 backend_chain: row.get(4)?,
125 last_error: row.get(5)?,
126 last_exit_code: row.get(6)?,
127 last_stderr_tail: row.get(7)?,
128 attempt_count: row.get(8)?,
129 status: parse_status(&row.get::<_, String>(9)?).map_err(|e| -> rusqlite::Error {
130 rusqlite::Error::FromSqlConversionFailure(
131 9,
132 rusqlite::types::Type::Text,
133 Box::new(std::io::Error::other(e.to_string())),
134 )
135 })?,
136 created_at: row.get(10)?,
137 updated_at: row.get(11)?,
138 })
139 })?;
140 let mut out = Vec::new();
141 for row in rows {
142 out.push(row?);
143 }
144 Ok(out)
145}
146
147pub fn abandon(conn: &Connection, pending_id: i64) -> Result<(), AppError> {
148 update_status(
149 conn,
150 pending_id,
151 PendingEmbeddingStatus::Abandoned,
152 None,
153 None,
154 None,
155 )
156}
157
158pub fn delete(conn: &Connection, pending_id: i64) -> Result<(), AppError> {
159 conn.execute(
160 "DELETE FROM pending_embeddings WHERE pending_id = ?1",
161 params![pending_id],
162 )?;
163 Ok(())
164}
165
166fn parse_status(s: &str) -> Result<PendingEmbeddingStatus, AppError> {
167 match s {
168 "pending" => Ok(PendingEmbeddingStatus::Pending),
169 "in_progress" => Ok(PendingEmbeddingStatus::InProgress),
170 "done" => Ok(PendingEmbeddingStatus::Done),
171 "abandoned" => Ok(PendingEmbeddingStatus::Abandoned),
172 other => Err(AppError::Validation(format!(
173 "unknown pending_embeddings status: {other}"
174 ))),
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181 use rusqlite::Connection;
182
183 fn fresh_db() -> Connection {
184 let mut conn = Connection::open_in_memory().expect("in-memory db");
185 conn.execute_batch("PRAGMA foreign_keys = ON;")
186 .expect("pragma");
187 crate::migrations::runner()
188 .run(&mut conn)
189 .expect("migrations apply");
190 conn
191 }
192
193 fn insert_test_memory(conn: &Connection, name: &str) -> i64 {
194 conn.execute(
195 "INSERT INTO memories (name, namespace, type, description, body, body_hash, source)
196 VALUES (?1, 'global', 'note', 'desc', 'body', 'h', 'agent')",
197 params![name],
198 )
199 .unwrap();
200 conn.last_insert_rowid()
201 }
202
203 #[test]
204 fn insert_records_pending_with_full_diagnostics() {
205 let conn = fresh_db();
206 let mid = insert_test_memory(&conn, "p");
207 let id = insert(
208 &conn,
209 mid,
210 "global",
211 "p",
212 "codex,claude,none",
213 Some("exit 137 SIGKILL"),
214 Some(137),
215 Some("OOM killed by kernel"),
216 )
217 .unwrap();
218 let p = list_by_status(&conn, PendingEmbeddingStatus::Pending, 10)
219 .unwrap()
220 .into_iter()
221 .find(|p| p.pending_id == id)
222 .expect("pending found");
223 assert_eq!(p.backend_chain, "codex,claude,none");
224 assert_eq!(p.last_exit_code, Some(137));
225 assert_eq!(p.last_stderr_tail.as_deref(), Some("OOM killed by kernel"));
226 }
227
228 #[test]
229 fn update_status_increments_attempt_count() {
230 let conn = fresh_db();
231 let mid = insert_test_memory(&conn, "p");
232 let id = insert(&conn, mid, "global", "p", "codex", None, None, None).unwrap();
233 update_status(
234 &conn,
235 id,
236 PendingEmbeddingStatus::InProgress,
237 None,
238 None,
239 None,
240 )
241 .unwrap();
242 let p = list_by_status(&conn, PendingEmbeddingStatus::InProgress, 10)
243 .unwrap()
244 .into_iter()
245 .find(|p| p.pending_id == id)
246 .expect("found");
247 assert_eq!(p.attempt_count, 1);
248 }
249
250 #[test]
251 fn abandon_sets_status() {
252 let conn = fresh_db();
253 let mid = insert_test_memory(&conn, "p");
254 let id = insert(&conn, mid, "global", "p", "codex", None, None, None).unwrap();
255 abandon(&conn, id).unwrap();
256 let abandoned = list_by_status(&conn, PendingEmbeddingStatus::Abandoned, 10).unwrap();
257 assert!(abandoned.iter().any(|p| p.pending_id == id));
258 }
259
260 #[test]
261 fn delete_removes_row() {
262 let conn = fresh_db();
263 let mid = insert_test_memory(&conn, "p");
264 let id = insert(&conn, mid, "global", "p", "codex", None, None, None).unwrap();
265 delete(&conn, id).unwrap();
266 let pending = list_by_status(&conn, PendingEmbeddingStatus::Pending, 10).unwrap();
267 assert!(pending.iter().all(|p| p.pending_id != id));
268 }
269}