1use crate::{MapStorageErr, MemoryRow, Storage};
4use codemem_core::{CodememError, MemoryNode, Repository};
5use rusqlite::{params, OptionalExtension};
6
7fn insert_memory_inner(
11 conn: &rusqlite::Connection,
12 memory: &MemoryNode,
13) -> Result<(), CodememError> {
14 let existing: Option<String> = conn
16 .query_row(
17 "SELECT id FROM memories WHERE content_hash = ?1 AND namespace IS ?2",
18 params![memory.content_hash, memory.namespace],
19 |row| row.get(0),
20 )
21 .optional()
22 .storage_err()?;
23
24 if existing.is_some() {
25 return Err(CodememError::Duplicate(memory.content_hash.clone()));
26 }
27
28 let tags_json = serde_json::to_string(&memory.tags)?;
29 let metadata_json = serde_json::to_string(&memory.metadata)?;
30
31 conn.execute(
32 "INSERT OR IGNORE INTO memories (id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, repo, git_ref, expires_at, created_at, updated_at, last_accessed_at)
33 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)",
34 params![
35 memory.id,
36 memory.content,
37 memory.memory_type.to_string(),
38 memory.importance,
39 memory.confidence,
40 memory.access_count,
41 memory.content_hash,
42 tags_json,
43 metadata_json,
44 memory.namespace,
45 memory.session_id,
46 memory.repo,
47 memory.git_ref,
48 memory.expires_at.map(|dt| dt.timestamp()),
49 memory.created_at.timestamp(),
50 memory.updated_at.timestamp(),
51 memory.last_accessed_at.timestamp(),
52 ],
53 )
54 .storage_err()?;
55
56 Ok(())
57}
58
59impl Storage {
60 pub fn insert_memory(&self, memory: &MemoryNode) -> Result<(), CodememError> {
70 let mut conn = self.conn()?;
74 if self.has_outer_transaction() {
75 drop(conn);
76 return self.insert_memory_no_tx(memory);
77 }
78
79 let tx = conn
80 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
81 .storage_err()?;
82
83 insert_memory_inner(&tx, memory)?;
86
87 tx.commit().storage_err()?;
88
89 Ok(())
90 }
91
92 fn insert_memory_no_tx(&self, memory: &MemoryNode) -> Result<(), CodememError> {
95 let conn = self.conn()?;
96 insert_memory_inner(&conn, memory)
97 }
98
99 pub fn get_memory(&self, id: &str) -> Result<Option<MemoryNode>, CodememError> {
106 let conn = self.conn()?;
107
108 let updated = conn
110 .execute(
111 "UPDATE memories SET access_count = access_count + 1, last_accessed_at = ?1 WHERE id = ?2",
112 params![chrono::Utc::now().timestamp(), id],
113 )
114 .storage_err()?;
115
116 if updated == 0 {
117 return Ok(None);
118 }
119
120 let result = conn
121 .query_row(
122 "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, repo, git_ref, expires_at, created_at, updated_at, last_accessed_at FROM memories WHERE id = ?1",
123 params![id],
124 MemoryRow::from_row,
125 )
126 .optional()
127 .storage_err()?;
128
129 match result {
130 Some(row) => Ok(Some(row.into_memory_node()?)),
131 None => Ok(None),
132 }
133 }
134
135 pub fn get_memory_no_touch(&self, id: &str) -> Result<Option<MemoryNode>, CodememError> {
138 let conn = self.conn()?;
139
140 let result = conn
141 .query_row(
142 "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, repo, git_ref, expires_at, created_at, updated_at, last_accessed_at FROM memories WHERE id = ?1",
143 params![id],
144 MemoryRow::from_row,
145 )
146 .optional()
147 .storage_err()?;
148
149 match result {
150 Some(row) => Ok(Some(row.into_memory_node()?)),
151 None => Ok(None),
152 }
153 }
154
155 pub fn update_memory(
157 &self,
158 id: &str,
159 content: &str,
160 importance: Option<f64>,
161 ) -> Result<(), CodememError> {
162 let conn = self.conn()?;
163 let hash = Self::content_hash(content);
164 let now = chrono::Utc::now().timestamp();
165
166 let rows_affected = if let Some(imp) = importance {
167 conn.execute(
168 "UPDATE memories SET content = ?1, content_hash = ?2, updated_at = ?3, importance = ?4 WHERE id = ?5",
169 params![content, hash, now, imp, id],
170 )
171 .storage_err()?
172 } else {
173 conn.execute(
174 "UPDATE memories SET content = ?1, content_hash = ?2, updated_at = ?3 WHERE id = ?4",
175 params![content, hash, now, id],
176 )
177 .storage_err()?
178 };
179
180 if rows_affected == 0 {
181 return Err(CodememError::NotFound(format!("Memory not found: {id}")));
182 }
183
184 Ok(())
185 }
186
187 pub fn delete_memory(&self, id: &str) -> Result<bool, CodememError> {
189 let conn = self.conn()?;
190 let rows = conn
191 .execute("DELETE FROM memories WHERE id = ?1", params![id])
192 .storage_err()?;
193 Ok(rows > 0)
194 }
195
196 pub fn delete_memory_cascade(&self, id: &str) -> Result<bool, CodememError> {
199 let mut conn = self.conn()?;
200 let tx = conn
203 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
204 .storage_err()?;
205
206 tx.execute(
208 "DELETE FROM graph_edges WHERE src IN (SELECT id FROM graph_nodes WHERE memory_id = ?1)
209 OR dst IN (SELECT id FROM graph_nodes WHERE memory_id = ?1)",
210 params![id],
211 )
212 .storage_err()?;
213
214 tx.execute("DELETE FROM graph_nodes WHERE memory_id = ?1", params![id])
216 .storage_err()?;
217
218 tx.execute(
220 "DELETE FROM memory_embeddings WHERE memory_id = ?1",
221 params![id],
222 )
223 .storage_err()?;
224
225 let rows = tx
227 .execute("DELETE FROM memories WHERE id = ?1", params![id])
228 .storage_err()?;
229
230 tx.commit().storage_err()?;
231
232 Ok(rows > 0)
233 }
234
235 pub fn delete_memories_batch_cascade(&self, ids: &[&str]) -> Result<usize, CodememError> {
238 if ids.is_empty() {
239 return Ok(0);
240 }
241
242 let mut conn = self.conn()?;
243 let tx = conn
244 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
245 .storage_err()?;
246
247 let placeholders: String = (1..=ids.len())
248 .map(|i| format!("?{i}"))
249 .collect::<Vec<_>>()
250 .join(",");
251 let params: Vec<&dyn rusqlite::types::ToSql> = ids
252 .iter()
253 .map(|id| id as &dyn rusqlite::types::ToSql)
254 .collect();
255
256 let edge_sql = format!(
259 "DELETE FROM graph_edges WHERE \
260 src IN (SELECT id FROM graph_nodes WHERE memory_id IN ({placeholders})) \
261 OR dst IN (SELECT id FROM graph_nodes WHERE memory_id IN ({placeholders})) \
262 OR src IN ({placeholders}) OR dst IN ({placeholders})"
263 );
264 tx.execute(&edge_sql, params.as_slice()).storage_err()?;
265
266 let node_sql = format!(
268 "DELETE FROM graph_nodes WHERE memory_id IN ({placeholders}) OR id IN ({placeholders})"
269 );
270 tx.execute(&node_sql, params.as_slice()).storage_err()?;
271
272 let emb_sql = format!("DELETE FROM memory_embeddings WHERE memory_id IN ({placeholders})");
274 tx.execute(&emb_sql, params.as_slice()).storage_err()?;
275
276 let mem_sql = format!("DELETE FROM memories WHERE id IN ({placeholders})");
278 let deleted = tx.execute(&mem_sql, params.as_slice()).storage_err()?;
279
280 tx.commit().storage_err()?;
281
282 Ok(deleted)
283 }
284
285 pub fn list_memory_ids(&self) -> Result<Vec<String>, CodememError> {
287 self.list_memory_ids_limited(None)
288 }
289
290 pub fn list_memory_ids_limited(
292 &self,
293 limit: Option<usize>,
294 ) -> Result<Vec<String>, CodememError> {
295 let conn = self.conn()?;
296 let (sql, params_vec): (&str, Vec<Box<dyn rusqlite::types::ToSql>>) =
297 if let Some(lim) = limit {
298 (
299 "SELECT id FROM memories ORDER BY created_at DESC LIMIT ?1",
300 vec![Box::new(lim as i64) as Box<dyn rusqlite::types::ToSql>],
301 )
302 } else {
303 ("SELECT id FROM memories ORDER BY created_at DESC", vec![])
304 };
305
306 let mut stmt = conn.prepare(sql).storage_err()?;
307
308 let refs: Vec<&dyn rusqlite::types::ToSql> =
309 params_vec.iter().map(|p| p.as_ref()).collect();
310
311 let ids = stmt
312 .query_map(refs.as_slice(), |row| row.get(0))
313 .storage_err()?
314 .collect::<Result<Vec<String>, _>>()
315 .storage_err()?;
316
317 Ok(ids)
318 }
319
320 pub fn list_memory_ids_for_namespace(
322 &self,
323 namespace: &str,
324 ) -> Result<Vec<String>, CodememError> {
325 let conn = self.conn()?;
326 let mut stmt = conn
327 .prepare("SELECT id FROM memories WHERE namespace = ?1 ORDER BY created_at DESC")
328 .storage_err()?;
329
330 let ids = stmt
331 .query_map(params![namespace], |row| row.get(0))
332 .storage_err()?
333 .collect::<Result<Vec<String>, _>>()
334 .storage_err()?;
335
336 Ok(ids)
337 }
338
339 pub fn list_namespaces(&self) -> Result<Vec<String>, CodememError> {
341 let conn = self.conn()?;
342 let mut stmt = conn
343 .prepare(
344 "SELECT DISTINCT namespace FROM (
345 SELECT namespace FROM memories WHERE namespace IS NOT NULL
346 UNION
347 SELECT namespace FROM graph_nodes WHERE namespace IS NOT NULL
348 ) ORDER BY namespace",
349 )
350 .storage_err()?;
351
352 let namespaces = stmt
353 .query_map([], |row| row.get(0))
354 .storage_err()?
355 .collect::<Result<Vec<String>, _>>()
356 .storage_err()?;
357
358 Ok(namespaces)
359 }
360
361 pub fn memory_count(&self) -> Result<usize, CodememError> {
363 let conn = self.conn()?;
364 let count: i64 = conn
365 .query_row("SELECT COUNT(*) FROM memories", [], |row| row.get(0))
366 .storage_err()?;
367 Ok(count as usize)
368 }
369
370 pub fn list_repos(&self) -> Result<Vec<Repository>, CodememError> {
374 let conn = self.conn()?;
375 let mut stmt = conn
376 .prepare(
377 "SELECT id, path, name, namespace, created_at, last_indexed_at, status FROM repositories ORDER BY created_at DESC",
378 )
379 .storage_err()?;
380
381 let repos = stmt
382 .query_map([], |row| {
383 let created_ts: String = row.get(4)?;
384 let indexed_ts: Option<String> = row.get(5)?;
385 Ok((
386 row.get::<_, String>(0)?,
387 row.get::<_, String>(1)?,
388 row.get::<_, Option<String>>(2)?,
389 row.get::<_, Option<String>>(3)?,
390 created_ts,
391 indexed_ts,
392 row.get::<_, Option<String>>(6)?
393 .unwrap_or_else(|| "idle".to_string()),
394 ))
395 })
396 .storage_err()?
397 .collect::<Result<Vec<_>, _>>()
398 .storage_err()?;
399
400 let mut result = Vec::new();
401 for (id, path, name, namespace, created_at, last_indexed_at, status) in repos {
402 result.push(Repository {
403 id,
404 path,
405 name,
406 namespace,
407 created_at,
408 last_indexed_at,
409 status,
410 });
411 }
412
413 Ok(result)
414 }
415
416 pub fn add_repo(&self, repo: &Repository) -> Result<(), CodememError> {
418 let conn = self.conn()?;
419 conn.execute(
420 "INSERT INTO repositories (id, path, name, namespace, created_at, last_indexed_at, status) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
421 params![
422 repo.id,
423 repo.path,
424 repo.name,
425 repo.namespace,
426 repo.created_at,
427 repo.last_indexed_at,
428 repo.status,
429 ],
430 )
431 .storage_err()?;
432 Ok(())
433 }
434
435 pub fn remove_repo(&self, id: &str) -> Result<bool, CodememError> {
437 let conn = self.conn()?;
438 let rows = conn
439 .execute("DELETE FROM repositories WHERE id = ?1", params![id])
440 .storage_err()?;
441 Ok(rows > 0)
442 }
443
444 pub fn get_repo(&self, id: &str) -> Result<Option<Repository>, CodememError> {
446 let conn = self.conn()?;
447 let result = conn
448 .query_row(
449 "SELECT id, path, name, namespace, created_at, last_indexed_at, status FROM repositories WHERE id = ?1",
450 params![id],
451 |row| {
452 Ok((
453 row.get::<_, String>(0)?,
454 row.get::<_, String>(1)?,
455 row.get::<_, Option<String>>(2)?,
456 row.get::<_, Option<String>>(3)?,
457 row.get::<_, String>(4)?,
458 row.get::<_, Option<String>>(5)?,
459 row.get::<_, Option<String>>(6)?.unwrap_or_else(|| "idle".to_string()),
460 ))
461 },
462 )
463 .optional()
464 .storage_err()?;
465
466 match result {
467 Some((id, path, name, namespace, created_at, last_indexed_at, status)) => {
468 Ok(Some(Repository {
469 id,
470 path,
471 name,
472 namespace,
473 created_at,
474 last_indexed_at,
475 status,
476 }))
477 }
478 None => Ok(None),
479 }
480 }
481
482 pub fn update_repo_status(
484 &self,
485 id: &str,
486 status: &str,
487 indexed_at: Option<&str>,
488 ) -> Result<(), CodememError> {
489 let conn = self.conn()?;
490 if let Some(ts) = indexed_at {
491 conn.execute(
492 "UPDATE repositories SET status = ?1, last_indexed_at = ?2 WHERE id = ?3",
493 params![status, ts, id],
494 )
495 .storage_err()?;
496 } else {
497 conn.execute(
498 "UPDATE repositories SET status = ?1 WHERE id = ?2",
499 params![status, id],
500 )
501 .storage_err()?;
502 }
503 Ok(())
504 }
505
506 pub fn delete_expired_memories(&self) -> Result<usize, CodememError> {
509 let conn = self.conn()?;
510 let now = chrono::Utc::now().timestamp();
511
512 let mut stmt = conn
515 .prepare("SELECT id FROM memories WHERE expires_at IS NOT NULL AND expires_at <= ?1")
516 .storage_err()?;
517 let expired_ids: Vec<String> = stmt
518 .query_map(params![now], |row| row.get(0))
519 .storage_err()?
520 .collect::<Result<Vec<String>, _>>()
521 .storage_err()?;
522
523 if expired_ids.is_empty() {
524 return Ok(0);
525 }
526
527 let mut total_deleted = 0usize;
529 for chunk in expired_ids.chunks(999) {
530 let placeholders: String = (1..=chunk.len())
531 .map(|i| format!("?{i}"))
532 .collect::<Vec<_>>()
533 .join(",");
534 let params: Vec<&dyn rusqlite::types::ToSql> = chunk
535 .iter()
536 .map(|id| id as &dyn rusqlite::types::ToSql)
537 .collect();
538
539 let emb_sql =
540 format!("DELETE FROM memory_embeddings WHERE memory_id IN ({placeholders})");
541 conn.execute(&emb_sql, params.as_slice()).storage_err()?;
542
543 let mem_sql = format!("DELETE FROM memories WHERE id IN ({placeholders})");
544 total_deleted += conn.execute(&mem_sql, params.as_slice()).storage_err()?;
545 }
546
547 Ok(total_deleted)
548 }
549
550 pub fn expire_memories_for_file(&self, file_path: &str) -> Result<usize, CodememError> {
556 let conn = self.conn()?;
557 let now = chrono::Utc::now().timestamp();
558 let expired = conn
559 .execute(
560 "UPDATE memories SET expires_at = ?1
561 WHERE expires_at IS NULL
562 AND id IN (SELECT m2.id FROM memories m2, json_each(m2.tags) jt
563 WHERE jt.value = 'static-analysis')
564 AND (
565 id IN (
566 SELECT gn.memory_id FROM graph_nodes gn
567 WHERE gn.memory_id IS NOT NULL
568 AND json_extract(gn.payload, '$.file_path') = ?2
569 )
570 OR id IN (
571 SELECT REPLACE(ge.dst, 'mem:', '')
572 FROM graph_edges ge
573 JOIN graph_nodes gn ON ge.src = gn.id
574 WHERE ge.relationship = 'RELATES_TO'
575 AND ge.dst LIKE 'mem:%'
576 AND json_extract(gn.payload, '$.file_path') = ?2
577 )
578 )",
579 params![now, file_path],
580 )
581 .storage_err()?;
582 Ok(expired)
583 }
584}
585
586#[cfg(test)]
587#[path = "tests/memory_tests.rs"]
588mod tests;