Skip to main content

sqlite_graphrag/storage/
pending_embeddings.rs

1//! GAP-005 (v1.0.82): DAO para tabela `pending_embeddings`.
2//!
3//! Fila de memórias persistidas com embedding NULL para reprocessamento posterior
4//! via `embedding retry --backend <KIND>` ou `enrich --operation re-embed --pending-only`.
5
6use rusqlite::{params, Connection};
7
8use crate::errors::AppError;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
11#[serde(rename_all = "snake_case")]
12pub enum PendingEmbeddingStatus {
13    Pending,
14    InProgress,
15    Done,
16    Abandoned,
17}
18
19impl PendingEmbeddingStatus {
20    pub fn as_str(&self) -> &'static str {
21        match self {
22            Self::Pending => "pending",
23            Self::InProgress => "in_progress",
24            Self::Done => "done",
25            Self::Abandoned => "abandoned",
26        }
27    }
28}
29
30#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
31pub struct PendingEmbedding {
32    pub pending_id: i64,
33    pub memory_id: i64,
34    pub namespace: String,
35    pub name: String,
36    pub backend_chain: String,
37    pub last_error: Option<String>,
38    pub last_exit_code: Option<i32>,
39    pub last_stderr_tail: Option<String>,
40    pub attempt_count: i32,
41    pub status: PendingEmbeddingStatus,
42    pub created_at: i64,
43    pub updated_at: i64,
44}
45
46/// Insere uma nova entrada `pending_embeddings` com status `pending`.
47#[allow(clippy::too_many_arguments)]
48pub fn insert(
49    conn: &Connection,
50    memory_id: i64,
51    namespace: &str,
52    name: &str,
53    backend_chain: &str,
54    last_error: Option<&str>,
55    last_exit_code: Option<i32>,
56    last_stderr_tail: Option<&str>,
57) -> Result<i64, AppError> {
58    conn.execute(
59        "INSERT INTO pending_embeddings
60            (memory_id, namespace, name, backend_chain, last_error,
61             last_exit_code, last_stderr_tail, attempt_count, status)
62         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 0, 'pending')",
63        params![
64            memory_id,
65            namespace,
66            name,
67            backend_chain,
68            last_error,
69            last_exit_code,
70            last_stderr_tail,
71        ],
72    )?;
73    Ok(conn.last_insert_rowid())
74}
75
76pub fn update_status(
77    conn: &Connection,
78    pending_id: i64,
79    status: PendingEmbeddingStatus,
80    last_error: Option<&str>,
81    last_exit_code: Option<i32>,
82    last_stderr_tail: Option<&str>,
83) -> Result<(), AppError> {
84    conn.execute(
85        "UPDATE pending_embeddings
86         SET status = ?1,
87             last_error = COALESCE(?2, last_error),
88             last_exit_code = COALESCE(?3, last_exit_code),
89             last_stderr_tail = COALESCE(?4, last_stderr_tail),
90             attempt_count = attempt_count + 1,
91             updated_at = unixepoch()
92         WHERE pending_id = ?5",
93        params![
94            status.as_str(),
95            last_error,
96            last_exit_code,
97            last_stderr_tail,
98            pending_id
99        ],
100    )?;
101    Ok(())
102}
103
104pub fn list_by_status(
105    conn: &Connection,
106    status: PendingEmbeddingStatus,
107    limit: usize,
108) -> Result<Vec<PendingEmbedding>, AppError> {
109    let mut stmt = conn.prepare(
110        "SELECT pending_id, memory_id, namespace, name, backend_chain,
111                last_error, last_exit_code, last_stderr_tail,
112                attempt_count, status, created_at, updated_at
113         FROM pending_embeddings
114         WHERE status = ?1
115         ORDER BY updated_at ASC
116         LIMIT ?2",
117    )?;
118    let rows = stmt.query_map(params![status.as_str(), limit as i64], |row| {
119        Ok(PendingEmbedding {
120            pending_id: row.get(0)?,
121            memory_id: row.get(1)?,
122            namespace: row.get(2)?,
123            name: row.get(3)?,
124            backend_chain: row.get(4)?,
125            last_error: row.get(5)?,
126            last_exit_code: row.get(6)?,
127            last_stderr_tail: row.get(7)?,
128            attempt_count: row.get(8)?,
129            status: parse_status(&row.get::<_, String>(9)?).map_err(|e| -> rusqlite::Error {
130                rusqlite::Error::FromSqlConversionFailure(
131                    9,
132                    rusqlite::types::Type::Text,
133                    Box::new(std::io::Error::other(e.to_string())),
134                )
135            })?,
136            created_at: row.get(10)?,
137            updated_at: row.get(11)?,
138        })
139    })?;
140    let mut out = Vec::new();
141    for row in rows {
142        out.push(row?);
143    }
144    Ok(out)
145}
146
147pub fn abandon(conn: &Connection, pending_id: i64) -> Result<(), AppError> {
148    update_status(
149        conn,
150        pending_id,
151        PendingEmbeddingStatus::Abandoned,
152        None,
153        None,
154        None,
155    )
156}
157
158pub fn delete(conn: &Connection, pending_id: i64) -> Result<(), AppError> {
159    conn.execute(
160        "DELETE FROM pending_embeddings WHERE pending_id = ?1",
161        params![pending_id],
162    )?;
163    Ok(())
164}
165
166fn parse_status(s: &str) -> Result<PendingEmbeddingStatus, AppError> {
167    match s {
168        "pending" => Ok(PendingEmbeddingStatus::Pending),
169        "in_progress" => Ok(PendingEmbeddingStatus::InProgress),
170        "done" => Ok(PendingEmbeddingStatus::Done),
171        "abandoned" => Ok(PendingEmbeddingStatus::Abandoned),
172        other => Err(AppError::Validation(format!(
173            "unknown pending_embeddings status: {other}"
174        ))),
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181    use rusqlite::Connection;
182
183    fn fresh_db() -> Connection {
184        let mut conn = Connection::open_in_memory().expect("in-memory db");
185        conn.execute_batch("PRAGMA foreign_keys = ON;")
186            .expect("pragma");
187        crate::migrations::runner()
188            .run(&mut conn)
189            .expect("migrations apply");
190        conn
191    }
192
193    fn insert_test_memory(conn: &Connection, name: &str) -> i64 {
194        conn.execute(
195            "INSERT INTO memories (name, namespace, type, description, body, body_hash, source)
196             VALUES (?1, 'global', 'note', 'desc', 'body', 'h', 'agent')",
197            params![name],
198        )
199        .unwrap();
200        conn.last_insert_rowid()
201    }
202
203    #[test]
204    fn insert_records_pending_with_full_diagnostics() {
205        let conn = fresh_db();
206        let mid = insert_test_memory(&conn, "p");
207        let id = insert(
208            &conn,
209            mid,
210            "global",
211            "p",
212            "codex,claude,none",
213            Some("exit 137 SIGKILL"),
214            Some(137),
215            Some("OOM killed by kernel"),
216        )
217        .unwrap();
218        let p = list_by_status(&conn, PendingEmbeddingStatus::Pending, 10)
219            .unwrap()
220            .into_iter()
221            .find(|p| p.pending_id == id)
222            .expect("pending found");
223        assert_eq!(p.backend_chain, "codex,claude,none");
224        assert_eq!(p.last_exit_code, Some(137));
225        assert_eq!(p.last_stderr_tail.as_deref(), Some("OOM killed by kernel"));
226    }
227
228    #[test]
229    fn update_status_increments_attempt_count() {
230        let conn = fresh_db();
231        let mid = insert_test_memory(&conn, "p");
232        let id = insert(&conn, mid, "global", "p", "codex", None, None, None).unwrap();
233        update_status(
234            &conn,
235            id,
236            PendingEmbeddingStatus::InProgress,
237            None,
238            None,
239            None,
240        )
241        .unwrap();
242        let p = list_by_status(&conn, PendingEmbeddingStatus::InProgress, 10)
243            .unwrap()
244            .into_iter()
245            .find(|p| p.pending_id == id)
246            .expect("found");
247        assert_eq!(p.attempt_count, 1);
248    }
249
250    #[test]
251    fn abandon_sets_status() {
252        let conn = fresh_db();
253        let mid = insert_test_memory(&conn, "p");
254        let id = insert(&conn, mid, "global", "p", "codex", None, None, None).unwrap();
255        abandon(&conn, id).unwrap();
256        let abandoned = list_by_status(&conn, PendingEmbeddingStatus::Abandoned, 10).unwrap();
257        assert!(abandoned.iter().any(|p| p.pending_id == id));
258    }
259
260    #[test]
261    fn delete_removes_row() {
262        let conn = fresh_db();
263        let mid = insert_test_memory(&conn, "p");
264        let id = insert(&conn, mid, "global", "p", "codex", None, None, None).unwrap();
265        delete(&conn, id).unwrap();
266        let pending = list_by_status(&conn, PendingEmbeddingStatus::Pending, 10).unwrap();
267        assert!(pending.iter().all(|p| p.pending_id != id));
268    }
269}