Skip to main content

codemem_storage/
memory.rs

1//! Memory CRUD operations on Storage.
2
3use crate::{MemoryRow, Storage};
4use codemem_core::{CodememError, MemoryNode, Repository};
5use rusqlite::{params, OptionalExtension};
6
7impl Storage {
8    /// Insert a new memory. Returns Err(Duplicate) if content hash already exists.
9    ///
10    /// Uses BEGIN IMMEDIATE to acquire a write lock before the dedup check,
11    /// ensuring the SELECT + INSERT are atomic. INSERT OR IGNORE is a safety
12    /// net against the UNIQUE constraint on content_hash.
13    ///
14    /// When an outer transaction is active (via `begin_transaction`), the
15    /// method skips starting its own transaction and executes directly on the
16    /// connection, so that all operations participate in the outer transaction.
17    pub fn insert_memory(&self, memory: &MemoryNode) -> Result<(), CodememError> {
18        // Check inside conn lock to avoid TOCTOU race with begin_transaction.
19        // The conn() mutex serializes all access, so the flag check + INSERT
20        // are atomic with respect to other callers.
21        let mut conn = self.conn()?;
22        if self.has_outer_transaction() {
23            drop(conn);
24            return self.insert_memory_no_tx(memory);
25        }
26
27        let tx = conn
28            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
29            .map_err(|e| CodememError::Storage(e.to_string()))?;
30
31        // Check dedup inside the transaction (namespace-scoped)
32        let existing: Option<String> = tx
33            .query_row(
34                "SELECT id FROM memories WHERE content_hash = ?1 AND namespace IS ?2",
35                params![memory.content_hash, memory.namespace],
36                |row| row.get(0),
37            )
38            .optional()
39            .map_err(|e| CodememError::Storage(e.to_string()))?;
40
41        if existing.is_some() {
42            tx.rollback()
43                .map_err(|e| CodememError::Storage(e.to_string()))?;
44            return Err(CodememError::Duplicate(memory.content_hash.clone()));
45        }
46
47        let tags_json = serde_json::to_string(&memory.tags)?;
48        let metadata_json = serde_json::to_string(&memory.metadata)?;
49
50        tx.execute(
51            "INSERT OR IGNORE INTO memories (id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at)
52             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
53            params![
54                memory.id,
55                memory.content,
56                memory.memory_type.to_string(),
57                memory.importance,
58                memory.confidence,
59                memory.access_count,
60                memory.content_hash,
61                tags_json,
62                metadata_json,
63                memory.namespace,
64                memory.session_id,
65                memory.created_at.timestamp(),
66                memory.updated_at.timestamp(),
67                memory.last_accessed_at.timestamp(),
68            ],
69        )
70        .map_err(|e| CodememError::Storage(e.to_string()))?;
71
72        tx.commit()
73            .map_err(|e| CodememError::Storage(e.to_string()))?;
74
75        Ok(())
76    }
77
78    /// Insert a memory without starting a new transaction.
79    /// Used when an outer transaction is already active.
80    fn insert_memory_no_tx(&self, memory: &MemoryNode) -> Result<(), CodememError> {
81        let conn = self.conn()?;
82
83        // Check dedup (namespace-scoped) — outer transaction provides atomicity
84        let existing: Option<String> = conn
85            .query_row(
86                "SELECT id FROM memories WHERE content_hash = ?1 AND namespace IS ?2",
87                params![memory.content_hash, memory.namespace],
88                |row| row.get(0),
89            )
90            .optional()
91            .map_err(|e| CodememError::Storage(e.to_string()))?;
92
93        if existing.is_some() {
94            return Err(CodememError::Duplicate(memory.content_hash.clone()));
95        }
96
97        let tags_json = serde_json::to_string(&memory.tags)?;
98        let metadata_json = serde_json::to_string(&memory.metadata)?;
99
100        conn.execute(
101            "INSERT OR IGNORE INTO memories (id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at)
102             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
103            params![
104                memory.id,
105                memory.content,
106                memory.memory_type.to_string(),
107                memory.importance,
108                memory.confidence,
109                memory.access_count,
110                memory.content_hash,
111                tags_json,
112                metadata_json,
113                memory.namespace,
114                memory.session_id,
115                memory.created_at.timestamp(),
116                memory.updated_at.timestamp(),
117                memory.last_accessed_at.timestamp(),
118            ],
119        )
120        .map_err(|e| CodememError::Storage(e.to_string()))?;
121
122        Ok(())
123    }
124
125    /// Get a memory by ID. Updates access_count and last_accessed_at.
126    pub fn get_memory(&self, id: &str) -> Result<Option<MemoryNode>, CodememError> {
127        let conn = self.conn()?;
128
129        // Bump access count first
130        let updated = conn
131            .execute(
132                "UPDATE memories SET access_count = access_count + 1, last_accessed_at = ?1 WHERE id = ?2",
133                params![chrono::Utc::now().timestamp(), id],
134            )
135            .map_err(|e| CodememError::Storage(e.to_string()))?;
136
137        if updated == 0 {
138            return Ok(None);
139        }
140
141        let result = conn
142            .query_row(
143                "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at FROM memories WHERE id = ?1",
144                params![id],
145                |row| {
146                    Ok(MemoryRow {
147                        id: row.get(0)?,
148                        content: row.get(1)?,
149                        memory_type: row.get(2)?,
150                        importance: row.get(3)?,
151                        confidence: row.get(4)?,
152                        access_count: row.get(5)?,
153                        content_hash: row.get(6)?,
154                        tags: row.get(7)?,
155                        metadata: row.get(8)?,
156                        namespace: row.get(9)?,
157                        session_id: row.get(10)?,
158                        created_at: row.get(11)?,
159                        updated_at: row.get(12)?,
160                        last_accessed_at: row.get(13)?,
161                    })
162                },
163            )
164            .optional()
165            .map_err(|e| CodememError::Storage(e.to_string()))?;
166
167        match result {
168            Some(row) => Ok(Some(row.into_memory_node()?)),
169            None => Ok(None),
170        }
171    }
172
173    /// Get a memory by ID without updating access_count or last_accessed_at.
174    /// Use for internal/system reads (consolidation, stats, batch processing).
175    pub fn get_memory_no_touch(&self, id: &str) -> Result<Option<MemoryNode>, CodememError> {
176        let conn = self.conn()?;
177
178        let result = conn
179            .query_row(
180                "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at FROM memories WHERE id = ?1",
181                params![id],
182                |row| {
183                    Ok(MemoryRow {
184                        id: row.get(0)?,
185                        content: row.get(1)?,
186                        memory_type: row.get(2)?,
187                        importance: row.get(3)?,
188                        confidence: row.get(4)?,
189                        access_count: row.get(5)?,
190                        content_hash: row.get(6)?,
191                        tags: row.get(7)?,
192                        metadata: row.get(8)?,
193                        namespace: row.get(9)?,
194                        session_id: row.get(10)?,
195                        created_at: row.get(11)?,
196                        updated_at: row.get(12)?,
197                        last_accessed_at: row.get(13)?,
198                    })
199                },
200            )
201            .optional()
202            .map_err(|e| CodememError::Storage(e.to_string()))?;
203
204        match result {
205            Some(row) => Ok(Some(row.into_memory_node()?)),
206            None => Ok(None),
207        }
208    }
209
210    /// Update a memory's content and re-hash. Returns `Err(NotFound)` if the ID doesn't exist.
211    pub fn update_memory(
212        &self,
213        id: &str,
214        content: &str,
215        importance: Option<f64>,
216    ) -> Result<(), CodememError> {
217        let conn = self.conn()?;
218        let hash = Self::content_hash(content);
219        let now = chrono::Utc::now().timestamp();
220
221        let rows_affected = if let Some(imp) = importance {
222            conn.execute(
223                "UPDATE memories SET content = ?1, content_hash = ?2, updated_at = ?3, importance = ?4 WHERE id = ?5",
224                params![content, hash, now, imp, id],
225            )
226            .map_err(|e| CodememError::Storage(e.to_string()))?
227        } else {
228            conn.execute(
229                "UPDATE memories SET content = ?1, content_hash = ?2, updated_at = ?3 WHERE id = ?4",
230                params![content, hash, now, id],
231            )
232            .map_err(|e| CodememError::Storage(e.to_string()))?
233        };
234
235        if rows_affected == 0 {
236            return Err(CodememError::NotFound(format!("Memory not found: {id}")));
237        }
238
239        Ok(())
240    }
241
242    /// Delete a memory by ID.
243    pub fn delete_memory(&self, id: &str) -> Result<bool, CodememError> {
244        let conn = self.conn()?;
245        let rows = conn
246            .execute("DELETE FROM memories WHERE id = ?1", params![id])
247            .map_err(|e| CodememError::Storage(e.to_string()))?;
248        Ok(rows > 0)
249    }
250
251    /// Delete a memory and all related data (embeddings, graph nodes/edges) atomically.
252    /// Returns true if the memory existed and was deleted.
253    pub fn delete_memory_cascade(&self, id: &str) -> Result<bool, CodememError> {
254        let mut conn = self.conn()?;
255        // L2: Use IMMEDIATE transaction to acquire write lock upfront,
256        // avoiding potential deadlock with DEFERRED transaction.
257        let tx = conn
258            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
259            .map_err(|e| CodememError::Storage(e.to_string()))?;
260
261        // Delete edges that reference graph nodes linked to this memory
262        tx.execute(
263            "DELETE FROM graph_edges WHERE src IN (SELECT id FROM graph_nodes WHERE memory_id = ?1)
264             OR dst IN (SELECT id FROM graph_nodes WHERE memory_id = ?1)",
265            params![id],
266        )
267        .map_err(|e| CodememError::Storage(e.to_string()))?;
268
269        // Delete graph nodes linked to this memory
270        tx.execute("DELETE FROM graph_nodes WHERE memory_id = ?1", params![id])
271            .map_err(|e| CodememError::Storage(e.to_string()))?;
272
273        // Delete embedding
274        tx.execute(
275            "DELETE FROM memory_embeddings WHERE memory_id = ?1",
276            params![id],
277        )
278        .map_err(|e| CodememError::Storage(e.to_string()))?;
279
280        // Delete the memory itself
281        let rows = tx
282            .execute("DELETE FROM memories WHERE id = ?1", params![id])
283            .map_err(|e| CodememError::Storage(e.to_string()))?;
284
285        tx.commit()
286            .map_err(|e| CodememError::Storage(e.to_string()))?;
287
288        Ok(rows > 0)
289    }
290
291    /// Delete multiple memories and all related data (embeddings, graph nodes/edges) atomically.
292    /// Returns the number of memories that were actually deleted.
293    pub fn delete_memories_batch_cascade(&self, ids: &[&str]) -> Result<usize, CodememError> {
294        if ids.is_empty() {
295            return Ok(0);
296        }
297
298        let mut conn = self.conn()?;
299        let tx = conn
300            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
301            .map_err(|e| CodememError::Storage(e.to_string()))?;
302
303        let placeholders: String = (1..=ids.len())
304            .map(|i| format!("?{i}"))
305            .collect::<Vec<_>>()
306            .join(",");
307        let params: Vec<&dyn rusqlite::types::ToSql> = ids
308            .iter()
309            .map(|id| id as &dyn rusqlite::types::ToSql)
310            .collect();
311
312        // Delete edges referencing graph nodes linked to these memories.
313        // Uses ?N numbered params which SQLite allows to be reused in the same statement.
314        let edge_sql = format!(
315            "DELETE FROM graph_edges WHERE \
316             src IN (SELECT id FROM graph_nodes WHERE memory_id IN ({placeholders})) \
317             OR dst IN (SELECT id FROM graph_nodes WHERE memory_id IN ({placeholders})) \
318             OR src IN ({placeholders}) OR dst IN ({placeholders})"
319        );
320        tx.execute(&edge_sql, params.as_slice())
321            .map_err(|e| CodememError::Storage(e.to_string()))?;
322
323        // Delete graph nodes linked to these memories (by memory_id column or by id)
324        let node_sql = format!(
325            "DELETE FROM graph_nodes WHERE memory_id IN ({placeholders}) OR id IN ({placeholders})"
326        );
327        tx.execute(&node_sql, params.as_slice())
328            .map_err(|e| CodememError::Storage(e.to_string()))?;
329
330        // Delete embeddings
331        let emb_sql = format!("DELETE FROM memory_embeddings WHERE memory_id IN ({placeholders})");
332        tx.execute(&emb_sql, params.as_slice())
333            .map_err(|e| CodememError::Storage(e.to_string()))?;
334
335        // Delete the memories themselves
336        let mem_sql = format!("DELETE FROM memories WHERE id IN ({placeholders})");
337        let deleted = tx
338            .execute(&mem_sql, params.as_slice())
339            .map_err(|e| CodememError::Storage(e.to_string()))?;
340
341        tx.commit()
342            .map_err(|e| CodememError::Storage(e.to_string()))?;
343
344        Ok(deleted)
345    }
346
347    /// List all memory IDs with an optional limit.
348    pub fn list_memory_ids(&self) -> Result<Vec<String>, CodememError> {
349        self.list_memory_ids_limited(None)
350    }
351
352    /// List memory IDs with an optional limit.
353    pub fn list_memory_ids_limited(
354        &self,
355        limit: Option<usize>,
356    ) -> Result<Vec<String>, CodememError> {
357        let conn = self.conn()?;
358        let (sql, params_vec): (&str, Vec<Box<dyn rusqlite::types::ToSql>>) =
359            if let Some(lim) = limit {
360                (
361                    "SELECT id FROM memories ORDER BY created_at DESC LIMIT ?1",
362                    vec![Box::new(lim as i64) as Box<dyn rusqlite::types::ToSql>],
363                )
364            } else {
365                ("SELECT id FROM memories ORDER BY created_at DESC", vec![])
366            };
367
368        let mut stmt = conn
369            .prepare(sql)
370            .map_err(|e| CodememError::Storage(e.to_string()))?;
371
372        let refs: Vec<&dyn rusqlite::types::ToSql> =
373            params_vec.iter().map(|p| p.as_ref()).collect();
374
375        let ids = stmt
376            .query_map(refs.as_slice(), |row| row.get(0))
377            .map_err(|e| CodememError::Storage(e.to_string()))?
378            .collect::<Result<Vec<String>, _>>()
379            .map_err(|e| CodememError::Storage(e.to_string()))?;
380
381        Ok(ids)
382    }
383
384    /// List memory IDs scoped to a specific namespace.
385    pub fn list_memory_ids_for_namespace(
386        &self,
387        namespace: &str,
388    ) -> Result<Vec<String>, CodememError> {
389        let conn = self.conn()?;
390        let mut stmt = conn
391            .prepare("SELECT id FROM memories WHERE namespace = ?1 ORDER BY created_at DESC")
392            .map_err(|e| CodememError::Storage(e.to_string()))?;
393
394        let ids = stmt
395            .query_map(params![namespace], |row| row.get(0))
396            .map_err(|e| CodememError::Storage(e.to_string()))?
397            .collect::<Result<Vec<String>, _>>()
398            .map_err(|e| CodememError::Storage(e.to_string()))?;
399
400        Ok(ids)
401    }
402
403    /// List all distinct namespaces.
404    pub fn list_namespaces(&self) -> Result<Vec<String>, CodememError> {
405        let conn = self.conn()?;
406        let mut stmt = conn
407            .prepare(
408                "SELECT DISTINCT namespace FROM (
409                    SELECT namespace FROM memories WHERE namespace IS NOT NULL
410                    UNION
411                    SELECT namespace FROM graph_nodes WHERE namespace IS NOT NULL
412                ) ORDER BY namespace",
413            )
414            .map_err(|e| CodememError::Storage(e.to_string()))?;
415
416        let namespaces = stmt
417            .query_map([], |row| row.get(0))
418            .map_err(|e| CodememError::Storage(e.to_string()))?
419            .collect::<Result<Vec<String>, _>>()
420            .map_err(|e| CodememError::Storage(e.to_string()))?;
421
422        Ok(namespaces)
423    }
424
425    /// Get memory count.
426    pub fn memory_count(&self) -> Result<usize, CodememError> {
427        let conn = self.conn()?;
428        let count: i64 = conn
429            .query_row("SELECT COUNT(*) FROM memories", [], |row| row.get(0))
430            .map_err(|e| CodememError::Storage(e.to_string()))?;
431        Ok(count as usize)
432    }
433
434    // ── Repository CRUD ─────────────────────────────────────────────────────
435
436    /// List all registered repositories.
437    pub fn list_repos(&self) -> Result<Vec<Repository>, CodememError> {
438        let conn = self.conn()?;
439        let mut stmt = conn
440            .prepare(
441                "SELECT id, path, name, namespace, created_at, last_indexed_at, status FROM repositories ORDER BY created_at DESC",
442            )
443            .map_err(|e| CodememError::Storage(e.to_string()))?;
444
445        let repos = stmt
446            .query_map([], |row| {
447                let created_ts: String = row.get(4)?;
448                let indexed_ts: Option<String> = row.get(5)?;
449                Ok((
450                    row.get::<_, String>(0)?,
451                    row.get::<_, String>(1)?,
452                    row.get::<_, Option<String>>(2)?,
453                    row.get::<_, Option<String>>(3)?,
454                    created_ts,
455                    indexed_ts,
456                    row.get::<_, Option<String>>(6)?
457                        .unwrap_or_else(|| "idle".to_string()),
458                ))
459            })
460            .map_err(|e| CodememError::Storage(e.to_string()))?
461            .collect::<Result<Vec<_>, _>>()
462            .map_err(|e| CodememError::Storage(e.to_string()))?;
463
464        let mut result = Vec::new();
465        for (id, path, name, namespace, created_at, last_indexed_at, status) in repos {
466            result.push(Repository {
467                id,
468                path,
469                name,
470                namespace,
471                created_at,
472                last_indexed_at,
473                status,
474            });
475        }
476
477        Ok(result)
478    }
479
480    /// Add a new repository.
481    pub fn add_repo(&self, repo: &Repository) -> Result<(), CodememError> {
482        let conn = self.conn()?;
483        conn.execute(
484            "INSERT INTO repositories (id, path, name, namespace, created_at, last_indexed_at, status) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
485            params![
486                repo.id,
487                repo.path,
488                repo.name,
489                repo.namespace,
490                repo.created_at,
491                repo.last_indexed_at,
492                repo.status,
493            ],
494        )
495        .map_err(|e| CodememError::Storage(e.to_string()))?;
496        Ok(())
497    }
498
499    /// Remove a repository by ID.
500    pub fn remove_repo(&self, id: &str) -> Result<bool, CodememError> {
501        let conn = self.conn()?;
502        let rows = conn
503            .execute("DELETE FROM repositories WHERE id = ?1", params![id])
504            .map_err(|e| CodememError::Storage(e.to_string()))?;
505        Ok(rows > 0)
506    }
507
508    /// Get a repository by ID.
509    pub fn get_repo(&self, id: &str) -> Result<Option<Repository>, CodememError> {
510        let conn = self.conn()?;
511        let result = conn
512            .query_row(
513                "SELECT id, path, name, namespace, created_at, last_indexed_at, status FROM repositories WHERE id = ?1",
514                params![id],
515                |row| {
516                    Ok((
517                        row.get::<_, String>(0)?,
518                        row.get::<_, String>(1)?,
519                        row.get::<_, Option<String>>(2)?,
520                        row.get::<_, Option<String>>(3)?,
521                        row.get::<_, String>(4)?,
522                        row.get::<_, Option<String>>(5)?,
523                        row.get::<_, Option<String>>(6)?.unwrap_or_else(|| "idle".to_string()),
524                    ))
525                },
526            )
527            .optional()
528            .map_err(|e| CodememError::Storage(e.to_string()))?;
529
530        match result {
531            Some((id, path, name, namespace, created_at, last_indexed_at, status)) => {
532                Ok(Some(Repository {
533                    id,
534                    path,
535                    name,
536                    namespace,
537                    created_at,
538                    last_indexed_at,
539                    status,
540                }))
541            }
542            None => Ok(None),
543        }
544    }
545
546    /// Update a repository's status and optionally last_indexed_at.
547    pub fn update_repo_status(
548        &self,
549        id: &str,
550        status: &str,
551        indexed_at: Option<&str>,
552    ) -> Result<(), CodememError> {
553        let conn = self.conn()?;
554        if let Some(ts) = indexed_at {
555            conn.execute(
556                "UPDATE repositories SET status = ?1, last_indexed_at = ?2 WHERE id = ?3",
557                params![status, ts, id],
558            )
559            .map_err(|e| CodememError::Storage(e.to_string()))?;
560        } else {
561            conn.execute(
562                "UPDATE repositories SET status = ?1 WHERE id = ?2",
563                params![status, id],
564            )
565            .map_err(|e| CodememError::Storage(e.to_string()))?;
566        }
567        Ok(())
568    }
569}
570
571#[cfg(test)]
572#[path = "tests/memory_tests.rs"]
573mod tests;