1use crate::{MemoryRow, Storage};
4use codemem_core::{CodememError, MemoryNode, Repository};
5use rusqlite::{params, OptionalExtension};
6
7impl Storage {
8 pub fn insert_memory(&self, memory: &MemoryNode) -> Result<(), CodememError> {
18 let mut conn = self.conn()?;
22 if self.has_outer_transaction() {
23 drop(conn);
24 return self.insert_memory_no_tx(memory);
25 }
26
27 let tx = conn
28 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
29 .map_err(|e| CodememError::Storage(e.to_string()))?;
30
31 let existing: Option<String> = tx
33 .query_row(
34 "SELECT id FROM memories WHERE content_hash = ?1 AND namespace IS ?2",
35 params![memory.content_hash, memory.namespace],
36 |row| row.get(0),
37 )
38 .optional()
39 .map_err(|e| CodememError::Storage(e.to_string()))?;
40
41 if existing.is_some() {
42 tx.rollback()
43 .map_err(|e| CodememError::Storage(e.to_string()))?;
44 return Err(CodememError::Duplicate(memory.content_hash.clone()));
45 }
46
47 let tags_json = serde_json::to_string(&memory.tags)?;
48 let metadata_json = serde_json::to_string(&memory.metadata)?;
49
50 tx.execute(
51 "INSERT OR IGNORE INTO memories (id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at)
52 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
53 params![
54 memory.id,
55 memory.content,
56 memory.memory_type.to_string(),
57 memory.importance,
58 memory.confidence,
59 memory.access_count,
60 memory.content_hash,
61 tags_json,
62 metadata_json,
63 memory.namespace,
64 memory.session_id,
65 memory.created_at.timestamp(),
66 memory.updated_at.timestamp(),
67 memory.last_accessed_at.timestamp(),
68 ],
69 )
70 .map_err(|e| CodememError::Storage(e.to_string()))?;
71
72 tx.commit()
73 .map_err(|e| CodememError::Storage(e.to_string()))?;
74
75 Ok(())
76 }
77
78 fn insert_memory_no_tx(&self, memory: &MemoryNode) -> Result<(), CodememError> {
81 let conn = self.conn()?;
82
83 let existing: Option<String> = conn
85 .query_row(
86 "SELECT id FROM memories WHERE content_hash = ?1 AND namespace IS ?2",
87 params![memory.content_hash, memory.namespace],
88 |row| row.get(0),
89 )
90 .optional()
91 .map_err(|e| CodememError::Storage(e.to_string()))?;
92
93 if existing.is_some() {
94 return Err(CodememError::Duplicate(memory.content_hash.clone()));
95 }
96
97 let tags_json = serde_json::to_string(&memory.tags)?;
98 let metadata_json = serde_json::to_string(&memory.metadata)?;
99
100 conn.execute(
101 "INSERT OR IGNORE INTO memories (id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at)
102 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
103 params![
104 memory.id,
105 memory.content,
106 memory.memory_type.to_string(),
107 memory.importance,
108 memory.confidence,
109 memory.access_count,
110 memory.content_hash,
111 tags_json,
112 metadata_json,
113 memory.namespace,
114 memory.session_id,
115 memory.created_at.timestamp(),
116 memory.updated_at.timestamp(),
117 memory.last_accessed_at.timestamp(),
118 ],
119 )
120 .map_err(|e| CodememError::Storage(e.to_string()))?;
121
122 Ok(())
123 }
124
125 pub fn get_memory(&self, id: &str) -> Result<Option<MemoryNode>, CodememError> {
127 let conn = self.conn()?;
128
129 let updated = conn
131 .execute(
132 "UPDATE memories SET access_count = access_count + 1, last_accessed_at = ?1 WHERE id = ?2",
133 params![chrono::Utc::now().timestamp(), id],
134 )
135 .map_err(|e| CodememError::Storage(e.to_string()))?;
136
137 if updated == 0 {
138 return Ok(None);
139 }
140
141 let result = conn
142 .query_row(
143 "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at FROM memories WHERE id = ?1",
144 params![id],
145 |row| {
146 Ok(MemoryRow {
147 id: row.get(0)?,
148 content: row.get(1)?,
149 memory_type: row.get(2)?,
150 importance: row.get(3)?,
151 confidence: row.get(4)?,
152 access_count: row.get(5)?,
153 content_hash: row.get(6)?,
154 tags: row.get(7)?,
155 metadata: row.get(8)?,
156 namespace: row.get(9)?,
157 session_id: row.get(10)?,
158 created_at: row.get(11)?,
159 updated_at: row.get(12)?,
160 last_accessed_at: row.get(13)?,
161 })
162 },
163 )
164 .optional()
165 .map_err(|e| CodememError::Storage(e.to_string()))?;
166
167 match result {
168 Some(row) => Ok(Some(row.into_memory_node()?)),
169 None => Ok(None),
170 }
171 }
172
173 pub fn get_memory_no_touch(&self, id: &str) -> Result<Option<MemoryNode>, CodememError> {
176 let conn = self.conn()?;
177
178 let result = conn
179 .query_row(
180 "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at FROM memories WHERE id = ?1",
181 params![id],
182 |row| {
183 Ok(MemoryRow {
184 id: row.get(0)?,
185 content: row.get(1)?,
186 memory_type: row.get(2)?,
187 importance: row.get(3)?,
188 confidence: row.get(4)?,
189 access_count: row.get(5)?,
190 content_hash: row.get(6)?,
191 tags: row.get(7)?,
192 metadata: row.get(8)?,
193 namespace: row.get(9)?,
194 session_id: row.get(10)?,
195 created_at: row.get(11)?,
196 updated_at: row.get(12)?,
197 last_accessed_at: row.get(13)?,
198 })
199 },
200 )
201 .optional()
202 .map_err(|e| CodememError::Storage(e.to_string()))?;
203
204 match result {
205 Some(row) => Ok(Some(row.into_memory_node()?)),
206 None => Ok(None),
207 }
208 }
209
210 pub fn update_memory(
212 &self,
213 id: &str,
214 content: &str,
215 importance: Option<f64>,
216 ) -> Result<(), CodememError> {
217 let conn = self.conn()?;
218 let hash = Self::content_hash(content);
219 let now = chrono::Utc::now().timestamp();
220
221 let rows_affected = if let Some(imp) = importance {
222 conn.execute(
223 "UPDATE memories SET content = ?1, content_hash = ?2, updated_at = ?3, importance = ?4 WHERE id = ?5",
224 params![content, hash, now, imp, id],
225 )
226 .map_err(|e| CodememError::Storage(e.to_string()))?
227 } else {
228 conn.execute(
229 "UPDATE memories SET content = ?1, content_hash = ?2, updated_at = ?3 WHERE id = ?4",
230 params![content, hash, now, id],
231 )
232 .map_err(|e| CodememError::Storage(e.to_string()))?
233 };
234
235 if rows_affected == 0 {
236 return Err(CodememError::NotFound(format!("Memory not found: {id}")));
237 }
238
239 Ok(())
240 }
241
242 pub fn delete_memory(&self, id: &str) -> Result<bool, CodememError> {
244 let conn = self.conn()?;
245 let rows = conn
246 .execute("DELETE FROM memories WHERE id = ?1", params![id])
247 .map_err(|e| CodememError::Storage(e.to_string()))?;
248 Ok(rows > 0)
249 }
250
251 pub fn delete_memory_cascade(&self, id: &str) -> Result<bool, CodememError> {
254 let mut conn = self.conn()?;
255 let tx = conn
258 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
259 .map_err(|e| CodememError::Storage(e.to_string()))?;
260
261 tx.execute(
263 "DELETE FROM graph_edges WHERE src IN (SELECT id FROM graph_nodes WHERE memory_id = ?1)
264 OR dst IN (SELECT id FROM graph_nodes WHERE memory_id = ?1)",
265 params![id],
266 )
267 .map_err(|e| CodememError::Storage(e.to_string()))?;
268
269 tx.execute("DELETE FROM graph_nodes WHERE memory_id = ?1", params![id])
271 .map_err(|e| CodememError::Storage(e.to_string()))?;
272
273 tx.execute(
275 "DELETE FROM memory_embeddings WHERE memory_id = ?1",
276 params![id],
277 )
278 .map_err(|e| CodememError::Storage(e.to_string()))?;
279
280 let rows = tx
282 .execute("DELETE FROM memories WHERE id = ?1", params![id])
283 .map_err(|e| CodememError::Storage(e.to_string()))?;
284
285 tx.commit()
286 .map_err(|e| CodememError::Storage(e.to_string()))?;
287
288 Ok(rows > 0)
289 }
290
291 pub fn delete_memories_batch_cascade(&self, ids: &[&str]) -> Result<usize, CodememError> {
294 if ids.is_empty() {
295 return Ok(0);
296 }
297
298 let mut conn = self.conn()?;
299 let tx = conn
300 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
301 .map_err(|e| CodememError::Storage(e.to_string()))?;
302
303 let placeholders: String = (1..=ids.len())
304 .map(|i| format!("?{i}"))
305 .collect::<Vec<_>>()
306 .join(",");
307 let params: Vec<&dyn rusqlite::types::ToSql> = ids
308 .iter()
309 .map(|id| id as &dyn rusqlite::types::ToSql)
310 .collect();
311
312 let edge_sql = format!(
315 "DELETE FROM graph_edges WHERE \
316 src IN (SELECT id FROM graph_nodes WHERE memory_id IN ({placeholders})) \
317 OR dst IN (SELECT id FROM graph_nodes WHERE memory_id IN ({placeholders})) \
318 OR src IN ({placeholders}) OR dst IN ({placeholders})"
319 );
320 tx.execute(&edge_sql, params.as_slice())
321 .map_err(|e| CodememError::Storage(e.to_string()))?;
322
323 let node_sql = format!(
325 "DELETE FROM graph_nodes WHERE memory_id IN ({placeholders}) OR id IN ({placeholders})"
326 );
327 tx.execute(&node_sql, params.as_slice())
328 .map_err(|e| CodememError::Storage(e.to_string()))?;
329
330 let emb_sql = format!("DELETE FROM memory_embeddings WHERE memory_id IN ({placeholders})");
332 tx.execute(&emb_sql, params.as_slice())
333 .map_err(|e| CodememError::Storage(e.to_string()))?;
334
335 let mem_sql = format!("DELETE FROM memories WHERE id IN ({placeholders})");
337 let deleted = tx
338 .execute(&mem_sql, params.as_slice())
339 .map_err(|e| CodememError::Storage(e.to_string()))?;
340
341 tx.commit()
342 .map_err(|e| CodememError::Storage(e.to_string()))?;
343
344 Ok(deleted)
345 }
346
347 pub fn list_memory_ids(&self) -> Result<Vec<String>, CodememError> {
349 self.list_memory_ids_limited(None)
350 }
351
352 pub fn list_memory_ids_limited(
354 &self,
355 limit: Option<usize>,
356 ) -> Result<Vec<String>, CodememError> {
357 let conn = self.conn()?;
358 let (sql, params_vec): (&str, Vec<Box<dyn rusqlite::types::ToSql>>) =
359 if let Some(lim) = limit {
360 (
361 "SELECT id FROM memories ORDER BY created_at DESC LIMIT ?1",
362 vec![Box::new(lim as i64) as Box<dyn rusqlite::types::ToSql>],
363 )
364 } else {
365 ("SELECT id FROM memories ORDER BY created_at DESC", vec![])
366 };
367
368 let mut stmt = conn
369 .prepare(sql)
370 .map_err(|e| CodememError::Storage(e.to_string()))?;
371
372 let refs: Vec<&dyn rusqlite::types::ToSql> =
373 params_vec.iter().map(|p| p.as_ref()).collect();
374
375 let ids = stmt
376 .query_map(refs.as_slice(), |row| row.get(0))
377 .map_err(|e| CodememError::Storage(e.to_string()))?
378 .collect::<Result<Vec<String>, _>>()
379 .map_err(|e| CodememError::Storage(e.to_string()))?;
380
381 Ok(ids)
382 }
383
384 pub fn list_memory_ids_for_namespace(
386 &self,
387 namespace: &str,
388 ) -> Result<Vec<String>, CodememError> {
389 let conn = self.conn()?;
390 let mut stmt = conn
391 .prepare("SELECT id FROM memories WHERE namespace = ?1 ORDER BY created_at DESC")
392 .map_err(|e| CodememError::Storage(e.to_string()))?;
393
394 let ids = stmt
395 .query_map(params![namespace], |row| row.get(0))
396 .map_err(|e| CodememError::Storage(e.to_string()))?
397 .collect::<Result<Vec<String>, _>>()
398 .map_err(|e| CodememError::Storage(e.to_string()))?;
399
400 Ok(ids)
401 }
402
403 pub fn list_namespaces(&self) -> Result<Vec<String>, CodememError> {
405 let conn = self.conn()?;
406 let mut stmt = conn
407 .prepare(
408 "SELECT DISTINCT namespace FROM (
409 SELECT namespace FROM memories WHERE namespace IS NOT NULL
410 UNION
411 SELECT namespace FROM graph_nodes WHERE namespace IS NOT NULL
412 ) ORDER BY namespace",
413 )
414 .map_err(|e| CodememError::Storage(e.to_string()))?;
415
416 let namespaces = stmt
417 .query_map([], |row| row.get(0))
418 .map_err(|e| CodememError::Storage(e.to_string()))?
419 .collect::<Result<Vec<String>, _>>()
420 .map_err(|e| CodememError::Storage(e.to_string()))?;
421
422 Ok(namespaces)
423 }
424
425 pub fn memory_count(&self) -> Result<usize, CodememError> {
427 let conn = self.conn()?;
428 let count: i64 = conn
429 .query_row("SELECT COUNT(*) FROM memories", [], |row| row.get(0))
430 .map_err(|e| CodememError::Storage(e.to_string()))?;
431 Ok(count as usize)
432 }
433
434 pub fn list_repos(&self) -> Result<Vec<Repository>, CodememError> {
438 let conn = self.conn()?;
439 let mut stmt = conn
440 .prepare(
441 "SELECT id, path, name, namespace, created_at, last_indexed_at, status FROM repositories ORDER BY created_at DESC",
442 )
443 .map_err(|e| CodememError::Storage(e.to_string()))?;
444
445 let repos = stmt
446 .query_map([], |row| {
447 let created_ts: String = row.get(4)?;
448 let indexed_ts: Option<String> = row.get(5)?;
449 Ok((
450 row.get::<_, String>(0)?,
451 row.get::<_, String>(1)?,
452 row.get::<_, Option<String>>(2)?,
453 row.get::<_, Option<String>>(3)?,
454 created_ts,
455 indexed_ts,
456 row.get::<_, Option<String>>(6)?
457 .unwrap_or_else(|| "idle".to_string()),
458 ))
459 })
460 .map_err(|e| CodememError::Storage(e.to_string()))?
461 .collect::<Result<Vec<_>, _>>()
462 .map_err(|e| CodememError::Storage(e.to_string()))?;
463
464 let mut result = Vec::new();
465 for (id, path, name, namespace, created_at, last_indexed_at, status) in repos {
466 result.push(Repository {
467 id,
468 path,
469 name,
470 namespace,
471 created_at,
472 last_indexed_at,
473 status,
474 });
475 }
476
477 Ok(result)
478 }
479
480 pub fn add_repo(&self, repo: &Repository) -> Result<(), CodememError> {
482 let conn = self.conn()?;
483 conn.execute(
484 "INSERT INTO repositories (id, path, name, namespace, created_at, last_indexed_at, status) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
485 params![
486 repo.id,
487 repo.path,
488 repo.name,
489 repo.namespace,
490 repo.created_at,
491 repo.last_indexed_at,
492 repo.status,
493 ],
494 )
495 .map_err(|e| CodememError::Storage(e.to_string()))?;
496 Ok(())
497 }
498
499 pub fn remove_repo(&self, id: &str) -> Result<bool, CodememError> {
501 let conn = self.conn()?;
502 let rows = conn
503 .execute("DELETE FROM repositories WHERE id = ?1", params![id])
504 .map_err(|e| CodememError::Storage(e.to_string()))?;
505 Ok(rows > 0)
506 }
507
508 pub fn get_repo(&self, id: &str) -> Result<Option<Repository>, CodememError> {
510 let conn = self.conn()?;
511 let result = conn
512 .query_row(
513 "SELECT id, path, name, namespace, created_at, last_indexed_at, status FROM repositories WHERE id = ?1",
514 params![id],
515 |row| {
516 Ok((
517 row.get::<_, String>(0)?,
518 row.get::<_, String>(1)?,
519 row.get::<_, Option<String>>(2)?,
520 row.get::<_, Option<String>>(3)?,
521 row.get::<_, String>(4)?,
522 row.get::<_, Option<String>>(5)?,
523 row.get::<_, Option<String>>(6)?.unwrap_or_else(|| "idle".to_string()),
524 ))
525 },
526 )
527 .optional()
528 .map_err(|e| CodememError::Storage(e.to_string()))?;
529
530 match result {
531 Some((id, path, name, namespace, created_at, last_indexed_at, status)) => {
532 Ok(Some(Repository {
533 id,
534 path,
535 name,
536 namespace,
537 created_at,
538 last_indexed_at,
539 status,
540 }))
541 }
542 None => Ok(None),
543 }
544 }
545
546 pub fn update_repo_status(
548 &self,
549 id: &str,
550 status: &str,
551 indexed_at: Option<&str>,
552 ) -> Result<(), CodememError> {
553 let conn = self.conn()?;
554 if let Some(ts) = indexed_at {
555 conn.execute(
556 "UPDATE repositories SET status = ?1, last_indexed_at = ?2 WHERE id = ?3",
557 params![status, ts, id],
558 )
559 .map_err(|e| CodememError::Storage(e.to_string()))?;
560 } else {
561 conn.execute(
562 "UPDATE repositories SET status = ?1 WHERE id = ?2",
563 params![status, id],
564 )
565 .map_err(|e| CodememError::Storage(e.to_string()))?;
566 }
567 Ok(())
568 }
569}
570
571#[cfg(test)]
572#[path = "tests/memory_tests.rs"]
573mod tests;