1use crate::{MapStorageErr, MemoryRow, Storage};
4use codemem_core::{CodememError, MemoryNode, Repository};
5use rusqlite::{params, OptionalExtension};
6
7fn insert_memory_inner(
11 conn: &rusqlite::Connection,
12 memory: &MemoryNode,
13) -> Result<(), CodememError> {
14 let existing: Option<String> = conn
16 .query_row(
17 "SELECT id FROM memories WHERE content_hash = ?1 AND namespace IS ?2",
18 params![memory.content_hash, memory.namespace],
19 |row| row.get(0),
20 )
21 .optional()
22 .storage_err()?;
23
24 if existing.is_some() {
25 return Err(CodememError::Duplicate(memory.content_hash.clone()));
26 }
27
28 let tags_json = serde_json::to_string(&memory.tags)?;
29 let metadata_json = serde_json::to_string(&memory.metadata)?;
30
31 conn.execute(
32 "INSERT OR IGNORE INTO memories (id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at)
33 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
34 params![
35 memory.id,
36 memory.content,
37 memory.memory_type.to_string(),
38 memory.importance,
39 memory.confidence,
40 memory.access_count,
41 memory.content_hash,
42 tags_json,
43 metadata_json,
44 memory.namespace,
45 memory.session_id,
46 memory.created_at.timestamp(),
47 memory.updated_at.timestamp(),
48 memory.last_accessed_at.timestamp(),
49 ],
50 )
51 .storage_err()?;
52
53 Ok(())
54}
55
56impl Storage {
57 pub fn insert_memory(&self, memory: &MemoryNode) -> Result<(), CodememError> {
67 let mut conn = self.conn()?;
71 if self.has_outer_transaction() {
72 drop(conn);
73 return self.insert_memory_no_tx(memory);
74 }
75
76 let tx = conn
77 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
78 .storage_err()?;
79
80 insert_memory_inner(&tx, memory)?;
83
84 tx.commit().storage_err()?;
85
86 Ok(())
87 }
88
89 fn insert_memory_no_tx(&self, memory: &MemoryNode) -> Result<(), CodememError> {
92 let conn = self.conn()?;
93 insert_memory_inner(&conn, memory)
94 }
95
96 pub fn get_memory(&self, id: &str) -> Result<Option<MemoryNode>, CodememError> {
98 let conn = self.conn()?;
99
100 let updated = conn
102 .execute(
103 "UPDATE memories SET access_count = access_count + 1, last_accessed_at = ?1 WHERE id = ?2",
104 params![chrono::Utc::now().timestamp(), id],
105 )
106 .storage_err()?;
107
108 if updated == 0 {
109 return Ok(None);
110 }
111
112 let result = conn
113 .query_row(
114 "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at FROM memories WHERE id = ?1",
115 params![id],
116 MemoryRow::from_row,
117 )
118 .optional()
119 .storage_err()?;
120
121 match result {
122 Some(row) => Ok(Some(row.into_memory_node()?)),
123 None => Ok(None),
124 }
125 }
126
127 pub fn get_memory_no_touch(&self, id: &str) -> Result<Option<MemoryNode>, CodememError> {
130 let conn = self.conn()?;
131
132 let result = conn
133 .query_row(
134 "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at FROM memories WHERE id = ?1",
135 params![id],
136 MemoryRow::from_row,
137 )
138 .optional()
139 .storage_err()?;
140
141 match result {
142 Some(row) => Ok(Some(row.into_memory_node()?)),
143 None => Ok(None),
144 }
145 }
146
147 pub fn update_memory(
149 &self,
150 id: &str,
151 content: &str,
152 importance: Option<f64>,
153 ) -> Result<(), CodememError> {
154 let conn = self.conn()?;
155 let hash = Self::content_hash(content);
156 let now = chrono::Utc::now().timestamp();
157
158 let rows_affected = if let Some(imp) = importance {
159 conn.execute(
160 "UPDATE memories SET content = ?1, content_hash = ?2, updated_at = ?3, importance = ?4 WHERE id = ?5",
161 params![content, hash, now, imp, id],
162 )
163 .storage_err()?
164 } else {
165 conn.execute(
166 "UPDATE memories SET content = ?1, content_hash = ?2, updated_at = ?3 WHERE id = ?4",
167 params![content, hash, now, id],
168 )
169 .storage_err()?
170 };
171
172 if rows_affected == 0 {
173 return Err(CodememError::NotFound(format!("Memory not found: {id}")));
174 }
175
176 Ok(())
177 }
178
179 pub fn delete_memory(&self, id: &str) -> Result<bool, CodememError> {
181 let conn = self.conn()?;
182 let rows = conn
183 .execute("DELETE FROM memories WHERE id = ?1", params![id])
184 .storage_err()?;
185 Ok(rows > 0)
186 }
187
188 pub fn delete_memory_cascade(&self, id: &str) -> Result<bool, CodememError> {
191 let mut conn = self.conn()?;
192 let tx = conn
195 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
196 .storage_err()?;
197
198 tx.execute(
200 "DELETE FROM graph_edges WHERE src IN (SELECT id FROM graph_nodes WHERE memory_id = ?1)
201 OR dst IN (SELECT id FROM graph_nodes WHERE memory_id = ?1)",
202 params![id],
203 )
204 .storage_err()?;
205
206 tx.execute("DELETE FROM graph_nodes WHERE memory_id = ?1", params![id])
208 .storage_err()?;
209
210 tx.execute(
212 "DELETE FROM memory_embeddings WHERE memory_id = ?1",
213 params![id],
214 )
215 .storage_err()?;
216
217 let rows = tx
219 .execute("DELETE FROM memories WHERE id = ?1", params![id])
220 .storage_err()?;
221
222 tx.commit().storage_err()?;
223
224 Ok(rows > 0)
225 }
226
227 pub fn delete_memories_batch_cascade(&self, ids: &[&str]) -> Result<usize, CodememError> {
230 if ids.is_empty() {
231 return Ok(0);
232 }
233
234 let mut conn = self.conn()?;
235 let tx = conn
236 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
237 .storage_err()?;
238
239 let placeholders: String = (1..=ids.len())
240 .map(|i| format!("?{i}"))
241 .collect::<Vec<_>>()
242 .join(",");
243 let params: Vec<&dyn rusqlite::types::ToSql> = ids
244 .iter()
245 .map(|id| id as &dyn rusqlite::types::ToSql)
246 .collect();
247
248 let edge_sql = format!(
251 "DELETE FROM graph_edges WHERE \
252 src IN (SELECT id FROM graph_nodes WHERE memory_id IN ({placeholders})) \
253 OR dst IN (SELECT id FROM graph_nodes WHERE memory_id IN ({placeholders})) \
254 OR src IN ({placeholders}) OR dst IN ({placeholders})"
255 );
256 tx.execute(&edge_sql, params.as_slice()).storage_err()?;
257
258 let node_sql = format!(
260 "DELETE FROM graph_nodes WHERE memory_id IN ({placeholders}) OR id IN ({placeholders})"
261 );
262 tx.execute(&node_sql, params.as_slice()).storage_err()?;
263
264 let emb_sql = format!("DELETE FROM memory_embeddings WHERE memory_id IN ({placeholders})");
266 tx.execute(&emb_sql, params.as_slice()).storage_err()?;
267
268 let mem_sql = format!("DELETE FROM memories WHERE id IN ({placeholders})");
270 let deleted = tx.execute(&mem_sql, params.as_slice()).storage_err()?;
271
272 tx.commit().storage_err()?;
273
274 Ok(deleted)
275 }
276
277 pub fn list_memory_ids(&self) -> Result<Vec<String>, CodememError> {
279 self.list_memory_ids_limited(None)
280 }
281
282 pub fn list_memory_ids_limited(
284 &self,
285 limit: Option<usize>,
286 ) -> Result<Vec<String>, CodememError> {
287 let conn = self.conn()?;
288 let (sql, params_vec): (&str, Vec<Box<dyn rusqlite::types::ToSql>>) =
289 if let Some(lim) = limit {
290 (
291 "SELECT id FROM memories ORDER BY created_at DESC LIMIT ?1",
292 vec![Box::new(lim as i64) as Box<dyn rusqlite::types::ToSql>],
293 )
294 } else {
295 ("SELECT id FROM memories ORDER BY created_at DESC", vec![])
296 };
297
298 let mut stmt = conn.prepare(sql).storage_err()?;
299
300 let refs: Vec<&dyn rusqlite::types::ToSql> =
301 params_vec.iter().map(|p| p.as_ref()).collect();
302
303 let ids = stmt
304 .query_map(refs.as_slice(), |row| row.get(0))
305 .storage_err()?
306 .collect::<Result<Vec<String>, _>>()
307 .storage_err()?;
308
309 Ok(ids)
310 }
311
312 pub fn list_memory_ids_for_namespace(
314 &self,
315 namespace: &str,
316 ) -> Result<Vec<String>, CodememError> {
317 let conn = self.conn()?;
318 let mut stmt = conn
319 .prepare("SELECT id FROM memories WHERE namespace = ?1 ORDER BY created_at DESC")
320 .storage_err()?;
321
322 let ids = stmt
323 .query_map(params![namespace], |row| row.get(0))
324 .storage_err()?
325 .collect::<Result<Vec<String>, _>>()
326 .storage_err()?;
327
328 Ok(ids)
329 }
330
331 pub fn list_namespaces(&self) -> Result<Vec<String>, CodememError> {
333 let conn = self.conn()?;
334 let mut stmt = conn
335 .prepare(
336 "SELECT DISTINCT namespace FROM (
337 SELECT namespace FROM memories WHERE namespace IS NOT NULL
338 UNION
339 SELECT namespace FROM graph_nodes WHERE namespace IS NOT NULL
340 ) ORDER BY namespace",
341 )
342 .storage_err()?;
343
344 let namespaces = stmt
345 .query_map([], |row| row.get(0))
346 .storage_err()?
347 .collect::<Result<Vec<String>, _>>()
348 .storage_err()?;
349
350 Ok(namespaces)
351 }
352
353 pub fn memory_count(&self) -> Result<usize, CodememError> {
355 let conn = self.conn()?;
356 let count: i64 = conn
357 .query_row("SELECT COUNT(*) FROM memories", [], |row| row.get(0))
358 .storage_err()?;
359 Ok(count as usize)
360 }
361
362 pub fn list_repos(&self) -> Result<Vec<Repository>, CodememError> {
366 let conn = self.conn()?;
367 let mut stmt = conn
368 .prepare(
369 "SELECT id, path, name, namespace, created_at, last_indexed_at, status FROM repositories ORDER BY created_at DESC",
370 )
371 .storage_err()?;
372
373 let repos = stmt
374 .query_map([], |row| {
375 let created_ts: String = row.get(4)?;
376 let indexed_ts: Option<String> = row.get(5)?;
377 Ok((
378 row.get::<_, String>(0)?,
379 row.get::<_, String>(1)?,
380 row.get::<_, Option<String>>(2)?,
381 row.get::<_, Option<String>>(3)?,
382 created_ts,
383 indexed_ts,
384 row.get::<_, Option<String>>(6)?
385 .unwrap_or_else(|| "idle".to_string()),
386 ))
387 })
388 .storage_err()?
389 .collect::<Result<Vec<_>, _>>()
390 .storage_err()?;
391
392 let mut result = Vec::new();
393 for (id, path, name, namespace, created_at, last_indexed_at, status) in repos {
394 result.push(Repository {
395 id,
396 path,
397 name,
398 namespace,
399 created_at,
400 last_indexed_at,
401 status,
402 });
403 }
404
405 Ok(result)
406 }
407
408 pub fn add_repo(&self, repo: &Repository) -> Result<(), CodememError> {
410 let conn = self.conn()?;
411 conn.execute(
412 "INSERT INTO repositories (id, path, name, namespace, created_at, last_indexed_at, status) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
413 params![
414 repo.id,
415 repo.path,
416 repo.name,
417 repo.namespace,
418 repo.created_at,
419 repo.last_indexed_at,
420 repo.status,
421 ],
422 )
423 .storage_err()?;
424 Ok(())
425 }
426
427 pub fn remove_repo(&self, id: &str) -> Result<bool, CodememError> {
429 let conn = self.conn()?;
430 let rows = conn
431 .execute("DELETE FROM repositories WHERE id = ?1", params![id])
432 .storage_err()?;
433 Ok(rows > 0)
434 }
435
436 pub fn get_repo(&self, id: &str) -> Result<Option<Repository>, CodememError> {
438 let conn = self.conn()?;
439 let result = conn
440 .query_row(
441 "SELECT id, path, name, namespace, created_at, last_indexed_at, status FROM repositories WHERE id = ?1",
442 params![id],
443 |row| {
444 Ok((
445 row.get::<_, String>(0)?,
446 row.get::<_, String>(1)?,
447 row.get::<_, Option<String>>(2)?,
448 row.get::<_, Option<String>>(3)?,
449 row.get::<_, String>(4)?,
450 row.get::<_, Option<String>>(5)?,
451 row.get::<_, Option<String>>(6)?.unwrap_or_else(|| "idle".to_string()),
452 ))
453 },
454 )
455 .optional()
456 .storage_err()?;
457
458 match result {
459 Some((id, path, name, namespace, created_at, last_indexed_at, status)) => {
460 Ok(Some(Repository {
461 id,
462 path,
463 name,
464 namespace,
465 created_at,
466 last_indexed_at,
467 status,
468 }))
469 }
470 None => Ok(None),
471 }
472 }
473
474 pub fn update_repo_status(
476 &self,
477 id: &str,
478 status: &str,
479 indexed_at: Option<&str>,
480 ) -> Result<(), CodememError> {
481 let conn = self.conn()?;
482 if let Some(ts) = indexed_at {
483 conn.execute(
484 "UPDATE repositories SET status = ?1, last_indexed_at = ?2 WHERE id = ?3",
485 params![status, ts, id],
486 )
487 .storage_err()?;
488 } else {
489 conn.execute(
490 "UPDATE repositories SET status = ?1 WHERE id = ?2",
491 params![status, id],
492 )
493 .storage_err()?;
494 }
495 Ok(())
496 }
497}
498
499#[cfg(test)]
500#[path = "tests/memory_tests.rs"]
501mod tests;