Skip to main content

codemem_storage/
memory.rs

1//! Memory CRUD operations on Storage.
2
3use crate::{MapStorageErr, MemoryRow, Storage};
4use codemem_core::{CodememError, MemoryNode, Repository};
5use rusqlite::{params, OptionalExtension};
6
7/// Shared dedup-check + INSERT logic used by both transactional and
8/// non-transactional insert paths.  Accepts `&rusqlite::Connection` because
9/// both `Connection` and `Transaction` deref to it.
10fn insert_memory_inner(
11    conn: &rusqlite::Connection,
12    memory: &MemoryNode,
13) -> Result<(), CodememError> {
14    // Check dedup (namespace-scoped)
15    let existing: Option<String> = conn
16        .query_row(
17            "SELECT id FROM memories WHERE content_hash = ?1 AND namespace IS ?2",
18            params![memory.content_hash, memory.namespace],
19            |row| row.get(0),
20        )
21        .optional()
22        .storage_err()?;
23
24    if existing.is_some() {
25        return Err(CodememError::Duplicate(memory.content_hash.clone()));
26    }
27
28    let tags_json = serde_json::to_string(&memory.tags)?;
29    let metadata_json = serde_json::to_string(&memory.metadata)?;
30
31    conn.execute(
32        "INSERT OR IGNORE INTO memories (id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at)
33         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
34        params![
35            memory.id,
36            memory.content,
37            memory.memory_type.to_string(),
38            memory.importance,
39            memory.confidence,
40            memory.access_count,
41            memory.content_hash,
42            tags_json,
43            metadata_json,
44            memory.namespace,
45            memory.session_id,
46            memory.created_at.timestamp(),
47            memory.updated_at.timestamp(),
48            memory.last_accessed_at.timestamp(),
49        ],
50    )
51    .storage_err()?;
52
53    Ok(())
54}
55
56impl Storage {
57    /// Insert a new memory. Returns Err(Duplicate) if content hash already exists.
58    ///
59    /// Uses BEGIN IMMEDIATE to acquire a write lock before the dedup check,
60    /// ensuring the SELECT + INSERT are atomic. INSERT OR IGNORE is a safety
61    /// net against the UNIQUE constraint on content_hash.
62    ///
63    /// When an outer transaction is active (via `begin_transaction`), the
64    /// method skips starting its own transaction and executes directly on the
65    /// connection, so that all operations participate in the outer transaction.
66    pub fn insert_memory(&self, memory: &MemoryNode) -> Result<(), CodememError> {
67        // Check inside conn lock to avoid TOCTOU race with begin_transaction.
68        // The conn() mutex serializes all access, so the flag check + INSERT
69        // are atomic with respect to other callers.
70        let mut conn = self.conn()?;
71        if self.has_outer_transaction() {
72            drop(conn);
73            return self.insert_memory_no_tx(memory);
74        }
75
76        let tx = conn
77            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
78            .storage_err()?;
79
80        // Dedup check + INSERT inside the transaction; on Duplicate error the
81        // transaction is rolled back automatically when `tx` is dropped.
82        insert_memory_inner(&tx, memory)?;
83
84        tx.commit().storage_err()?;
85
86        Ok(())
87    }
88
89    /// Insert a memory without starting a new transaction.
90    /// Used when an outer transaction is already active.
91    fn insert_memory_no_tx(&self, memory: &MemoryNode) -> Result<(), CodememError> {
92        let conn = self.conn()?;
93        insert_memory_inner(&conn, memory)
94    }
95
96    /// Get a memory by ID. Updates access_count and last_accessed_at.
97    pub fn get_memory(&self, id: &str) -> Result<Option<MemoryNode>, CodememError> {
98        let conn = self.conn()?;
99
100        // Bump access count first
101        let updated = conn
102            .execute(
103                "UPDATE memories SET access_count = access_count + 1, last_accessed_at = ?1 WHERE id = ?2",
104                params![chrono::Utc::now().timestamp(), id],
105            )
106            .storage_err()?;
107
108        if updated == 0 {
109            return Ok(None);
110        }
111
112        let result = conn
113            .query_row(
114                "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at FROM memories WHERE id = ?1",
115                params![id],
116                MemoryRow::from_row,
117            )
118            .optional()
119            .storage_err()?;
120
121        match result {
122            Some(row) => Ok(Some(row.into_memory_node()?)),
123            None => Ok(None),
124        }
125    }
126
127    /// Get a memory by ID without updating access_count or last_accessed_at.
128    /// Use for internal/system reads (consolidation, stats, batch processing).
129    pub fn get_memory_no_touch(&self, id: &str) -> Result<Option<MemoryNode>, CodememError> {
130        let conn = self.conn()?;
131
132        let result = conn
133            .query_row(
134                "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at FROM memories WHERE id = ?1",
135                params![id],
136                MemoryRow::from_row,
137            )
138            .optional()
139            .storage_err()?;
140
141        match result {
142            Some(row) => Ok(Some(row.into_memory_node()?)),
143            None => Ok(None),
144        }
145    }
146
147    /// Update a memory's content and re-hash. Returns `Err(NotFound)` if the ID doesn't exist.
148    pub fn update_memory(
149        &self,
150        id: &str,
151        content: &str,
152        importance: Option<f64>,
153    ) -> Result<(), CodememError> {
154        let conn = self.conn()?;
155        let hash = Self::content_hash(content);
156        let now = chrono::Utc::now().timestamp();
157
158        let rows_affected = if let Some(imp) = importance {
159            conn.execute(
160                "UPDATE memories SET content = ?1, content_hash = ?2, updated_at = ?3, importance = ?4 WHERE id = ?5",
161                params![content, hash, now, imp, id],
162            )
163            .storage_err()?
164        } else {
165            conn.execute(
166                "UPDATE memories SET content = ?1, content_hash = ?2, updated_at = ?3 WHERE id = ?4",
167                params![content, hash, now, id],
168            )
169            .storage_err()?
170        };
171
172        if rows_affected == 0 {
173            return Err(CodememError::NotFound(format!("Memory not found: {id}")));
174        }
175
176        Ok(())
177    }
178
179    /// Delete a memory by ID.
180    pub fn delete_memory(&self, id: &str) -> Result<bool, CodememError> {
181        let conn = self.conn()?;
182        let rows = conn
183            .execute("DELETE FROM memories WHERE id = ?1", params![id])
184            .storage_err()?;
185        Ok(rows > 0)
186    }
187
188    /// Delete a memory and all related data (embeddings, graph nodes/edges) atomically.
189    /// Returns true if the memory existed and was deleted.
190    pub fn delete_memory_cascade(&self, id: &str) -> Result<bool, CodememError> {
191        let mut conn = self.conn()?;
192        // L2: Use IMMEDIATE transaction to acquire write lock upfront,
193        // avoiding potential deadlock with DEFERRED transaction.
194        let tx = conn
195            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
196            .storage_err()?;
197
198        // Delete edges that reference graph nodes linked to this memory
199        tx.execute(
200            "DELETE FROM graph_edges WHERE src IN (SELECT id FROM graph_nodes WHERE memory_id = ?1)
201             OR dst IN (SELECT id FROM graph_nodes WHERE memory_id = ?1)",
202            params![id],
203        )
204        .storage_err()?;
205
206        // Delete graph nodes linked to this memory
207        tx.execute("DELETE FROM graph_nodes WHERE memory_id = ?1", params![id])
208            .storage_err()?;
209
210        // Delete embedding
211        tx.execute(
212            "DELETE FROM memory_embeddings WHERE memory_id = ?1",
213            params![id],
214        )
215        .storage_err()?;
216
217        // Delete the memory itself
218        let rows = tx
219            .execute("DELETE FROM memories WHERE id = ?1", params![id])
220            .storage_err()?;
221
222        tx.commit().storage_err()?;
223
224        Ok(rows > 0)
225    }
226
227    /// Delete multiple memories and all related data (embeddings, graph nodes/edges) atomically.
228    /// Returns the number of memories that were actually deleted.
229    pub fn delete_memories_batch_cascade(&self, ids: &[&str]) -> Result<usize, CodememError> {
230        if ids.is_empty() {
231            return Ok(0);
232        }
233
234        let mut conn = self.conn()?;
235        let tx = conn
236            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
237            .storage_err()?;
238
239        let placeholders: String = (1..=ids.len())
240            .map(|i| format!("?{i}"))
241            .collect::<Vec<_>>()
242            .join(",");
243        let params: Vec<&dyn rusqlite::types::ToSql> = ids
244            .iter()
245            .map(|id| id as &dyn rusqlite::types::ToSql)
246            .collect();
247
248        // Delete edges referencing graph nodes linked to these memories.
249        // Uses ?N numbered params which SQLite allows to be reused in the same statement.
250        let edge_sql = format!(
251            "DELETE FROM graph_edges WHERE \
252             src IN (SELECT id FROM graph_nodes WHERE memory_id IN ({placeholders})) \
253             OR dst IN (SELECT id FROM graph_nodes WHERE memory_id IN ({placeholders})) \
254             OR src IN ({placeholders}) OR dst IN ({placeholders})"
255        );
256        tx.execute(&edge_sql, params.as_slice()).storage_err()?;
257
258        // Delete graph nodes linked to these memories (by memory_id column or by id)
259        let node_sql = format!(
260            "DELETE FROM graph_nodes WHERE memory_id IN ({placeholders}) OR id IN ({placeholders})"
261        );
262        tx.execute(&node_sql, params.as_slice()).storage_err()?;
263
264        // Delete embeddings
265        let emb_sql = format!("DELETE FROM memory_embeddings WHERE memory_id IN ({placeholders})");
266        tx.execute(&emb_sql, params.as_slice()).storage_err()?;
267
268        // Delete the memories themselves
269        let mem_sql = format!("DELETE FROM memories WHERE id IN ({placeholders})");
270        let deleted = tx.execute(&mem_sql, params.as_slice()).storage_err()?;
271
272        tx.commit().storage_err()?;
273
274        Ok(deleted)
275    }
276
277    /// List all memory IDs with an optional limit.
278    pub fn list_memory_ids(&self) -> Result<Vec<String>, CodememError> {
279        self.list_memory_ids_limited(None)
280    }
281
282    /// List memory IDs with an optional limit.
283    pub fn list_memory_ids_limited(
284        &self,
285        limit: Option<usize>,
286    ) -> Result<Vec<String>, CodememError> {
287        let conn = self.conn()?;
288        let (sql, params_vec): (&str, Vec<Box<dyn rusqlite::types::ToSql>>) =
289            if let Some(lim) = limit {
290                (
291                    "SELECT id FROM memories ORDER BY created_at DESC LIMIT ?1",
292                    vec![Box::new(lim as i64) as Box<dyn rusqlite::types::ToSql>],
293                )
294            } else {
295                ("SELECT id FROM memories ORDER BY created_at DESC", vec![])
296            };
297
298        let mut stmt = conn.prepare(sql).storage_err()?;
299
300        let refs: Vec<&dyn rusqlite::types::ToSql> =
301            params_vec.iter().map(|p| p.as_ref()).collect();
302
303        let ids = stmt
304            .query_map(refs.as_slice(), |row| row.get(0))
305            .storage_err()?
306            .collect::<Result<Vec<String>, _>>()
307            .storage_err()?;
308
309        Ok(ids)
310    }
311
312    /// List memory IDs scoped to a specific namespace.
313    pub fn list_memory_ids_for_namespace(
314        &self,
315        namespace: &str,
316    ) -> Result<Vec<String>, CodememError> {
317        let conn = self.conn()?;
318        let mut stmt = conn
319            .prepare("SELECT id FROM memories WHERE namespace = ?1 ORDER BY created_at DESC")
320            .storage_err()?;
321
322        let ids = stmt
323            .query_map(params![namespace], |row| row.get(0))
324            .storage_err()?
325            .collect::<Result<Vec<String>, _>>()
326            .storage_err()?;
327
328        Ok(ids)
329    }
330
331    /// List all distinct namespaces.
332    pub fn list_namespaces(&self) -> Result<Vec<String>, CodememError> {
333        let conn = self.conn()?;
334        let mut stmt = conn
335            .prepare(
336                "SELECT DISTINCT namespace FROM (
337                    SELECT namespace FROM memories WHERE namespace IS NOT NULL
338                    UNION
339                    SELECT namespace FROM graph_nodes WHERE namespace IS NOT NULL
340                ) ORDER BY namespace",
341            )
342            .storage_err()?;
343
344        let namespaces = stmt
345            .query_map([], |row| row.get(0))
346            .storage_err()?
347            .collect::<Result<Vec<String>, _>>()
348            .storage_err()?;
349
350        Ok(namespaces)
351    }
352
353    /// Get memory count.
354    pub fn memory_count(&self) -> Result<usize, CodememError> {
355        let conn = self.conn()?;
356        let count: i64 = conn
357            .query_row("SELECT COUNT(*) FROM memories", [], |row| row.get(0))
358            .storage_err()?;
359        Ok(count as usize)
360    }
361
362    // ── Repository CRUD ─────────────────────────────────────────────────────
363
364    /// List all registered repositories.
365    pub fn list_repos(&self) -> Result<Vec<Repository>, CodememError> {
366        let conn = self.conn()?;
367        let mut stmt = conn
368            .prepare(
369                "SELECT id, path, name, namespace, created_at, last_indexed_at, status FROM repositories ORDER BY created_at DESC",
370            )
371            .storage_err()?;
372
373        let repos = stmt
374            .query_map([], |row| {
375                let created_ts: String = row.get(4)?;
376                let indexed_ts: Option<String> = row.get(5)?;
377                Ok((
378                    row.get::<_, String>(0)?,
379                    row.get::<_, String>(1)?,
380                    row.get::<_, Option<String>>(2)?,
381                    row.get::<_, Option<String>>(3)?,
382                    created_ts,
383                    indexed_ts,
384                    row.get::<_, Option<String>>(6)?
385                        .unwrap_or_else(|| "idle".to_string()),
386                ))
387            })
388            .storage_err()?
389            .collect::<Result<Vec<_>, _>>()
390            .storage_err()?;
391
392        let mut result = Vec::new();
393        for (id, path, name, namespace, created_at, last_indexed_at, status) in repos {
394            result.push(Repository {
395                id,
396                path,
397                name,
398                namespace,
399                created_at,
400                last_indexed_at,
401                status,
402            });
403        }
404
405        Ok(result)
406    }
407
408    /// Add a new repository.
409    pub fn add_repo(&self, repo: &Repository) -> Result<(), CodememError> {
410        let conn = self.conn()?;
411        conn.execute(
412            "INSERT INTO repositories (id, path, name, namespace, created_at, last_indexed_at, status) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
413            params![
414                repo.id,
415                repo.path,
416                repo.name,
417                repo.namespace,
418                repo.created_at,
419                repo.last_indexed_at,
420                repo.status,
421            ],
422        )
423        .storage_err()?;
424        Ok(())
425    }
426
427    /// Remove a repository by ID.
428    pub fn remove_repo(&self, id: &str) -> Result<bool, CodememError> {
429        let conn = self.conn()?;
430        let rows = conn
431            .execute("DELETE FROM repositories WHERE id = ?1", params![id])
432            .storage_err()?;
433        Ok(rows > 0)
434    }
435
436    /// Get a repository by ID.
437    pub fn get_repo(&self, id: &str) -> Result<Option<Repository>, CodememError> {
438        let conn = self.conn()?;
439        let result = conn
440            .query_row(
441                "SELECT id, path, name, namespace, created_at, last_indexed_at, status FROM repositories WHERE id = ?1",
442                params![id],
443                |row| {
444                    Ok((
445                        row.get::<_, String>(0)?,
446                        row.get::<_, String>(1)?,
447                        row.get::<_, Option<String>>(2)?,
448                        row.get::<_, Option<String>>(3)?,
449                        row.get::<_, String>(4)?,
450                        row.get::<_, Option<String>>(5)?,
451                        row.get::<_, Option<String>>(6)?.unwrap_or_else(|| "idle".to_string()),
452                    ))
453                },
454            )
455            .optional()
456            .storage_err()?;
457
458        match result {
459            Some((id, path, name, namespace, created_at, last_indexed_at, status)) => {
460                Ok(Some(Repository {
461                    id,
462                    path,
463                    name,
464                    namespace,
465                    created_at,
466                    last_indexed_at,
467                    status,
468                }))
469            }
470            None => Ok(None),
471        }
472    }
473
474    /// Update a repository's status and optionally last_indexed_at.
475    pub fn update_repo_status(
476        &self,
477        id: &str,
478        status: &str,
479        indexed_at: Option<&str>,
480    ) -> Result<(), CodememError> {
481        let conn = self.conn()?;
482        if let Some(ts) = indexed_at {
483            conn.execute(
484                "UPDATE repositories SET status = ?1, last_indexed_at = ?2 WHERE id = ?3",
485                params![status, ts, id],
486            )
487            .storage_err()?;
488        } else {
489            conn.execute(
490                "UPDATE repositories SET status = ?1 WHERE id = ?2",
491                params![status, id],
492            )
493            .storage_err()?;
494        }
495        Ok(())
496    }
497}
498
499#[cfg(test)]
500#[path = "tests/memory_tests.rs"]
501mod tests;