Skip to main content

codemem_storage/
memory.rs

1//! Memory CRUD operations on Storage.
2
3use crate::{MemoryRow, Storage};
4use codemem_core::{CodememError, MemoryNode, Repository};
5use rusqlite::{params, OptionalExtension};
6
7impl Storage {
8    /// Insert a new memory. Returns Err(Duplicate) if content hash already exists.
9    ///
10    /// Uses BEGIN IMMEDIATE to acquire a write lock before the dedup check,
11    /// ensuring the SELECT + INSERT are atomic. INSERT OR IGNORE is a safety
12    /// net against the UNIQUE constraint on content_hash.
13    pub fn insert_memory(&self, memory: &MemoryNode) -> Result<(), CodememError> {
14        let mut conn = self.conn()?;
15
16        let tx = conn
17            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
18            .map_err(|e| CodememError::Storage(e.to_string()))?;
19
20        // Check dedup inside the transaction (namespace-scoped)
21        let existing: Option<String> = tx
22            .query_row(
23                "SELECT id FROM memories WHERE content_hash = ?1 AND namespace IS ?2",
24                params![memory.content_hash, memory.namespace],
25                |row| row.get(0),
26            )
27            .optional()
28            .map_err(|e| CodememError::Storage(e.to_string()))?;
29
30        if existing.is_some() {
31            tx.rollback()
32                .map_err(|e| CodememError::Storage(e.to_string()))?;
33            return Err(CodememError::Duplicate(memory.content_hash.clone()));
34        }
35
36        let tags_json = serde_json::to_string(&memory.tags)?;
37        let metadata_json = serde_json::to_string(&memory.metadata)?;
38
39        tx.execute(
40            "INSERT OR IGNORE INTO memories (id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, created_at, updated_at, last_accessed_at)
41             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
42            params![
43                memory.id,
44                memory.content,
45                memory.memory_type.to_string(),
46                memory.importance,
47                memory.confidence,
48                memory.access_count,
49                memory.content_hash,
50                tags_json,
51                metadata_json,
52                memory.namespace,
53                memory.created_at.timestamp(),
54                memory.updated_at.timestamp(),
55                memory.last_accessed_at.timestamp(),
56            ],
57        )
58        .map_err(|e| CodememError::Storage(e.to_string()))?;
59
60        tx.commit()
61            .map_err(|e| CodememError::Storage(e.to_string()))?;
62
63        Ok(())
64    }
65
66    /// Get a memory by ID. Updates access_count and last_accessed_at.
67    pub fn get_memory(&self, id: &str) -> Result<Option<MemoryNode>, CodememError> {
68        let conn = self.conn()?;
69
70        // Bump access count first
71        let updated = conn
72            .execute(
73                "UPDATE memories SET access_count = access_count + 1, last_accessed_at = ?1 WHERE id = ?2",
74                params![chrono::Utc::now().timestamp(), id],
75            )
76            .map_err(|e| CodememError::Storage(e.to_string()))?;
77
78        if updated == 0 {
79            return Ok(None);
80        }
81
82        let result = conn
83            .query_row(
84                "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, created_at, updated_at, last_accessed_at FROM memories WHERE id = ?1",
85                params![id],
86                |row| {
87                    Ok(MemoryRow {
88                        id: row.get(0)?,
89                        content: row.get(1)?,
90                        memory_type: row.get(2)?,
91                        importance: row.get(3)?,
92                        confidence: row.get(4)?,
93                        access_count: row.get(5)?,
94                        content_hash: row.get(6)?,
95                        tags: row.get(7)?,
96                        metadata: row.get(8)?,
97                        namespace: row.get(9)?,
98                        created_at: row.get(10)?,
99                        updated_at: row.get(11)?,
100                        last_accessed_at: row.get(12)?,
101                    })
102                },
103            )
104            .optional()
105            .map_err(|e| CodememError::Storage(e.to_string()))?;
106
107        match result {
108            Some(row) => Ok(Some(row.into_memory_node()?)),
109            None => Ok(None),
110        }
111    }
112
113    /// Get a memory by ID without updating access_count or last_accessed_at.
114    /// Use for internal/system reads (consolidation, stats, batch processing).
115    pub fn get_memory_no_touch(&self, id: &str) -> Result<Option<MemoryNode>, CodememError> {
116        let conn = self.conn()?;
117
118        let result = conn
119            .query_row(
120                "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, created_at, updated_at, last_accessed_at FROM memories WHERE id = ?1",
121                params![id],
122                |row| {
123                    Ok(MemoryRow {
124                        id: row.get(0)?,
125                        content: row.get(1)?,
126                        memory_type: row.get(2)?,
127                        importance: row.get(3)?,
128                        confidence: row.get(4)?,
129                        access_count: row.get(5)?,
130                        content_hash: row.get(6)?,
131                        tags: row.get(7)?,
132                        metadata: row.get(8)?,
133                        namespace: row.get(9)?,
134                        created_at: row.get(10)?,
135                        updated_at: row.get(11)?,
136                        last_accessed_at: row.get(12)?,
137                    })
138                },
139            )
140            .optional()
141            .map_err(|e| CodememError::Storage(e.to_string()))?;
142
143        match result {
144            Some(row) => Ok(Some(row.into_memory_node()?)),
145            None => Ok(None),
146        }
147    }
148
149    /// Update a memory's content and re-hash. Returns `Err(NotFound)` if the ID doesn't exist.
150    pub fn update_memory(
151        &self,
152        id: &str,
153        content: &str,
154        importance: Option<f64>,
155    ) -> Result<(), CodememError> {
156        let conn = self.conn()?;
157        let hash = Self::content_hash(content);
158        let now = chrono::Utc::now().timestamp();
159
160        let rows_affected = if let Some(imp) = importance {
161            conn.execute(
162                "UPDATE memories SET content = ?1, content_hash = ?2, updated_at = ?3, importance = ?4 WHERE id = ?5",
163                params![content, hash, now, imp, id],
164            )
165            .map_err(|e| CodememError::Storage(e.to_string()))?
166        } else {
167            conn.execute(
168                "UPDATE memories SET content = ?1, content_hash = ?2, updated_at = ?3 WHERE id = ?4",
169                params![content, hash, now, id],
170            )
171            .map_err(|e| CodememError::Storage(e.to_string()))?
172        };
173
174        if rows_affected == 0 {
175            return Err(CodememError::NotFound(format!("Memory not found: {id}")));
176        }
177
178        Ok(())
179    }
180
181    /// Delete a memory by ID.
182    pub fn delete_memory(&self, id: &str) -> Result<bool, CodememError> {
183        let conn = self.conn()?;
184        let rows = conn
185            .execute("DELETE FROM memories WHERE id = ?1", params![id])
186            .map_err(|e| CodememError::Storage(e.to_string()))?;
187        Ok(rows > 0)
188    }
189
190    /// Delete a memory and all related data (embeddings, graph nodes/edges) atomically.
191    /// Returns true if the memory existed and was deleted.
192    pub fn delete_memory_cascade(&self, id: &str) -> Result<bool, CodememError> {
193        let mut conn = self.conn()?;
194        // L2: Use IMMEDIATE transaction to acquire write lock upfront,
195        // avoiding potential deadlock with DEFERRED transaction.
196        let tx = conn
197            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
198            .map_err(|e| CodememError::Storage(e.to_string()))?;
199
200        // Delete edges that reference graph nodes linked to this memory
201        tx.execute(
202            "DELETE FROM graph_edges WHERE src IN (SELECT id FROM graph_nodes WHERE memory_id = ?1)
203             OR dst IN (SELECT id FROM graph_nodes WHERE memory_id = ?1)",
204            params![id],
205        )
206        .map_err(|e| CodememError::Storage(e.to_string()))?;
207
208        // Delete graph nodes linked to this memory
209        tx.execute("DELETE FROM graph_nodes WHERE memory_id = ?1", params![id])
210            .map_err(|e| CodememError::Storage(e.to_string()))?;
211
212        // Delete embedding
213        tx.execute(
214            "DELETE FROM memory_embeddings WHERE memory_id = ?1",
215            params![id],
216        )
217        .map_err(|e| CodememError::Storage(e.to_string()))?;
218
219        // Delete the memory itself
220        let rows = tx
221            .execute("DELETE FROM memories WHERE id = ?1", params![id])
222            .map_err(|e| CodememError::Storage(e.to_string()))?;
223
224        tx.commit()
225            .map_err(|e| CodememError::Storage(e.to_string()))?;
226
227        Ok(rows > 0)
228    }
229
230    /// List all memory IDs with an optional limit.
231    pub fn list_memory_ids(&self) -> Result<Vec<String>, CodememError> {
232        self.list_memory_ids_limited(None)
233    }
234
235    /// List memory IDs with an optional limit.
236    pub fn list_memory_ids_limited(
237        &self,
238        limit: Option<usize>,
239    ) -> Result<Vec<String>, CodememError> {
240        let conn = self.conn()?;
241        let (sql, params_vec): (&str, Vec<Box<dyn rusqlite::types::ToSql>>) =
242            if let Some(lim) = limit {
243                (
244                    "SELECT id FROM memories ORDER BY created_at DESC LIMIT ?1",
245                    vec![Box::new(lim as i64) as Box<dyn rusqlite::types::ToSql>],
246                )
247            } else {
248                ("SELECT id FROM memories ORDER BY created_at DESC", vec![])
249            };
250
251        let mut stmt = conn
252            .prepare(sql)
253            .map_err(|e| CodememError::Storage(e.to_string()))?;
254
255        let refs: Vec<&dyn rusqlite::types::ToSql> =
256            params_vec.iter().map(|p| p.as_ref()).collect();
257
258        let ids = stmt
259            .query_map(refs.as_slice(), |row| row.get(0))
260            .map_err(|e| CodememError::Storage(e.to_string()))?
261            .collect::<Result<Vec<String>, _>>()
262            .map_err(|e| CodememError::Storage(e.to_string()))?;
263
264        Ok(ids)
265    }
266
267    /// List memory IDs scoped to a specific namespace.
268    pub fn list_memory_ids_for_namespace(
269        &self,
270        namespace: &str,
271    ) -> Result<Vec<String>, CodememError> {
272        let conn = self.conn()?;
273        let mut stmt = conn
274            .prepare("SELECT id FROM memories WHERE namespace = ?1 ORDER BY created_at DESC")
275            .map_err(|e| CodememError::Storage(e.to_string()))?;
276
277        let ids = stmt
278            .query_map(params![namespace], |row| row.get(0))
279            .map_err(|e| CodememError::Storage(e.to_string()))?
280            .collect::<Result<Vec<String>, _>>()
281            .map_err(|e| CodememError::Storage(e.to_string()))?;
282
283        Ok(ids)
284    }
285
286    /// List all distinct namespaces.
287    pub fn list_namespaces(&self) -> Result<Vec<String>, CodememError> {
288        let conn = self.conn()?;
289        let mut stmt = conn
290            .prepare(
291                "SELECT DISTINCT namespace FROM (
292                    SELECT namespace FROM memories WHERE namespace IS NOT NULL
293                    UNION
294                    SELECT namespace FROM graph_nodes WHERE namespace IS NOT NULL
295                ) ORDER BY namespace",
296            )
297            .map_err(|e| CodememError::Storage(e.to_string()))?;
298
299        let namespaces = stmt
300            .query_map([], |row| row.get(0))
301            .map_err(|e| CodememError::Storage(e.to_string()))?
302            .collect::<Result<Vec<String>, _>>()
303            .map_err(|e| CodememError::Storage(e.to_string()))?;
304
305        Ok(namespaces)
306    }
307
308    /// Get memory count.
309    pub fn memory_count(&self) -> Result<usize, CodememError> {
310        let conn = self.conn()?;
311        let count: i64 = conn
312            .query_row("SELECT COUNT(*) FROM memories", [], |row| row.get(0))
313            .map_err(|e| CodememError::Storage(e.to_string()))?;
314        Ok(count as usize)
315    }
316
317    // ── Repository CRUD ─────────────────────────────────────────────────────
318
319    /// List all registered repositories.
320    pub fn list_repos(&self) -> Result<Vec<Repository>, CodememError> {
321        let conn = self.conn()?;
322        let mut stmt = conn
323            .prepare(
324                "SELECT id, path, name, namespace, created_at, last_indexed_at, status FROM repositories ORDER BY created_at DESC",
325            )
326            .map_err(|e| CodememError::Storage(e.to_string()))?;
327
328        let repos = stmt
329            .query_map([], |row| {
330                let created_ts: String = row.get(4)?;
331                let indexed_ts: Option<String> = row.get(5)?;
332                Ok((
333                    row.get::<_, String>(0)?,
334                    row.get::<_, String>(1)?,
335                    row.get::<_, Option<String>>(2)?,
336                    row.get::<_, Option<String>>(3)?,
337                    created_ts,
338                    indexed_ts,
339                    row.get::<_, Option<String>>(6)?
340                        .unwrap_or_else(|| "idle".to_string()),
341                ))
342            })
343            .map_err(|e| CodememError::Storage(e.to_string()))?
344            .collect::<Result<Vec<_>, _>>()
345            .map_err(|e| CodememError::Storage(e.to_string()))?;
346
347        let mut result = Vec::new();
348        for (id, path, name, namespace, created_at, last_indexed_at, status) in repos {
349            result.push(Repository {
350                id,
351                path,
352                name,
353                namespace,
354                created_at,
355                last_indexed_at,
356                status,
357            });
358        }
359
360        Ok(result)
361    }
362
363    /// Add a new repository.
364    pub fn add_repo(&self, repo: &Repository) -> Result<(), CodememError> {
365        let conn = self.conn()?;
366        conn.execute(
367            "INSERT INTO repositories (id, path, name, namespace, created_at, last_indexed_at, status) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
368            params![
369                repo.id,
370                repo.path,
371                repo.name,
372                repo.namespace,
373                repo.created_at,
374                repo.last_indexed_at,
375                repo.status,
376            ],
377        )
378        .map_err(|e| CodememError::Storage(e.to_string()))?;
379        Ok(())
380    }
381
382    /// Remove a repository by ID.
383    pub fn remove_repo(&self, id: &str) -> Result<bool, CodememError> {
384        let conn = self.conn()?;
385        let rows = conn
386            .execute("DELETE FROM repositories WHERE id = ?1", params![id])
387            .map_err(|e| CodememError::Storage(e.to_string()))?;
388        Ok(rows > 0)
389    }
390
391    /// Get a repository by ID.
392    pub fn get_repo(&self, id: &str) -> Result<Option<Repository>, CodememError> {
393        let conn = self.conn()?;
394        let result = conn
395            .query_row(
396                "SELECT id, path, name, namespace, created_at, last_indexed_at, status FROM repositories WHERE id = ?1",
397                params![id],
398                |row| {
399                    Ok((
400                        row.get::<_, String>(0)?,
401                        row.get::<_, String>(1)?,
402                        row.get::<_, Option<String>>(2)?,
403                        row.get::<_, Option<String>>(3)?,
404                        row.get::<_, String>(4)?,
405                        row.get::<_, Option<String>>(5)?,
406                        row.get::<_, Option<String>>(6)?.unwrap_or_else(|| "idle".to_string()),
407                    ))
408                },
409            )
410            .optional()
411            .map_err(|e| CodememError::Storage(e.to_string()))?;
412
413        match result {
414            Some((id, path, name, namespace, created_at, last_indexed_at, status)) => {
415                Ok(Some(Repository {
416                    id,
417                    path,
418                    name,
419                    namespace,
420                    created_at,
421                    last_indexed_at,
422                    status,
423                }))
424            }
425            None => Ok(None),
426        }
427    }
428
429    /// Update a repository's status and optionally last_indexed_at.
430    pub fn update_repo_status(
431        &self,
432        id: &str,
433        status: &str,
434        indexed_at: Option<&str>,
435    ) -> Result<(), CodememError> {
436        let conn = self.conn()?;
437        if let Some(ts) = indexed_at {
438            conn.execute(
439                "UPDATE repositories SET status = ?1, last_indexed_at = ?2 WHERE id = ?3",
440                params![status, ts, id],
441            )
442            .map_err(|e| CodememError::Storage(e.to_string()))?;
443        } else {
444            conn.execute(
445                "UPDATE repositories SET status = ?1 WHERE id = ?2",
446                params![status, id],
447            )
448            .map_err(|e| CodememError::Storage(e.to_string()))?;
449        }
450        Ok(())
451    }
452}
453
454#[cfg(test)]
455#[path = "tests/memory_tests.rs"]
456mod tests;