Skip to main content

codemem_storage/
memory.rs

1//! Memory CRUD operations on Storage.
2
3use crate::{MapStorageErr, MemoryRow, Storage};
4use codemem_core::{CodememError, MemoryNode, Repository};
5use rusqlite::{params, OptionalExtension};
6
7/// Shared dedup-check + INSERT logic used by both transactional and
8/// non-transactional insert paths.  Accepts `&rusqlite::Connection` because
9/// both `Connection` and `Transaction` deref to it.
10fn insert_memory_inner(
11    conn: &rusqlite::Connection,
12    memory: &MemoryNode,
13) -> Result<(), CodememError> {
14    // Check dedup (namespace-scoped)
15    let existing: Option<String> = conn
16        .query_row(
17            "SELECT id FROM memories WHERE content_hash = ?1 AND namespace IS ?2",
18            params![memory.content_hash, memory.namespace],
19            |row| row.get(0),
20        )
21        .optional()
22        .storage_err()?;
23
24    if existing.is_some() {
25        return Err(CodememError::Duplicate(memory.content_hash.clone()));
26    }
27
28    let tags_json = serde_json::to_string(&memory.tags)?;
29    let metadata_json = serde_json::to_string(&memory.metadata)?;
30
31    conn.execute(
32        "INSERT OR IGNORE INTO memories (id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, repo, git_ref, expires_at, created_at, updated_at, last_accessed_at)
33         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)",
34        params![
35            memory.id,
36            memory.content,
37            memory.memory_type.to_string(),
38            memory.importance,
39            memory.confidence,
40            memory.access_count,
41            memory.content_hash,
42            tags_json,
43            metadata_json,
44            memory.namespace,
45            memory.session_id,
46            memory.repo,
47            memory.git_ref,
48            memory.expires_at.map(|dt| dt.timestamp()),
49            memory.created_at.timestamp(),
50            memory.updated_at.timestamp(),
51            memory.last_accessed_at.timestamp(),
52        ],
53    )
54    .storage_err()?;
55
56    Ok(())
57}
58
59impl Storage {
60    /// Insert a new memory. Returns Err(Duplicate) if content hash already exists.
61    ///
62    /// Uses BEGIN IMMEDIATE to acquire a write lock before the dedup check,
63    /// ensuring the SELECT + INSERT are atomic. INSERT OR IGNORE is a safety
64    /// net against the UNIQUE constraint on content_hash.
65    ///
66    /// When an outer transaction is active (via `begin_transaction`), the
67    /// method skips starting its own transaction and executes directly on the
68    /// connection, so that all operations participate in the outer transaction.
69    pub fn insert_memory(&self, memory: &MemoryNode) -> Result<(), CodememError> {
70        // Check inside conn lock to avoid TOCTOU race with begin_transaction.
71        // The conn() mutex serializes all access, so the flag check + INSERT
72        // are atomic with respect to other callers.
73        let mut conn = self.conn()?;
74        if self.has_outer_transaction() {
75            drop(conn);
76            return self.insert_memory_no_tx(memory);
77        }
78
79        let tx = conn
80            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
81            .storage_err()?;
82
83        // Dedup check + INSERT inside the transaction; on Duplicate error the
84        // transaction is rolled back automatically when `tx` is dropped.
85        insert_memory_inner(&tx, memory)?;
86
87        tx.commit().storage_err()?;
88
89        Ok(())
90    }
91
92    /// Insert a memory without starting a new transaction.
93    /// Used when an outer transaction is already active.
94    fn insert_memory_no_tx(&self, memory: &MemoryNode) -> Result<(), CodememError> {
95        let conn = self.conn()?;
96        insert_memory_inner(&conn, memory)
97    }
98
99    /// Get a memory by ID. Updates access_count and last_accessed_at.
100    ///
101    /// Note: returns expired memories intentionally — direct ID lookups should
102    /// always succeed for debugging and internal use. Expiry filtering happens
103    /// in `list_memories_filtered` (bulk queries) and the opportunistic sweep
104    /// in `delete_expired_memories` handles actual cleanup.
105    pub fn get_memory(&self, id: &str) -> Result<Option<MemoryNode>, CodememError> {
106        let conn = self.conn()?;
107
108        // Bump access count first
109        let updated = conn
110            .execute(
111                "UPDATE memories SET access_count = access_count + 1, last_accessed_at = ?1 WHERE id = ?2",
112                params![chrono::Utc::now().timestamp(), id],
113            )
114            .storage_err()?;
115
116        if updated == 0 {
117            return Ok(None);
118        }
119
120        let result = conn
121            .query_row(
122                "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, repo, git_ref, expires_at, created_at, updated_at, last_accessed_at FROM memories WHERE id = ?1",
123                params![id],
124                MemoryRow::from_row,
125            )
126            .optional()
127            .storage_err()?;
128
129        match result {
130            Some(row) => Ok(Some(row.into_memory_node()?)),
131            None => Ok(None),
132        }
133    }
134
135    /// Get a memory by ID without updating access_count or last_accessed_at.
136    /// Use for internal/system reads (consolidation, stats, batch processing).
137    pub fn get_memory_no_touch(&self, id: &str) -> Result<Option<MemoryNode>, CodememError> {
138        let conn = self.conn()?;
139
140        let result = conn
141            .query_row(
142                "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, repo, git_ref, expires_at, created_at, updated_at, last_accessed_at FROM memories WHERE id = ?1",
143                params![id],
144                MemoryRow::from_row,
145            )
146            .optional()
147            .storage_err()?;
148
149        match result {
150            Some(row) => Ok(Some(row.into_memory_node()?)),
151            None => Ok(None),
152        }
153    }
154
155    /// Update a memory's content and re-hash. Returns `Err(NotFound)` if the ID doesn't exist.
156    pub fn update_memory(
157        &self,
158        id: &str,
159        content: &str,
160        importance: Option<f64>,
161    ) -> Result<(), CodememError> {
162        let conn = self.conn()?;
163        let hash = Self::content_hash(content);
164        let now = chrono::Utc::now().timestamp();
165
166        let rows_affected = if let Some(imp) = importance {
167            conn.execute(
168                "UPDATE memories SET content = ?1, content_hash = ?2, updated_at = ?3, importance = ?4 WHERE id = ?5",
169                params![content, hash, now, imp, id],
170            )
171            .storage_err()?
172        } else {
173            conn.execute(
174                "UPDATE memories SET content = ?1, content_hash = ?2, updated_at = ?3 WHERE id = ?4",
175                params![content, hash, now, id],
176            )
177            .storage_err()?
178        };
179
180        if rows_affected == 0 {
181            return Err(CodememError::NotFound(format!("Memory not found: {id}")));
182        }
183
184        Ok(())
185    }
186
187    /// Delete a memory by ID.
188    pub fn delete_memory(&self, id: &str) -> Result<bool, CodememError> {
189        let conn = self.conn()?;
190        let rows = conn
191            .execute("DELETE FROM memories WHERE id = ?1", params![id])
192            .storage_err()?;
193        Ok(rows > 0)
194    }
195
196    /// Delete a memory and all related data (embeddings, graph nodes/edges) atomically.
197    /// Returns true if the memory existed and was deleted.
198    pub fn delete_memory_cascade(&self, id: &str) -> Result<bool, CodememError> {
199        let mut conn = self.conn()?;
200        // L2: Use IMMEDIATE transaction to acquire write lock upfront,
201        // avoiding potential deadlock with DEFERRED transaction.
202        let tx = conn
203            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
204            .storage_err()?;
205
206        // Delete edges that reference graph nodes linked to this memory
207        tx.execute(
208            "DELETE FROM graph_edges WHERE src IN (SELECT id FROM graph_nodes WHERE memory_id = ?1)
209             OR dst IN (SELECT id FROM graph_nodes WHERE memory_id = ?1)",
210            params![id],
211        )
212        .storage_err()?;
213
214        // Delete graph nodes linked to this memory
215        tx.execute("DELETE FROM graph_nodes WHERE memory_id = ?1", params![id])
216            .storage_err()?;
217
218        // Delete embedding
219        tx.execute(
220            "DELETE FROM memory_embeddings WHERE memory_id = ?1",
221            params![id],
222        )
223        .storage_err()?;
224
225        // Delete the memory itself
226        let rows = tx
227            .execute("DELETE FROM memories WHERE id = ?1", params![id])
228            .storage_err()?;
229
230        tx.commit().storage_err()?;
231
232        Ok(rows > 0)
233    }
234
235    /// Delete multiple memories and all related data (embeddings, graph nodes/edges) atomically.
236    /// Returns the number of memories that were actually deleted.
237    pub fn delete_memories_batch_cascade(&self, ids: &[&str]) -> Result<usize, CodememError> {
238        if ids.is_empty() {
239            return Ok(0);
240        }
241
242        let mut conn = self.conn()?;
243        let tx = conn
244            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
245            .storage_err()?;
246
247        let placeholders: String = (1..=ids.len())
248            .map(|i| format!("?{i}"))
249            .collect::<Vec<_>>()
250            .join(",");
251        let params: Vec<&dyn rusqlite::types::ToSql> = ids
252            .iter()
253            .map(|id| id as &dyn rusqlite::types::ToSql)
254            .collect();
255
256        // Delete edges referencing graph nodes linked to these memories.
257        // Uses ?N numbered params which SQLite allows to be reused in the same statement.
258        let edge_sql = format!(
259            "DELETE FROM graph_edges WHERE \
260             src IN (SELECT id FROM graph_nodes WHERE memory_id IN ({placeholders})) \
261             OR dst IN (SELECT id FROM graph_nodes WHERE memory_id IN ({placeholders})) \
262             OR src IN ({placeholders}) OR dst IN ({placeholders})"
263        );
264        tx.execute(&edge_sql, params.as_slice()).storage_err()?;
265
266        // Delete graph nodes linked to these memories (by memory_id column or by id)
267        let node_sql = format!(
268            "DELETE FROM graph_nodes WHERE memory_id IN ({placeholders}) OR id IN ({placeholders})"
269        );
270        tx.execute(&node_sql, params.as_slice()).storage_err()?;
271
272        // Delete embeddings
273        let emb_sql = format!("DELETE FROM memory_embeddings WHERE memory_id IN ({placeholders})");
274        tx.execute(&emb_sql, params.as_slice()).storage_err()?;
275
276        // Delete the memories themselves
277        let mem_sql = format!("DELETE FROM memories WHERE id IN ({placeholders})");
278        let deleted = tx.execute(&mem_sql, params.as_slice()).storage_err()?;
279
280        tx.commit().storage_err()?;
281
282        Ok(deleted)
283    }
284
285    /// List all memory IDs with an optional limit.
286    pub fn list_memory_ids(&self) -> Result<Vec<String>, CodememError> {
287        self.list_memory_ids_limited(None)
288    }
289
290    /// List memory IDs with an optional limit.
291    pub fn list_memory_ids_limited(
292        &self,
293        limit: Option<usize>,
294    ) -> Result<Vec<String>, CodememError> {
295        let conn = self.conn()?;
296        let (sql, params_vec): (&str, Vec<Box<dyn rusqlite::types::ToSql>>) =
297            if let Some(lim) = limit {
298                (
299                    "SELECT id FROM memories ORDER BY created_at DESC LIMIT ?1",
300                    vec![Box::new(lim as i64) as Box<dyn rusqlite::types::ToSql>],
301                )
302            } else {
303                ("SELECT id FROM memories ORDER BY created_at DESC", vec![])
304            };
305
306        let mut stmt = conn.prepare(sql).storage_err()?;
307
308        let refs: Vec<&dyn rusqlite::types::ToSql> =
309            params_vec.iter().map(|p| p.as_ref()).collect();
310
311        let ids = stmt
312            .query_map(refs.as_slice(), |row| row.get(0))
313            .storage_err()?
314            .collect::<Result<Vec<String>, _>>()
315            .storage_err()?;
316
317        Ok(ids)
318    }
319
320    /// List memory IDs scoped to a specific namespace.
321    pub fn list_memory_ids_for_namespace(
322        &self,
323        namespace: &str,
324    ) -> Result<Vec<String>, CodememError> {
325        let conn = self.conn()?;
326        let mut stmt = conn
327            .prepare("SELECT id FROM memories WHERE namespace = ?1 ORDER BY created_at DESC")
328            .storage_err()?;
329
330        let ids = stmt
331            .query_map(params![namespace], |row| row.get(0))
332            .storage_err()?
333            .collect::<Result<Vec<String>, _>>()
334            .storage_err()?;
335
336        Ok(ids)
337    }
338
339    /// List all distinct namespaces.
340    pub fn list_namespaces(&self) -> Result<Vec<String>, CodememError> {
341        let conn = self.conn()?;
342        let mut stmt = conn
343            .prepare(
344                "SELECT DISTINCT namespace FROM (
345                    SELECT namespace FROM memories WHERE namespace IS NOT NULL
346                    UNION
347                    SELECT namespace FROM graph_nodes WHERE namespace IS NOT NULL
348                ) ORDER BY namespace",
349            )
350            .storage_err()?;
351
352        let namespaces = stmt
353            .query_map([], |row| row.get(0))
354            .storage_err()?
355            .collect::<Result<Vec<String>, _>>()
356            .storage_err()?;
357
358        Ok(namespaces)
359    }
360
361    /// Get memory count.
362    pub fn memory_count(&self) -> Result<usize, CodememError> {
363        let conn = self.conn()?;
364        let count: i64 = conn
365            .query_row("SELECT COUNT(*) FROM memories", [], |row| row.get(0))
366            .storage_err()?;
367        Ok(count as usize)
368    }
369
370    // ── Repository CRUD ─────────────────────────────────────────────────────
371
372    /// List all registered repositories.
373    pub fn list_repos(&self) -> Result<Vec<Repository>, CodememError> {
374        let conn = self.conn()?;
375        let mut stmt = conn
376            .prepare(
377                "SELECT id, path, name, namespace, created_at, last_indexed_at, status FROM repositories ORDER BY created_at DESC",
378            )
379            .storage_err()?;
380
381        let repos = stmt
382            .query_map([], |row| {
383                let created_ts: String = row.get(4)?;
384                let indexed_ts: Option<String> = row.get(5)?;
385                Ok((
386                    row.get::<_, String>(0)?,
387                    row.get::<_, String>(1)?,
388                    row.get::<_, Option<String>>(2)?,
389                    row.get::<_, Option<String>>(3)?,
390                    created_ts,
391                    indexed_ts,
392                    row.get::<_, Option<String>>(6)?
393                        .unwrap_or_else(|| "idle".to_string()),
394                ))
395            })
396            .storage_err()?
397            .collect::<Result<Vec<_>, _>>()
398            .storage_err()?;
399
400        let mut result = Vec::new();
401        for (id, path, name, namespace, created_at, last_indexed_at, status) in repos {
402            result.push(Repository {
403                id,
404                path,
405                name,
406                namespace,
407                created_at,
408                last_indexed_at,
409                status,
410            });
411        }
412
413        Ok(result)
414    }
415
416    /// Add a new repository.
417    pub fn add_repo(&self, repo: &Repository) -> Result<(), CodememError> {
418        let conn = self.conn()?;
419        conn.execute(
420            "INSERT INTO repositories (id, path, name, namespace, created_at, last_indexed_at, status) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
421            params![
422                repo.id,
423                repo.path,
424                repo.name,
425                repo.namespace,
426                repo.created_at,
427                repo.last_indexed_at,
428                repo.status,
429            ],
430        )
431        .storage_err()?;
432        Ok(())
433    }
434
435    /// Remove a repository by ID.
436    pub fn remove_repo(&self, id: &str) -> Result<bool, CodememError> {
437        let conn = self.conn()?;
438        let rows = conn
439            .execute("DELETE FROM repositories WHERE id = ?1", params![id])
440            .storage_err()?;
441        Ok(rows > 0)
442    }
443
444    /// Get a repository by ID.
445    pub fn get_repo(&self, id: &str) -> Result<Option<Repository>, CodememError> {
446        let conn = self.conn()?;
447        let result = conn
448            .query_row(
449                "SELECT id, path, name, namespace, created_at, last_indexed_at, status FROM repositories WHERE id = ?1",
450                params![id],
451                |row| {
452                    Ok((
453                        row.get::<_, String>(0)?,
454                        row.get::<_, String>(1)?,
455                        row.get::<_, Option<String>>(2)?,
456                        row.get::<_, Option<String>>(3)?,
457                        row.get::<_, String>(4)?,
458                        row.get::<_, Option<String>>(5)?,
459                        row.get::<_, Option<String>>(6)?.unwrap_or_else(|| "idle".to_string()),
460                    ))
461                },
462            )
463            .optional()
464            .storage_err()?;
465
466        match result {
467            Some((id, path, name, namespace, created_at, last_indexed_at, status)) => {
468                Ok(Some(Repository {
469                    id,
470                    path,
471                    name,
472                    namespace,
473                    created_at,
474                    last_indexed_at,
475                    status,
476                }))
477            }
478            None => Ok(None),
479        }
480    }
481
482    /// Update a repository's status and optionally last_indexed_at.
483    pub fn update_repo_status(
484        &self,
485        id: &str,
486        status: &str,
487        indexed_at: Option<&str>,
488    ) -> Result<(), CodememError> {
489        let conn = self.conn()?;
490        if let Some(ts) = indexed_at {
491            conn.execute(
492                "UPDATE repositories SET status = ?1, last_indexed_at = ?2 WHERE id = ?3",
493                params![status, ts, id],
494            )
495            .storage_err()?;
496        } else {
497            conn.execute(
498                "UPDATE repositories SET status = ?1 WHERE id = ?2",
499                params![status, id],
500            )
501            .storage_err()?;
502        }
503        Ok(())
504    }
505
506    /// Delete all expired memories (where `expires_at <= now`).
507    /// Returns the number of memories deleted.
508    pub fn delete_expired_memories(&self) -> Result<usize, CodememError> {
509        let conn = self.conn()?;
510        let now = chrono::Utc::now().timestamp();
511
512        // Collect IDs of expired memories first, then delete embeddings for exactly
513        // those IDs (avoids O(all-embeddings) NOT IN subquery).
514        let mut stmt = conn
515            .prepare("SELECT id FROM memories WHERE expires_at IS NOT NULL AND expires_at <= ?1")
516            .storage_err()?;
517        let expired_ids: Vec<String> = stmt
518            .query_map(params![now], |row| row.get(0))
519            .storage_err()?
520            .collect::<Result<Vec<String>, _>>()
521            .storage_err()?;
522
523        if expired_ids.is_empty() {
524            return Ok(0);
525        }
526
527        // Batch in chunks of 999 to respect SQLite's parameter limit.
528        let mut total_deleted = 0usize;
529        for chunk in expired_ids.chunks(999) {
530            let placeholders: String = (1..=chunk.len())
531                .map(|i| format!("?{i}"))
532                .collect::<Vec<_>>()
533                .join(",");
534            let params: Vec<&dyn rusqlite::types::ToSql> = chunk
535                .iter()
536                .map(|id| id as &dyn rusqlite::types::ToSql)
537                .collect();
538
539            let emb_sql =
540                format!("DELETE FROM memory_embeddings WHERE memory_id IN ({placeholders})");
541            conn.execute(&emb_sql, params.as_slice()).storage_err()?;
542
543            let mem_sql = format!("DELETE FROM memories WHERE id IN ({placeholders})");
544            total_deleted += conn.execute(&mem_sql, params.as_slice()).storage_err()?;
545        }
546
547        Ok(total_deleted)
548    }
549
550    /// Mark memories as expired for symbols in files whose content hash changed.
551    /// Targets memories with `static-analysis` tag linked (via graph) to symbols
552    /// in the given file — found via `graph_nodes.memory_id` (primary link) or
553    /// `RELATES_TO` edges in `graph_edges` (secondary link for SCIP doc memories).
554    /// Sets `expires_at` to now so they'll be cleaned up on next opportunistic sweep.
555    pub fn expire_memories_for_file(&self, file_path: &str) -> Result<usize, CodememError> {
556        let conn = self.conn()?;
557        let now = chrono::Utc::now().timestamp();
558        let expired = conn
559            .execute(
560                "UPDATE memories SET expires_at = ?1
561                 WHERE expires_at IS NULL
562                   AND id IN (SELECT m2.id FROM memories m2, json_each(m2.tags) jt
563                              WHERE jt.value = 'static-analysis')
564                   AND (
565                     id IN (
566                       SELECT gn.memory_id FROM graph_nodes gn
567                       WHERE gn.memory_id IS NOT NULL
568                         AND json_extract(gn.payload, '$.file_path') = ?2
569                     )
570                     OR id IN (
571                       SELECT REPLACE(ge.dst, 'mem:', '')
572                       FROM graph_edges ge
573                       JOIN graph_nodes gn ON ge.src = gn.id
574                       WHERE ge.relationship = 'RELATES_TO'
575                         AND ge.dst LIKE 'mem:%'
576                         AND json_extract(gn.payload, '$.file_path') = ?2
577                     )
578                   )",
579                params![now, file_path],
580            )
581            .storage_err()?;
582        Ok(expired)
583    }
584}
585
586#[cfg(test)]
587#[path = "tests/memory_tests.rs"]
588mod tests;