1use chrono::{DateTime, Utc};
4use rusqlite::{params, Connection, OptionalExtension, Row};
5use serde::{Deserialize, Serialize};
6use sha2::{Digest, Sha256};
7use std::collections::HashMap;
8
9use crate::error::{EngramError, Result};
10use crate::types::*;
11
12pub fn memory_from_row(row: &Row) -> rusqlite::Result<Memory> {
14 let id: i64 = row.get("id")?;
15 let content: String = row.get("content")?;
16 let memory_type_str: String = row.get("memory_type")?;
17 let importance: f32 = row.get("importance")?;
18 let access_count: i32 = row.get("access_count")?;
19 let created_at: String = row.get("created_at")?;
20 let updated_at: String = row.get("updated_at")?;
21 let last_accessed_at: Option<String> = row.get("last_accessed_at")?;
22 let owner_id: Option<String> = row.get("owner_id")?;
23 let visibility_str: String = row.get("visibility")?;
24 let version: i32 = row.get("version")?;
25 let has_embedding: i32 = row.get("has_embedding")?;
26 let metadata_str: String = row.get("metadata")?;
27
28 let scope_type: String = row
30 .get("scope_type")
31 .unwrap_or_else(|_| "global".to_string());
32 let scope_id: Option<String> = row.get("scope_id").unwrap_or(None);
33
34 let expires_at: Option<String> = row.get("expires_at").unwrap_or(None);
36
37 let content_hash: Option<String> = row.get("content_hash").unwrap_or(None);
39
40 let memory_type = memory_type_str.parse().unwrap_or(MemoryType::Note);
41 let visibility = match visibility_str.as_str() {
42 "shared" => Visibility::Shared,
43 "public" => Visibility::Public,
44 _ => Visibility::Private,
45 };
46
47 let scope = match (scope_type.as_str(), scope_id) {
49 ("user", Some(id)) => MemoryScope::User { user_id: id },
50 ("session", Some(id)) => MemoryScope::Session { session_id: id },
51 ("agent", Some(id)) => MemoryScope::Agent { agent_id: id },
52 _ => MemoryScope::Global,
53 };
54
55 let metadata: HashMap<String, serde_json::Value> =
56 serde_json::from_str(&metadata_str).unwrap_or_default();
57
58 let workspace: String = row
60 .get("workspace")
61 .unwrap_or_else(|_| "default".to_string());
62
63 let tier_str: String = row.get("tier").unwrap_or_else(|_| "permanent".to_string());
65 let tier = tier_str.parse().unwrap_or_default();
66
67 let event_time: Option<String> = row.get("event_time").unwrap_or(None);
68 let event_duration_seconds: Option<i64> = row.get("event_duration_seconds").unwrap_or(None);
69 let trigger_pattern: Option<String> = row.get("trigger_pattern").unwrap_or(None);
70 let procedure_success_count: i32 = row.get("procedure_success_count").unwrap_or(0);
71 let procedure_failure_count: i32 = row.get("procedure_failure_count").unwrap_or(0);
72 let summary_of_id: Option<i64> = row.get("summary_of_id").unwrap_or(None);
73 let lifecycle_state_str: Option<String> = row.get("lifecycle_state").unwrap_or(None);
74
75 let lifecycle_state = lifecycle_state_str
76 .and_then(|s| s.parse().ok())
77 .unwrap_or(crate::types::LifecycleState::Active);
78
79 Ok(Memory {
80 id,
81 content,
82 memory_type,
83 tags: vec![], metadata,
85 importance,
86 access_count,
87 created_at: DateTime::parse_from_rfc3339(&created_at)
88 .map(|dt| dt.with_timezone(&Utc))
89 .unwrap_or_else(|_| Utc::now()),
90 updated_at: DateTime::parse_from_rfc3339(&updated_at)
91 .map(|dt| dt.with_timezone(&Utc))
92 .unwrap_or_else(|_| Utc::now()),
93 last_accessed_at: last_accessed_at.and_then(|s| {
94 DateTime::parse_from_rfc3339(&s)
95 .map(|dt| dt.with_timezone(&Utc))
96 .ok()
97 }),
98 owner_id,
99 visibility,
100 scope,
101 workspace,
102 tier,
103 version,
104 has_embedding: has_embedding != 0,
105 expires_at: expires_at.and_then(|s| {
106 DateTime::parse_from_rfc3339(&s)
107 .map(|dt| dt.with_timezone(&Utc))
108 .ok()
109 }),
110 content_hash,
111 event_time: event_time.and_then(|s| {
112 DateTime::parse_from_rfc3339(&s)
113 .map(|dt| dt.with_timezone(&Utc))
114 .ok()
115 }),
116 event_duration_seconds,
117 trigger_pattern,
118 procedure_success_count,
119 procedure_failure_count,
120 summary_of_id,
121 lifecycle_state,
122 })
123}
124
125pub(crate) fn metadata_value_to_param(
126 key: &str,
127 value: &serde_json::Value,
128 conditions: &mut Vec<String>,
129 params: &mut Vec<Box<dyn rusqlite::ToSql>>,
130) -> Result<()> {
131 match value {
132 serde_json::Value::String(s) => {
133 conditions.push(format!("json_extract(m.metadata, '$.{}') = ?", key));
134 params.push(Box::new(s.clone()));
135 }
136 serde_json::Value::Number(n) => {
137 conditions.push(format!("json_extract(m.metadata, '$.{}') = ?", key));
138 if let Some(i) = n.as_i64() {
139 params.push(Box::new(i));
140 } else if let Some(f) = n.as_f64() {
141 params.push(Box::new(f));
142 } else {
143 return Err(EngramError::InvalidInput("Invalid number".to_string()));
144 }
145 }
146 serde_json::Value::Bool(b) => {
147 conditions.push(format!("json_extract(m.metadata, '$.{}') = ?", key));
148 params.push(Box::new(*b));
149 }
150 serde_json::Value::Null => {
151 conditions.push(format!("json_extract(m.metadata, '$.{}') IS NULL", key));
152 }
153 _ => {
154 return Err(EngramError::InvalidInput(format!(
155 "Unsupported metadata filter value for key: {}",
156 key
157 )));
158 }
159 }
160
161 Ok(())
162}
163
164fn get_memory_internal(conn: &Connection, id: i64, track_access: bool) -> Result<Memory> {
165 let now = Utc::now().to_rfc3339();
166
167 let mut stmt = conn.prepare_cached(
168 "SELECT id, content, memory_type, importance, access_count,
169 created_at, updated_at, last_accessed_at, owner_id,
170 visibility, version, has_embedding, metadata,
171 scope_type, scope_id, workspace, tier, expires_at, content_hash
172 FROM memories
173 WHERE id = ? AND valid_to IS NULL
174 AND (expires_at IS NULL OR expires_at > ?)",
175 )?;
176
177 let mut memory = stmt
178 .query_row(params![id, now], memory_from_row)
179 .map_err(|_| EngramError::NotFound(id))?;
180
181 memory.tags = load_tags(conn, id)?;
182
183 if track_access {
184 let now = Utc::now().to_rfc3339();
186 conn.execute(
187 "UPDATE memories SET access_count = access_count + 1, last_accessed_at = ?
188 WHERE id = ?",
189 params![now, id],
190 )?;
191 }
192
193 Ok(memory)
194}
195
196pub fn load_tags(conn: &Connection, memory_id: i64) -> Result<Vec<String>> {
198 let mut stmt = conn.prepare_cached(
199 "SELECT t.name FROM tags t
200 JOIN memory_tags mt ON t.id = mt.tag_id
201 WHERE mt.memory_id = ?",
202 )?;
203
204 let tags: Vec<String> = stmt
205 .query_map([memory_id], |row| row.get(0))?
206 .filter_map(|r| r.ok())
207 .collect();
208
209 Ok(tags)
210}
211
212pub fn compute_content_hash(content: &str) -> String {
214 let normalized = content
216 .to_lowercase()
217 .split_whitespace()
218 .collect::<Vec<_>>()
219 .join(" ");
220
221 let mut hasher = Sha256::new();
222 hasher.update(normalized.as_bytes());
223 format!("sha256:{}", hex::encode(hasher.finalize()))
224}
225
226pub fn find_by_content_hash(
234 conn: &Connection,
235 content_hash: &str,
236 scope: &MemoryScope,
237 workspace: Option<&str>,
238) -> Result<Option<Memory>> {
239 let now = Utc::now().to_rfc3339();
240 let scope_type = scope.scope_type();
241 let scope_id = scope.scope_id().map(|s| s.to_string());
242 let workspace = workspace.unwrap_or("default");
243
244 let mut stmt = conn.prepare_cached(
245 "SELECT id, content, memory_type, importance, access_count,
246 created_at, updated_at, last_accessed_at, owner_id,
247 visibility, version, has_embedding, metadata,
248 scope_type, scope_id, workspace, tier, expires_at, content_hash
249 FROM memories
250 WHERE content_hash = ? AND valid_to IS NULL
251 AND (expires_at IS NULL OR expires_at > ?)
252 AND scope_type = ?
253 AND (scope_id = ? OR (scope_id IS NULL AND ? IS NULL))
254 AND workspace = ?
255 LIMIT 1",
256 )?;
257
258 let result = stmt
259 .query_row(
260 params![content_hash, now, scope_type, scope_id, scope_id, workspace],
261 memory_from_row,
262 )
263 .ok();
264
265 if let Some(mut memory) = result {
266 memory.tags = load_tags(conn, memory.id)?;
267 Ok(Some(memory))
268 } else {
269 Ok(None)
270 }
271}
272
273pub fn find_similar_by_embedding(
278 conn: &Connection,
279 query_embedding: &[f32],
280 scope: &MemoryScope,
281 workspace: Option<&str>,
282 threshold: f32,
283) -> Result<Option<(Memory, f32)>> {
284 use crate::embedding::{cosine_similarity, get_embedding};
285
286 let now = Utc::now().to_rfc3339();
287 let scope_type = scope.scope_type();
288 let scope_id = scope.scope_id().map(|s| s.to_string());
289 let workspace = workspace.unwrap_or("default");
290
291 let mut stmt = conn.prepare_cached(
293 "SELECT id, content, memory_type, importance, access_count,
294 created_at, updated_at, last_accessed_at, owner_id,
295 visibility, version, has_embedding, metadata,
296 scope_type, scope_id, workspace, tier, expires_at, content_hash
297 FROM memories
298 WHERE has_embedding = 1 AND valid_to IS NULL
299 AND (expires_at IS NULL OR expires_at > ?)
300 AND scope_type = ?
301 AND (scope_id = ? OR (scope_id IS NULL AND ? IS NULL))
302 AND workspace = ?",
303 )?;
304
305 let memories: Vec<Memory> = stmt
306 .query_map(
307 params![now, scope_type, scope_id, scope_id, workspace],
308 memory_from_row,
309 )?
310 .filter_map(|r| r.ok())
311 .collect();
312
313 let mut best_match: Option<(Memory, f32)> = None;
314
315 for memory in memories {
316 if let Ok(Some(embedding)) = get_embedding(conn, memory.id) {
317 let similarity = cosine_similarity(query_embedding, &embedding);
318 if similarity >= threshold {
319 match &best_match {
320 None => best_match = Some((memory, similarity)),
321 Some((_, best_score)) if similarity > *best_score => {
322 best_match = Some((memory, similarity));
323 }
324 _ => {}
325 }
326 }
327 }
328 }
329
330 if let Some((mut memory, score)) = best_match {
332 memory.tags = load_tags(conn, memory.id)?;
333 Ok(Some((memory, score)))
334 } else {
335 Ok(None)
336 }
337}
338
339#[derive(Debug, Clone, serde::Serialize)]
341pub struct DuplicatePair {
342 pub memory_a: Memory,
343 pub memory_b: Memory,
344 pub similarity_score: f64,
345 pub match_type: DuplicateMatchType,
346}
347
348#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
350#[serde(rename_all = "snake_case")]
351pub enum DuplicateMatchType {
352 ExactHash,
354 HighSimilarity,
356 EmbeddingSimilarity,
358}
359
360pub fn find_duplicates(conn: &Connection, threshold: f64) -> Result<Vec<DuplicatePair>> {
368 find_duplicates_in_workspace(conn, threshold, None)
369}
370
371pub fn find_duplicates_in_workspace(
373 conn: &Connection,
374 threshold: f64,
375 workspace: Option<&str>,
376) -> Result<Vec<DuplicatePair>> {
377 let now = Utc::now().to_rfc3339();
378 let mut duplicates = Vec::new();
379
380 let (hash_sql, hash_params): (&str, Vec<Box<dyn rusqlite::ToSql>>) = if let Some(ws) = workspace
382 {
383 (
384 "SELECT content_hash, scope_type, scope_id, GROUP_CONCAT(id) as ids
385 FROM memories
386 WHERE content_hash IS NOT NULL
387 AND valid_to IS NULL
388 AND (expires_at IS NULL OR expires_at > ?)
389 AND workspace = ?
390 GROUP BY content_hash, scope_type, scope_id, workspace
391 HAVING COUNT(*) > 1",
392 vec![Box::new(now.clone()), Box::new(ws.to_string())],
393 )
394 } else {
395 (
396 "SELECT content_hash, scope_type, scope_id, GROUP_CONCAT(id) as ids
397 FROM memories
398 WHERE content_hash IS NOT NULL
399 AND valid_to IS NULL
400 AND (expires_at IS NULL OR expires_at > ?)
401 GROUP BY content_hash, scope_type, scope_id, workspace
402 HAVING COUNT(*) > 1",
403 vec![Box::new(now.clone())],
404 )
405 };
406
407 let mut hash_stmt = conn.prepare_cached(hash_sql)?;
408 let hash_rows = hash_stmt.query_map(
409 rusqlite::params_from_iter(hash_params.iter().map(|p| p.as_ref())),
410 |row| {
411 let ids_str: String = row.get(3)?;
412 Ok(ids_str)
413 },
414 )?;
415
416 for ids_result in hash_rows {
417 let ids_str = ids_result?;
418 let ids: Vec<i64> = ids_str
419 .split(',')
420 .filter_map(|s| s.trim().parse().ok())
421 .collect();
422
423 for i in 0..ids.len() {
426 for j in (i + 1)..ids.len() {
427 let memory_a = get_memory_internal(conn, ids[i], false)?;
428 let memory_b = get_memory_internal(conn, ids[j], false)?;
429 duplicates.push(DuplicatePair {
430 memory_a,
431 memory_b,
432 similarity_score: 1.0, match_type: DuplicateMatchType::ExactHash,
434 });
435 }
436 }
437 }
438
439 let (sim_sql, sim_params): (&str, Vec<Box<dyn rusqlite::ToSql>>) = if let Some(ws) = workspace {
441 (
442 "SELECT DISTINCT c.from_id, c.to_id, c.score
443 FROM crossrefs c
444 JOIN memories m1 ON c.from_id = m1.id
445 JOIN memories m2 ON c.to_id = m2.id
446 WHERE c.score >= ?
447 AND m1.valid_to IS NULL
448 AND m2.valid_to IS NULL
449 AND (m1.expires_at IS NULL OR m1.expires_at > ?)
450 AND (m2.expires_at IS NULL OR m2.expires_at > ?)
451 AND c.from_id < c.to_id
452 AND m1.scope_type = m2.scope_type
453 AND (m1.scope_id = m2.scope_id OR (m1.scope_id IS NULL AND m2.scope_id IS NULL))
454 AND m1.workspace = ?
455 AND m2.workspace = ?
456 ORDER BY c.score DESC",
457 vec![
458 Box::new(threshold),
459 Box::new(now.clone()),
460 Box::new(now.clone()),
461 Box::new(ws.to_string()),
462 Box::new(ws.to_string()),
463 ],
464 )
465 } else {
466 (
467 "SELECT DISTINCT c.from_id, c.to_id, c.score
468 FROM crossrefs c
469 JOIN memories m1 ON c.from_id = m1.id
470 JOIN memories m2 ON c.to_id = m2.id
471 WHERE c.score >= ?
472 AND m1.valid_to IS NULL
473 AND m2.valid_to IS NULL
474 AND (m1.expires_at IS NULL OR m1.expires_at > ?)
475 AND (m2.expires_at IS NULL OR m2.expires_at > ?)
476 AND c.from_id < c.to_id
477 AND m1.scope_type = m2.scope_type
478 AND (m1.scope_id = m2.scope_id OR (m1.scope_id IS NULL AND m2.scope_id IS NULL))
479 AND m1.workspace = m2.workspace
480 ORDER BY c.score DESC",
481 vec![
482 Box::new(threshold),
483 Box::new(now.clone()),
484 Box::new(now.clone()),
485 ],
486 )
487 };
488
489 let mut sim_stmt = conn.prepare_cached(sim_sql)?;
490 let sim_rows = sim_stmt.query_map(
491 rusqlite::params_from_iter(sim_params.iter().map(|p| p.as_ref())),
492 |row| {
493 Ok((
494 row.get::<_, i64>(0)?,
495 row.get::<_, i64>(1)?,
496 row.get::<_, f64>(2)?,
497 ))
498 },
499 )?;
500
501 for row_result in sim_rows {
502 let (from_id, to_id, score) = row_result?;
503
504 let already_found = duplicates.iter().any(|d| {
506 (d.memory_a.id == from_id && d.memory_b.id == to_id)
507 || (d.memory_a.id == to_id && d.memory_b.id == from_id)
508 });
509
510 if !already_found {
511 let memory_a = get_memory_internal(conn, from_id, false)?;
513 let memory_b = get_memory_internal(conn, to_id, false)?;
514 duplicates.push(DuplicatePair {
515 memory_a,
516 memory_b,
517 similarity_score: score,
518 match_type: DuplicateMatchType::HighSimilarity,
519 });
520 }
521 }
522
523 Ok(duplicates)
524}
525
526pub fn find_duplicates_by_embedding(
530 conn: &Connection,
531 threshold: f32,
532 workspace: Option<&str>,
533 limit: usize,
534) -> Result<Vec<DuplicatePair>> {
535 use crate::embedding::{cosine_similarity, get_embedding};
536
537 let now = Utc::now().to_rfc3339();
538
539 let (sql, params_vec): (&str, Vec<Box<dyn rusqlite::ToSql>>) = if let Some(ws) = workspace {
541 (
542 "SELECT id FROM memories
543 WHERE has_embedding = 1 AND valid_to IS NULL
544 AND (expires_at IS NULL OR expires_at > ?)
545 AND COALESCE(lifecycle_state, 'active') = 'active'
546 AND workspace = ?
547 ORDER BY id",
548 vec![Box::new(now), Box::new(ws.to_string())],
549 )
550 } else {
551 (
552 "SELECT id FROM memories
553 WHERE has_embedding = 1 AND valid_to IS NULL
554 AND (expires_at IS NULL OR expires_at > ?)
555 AND COALESCE(lifecycle_state, 'active') = 'active'
556 ORDER BY id",
557 vec![Box::new(now)],
558 )
559 };
560
561 let mut stmt = conn.prepare(sql)?;
562 let ids: Vec<i64> = stmt
563 .query_map(
564 rusqlite::params_from_iter(params_vec.iter().map(|p| p.as_ref())),
565 |row| row.get(0),
566 )?
567 .filter_map(|r| r.ok())
568 .collect();
569
570 let mut embeddings: Vec<(i64, Vec<f32>)> = Vec::with_capacity(ids.len());
572 for &id in &ids {
573 if let Ok(Some(emb)) = get_embedding(conn, id) {
574 embeddings.push((id, emb));
575 }
576 }
577
578 let mut duplicates = Vec::new();
579
580 for i in 0..embeddings.len() {
582 if duplicates.len() >= limit {
583 break;
584 }
585 for j in (i + 1)..embeddings.len() {
586 if duplicates.len() >= limit {
587 break;
588 }
589 let sim = cosine_similarity(&embeddings[i].1, &embeddings[j].1);
590 if sim >= threshold {
591 let memory_a = get_memory_internal(conn, embeddings[i].0, false)?;
592 let memory_b = get_memory_internal(conn, embeddings[j].0, false)?;
593 duplicates.push(DuplicatePair {
594 memory_a,
595 memory_b,
596 similarity_score: sim as f64,
597 match_type: DuplicateMatchType::EmbeddingSimilarity,
598 });
599 }
600 }
601 }
602
603 duplicates.sort_by(|a, b| {
605 b.similarity_score
606 .partial_cmp(&a.similarity_score)
607 .unwrap_or(std::cmp::Ordering::Equal)
608 });
609
610 Ok(duplicates)
611}
612
613pub fn create_memory(conn: &Connection, input: &CreateMemoryInput) -> Result<Memory> {
615 let now = Utc::now();
616 let now_str = now.to_rfc3339();
617 let metadata_json = serde_json::to_string(&input.metadata)?;
618 let importance = input.importance.unwrap_or(0.5);
619
620 let content_hash = compute_content_hash(&input.content);
622
623 let workspace = match &input.workspace {
625 Some(ws) => crate::types::normalize_workspace(ws)
626 .map_err(|e| EngramError::InvalidInput(format!("Invalid workspace: {}", e)))?,
627 None => "default".to_string(),
628 };
629
630 if input.dedup_mode != DedupMode::Allow {
632 if let Some(existing) =
633 find_by_content_hash(conn, &content_hash, &input.scope, Some(&workspace))?
634 {
635 match input.dedup_mode {
636 DedupMode::Reject => {
637 return Err(EngramError::Duplicate {
638 existing_id: existing.id,
639 message: format!(
640 "Duplicate memory detected (id={}). Content hash: {}",
641 existing.id, content_hash
642 ),
643 });
644 }
645 DedupMode::Skip => {
646 return Ok(existing);
648 }
649 DedupMode::Merge => {
650 let mut merged_tags = existing.tags.clone();
652 for tag in &input.tags {
653 if !merged_tags.contains(tag) {
654 merged_tags.push(tag.clone());
655 }
656 }
657
658 let mut merged_metadata = existing.metadata.clone();
659 for (key, value) in &input.metadata {
660 merged_metadata.insert(key.clone(), value.clone());
661 }
662
663 let update_input = UpdateMemoryInput {
664 content: None, memory_type: None,
666 tags: Some(merged_tags),
667 metadata: Some(merged_metadata),
668 importance: input.importance, scope: None,
670 ttl_seconds: input.ttl_seconds, event_time: None,
673 trigger_pattern: None,
674 };
675
676 return update_memory(conn, existing.id, &update_input);
677 }
678 DedupMode::Allow => unreachable!(),
679 }
680 }
681 }
682
683 let scope_type = input.scope.scope_type();
685 let scope_id = input.scope.scope_id().map(|s| s.to_string());
686
687 let tier = input.tier;
691
692 let expires_at = match tier {
697 MemoryTier::Permanent => {
698 if input.ttl_seconds.is_some() && input.ttl_seconds != Some(0) {
700 return Err(EngramError::InvalidInput(
701 "Permanent tier memories cannot have a TTL. Use Daily tier for expiring memories.".to_string()
702 ));
703 }
704 None
705 }
706 MemoryTier::Daily => {
707 let ttl = input.ttl_seconds.filter(|&t| t > 0).unwrap_or(86400); Some((now + chrono::Duration::seconds(ttl)).to_rfc3339())
710 }
711 };
712
713 let event_time = input.event_time.map(|dt| dt.to_rfc3339());
714
715 conn.execute(
716 "INSERT INTO memories (content, memory_type, importance, metadata, created_at, updated_at, valid_from, scope_type, scope_id, workspace, tier, expires_at, content_hash, event_time, event_duration_seconds, trigger_pattern, summary_of_id)
717 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
718 params![
719 input.content,
720 input.memory_type.as_str(),
721 importance,
722 metadata_json,
723 now_str,
724 now_str,
725 now_str,
726 scope_type,
727 scope_id,
728 workspace,
729 tier.as_str(),
730 expires_at,
731 content_hash,
732 event_time,
733 input.event_duration_seconds,
734 input.trigger_pattern,
735 input.summary_of_id,
736 ],
737 )?;
738
739 let id = conn.last_insert_rowid();
740
741 for tag in &input.tags {
743 ensure_tag(conn, tag)?;
744 conn.execute(
745 "INSERT OR IGNORE INTO memory_tags (memory_id, tag_id)
746 SELECT ?, id FROM tags WHERE name = ?",
747 params![id, tag],
748 )?;
749 }
750
751 if !input.defer_embedding {
753 conn.execute(
754 "INSERT INTO embedding_queue (memory_id, status, queued_at)
755 VALUES (?, 'pending', ?)",
756 params![id, now_str],
757 )?;
758 }
759
760 let tags_json = serde_json::to_string(&input.tags)?;
762 conn.execute(
763 "INSERT INTO memory_versions (memory_id, version, content, tags, metadata, created_at)
764 VALUES (?, 1, ?, ?, ?, ?)",
765 params![id, input.content, tags_json, metadata_json, now_str],
766 )?;
767
768 record_event(
770 conn,
771 MemoryEventType::Created,
772 Some(id),
773 None,
774 serde_json::json!({
775 "workspace": input.workspace.as_deref().unwrap_or("default"),
776 "memory_type": input.memory_type.as_str(),
777 }),
778 )?;
779
780 conn.execute(
782 "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
783 [],
784 )?;
785
786 get_memory_internal(conn, id, false)
787}
788
789fn ensure_tag(conn: &Connection, tag: &str) -> Result<i64> {
791 conn.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", params![tag])?;
792
793 let id: i64 = conn.query_row("SELECT id FROM tags WHERE name = ?", params![tag], |row| {
794 row.get(0)
795 })?;
796
797 Ok(id)
798}
799
800pub fn get_memory(conn: &Connection, id: i64) -> Result<Memory> {
802 get_memory_internal(conn, id, true)
803}
804
805pub fn update_memory(conn: &Connection, id: i64, input: &UpdateMemoryInput) -> Result<Memory> {
807 let current = get_memory_internal(conn, id, false)?;
809 let now = Utc::now().to_rfc3339();
810
811 let mut updates = vec!["updated_at = ?".to_string()];
813 let mut values: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now.clone())];
814
815 if let Some(ref content) = input.content {
816 updates.push("content = ?".to_string());
817 values.push(Box::new(content.clone()));
818 let new_hash = compute_content_hash(content);
820 updates.push("content_hash = ?".to_string());
821 values.push(Box::new(new_hash));
822 }
823
824 if let Some(ref memory_type) = input.memory_type {
825 updates.push("memory_type = ?".to_string());
826 values.push(Box::new(memory_type.as_str().to_string()));
827 }
828
829 if let Some(importance) = input.importance {
830 updates.push("importance = ?".to_string());
831 values.push(Box::new(importance));
832 }
833
834 if let Some(ref metadata) = input.metadata {
835 let metadata_json = serde_json::to_string(metadata)?;
836 updates.push("metadata = ?".to_string());
837 values.push(Box::new(metadata_json));
838 }
839
840 if let Some(ref scope) = input.scope {
841 updates.push("scope_type = ?".to_string());
842 values.push(Box::new(scope.scope_type().to_string()));
843 updates.push("scope_id = ?".to_string());
844 values.push(Box::new(scope.scope_id().map(|s| s.to_string())));
845 }
846
847 if let Some(event_time) = &input.event_time {
849 updates.push("event_time = ?".to_string());
850 let value = event_time.as_ref().map(|dt| dt.to_rfc3339());
851 values.push(Box::new(value));
852 }
853
854 if let Some(trigger_pattern) = &input.trigger_pattern {
856 updates.push("trigger_pattern = ?".to_string());
857 values.push(Box::new(trigger_pattern.clone()));
858 }
859
860 if let Some(ttl) = input.ttl_seconds {
866 if ttl <= 0 {
867 if current.tier == MemoryTier::Daily {
870 return Err(crate::error::EngramError::InvalidInput(
871 "Cannot remove expiration from a Daily tier memory. Use promote_to_permanent first.".to_string()
872 ));
873 }
874 updates.push("expires_at = NULL".to_string());
875 } else {
876 if current.tier == MemoryTier::Permanent {
879 return Err(crate::error::EngramError::InvalidInput(
880 "Cannot set expiration on a Permanent tier memory. Permanent memories cannot expire.".to_string()
881 ));
882 }
883 let expires_at = (Utc::now() + chrono::Duration::seconds(ttl)).to_rfc3339();
884 updates.push("expires_at = ?".to_string());
885 values.push(Box::new(expires_at));
886 }
887 }
888
889 updates.push("version = version + 1".to_string());
891
892 let sql = format!("UPDATE memories SET {} WHERE id = ?", updates.join(", "));
894 values.push(Box::new(id));
895
896 let params: Vec<&dyn rusqlite::ToSql> = values.iter().map(|b| b.as_ref()).collect();
897 conn.execute(&sql, params.as_slice())?;
898
899 if let Some(ref tags) = input.tags {
901 conn.execute("DELETE FROM memory_tags WHERE memory_id = ?", params![id])?;
902 for tag in tags {
903 ensure_tag(conn, tag)?;
904 conn.execute(
905 "INSERT OR IGNORE INTO memory_tags (memory_id, tag_id)
906 SELECT ?, id FROM tags WHERE name = ?",
907 params![id, tag],
908 )?;
909 }
910 }
911
912 let new_content = input.content.as_ref().unwrap_or(¤t.content);
914 let new_tags = input.tags.as_ref().unwrap_or(¤t.tags);
915 let new_metadata = input.metadata.as_ref().unwrap_or(¤t.metadata);
916 let tags_json = serde_json::to_string(new_tags)?;
917 let metadata_json = serde_json::to_string(new_metadata)?;
918
919 conn.execute(
920 "INSERT INTO memory_versions (memory_id, version, content, tags, metadata, created_at)
921 VALUES (?, (SELECT version FROM memories WHERE id = ?), ?, ?, ?, ?)",
922 params![id, id, new_content, tags_json, metadata_json, now],
923 )?;
924
925 if input.content.is_some() {
927 conn.execute(
928 "INSERT OR REPLACE INTO embedding_queue (memory_id, status, queued_at)
929 VALUES (?, 'pending', ?)",
930 params![id, now],
931 )?;
932 conn.execute(
933 "UPDATE memories SET has_embedding = 0 WHERE id = ?",
934 params![id],
935 )?;
936 }
937
938 let mut changed_fields = Vec::new();
940 if input.content.is_some() {
941 changed_fields.push("content");
942 }
943 if input.tags.is_some() {
944 changed_fields.push("tags");
945 }
946 if input.metadata.is_some() {
947 changed_fields.push("metadata");
948 }
949 if input.importance.is_some() {
950 changed_fields.push("importance");
951 }
952 if input.ttl_seconds.is_some() {
953 changed_fields.push("ttl");
954 }
955
956 record_event(
958 conn,
959 MemoryEventType::Updated,
960 Some(id),
961 None,
962 serde_json::json!({
963 "changed_fields": changed_fields,
964 }),
965 )?;
966
967 conn.execute(
969 "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
970 [],
971 )?;
972
973 get_memory_internal(conn, id, false)
974}
975
976pub fn promote_to_permanent(conn: &Connection, id: i64) -> Result<Memory> {
987 let memory = get_memory_internal(conn, id, false)?;
988
989 if memory.tier == MemoryTier::Permanent {
990 return Err(EngramError::InvalidInput(format!(
991 "Memory {} is already in the Permanent tier",
992 id
993 )));
994 }
995
996 let now = Utc::now().to_rfc3339();
997
998 conn.execute(
999 "UPDATE memories SET tier = 'permanent', expires_at = NULL, updated_at = ?, version = version + 1 WHERE id = ?",
1000 params![now, id],
1001 )?;
1002
1003 record_event(
1005 conn,
1006 MemoryEventType::Updated,
1007 Some(id),
1008 None,
1009 serde_json::json!({
1010 "changed_fields": ["tier", "expires_at"],
1011 "action": "promote_to_permanent",
1012 }),
1013 )?;
1014
1015 conn.execute(
1017 "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
1018 [],
1019 )?;
1020
1021 tracing::info!(memory_id = id, "Promoted memory to permanent tier");
1022
1023 get_memory_internal(conn, id, false)
1024}
1025
1026pub fn move_to_workspace(conn: &Connection, id: i64, workspace: &str) -> Result<Memory> {
1036 let _memory = get_memory_internal(conn, id, false)?;
1038
1039 let normalized = crate::types::normalize_workspace(workspace)
1041 .map_err(|e| EngramError::InvalidInput(format!("Invalid workspace: {}", e)))?;
1042
1043 let now = Utc::now().to_rfc3339();
1044
1045 conn.execute(
1046 "UPDATE memories SET workspace = ?, updated_at = ?, version = version + 1 WHERE id = ?",
1047 params![normalized, now, id],
1048 )?;
1049
1050 record_event(
1052 conn,
1053 MemoryEventType::Updated,
1054 Some(id),
1055 None,
1056 serde_json::json!({
1057 "changed_fields": ["workspace"],
1058 "action": "move_to_workspace",
1059 "new_workspace": normalized,
1060 }),
1061 )?;
1062
1063 conn.execute(
1065 "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
1066 [],
1067 )?;
1068
1069 tracing::info!(memory_id = id, workspace = %normalized, "Moved memory to workspace");
1070
1071 get_memory_internal(conn, id, false)
1072}
1073
1074pub fn list_workspaces(conn: &Connection) -> Result<Vec<WorkspaceStats>> {
1079 let now = Utc::now().to_rfc3339();
1080
1081 let mut stmt = conn.prepare(
1082 r#"
1083 SELECT
1084 workspace,
1085 COUNT(*) as memory_count,
1086 SUM(CASE WHEN tier = 'permanent' THEN 1 ELSE 0 END) as permanent_count,
1087 SUM(CASE WHEN tier = 'daily' THEN 1 ELSE 0 END) as daily_count,
1088 MIN(created_at) as first_memory_at,
1089 MAX(created_at) as last_memory_at,
1090 AVG(importance) as avg_importance
1091 FROM memories
1092 WHERE valid_to IS NULL AND (expires_at IS NULL OR expires_at > ?)
1093 GROUP BY workspace
1094 ORDER BY memory_count DESC
1095 "#,
1096 )?;
1097
1098 let workspaces: Vec<WorkspaceStats> = stmt
1099 .query_map(params![now], |row| {
1100 let workspace: String = row.get(0)?;
1101 let memory_count: i64 = row.get(1)?;
1102 let permanent_count: i64 = row.get(2)?;
1103 let daily_count: i64 = row.get(3)?;
1104 let first_memory_at: Option<String> = row.get(4)?;
1105 let last_memory_at: Option<String> = row.get(5)?;
1106 let avg_importance: Option<f64> = row.get(6)?;
1107
1108 Ok(WorkspaceStats {
1109 workspace,
1110 memory_count,
1111 permanent_count,
1112 daily_count,
1113 first_memory_at: first_memory_at.and_then(|s| {
1114 DateTime::parse_from_rfc3339(&s)
1115 .map(|dt| dt.with_timezone(&Utc))
1116 .ok()
1117 }),
1118 last_memory_at: last_memory_at.and_then(|s| {
1119 DateTime::parse_from_rfc3339(&s)
1120 .map(|dt| dt.with_timezone(&Utc))
1121 .ok()
1122 }),
1123 top_tags: vec![], avg_importance: avg_importance.map(|v| v as f32),
1125 })
1126 })?
1127 .filter_map(|r| r.ok())
1128 .collect();
1129
1130 Ok(workspaces)
1131}
1132
1133pub fn get_workspace_stats(conn: &Connection, workspace: &str) -> Result<WorkspaceStats> {
1135 let normalized = crate::types::normalize_workspace(workspace)
1136 .map_err(|e| EngramError::InvalidInput(format!("Invalid workspace: {}", e)))?;
1137
1138 let now = Utc::now().to_rfc3339();
1139
1140 let stats = conn
1141 .query_row(
1142 r#"
1143 SELECT
1144 workspace,
1145 COUNT(*) as memory_count,
1146 SUM(CASE WHEN tier = 'permanent' THEN 1 ELSE 0 END) as permanent_count,
1147 SUM(CASE WHEN tier = 'daily' THEN 1 ELSE 0 END) as daily_count,
1148 MIN(created_at) as first_memory_at,
1149 MAX(created_at) as last_memory_at,
1150 AVG(importance) as avg_importance
1151 FROM memories
1152 WHERE workspace = ? AND valid_to IS NULL AND (expires_at IS NULL OR expires_at > ?)
1153 GROUP BY workspace
1154 "#,
1155 params![normalized, now],
1156 |row| {
1157 let workspace: String = row.get(0)?;
1158 let memory_count: i64 = row.get(1)?;
1159 let permanent_count: i64 = row.get(2)?;
1160 let daily_count: i64 = row.get(3)?;
1161 let first_memory_at: Option<String> = row.get(4)?;
1162 let last_memory_at: Option<String> = row.get(5)?;
1163 let avg_importance: Option<f64> = row.get(6)?;
1164
1165 Ok(WorkspaceStats {
1166 workspace,
1167 memory_count,
1168 permanent_count,
1169 daily_count,
1170 first_memory_at: first_memory_at.and_then(|s| {
1171 DateTime::parse_from_rfc3339(&s)
1172 .map(|dt| dt.with_timezone(&Utc))
1173 .ok()
1174 }),
1175 last_memory_at: last_memory_at.and_then(|s| {
1176 DateTime::parse_from_rfc3339(&s)
1177 .map(|dt| dt.with_timezone(&Utc))
1178 .ok()
1179 }),
1180 top_tags: vec![],
1181 avg_importance: avg_importance.map(|v| v as f32),
1182 })
1183 },
1184 )
1185 .map_err(|e| match e {
1186 rusqlite::Error::QueryReturnedNoRows => {
1187 EngramError::NotFound(0) }
1189 _ => EngramError::Database(e),
1190 })?;
1191
1192 Ok(stats)
1193}
1194
1195pub fn delete_workspace(conn: &Connection, workspace: &str, move_to_default: bool) -> Result<i64> {
1204 let normalized = crate::types::normalize_workspace(workspace)
1205 .map_err(|e| EngramError::InvalidInput(format!("Invalid workspace: {}", e)))?;
1206
1207 if normalized == "default" {
1208 return Err(EngramError::InvalidInput(
1209 "Cannot delete the default workspace".to_string(),
1210 ));
1211 }
1212
1213 let now = Utc::now().to_rfc3339();
1214
1215 let affected_ids: Vec<i64> = {
1217 let mut stmt =
1218 conn.prepare("SELECT id FROM memories WHERE workspace = ? AND valid_to IS NULL")?;
1219 let rows = stmt.query_map(params![&normalized], |row| row.get(0))?;
1220 rows.collect::<std::result::Result<Vec<_>, _>>()?
1221 };
1222
1223 let affected = affected_ids.len() as i64;
1224
1225 if affected > 0 {
1226 if move_to_default {
1227 conn.execute(
1229 "UPDATE memories SET workspace = 'default', updated_at = ?, version = version + 1 WHERE workspace = ? AND valid_to IS NULL",
1230 params![&now, &normalized],
1231 )?;
1232 } else {
1233 conn.execute(
1235 "UPDATE memories SET valid_to = ? WHERE workspace = ? AND valid_to IS NULL",
1236 params![&now, &normalized],
1237 )?;
1238 }
1239
1240 let event_type = if move_to_default {
1242 MemoryEventType::Updated
1243 } else {
1244 MemoryEventType::Deleted
1245 };
1246
1247 for memory_id in &affected_ids {
1248 record_event(
1249 conn,
1250 event_type.clone(),
1251 Some(*memory_id),
1252 None,
1253 serde_json::json!({
1254 "action": "delete_workspace",
1255 "workspace": normalized,
1256 "move_to_default": move_to_default,
1257 }),
1258 )?;
1259 }
1260 }
1261
1262 conn.execute(
1264 "UPDATE sync_state SET pending_changes = pending_changes + ?, version = (SELECT COALESCE(MAX(id), 0) FROM memory_events) WHERE id = 1",
1265 params![affected],
1266 )?;
1267
1268 tracing::info!(
1269 workspace = %normalized,
1270 move_to_default,
1271 affected,
1272 "Deleted workspace"
1273 );
1274
1275 Ok(affected)
1276}
1277
1278pub fn delete_memory(conn: &Connection, id: i64) -> Result<()> {
1280 let now = Utc::now().to_rfc3339();
1281
1282 let memory_info: Option<(String, String)> = conn
1284 .query_row(
1285 "SELECT workspace, memory_type FROM memories WHERE id = ? AND valid_to IS NULL",
1286 params![id],
1287 |row| Ok((row.get(0)?, row.get(1)?)),
1288 )
1289 .ok();
1290
1291 let affected = conn.execute(
1292 "UPDATE memories SET valid_to = ? WHERE id = ? AND valid_to IS NULL",
1293 params![now, id],
1294 )?;
1295
1296 if affected == 0 {
1297 return Err(EngramError::NotFound(id));
1298 }
1299
1300 conn.execute(
1302 "UPDATE crossrefs SET valid_to = ? WHERE (from_id = ? OR to_id = ?) AND valid_to IS NULL",
1303 params![now, id, id],
1304 )?;
1305
1306 let (workspace, memory_type) =
1308 memory_info.unwrap_or(("default".to_string(), "unknown".to_string()));
1309 record_event(
1310 conn,
1311 MemoryEventType::Deleted,
1312 Some(id),
1313 None,
1314 serde_json::json!({
1315 "workspace": workspace,
1316 "memory_type": memory_type,
1317 }),
1318 )?;
1319
1320 conn.execute(
1322 "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
1323 [],
1324 )?;
1325
1326 Ok(())
1327}
1328
1329pub fn list_memories(conn: &Connection, options: &ListOptions) -> Result<Vec<Memory>> {
1331 let now = Utc::now().to_rfc3339();
1332
1333 let mut sql = String::from(
1334 "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
1335 m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
1336 m.visibility, m.version, m.has_embedding, m.metadata,
1337 m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash
1338 FROM memories m",
1339 );
1340
1341 let mut conditions = vec!["m.valid_to IS NULL".to_string()];
1342 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![];
1343
1344 conditions.push("(m.expires_at IS NULL OR m.expires_at > ?)".to_string());
1346 params.push(Box::new(now));
1347
1348 if let Some(ref tags) = options.tags {
1350 if !tags.is_empty() {
1351 sql.push_str(
1352 " JOIN memory_tags mt ON m.id = mt.memory_id
1353 JOIN tags t ON mt.tag_id = t.id",
1354 );
1355 let placeholders: Vec<String> = tags.iter().map(|_| "?".to_string()).collect();
1356 conditions.push(format!("t.name IN ({})", placeholders.join(", ")));
1357 for tag in tags {
1358 params.push(Box::new(tag.clone()));
1359 }
1360 }
1361 }
1362
1363 if let Some(ref memory_type) = options.memory_type {
1365 conditions.push("m.memory_type = ?".to_string());
1366 params.push(Box::new(memory_type.as_str().to_string()));
1367 }
1368
1369 if let Some(ref metadata_filter) = options.metadata_filter {
1371 for (key, value) in metadata_filter {
1372 metadata_value_to_param(key, value, &mut conditions, &mut params)?;
1373 }
1374 }
1375
1376 if let Some(ref scope) = options.scope {
1378 conditions.push("m.scope_type = ?".to_string());
1379 params.push(Box::new(scope.scope_type().to_string()));
1380 if let Some(scope_id) = scope.scope_id() {
1381 conditions.push("m.scope_id = ?".to_string());
1382 params.push(Box::new(scope_id.to_string()));
1383 } else {
1384 conditions.push("m.scope_id IS NULL".to_string());
1385 }
1386 }
1387
1388 if let Some(ref workspace) = options.workspace {
1390 conditions.push("m.workspace = ?".to_string());
1391 params.push(Box::new(workspace.clone()));
1392 }
1393
1394 if let Some(ref tier) = options.tier {
1396 conditions.push("m.tier = ?".to_string());
1397 params.push(Box::new(tier.as_str().to_string()));
1398 }
1399
1400 sql.push_str(" WHERE ");
1401 sql.push_str(&conditions.join(" AND "));
1402
1403 let sort_field = match options.sort_by.unwrap_or_default() {
1405 SortField::CreatedAt => "m.created_at",
1406 SortField::UpdatedAt => "m.updated_at",
1407 SortField::LastAccessedAt => "m.last_accessed_at",
1408 SortField::Importance => "m.importance",
1409 SortField::AccessCount => "m.access_count",
1410 };
1411 let sort_order = match options.sort_order.unwrap_or_default() {
1412 SortOrder::Asc => "ASC",
1413 SortOrder::Desc => "DESC",
1414 };
1415 sql.push_str(&format!(" ORDER BY {} {}", sort_field, sort_order));
1416
1417 let limit = options.limit.unwrap_or(100);
1419 let offset = options.offset.unwrap_or(0);
1420 sql.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
1421
1422 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
1423 let mut stmt = conn.prepare(&sql)?;
1424
1425 let memories: Vec<Memory> = stmt
1426 .query_map(param_refs.as_slice(), memory_from_row)?
1427 .filter_map(|r| r.ok())
1428 .map(|mut m| {
1429 m.tags = load_tags(conn, m.id).unwrap_or_default();
1430 m
1431 })
1432 .collect();
1433
1434 Ok(memories)
1435}
1436
1437pub fn get_episodic_timeline(
1439 conn: &Connection,
1440 start_time: Option<DateTime<Utc>>,
1441 end_time: Option<DateTime<Utc>>,
1442 workspace: Option<&str>,
1443 tags: Option<&[String]>,
1444 limit: i64,
1445) -> Result<Vec<Memory>> {
1446 let now = Utc::now().to_rfc3339();
1447
1448 let mut sql = String::from(
1449 "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
1450 m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
1451 m.visibility, m.version, m.has_embedding, m.metadata,
1452 m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash,
1453 m.event_time, m.event_duration_seconds, m.trigger_pattern,
1454 m.procedure_success_count, m.procedure_failure_count, m.summary_of_id,
1455 m.lifecycle_state
1456 FROM memories m",
1457 );
1458
1459 let mut conditions = vec![
1460 "m.valid_to IS NULL".to_string(),
1461 "(m.expires_at IS NULL OR m.expires_at > ?)".to_string(),
1462 "m.memory_type = 'episodic'".to_string(),
1463 "m.event_time IS NOT NULL".to_string(),
1464 ];
1465 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
1466
1467 if let Some(start) = start_time {
1468 conditions.push("m.event_time >= ?".to_string());
1469 params.push(Box::new(start.to_rfc3339()));
1470 }
1471
1472 if let Some(end) = end_time {
1473 conditions.push("m.event_time <= ?".to_string());
1474 params.push(Box::new(end.to_rfc3339()));
1475 }
1476
1477 if let Some(ws) = workspace {
1478 conditions.push("m.workspace = ?".to_string());
1479 params.push(Box::new(ws.to_string()));
1480 }
1481
1482 if let Some(tag_list) = tags {
1483 if !tag_list.is_empty() {
1484 sql.push_str(
1485 " JOIN memory_tags mt ON m.id = mt.memory_id
1486 JOIN tags t ON mt.tag_id = t.id",
1487 );
1488 let placeholders: Vec<String> = tag_list.iter().map(|_| "?".to_string()).collect();
1489 conditions.push(format!("t.name IN ({})", placeholders.join(", ")));
1490 for tag in tag_list {
1491 params.push(Box::new(tag.clone()));
1492 }
1493 }
1494 }
1495
1496 sql.push_str(" WHERE ");
1497 sql.push_str(&conditions.join(" AND "));
1498 sql.push_str(" ORDER BY m.event_time ASC");
1499 sql.push_str(&format!(" LIMIT {}", limit));
1500
1501 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
1502 let mut stmt = conn.prepare(&sql)?;
1503
1504 let memories: Vec<Memory> = stmt
1505 .query_map(param_refs.as_slice(), memory_from_row)?
1506 .filter_map(|r| r.ok())
1507 .map(|mut m| {
1508 m.tags = load_tags(conn, m.id).unwrap_or_default();
1509 m
1510 })
1511 .collect();
1512
1513 Ok(memories)
1514}
1515
1516pub fn get_procedural_memories(
1518 conn: &Connection,
1519 trigger_pattern: Option<&str>,
1520 workspace: Option<&str>,
1521 min_success_rate: Option<f32>,
1522 limit: i64,
1523) -> Result<Vec<Memory>> {
1524 let now = Utc::now().to_rfc3339();
1525
1526 let sql_base = "SELECT m.id, m.content, m.memory_type, m.importance, m.access_count,
1527 m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
1528 m.visibility, m.version, m.has_embedding, m.metadata,
1529 m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash,
1530 m.event_time, m.event_duration_seconds, m.trigger_pattern,
1531 m.procedure_success_count, m.procedure_failure_count, m.summary_of_id,
1532 m.lifecycle_state
1533 FROM memories m";
1534
1535 let mut conditions = vec![
1536 "m.valid_to IS NULL".to_string(),
1537 "(m.expires_at IS NULL OR m.expires_at > ?)".to_string(),
1538 "m.memory_type = 'procedural'".to_string(),
1539 ];
1540 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
1541
1542 if let Some(pattern) = trigger_pattern {
1543 conditions.push("m.trigger_pattern LIKE ?".to_string());
1544 params.push(Box::new(format!("%{}%", pattern)));
1545 }
1546
1547 if let Some(ws) = workspace {
1548 conditions.push("m.workspace = ?".to_string());
1549 params.push(Box::new(ws.to_string()));
1550 }
1551
1552 if let Some(min_rate) = min_success_rate {
1553 conditions.push("(m.procedure_success_count + m.procedure_failure_count) > 0".to_string());
1556 conditions.push(
1557 "CAST(m.procedure_success_count AS REAL) / (m.procedure_success_count + m.procedure_failure_count) >= ?"
1558 .to_string(),
1559 );
1560 params.push(Box::new(min_rate as f64));
1561 }
1562
1563 let sql = format!(
1564 "{} WHERE {} ORDER BY m.procedure_success_count DESC LIMIT {}",
1565 sql_base,
1566 conditions.join(" AND "),
1567 limit
1568 );
1569
1570 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
1571 let mut stmt = conn.prepare(&sql)?;
1572
1573 let memories: Vec<Memory> = stmt
1574 .query_map(param_refs.as_slice(), memory_from_row)?
1575 .filter_map(|r| r.ok())
1576 .map(|mut m| {
1577 m.tags = load_tags(conn, m.id).unwrap_or_default();
1578 m
1579 })
1580 .collect();
1581
1582 Ok(memories)
1583}
1584
1585pub fn record_procedure_outcome(
1587 conn: &Connection,
1588 memory_id: i64,
1589 success: bool,
1590) -> Result<Memory> {
1591 let column = if success {
1592 "procedure_success_count"
1593 } else {
1594 "procedure_failure_count"
1595 };
1596
1597 let now = Utc::now().to_rfc3339();
1598
1599 let memory_type: String = conn
1601 .query_row(
1602 "SELECT memory_type FROM memories WHERE id = ? AND valid_to IS NULL",
1603 params![memory_id],
1604 |row| row.get(0),
1605 )
1606 .map_err(|_| EngramError::NotFound(memory_id))?;
1607
1608 if memory_type != "procedural" {
1609 return Err(EngramError::InvalidInput(format!(
1610 "Memory {} is type '{}', not 'procedural'",
1611 memory_id, memory_type
1612 )));
1613 }
1614
1615 conn.execute(
1616 &format!(
1617 "UPDATE memories SET {} = {} + 1, updated_at = ? WHERE id = ?",
1618 column, column
1619 ),
1620 params![now, memory_id],
1621 )?;
1622
1623 get_memory(conn, memory_id)
1624}
1625
1626pub fn create_crossref(conn: &Connection, input: &CreateCrossRefInput) -> Result<CrossReference> {
1628 let now = Utc::now().to_rfc3339();
1629
1630 let _ = get_memory_internal(conn, input.from_id, false)?;
1632 let _ = get_memory_internal(conn, input.to_id, false)?;
1633
1634 let strength = input.strength.unwrap_or(1.0);
1635
1636 conn.execute(
1637 "INSERT INTO crossrefs (from_id, to_id, edge_type, score, strength, source, source_context, pinned, created_at, valid_from)
1638 VALUES (?, ?, ?, 1.0, ?, 'manual', ?, ?, ?, ?)
1639 ON CONFLICT(from_id, to_id, edge_type) DO UPDATE SET
1640 strength = excluded.strength,
1641 source_context = COALESCE(excluded.source_context, crossrefs.source_context),
1642 pinned = excluded.pinned",
1643 params![
1644 input.from_id,
1645 input.to_id,
1646 input.edge_type.as_str(),
1647 strength,
1648 input.source_context,
1649 input.pinned,
1650 now,
1651 now,
1652 ],
1653 )?;
1654
1655 get_crossref(conn, input.from_id, input.to_id, input.edge_type)
1656}
1657
1658pub fn get_crossref(
1660 conn: &Connection,
1661 from_id: i64,
1662 to_id: i64,
1663 edge_type: EdgeType,
1664) -> Result<CrossReference> {
1665 let mut stmt = conn.prepare_cached(
1666 "SELECT from_id, to_id, edge_type, score, confidence, strength, source,
1667 source_context, created_at, valid_from, valid_to, pinned, metadata
1668 FROM crossrefs
1669 WHERE from_id = ? AND to_id = ? AND edge_type = ? AND valid_to IS NULL",
1670 )?;
1671
1672 let crossref = stmt.query_row(params![from_id, to_id, edge_type.as_str()], |row| {
1673 let edge_type_str: String = row.get("edge_type")?;
1674 let source_str: String = row.get("source")?;
1675 let created_at_str: String = row.get("created_at")?;
1676 let valid_from_str: String = row.get("valid_from")?;
1677 let valid_to_str: Option<String> = row.get("valid_to")?;
1678 let metadata_str: String = row.get("metadata")?;
1679
1680 Ok(CrossReference {
1681 from_id: row.get("from_id")?,
1682 to_id: row.get("to_id")?,
1683 edge_type: edge_type_str.parse().unwrap_or(EdgeType::RelatedTo),
1684 score: row.get("score")?,
1685 confidence: row.get("confidence")?,
1686 strength: row.get("strength")?,
1687 source: match source_str.as_str() {
1688 "manual" => RelationSource::Manual,
1689 "llm" => RelationSource::Llm,
1690 _ => RelationSource::Auto,
1691 },
1692 source_context: row.get("source_context")?,
1693 created_at: DateTime::parse_from_rfc3339(&created_at_str)
1694 .map(|dt| dt.with_timezone(&Utc))
1695 .unwrap_or_else(|_| Utc::now()),
1696 valid_from: DateTime::parse_from_rfc3339(&valid_from_str)
1697 .map(|dt| dt.with_timezone(&Utc))
1698 .unwrap_or_else(|_| Utc::now()),
1699 valid_to: valid_to_str.and_then(|s| {
1700 DateTime::parse_from_rfc3339(&s)
1701 .map(|dt| dt.with_timezone(&Utc))
1702 .ok()
1703 }),
1704 pinned: row.get::<_, i32>("pinned")? != 0,
1705 metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1706 })
1707 })?;
1708
1709 Ok(crossref)
1710}
1711
1712pub fn get_related(conn: &Connection, memory_id: i64) -> Result<Vec<CrossReference>> {
1714 let mut stmt = conn.prepare_cached(
1715 "SELECT from_id, to_id, edge_type, score, confidence, strength, source,
1716 source_context, created_at, valid_from, valid_to, pinned, metadata
1717 FROM crossrefs
1718 WHERE (from_id = ? OR to_id = ?) AND valid_to IS NULL
1719 ORDER BY score DESC",
1720 )?;
1721
1722 let crossrefs: Vec<CrossReference> = stmt
1723 .query_map(params![memory_id, memory_id], |row| {
1724 let edge_type_str: String = row.get("edge_type")?;
1725 let source_str: String = row.get("source")?;
1726 let created_at_str: String = row.get("created_at")?;
1727 let valid_from_str: String = row.get("valid_from")?;
1728 let valid_to_str: Option<String> = row.get("valid_to")?;
1729 let metadata_str: String = row.get("metadata")?;
1730
1731 Ok(CrossReference {
1732 from_id: row.get("from_id")?,
1733 to_id: row.get("to_id")?,
1734 edge_type: edge_type_str.parse().unwrap_or(EdgeType::RelatedTo),
1735 score: row.get("score")?,
1736 confidence: row.get("confidence")?,
1737 strength: row.get("strength")?,
1738 source: match source_str.as_str() {
1739 "manual" => RelationSource::Manual,
1740 "llm" => RelationSource::Llm,
1741 _ => RelationSource::Auto,
1742 },
1743 source_context: row.get("source_context")?,
1744 created_at: DateTime::parse_from_rfc3339(&created_at_str)
1745 .map(|dt| dt.with_timezone(&Utc))
1746 .unwrap_or_else(|_| Utc::now()),
1747 valid_from: DateTime::parse_from_rfc3339(&valid_from_str)
1748 .map(|dt| dt.with_timezone(&Utc))
1749 .unwrap_or_else(|_| Utc::now()),
1750 valid_to: valid_to_str.and_then(|s| {
1751 DateTime::parse_from_rfc3339(&s)
1752 .map(|dt| dt.with_timezone(&Utc))
1753 .ok()
1754 }),
1755 pinned: row.get::<_, i32>("pinned")? != 0,
1756 metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1757 })
1758 })?
1759 .filter_map(|r| r.ok())
1760 .collect();
1761
1762 Ok(crossrefs)
1763}
1764
1765pub fn delete_crossref(
1767 conn: &Connection,
1768 from_id: i64,
1769 to_id: i64,
1770 edge_type: EdgeType,
1771) -> Result<()> {
1772 let now = Utc::now().to_rfc3339();
1773
1774 let affected = conn.execute(
1775 "UPDATE crossrefs SET valid_to = ?
1776 WHERE from_id = ? AND to_id = ? AND edge_type = ? AND valid_to IS NULL",
1777 params![now, from_id, to_id, edge_type.as_str()],
1778 )?;
1779
1780 if affected == 0 {
1781 return Err(EngramError::NotFound(from_id));
1782 }
1783
1784 Ok(())
1785}
1786
1787pub fn set_memory_expiration(
1794 conn: &Connection,
1795 id: i64,
1796 ttl_seconds: Option<i64>,
1797) -> Result<Memory> {
1798 let _ = get_memory_internal(conn, id, false)?;
1800
1801 match ttl_seconds {
1802 Some(0) => {
1803 conn.execute(
1805 "UPDATE memories SET expires_at = NULL, updated_at = ? WHERE id = ?",
1806 params![Utc::now().to_rfc3339(), id],
1807 )?;
1808 }
1809 Some(ttl) => {
1810 let expires_at = (Utc::now() + chrono::Duration::seconds(ttl)).to_rfc3339();
1812 conn.execute(
1813 "UPDATE memories SET expires_at = ?, updated_at = ? WHERE id = ?",
1814 params![expires_at, Utc::now().to_rfc3339(), id],
1815 )?;
1816 }
1817 None => {
1818 return get_memory_internal(conn, id, false);
1820 }
1821 }
1822
1823 record_event(
1825 conn,
1826 MemoryEventType::Updated,
1827 Some(id),
1828 None,
1829 serde_json::json!({
1830 "changed_fields": ["expires_at"],
1831 "action": "set_expiration",
1832 }),
1833 )?;
1834
1835 conn.execute(
1837 "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
1838 [],
1839 )?;
1840
1841 get_memory_internal(conn, id, false)
1842}
1843
1844pub fn cleanup_expired_memories(conn: &Connection) -> Result<i64> {
1848 let now = Utc::now().to_rfc3339();
1849
1850 let affected = conn.execute(
1852 "UPDATE memories SET valid_to = ?
1853 WHERE expires_at IS NOT NULL AND expires_at <= ? AND valid_to IS NULL",
1854 params![now, now],
1855 )?;
1856
1857 if affected > 0 {
1858 conn.execute(
1860 "UPDATE crossrefs SET valid_to = ?
1861 WHERE valid_to IS NULL AND (
1862 from_id IN (SELECT id FROM memories WHERE valid_to IS NOT NULL AND expires_at IS NOT NULL AND expires_at <= ?)
1863 OR to_id IN (SELECT id FROM memories WHERE valid_to IS NOT NULL AND expires_at IS NOT NULL AND expires_at <= ?)
1864 )",
1865 params![now, now, now],
1866 )?;
1867
1868 conn.execute(
1871 "DELETE FROM memory_entities
1872 WHERE memory_id IN (
1873 SELECT id FROM memories
1874 WHERE valid_to IS NOT NULL AND expires_at IS NOT NULL AND expires_at <= ?
1875 )",
1876 params![now],
1877 )?;
1878
1879 conn.execute(
1881 "DELETE FROM memory_tags
1882 WHERE memory_id IN (
1883 SELECT id FROM memories
1884 WHERE valid_to IS NOT NULL AND expires_at IS NOT NULL AND expires_at <= ?
1885 )",
1886 params![now],
1887 )?;
1888
1889 record_event(
1891 conn,
1892 MemoryEventType::Deleted,
1893 None, None,
1895 serde_json::json!({
1896 "action": "cleanup_expired",
1897 "affected_count": affected,
1898 }),
1899 )?;
1900
1901 conn.execute(
1903 "UPDATE sync_state SET pending_changes = pending_changes + ?, version = (SELECT COALESCE(MAX(id), 0) FROM memory_events) WHERE id = 1",
1904 params![affected as i64],
1905 )?;
1906 }
1907
1908 Ok(affected as i64)
1909}
1910
1911pub fn count_expired_memories(conn: &Connection) -> Result<i64> {
1913 let now = Utc::now().to_rfc3339();
1914
1915 let count: i64 = conn.query_row(
1916 "SELECT COUNT(*) FROM memories
1917 WHERE expires_at IS NOT NULL AND expires_at <= ? AND valid_to IS NULL",
1918 params![now],
1919 |row| row.get(0),
1920 )?;
1921
1922 Ok(count)
1923}
1924
1925#[derive(Debug, Clone, Serialize, Deserialize)]
1927pub struct RetentionPolicy {
1928 pub id: i64,
1929 pub workspace: String,
1930 pub max_age_days: Option<i64>,
1931 pub max_memories: Option<i64>,
1932 pub compress_after_days: Option<i64>,
1933 pub compress_max_importance: f32,
1934 pub compress_min_access: i32,
1935 pub auto_delete_after_days: Option<i64>,
1936 pub exclude_types: Vec<String>,
1937 pub created_at: String,
1938 pub updated_at: String,
1939}
1940
1941pub fn get_retention_policy(conn: &Connection, workspace: &str) -> Result<Option<RetentionPolicy>> {
1943 conn.query_row(
1944 "SELECT id, workspace, max_age_days, max_memories, compress_after_days,
1945 compress_max_importance, compress_min_access, auto_delete_after_days,
1946 exclude_types, created_at, updated_at
1947 FROM retention_policies WHERE workspace = ?",
1948 params![workspace],
1949 |row| {
1950 let exclude_str: Option<String> = row.get(8)?;
1951 Ok(RetentionPolicy {
1952 id: row.get(0)?,
1953 workspace: row.get(1)?,
1954 max_age_days: row.get(2)?,
1955 max_memories: row.get(3)?,
1956 compress_after_days: row.get(4)?,
1957 compress_max_importance: row.get::<_, f32>(5).unwrap_or(0.3),
1958 compress_min_access: row.get::<_, i32>(6).unwrap_or(3),
1959 auto_delete_after_days: row.get(7)?,
1960 exclude_types: exclude_str
1961 .map(|s| s.split(',').map(|t| t.trim().to_string()).collect())
1962 .unwrap_or_default(),
1963 created_at: row.get(9)?,
1964 updated_at: row.get(10)?,
1965 })
1966 },
1967 )
1968 .optional()
1969 .map_err(EngramError::from)
1970}
1971
1972pub fn list_retention_policies(conn: &Connection) -> Result<Vec<RetentionPolicy>> {
1974 let mut stmt = conn.prepare(
1975 "SELECT id, workspace, max_age_days, max_memories, compress_after_days,
1976 compress_max_importance, compress_min_access, auto_delete_after_days,
1977 exclude_types, created_at, updated_at
1978 FROM retention_policies ORDER BY workspace",
1979 )?;
1980
1981 let policies = stmt
1982 .query_map([], |row| {
1983 let exclude_str: Option<String> = row.get(8)?;
1984 Ok(RetentionPolicy {
1985 id: row.get(0)?,
1986 workspace: row.get(1)?,
1987 max_age_days: row.get(2)?,
1988 max_memories: row.get(3)?,
1989 compress_after_days: row.get(4)?,
1990 compress_max_importance: row.get::<_, f32>(5).unwrap_or(0.3),
1991 compress_min_access: row.get::<_, i32>(6).unwrap_or(3),
1992 auto_delete_after_days: row.get(7)?,
1993 exclude_types: exclude_str
1994 .map(|s| s.split(',').map(|t| t.trim().to_string()).collect())
1995 .unwrap_or_default(),
1996 created_at: row.get(9)?,
1997 updated_at: row.get(10)?,
1998 })
1999 })?
2000 .filter_map(|r| r.ok())
2001 .collect();
2002
2003 Ok(policies)
2004}
2005
2006pub fn set_retention_policy(
2008 conn: &Connection,
2009 workspace: &str,
2010 max_age_days: Option<i64>,
2011 max_memories: Option<i64>,
2012 compress_after_days: Option<i64>,
2013 compress_max_importance: Option<f32>,
2014 compress_min_access: Option<i32>,
2015 auto_delete_after_days: Option<i64>,
2016 exclude_types: Option<Vec<String>>,
2017) -> Result<RetentionPolicy> {
2018 let now = Utc::now().to_rfc3339();
2019 let exclude_str = exclude_types.map(|v| v.join(","));
2020
2021 conn.execute(
2022 "INSERT INTO retention_policies (workspace, max_age_days, max_memories, compress_after_days,
2023 compress_max_importance, compress_min_access, auto_delete_after_days, exclude_types,
2024 created_at, updated_at)
2025 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9)
2026 ON CONFLICT(workspace) DO UPDATE SET
2027 max_age_days = COALESCE(?2, max_age_days),
2028 max_memories = COALESCE(?3, max_memories),
2029 compress_after_days = COALESCE(?4, compress_after_days),
2030 compress_max_importance = COALESCE(?5, compress_max_importance),
2031 compress_min_access = COALESCE(?6, compress_min_access),
2032 auto_delete_after_days = COALESCE(?7, auto_delete_after_days),
2033 exclude_types = COALESCE(?8, exclude_types),
2034 updated_at = ?9",
2035 params![
2036 workspace,
2037 max_age_days,
2038 max_memories,
2039 compress_after_days,
2040 compress_max_importance.unwrap_or(0.3),
2041 compress_min_access.unwrap_or(3),
2042 auto_delete_after_days,
2043 exclude_str,
2044 now,
2045 ],
2046 )?;
2047
2048 get_retention_policy(conn, workspace)?.ok_or_else(|| EngramError::NotFound(0))
2049}
2050
2051pub fn delete_retention_policy(conn: &Connection, workspace: &str) -> Result<bool> {
2053 let affected = conn.execute(
2054 "DELETE FROM retention_policies WHERE workspace = ?",
2055 params![workspace],
2056 )?;
2057 Ok(affected > 0)
2058}
2059
2060pub fn apply_retention_policies(conn: &Connection) -> Result<i64> {
2062 let policies = list_retention_policies(conn)?;
2063 let mut total_affected = 0i64;
2064
2065 for policy in &policies {
2066 if let Some(compress_days) = policy.compress_after_days {
2068 let compressed = compress_old_memories(
2069 conn,
2070 compress_days,
2071 policy.compress_max_importance,
2072 policy.compress_min_access,
2073 100,
2074 )?;
2075 total_affected += compressed;
2076 }
2077
2078 if let Some(max_mem) = policy.max_memories {
2080 let count: i64 = conn
2081 .query_row(
2082 "SELECT COUNT(*) FROM memories WHERE workspace = ? AND valid_to IS NULL
2083 AND COALESCE(lifecycle_state, 'active') = 'active'",
2084 params![policy.workspace],
2085 |row| row.get(0),
2086 )
2087 .unwrap_or(0);
2088
2089 if count > max_mem {
2090 let excess = count - max_mem;
2092 let archived = conn.execute(
2093 "UPDATE memories SET lifecycle_state = 'archived'
2094 WHERE id IN (
2095 SELECT id FROM memories
2096 WHERE workspace = ? AND valid_to IS NULL
2097 AND COALESCE(lifecycle_state, 'active') = 'active'
2098 AND memory_type NOT IN ('summary', 'checkpoint')
2099 ORDER BY importance ASC, access_count ASC, created_at ASC
2100 LIMIT ?
2101 )",
2102 params![policy.workspace, excess],
2103 )?;
2104 total_affected += archived as i64;
2105 }
2106 }
2107
2108 if let Some(delete_days) = policy.auto_delete_after_days {
2110 let cutoff = (Utc::now() - chrono::Duration::days(delete_days)).to_rfc3339();
2111 let now = Utc::now().to_rfc3339();
2112 let deleted = conn.execute(
2113 "UPDATE memories SET valid_to = ?
2114 WHERE workspace = ? AND valid_to IS NULL
2115 AND lifecycle_state = 'archived'
2116 AND created_at < ?",
2117 params![now, policy.workspace, cutoff],
2118 )?;
2119 total_affected += deleted as i64;
2120 }
2121 }
2122
2123 Ok(total_affected)
2124}
2125
2126pub fn compress_old_memories(
2129 conn: &Connection,
2130 max_age_days: i64,
2131 max_importance: f32,
2132 min_access_count: i32,
2133 batch_limit: usize,
2134) -> Result<i64> {
2135 let cutoff = (Utc::now() - chrono::Duration::days(max_age_days)).to_rfc3339();
2136 let now = Utc::now().to_rfc3339();
2137
2138 let mut stmt = conn.prepare(
2140 "SELECT id, content, memory_type, importance, tags, workspace
2141 FROM (
2142 SELECT m.id, m.content, m.memory_type, m.importance, m.access_count, m.workspace,
2143 COALESCE(m.lifecycle_state, 'active') as lifecycle_state,
2144 (SELECT GROUP_CONCAT(t.name, ',') FROM memory_tags mt JOIN tags t ON mt.tag_id = t.id WHERE mt.memory_id = m.id) as tags
2145 FROM memories m
2146 WHERE m.valid_to IS NULL
2147 AND (m.expires_at IS NULL OR m.expires_at > ?1)
2148 AND m.created_at < ?2
2149 AND m.importance <= ?3
2150 AND m.access_count < ?4
2151 AND m.memory_type NOT IN ('summary', 'checkpoint')
2152 AND COALESCE(m.lifecycle_state, 'active') = 'active'
2153 ORDER BY m.created_at ASC
2154 LIMIT ?5
2155 )",
2156 )?;
2157
2158 let candidates: Vec<(i64, String, String, f32, Option<String>, String)> = stmt
2159 .query_map(
2160 params![
2161 now,
2162 cutoff,
2163 max_importance,
2164 min_access_count,
2165 batch_limit as i64
2166 ],
2167 |row| {
2168 Ok((
2169 row.get(0)?,
2170 row.get(1)?,
2171 row.get(2)?,
2172 row.get(3)?,
2173 row.get(4)?,
2174 row.get::<_, String>(5)
2175 .unwrap_or_else(|_| "default".to_string()),
2176 ))
2177 },
2178 )?
2179 .filter_map(|r| r.ok())
2180 .collect();
2181
2182 let mut archived = 0i64;
2183
2184 for (id, content, memory_type, importance, tags_csv, workspace) in &candidates {
2185 let summary_text = if content.len() > 200 {
2187 let head: String = content.chars().take(120).collect();
2188 let tail: String = content
2189 .chars()
2190 .rev()
2191 .take(60)
2192 .collect::<String>()
2193 .chars()
2194 .rev()
2195 .collect();
2196 format!("{}...{}", head, tail)
2197 } else {
2198 content.clone()
2199 };
2200
2201 let tags: Vec<String> = tags_csv
2202 .as_deref()
2203 .unwrap_or("")
2204 .split(',')
2205 .filter(|s| !s.is_empty())
2206 .map(|s| s.to_string())
2207 .collect();
2208
2209 let input = CreateMemoryInput {
2210 content: format!("[Archived {}] {}", memory_type, summary_text),
2211 memory_type: MemoryType::Summary,
2212 importance: Some(*importance),
2213 tags,
2214 workspace: Some(workspace.clone()),
2215 tier: MemoryTier::Permanent,
2216 summary_of_id: Some(*id),
2217 ..Default::default()
2218 };
2219
2220 if create_memory(conn, &input).is_ok()
2221 && conn
2222 .execute(
2223 "UPDATE memories SET lifecycle_state = 'archived' WHERE id = ? AND valid_to IS NULL",
2224 params![id],
2225 )
2226 .is_ok()
2227 {
2228 archived += 1;
2229 }
2230 }
2231
2232 Ok(archived)
2233}
2234
2235#[derive(Debug, Clone, serde::Serialize)]
2238pub struct CompactMemoryRow {
2239 pub id: i64,
2241 pub preview: String,
2243 pub truncated: bool,
2245 pub memory_type: MemoryType,
2247 pub tags: Vec<String>,
2249 pub importance: f32,
2251 pub created_at: DateTime<Utc>,
2253 pub updated_at: DateTime<Utc>,
2255 pub workspace: String,
2257 pub tier: MemoryTier,
2259 pub content_length: usize,
2261 pub line_count: usize,
2263}
2264
2265pub fn list_memories_compact(
2275 conn: &Connection,
2276 options: &ListOptions,
2277 preview_chars: Option<usize>,
2278) -> Result<Vec<CompactMemoryRow>> {
2279 use crate::intelligence::compact_preview;
2280
2281 let now = Utc::now().to_rfc3339();
2282 let max_preview = preview_chars.unwrap_or(100);
2283
2284 let mut sql = String::from(
2285 "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance,
2286 m.created_at, m.updated_at, m.workspace, m.tier
2287 FROM memories m",
2288 );
2289
2290 let mut conditions = vec!["m.valid_to IS NULL".to_string()];
2291 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![];
2292
2293 conditions.push("(m.expires_at IS NULL OR m.expires_at > ?)".to_string());
2295 params.push(Box::new(now));
2296
2297 if let Some(ref tags) = options.tags {
2299 if !tags.is_empty() {
2300 sql.push_str(
2301 " JOIN memory_tags mt ON m.id = mt.memory_id
2302 JOIN tags t ON mt.tag_id = t.id",
2303 );
2304 let placeholders: Vec<String> = tags.iter().map(|_| "?".to_string()).collect();
2305 conditions.push(format!("t.name IN ({})", placeholders.join(", ")));
2306 for tag in tags {
2307 params.push(Box::new(tag.clone()));
2308 }
2309 }
2310 }
2311
2312 if let Some(ref memory_type) = options.memory_type {
2314 conditions.push("m.memory_type = ?".to_string());
2315 params.push(Box::new(memory_type.as_str().to_string()));
2316 }
2317
2318 if let Some(ref metadata_filter) = options.metadata_filter {
2320 for (key, value) in metadata_filter {
2321 metadata_value_to_param(key, value, &mut conditions, &mut params)?;
2322 }
2323 }
2324
2325 if let Some(ref scope) = options.scope {
2327 conditions.push("m.scope_type = ?".to_string());
2328 params.push(Box::new(scope.scope_type().to_string()));
2329 if let Some(scope_id) = scope.scope_id() {
2330 conditions.push("m.scope_id = ?".to_string());
2331 params.push(Box::new(scope_id.to_string()));
2332 } else {
2333 conditions.push("m.scope_id IS NULL".to_string());
2334 }
2335 }
2336
2337 if let Some(ref workspace) = options.workspace {
2339 conditions.push("m.workspace = ?".to_string());
2340 params.push(Box::new(workspace.clone()));
2341 }
2342
2343 if let Some(ref tier) = options.tier {
2345 conditions.push("m.tier = ?".to_string());
2346 params.push(Box::new(tier.as_str().to_string()));
2347 }
2348
2349 sql.push_str(" WHERE ");
2350 sql.push_str(&conditions.join(" AND "));
2351
2352 let sort_field = match options.sort_by.unwrap_or_default() {
2354 SortField::CreatedAt => "m.created_at",
2355 SortField::UpdatedAt => "m.updated_at",
2356 SortField::LastAccessedAt => "m.last_accessed_at",
2357 SortField::Importance => "m.importance",
2358 SortField::AccessCount => "m.access_count",
2359 };
2360 let sort_order = match options.sort_order.unwrap_or_default() {
2361 SortOrder::Asc => "ASC",
2362 SortOrder::Desc => "DESC",
2363 };
2364 sql.push_str(&format!(" ORDER BY {} {}", sort_field, sort_order));
2365
2366 let limit = options.limit.unwrap_or(100);
2368 let offset = options.offset.unwrap_or(0);
2369 sql.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
2370
2371 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
2372 let mut stmt = conn.prepare(&sql)?;
2373
2374 let memories: Vec<CompactMemoryRow> = stmt
2375 .query_map(param_refs.as_slice(), |row| {
2376 let id: i64 = row.get("id")?;
2377 let content: String = row.get("content")?;
2378 let memory_type_str: String = row.get("memory_type")?;
2379 let importance: f32 = row.get("importance")?;
2380 let created_at_str: String = row.get("created_at")?;
2381 let updated_at_str: String = row.get("updated_at")?;
2382 let workspace: String = row.get("workspace")?;
2383 let tier_str: String = row.get("tier")?;
2384
2385 let memory_type = memory_type_str.parse().unwrap_or(MemoryType::Note);
2386 let tier = tier_str.parse().unwrap_or_default();
2387
2388 let (preview, truncated) = compact_preview(&content, max_preview);
2390 let content_length = content.len();
2391 let line_count = content.lines().count();
2392
2393 Ok(CompactMemoryRow {
2394 id,
2395 preview,
2396 truncated,
2397 memory_type,
2398 tags: vec![], importance,
2400 created_at: DateTime::parse_from_rfc3339(&created_at_str)
2401 .map(|dt| dt.with_timezone(&Utc))
2402 .unwrap_or_else(|_| Utc::now()),
2403 updated_at: DateTime::parse_from_rfc3339(&updated_at_str)
2404 .map(|dt| dt.with_timezone(&Utc))
2405 .unwrap_or_else(|_| Utc::now()),
2406 workspace,
2407 tier,
2408 content_length,
2409 line_count,
2410 })
2411 })?
2412 .filter_map(|r| r.ok())
2413 .map(|mut m| {
2414 m.tags = load_tags(conn, m.id).unwrap_or_default();
2415 m
2416 })
2417 .collect();
2418
2419 Ok(memories)
2420}
2421
2422pub fn get_stats(conn: &Connection) -> Result<StorageStats> {
2424 let total_memories: i64 = conn.query_row(
2425 "SELECT COUNT(*) FROM memories WHERE valid_to IS NULL",
2426 [],
2427 |row| row.get(0),
2428 )?;
2429
2430 let total_tags: i64 = conn.query_row("SELECT COUNT(*) FROM tags", [], |row| row.get(0))?;
2431
2432 let total_crossrefs: i64 = conn.query_row(
2433 "SELECT COUNT(*) FROM crossrefs WHERE valid_to IS NULL",
2434 [],
2435 |row| row.get(0),
2436 )?;
2437
2438 let total_versions: i64 =
2439 conn.query_row("SELECT COUNT(*) FROM memory_versions", [], |row| row.get(0))?;
2440
2441 let _total_identities: i64 =
2442 conn.query_row("SELECT COUNT(*) FROM identities", [], |row| row.get(0))?;
2443
2444 let _total_entities: i64 =
2445 conn.query_row("SELECT COUNT(*) FROM entities", [], |row| row.get(0))?;
2446
2447 let db_size_bytes: i64 = conn.query_row(
2448 "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
2449 [],
2450 |row| row.get(0),
2451 )?;
2452
2453 let _schema_version: i32 = conn
2454 .query_row("SELECT MAX(version) FROM schema_version", [], |row| {
2455 row.get(0)
2456 })
2457 .unwrap_or(0);
2458
2459 let mut workspace_stmt = conn.prepare(
2460 "SELECT workspace, COUNT(*) FROM memories WHERE valid_to IS NULL GROUP BY workspace",
2461 )?;
2462 let workspaces: HashMap<String, i64> = workspace_stmt
2463 .query_map([], |row| {
2464 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
2465 })?
2466 .filter_map(|r| r.ok())
2467 .collect();
2468
2469 let mut type_stmt = conn.prepare(
2470 "SELECT memory_type, COUNT(*) FROM memories WHERE valid_to IS NULL GROUP BY memory_type",
2471 )?;
2472 let type_counts: HashMap<String, i64> = type_stmt
2473 .query_map([], |row| {
2474 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
2475 })?
2476 .filter_map(|r| r.ok())
2477 .collect();
2478
2479 let mut tier_stmt = conn.prepare(
2480 "SELECT COALESCE(tier, 'permanent'), COUNT(*) FROM memories GROUP BY COALESCE(tier, 'permanent')",
2481 )?;
2482 let tier_counts: HashMap<String, i64> = tier_stmt
2483 .query_map([], |row| {
2484 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
2485 })?
2486 .filter_map(|r| r.ok())
2487 .collect();
2488
2489 let memories_with_embeddings: i64 = conn.query_row(
2490 "SELECT COUNT(*) FROM memories WHERE has_embedding = 1 AND valid_to IS NULL",
2491 [],
2492 |row| row.get(0),
2493 )?;
2494
2495 let memories_pending_embedding: i64 = conn.query_row(
2496 "SELECT COUNT(*) FROM embedding_queue WHERE status = 'pending'",
2497 [],
2498 |row| row.get(0),
2499 )?;
2500
2501 let (last_sync, sync_pending): (Option<String>, i64) = conn.query_row(
2502 "SELECT last_sync, pending_changes FROM sync_state WHERE id = 1",
2503 [],
2504 |row| Ok((row.get(0)?, row.get(1)?)),
2505 )?;
2506
2507 Ok(StorageStats {
2508 total_memories,
2509 total_tags,
2510 total_crossrefs,
2511 total_versions,
2512 total_identities: 0,
2513 total_entities: 0,
2514 db_size_bytes,
2515 memories_with_embeddings,
2516 memories_pending_embedding,
2517 last_sync: last_sync.and_then(|s| {
2518 DateTime::parse_from_rfc3339(&s)
2519 .map(|dt| dt.with_timezone(&Utc))
2520 .ok()
2521 }),
2522 sync_pending: sync_pending > 0,
2523 storage_mode: "sqlite".to_string(),
2524 schema_version: 0,
2525 workspaces,
2526 type_counts,
2527 tier_counts,
2528 })
2529}
2530
2531pub fn get_memory_versions(conn: &Connection, memory_id: i64) -> Result<Vec<MemoryVersion>> {
2533 let mut stmt = conn.prepare_cached(
2534 "SELECT version, content, tags, metadata, created_at, created_by, change_summary
2535 FROM memory_versions WHERE memory_id = ? ORDER BY version DESC",
2536 )?;
2537
2538 let versions: Vec<MemoryVersion> = stmt
2539 .query_map([memory_id], |row| {
2540 let tags_str: String = row.get("tags")?;
2541 let metadata_str: String = row.get("metadata")?;
2542 let created_at_str: String = row.get("created_at")?;
2543
2544 Ok(MemoryVersion {
2545 version: row.get("version")?,
2546 content: row.get("content")?,
2547 tags: serde_json::from_str(&tags_str).unwrap_or_default(),
2548 metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
2549 created_at: DateTime::parse_from_rfc3339(&created_at_str)
2550 .map(|dt| dt.with_timezone(&Utc))
2551 .unwrap_or_else(|_| Utc::now()),
2552 created_by: row.get("created_by")?,
2553 change_summary: row.get("change_summary")?,
2554 })
2555 })?
2556 .filter_map(|r| r.ok())
2557 .collect();
2558
2559 Ok(versions)
2560}
2561
2562#[derive(Debug, Clone, serde::Serialize)]
2568pub struct BatchCreateResult {
2569 pub created: Vec<Memory>,
2570 pub failed: Vec<BatchError>,
2571 pub total_created: usize,
2572 pub total_failed: usize,
2573}
2574
2575#[derive(Debug, Clone, serde::Serialize)]
2577pub struct BatchDeleteResult {
2578 pub deleted: Vec<i64>,
2579 pub failed: Vec<BatchError>,
2580 pub total_deleted: usize,
2581 pub total_failed: usize,
2582}
2583
2584#[derive(Debug, Clone, serde::Serialize)]
2586pub struct BatchError {
2587 pub index: usize,
2588 pub id: Option<i64>,
2589 pub error: String,
2590}
2591
2592pub fn create_memory_batch(
2594 conn: &Connection,
2595 inputs: &[CreateMemoryInput],
2596) -> Result<BatchCreateResult> {
2597 let mut created = Vec::new();
2598 let mut failed = Vec::new();
2599
2600 for (index, input) in inputs.iter().enumerate() {
2601 match create_memory(conn, input) {
2602 Ok(memory) => created.push(memory),
2603 Err(e) => failed.push(BatchError {
2604 index,
2605 id: None,
2606 error: e.to_string(),
2607 }),
2608 }
2609 }
2610
2611 Ok(BatchCreateResult {
2612 total_created: created.len(),
2613 total_failed: failed.len(),
2614 created,
2615 failed,
2616 })
2617}
2618
2619pub fn delete_memory_batch(conn: &Connection, ids: &[i64]) -> Result<BatchDeleteResult> {
2621 let mut deleted = Vec::new();
2622 let mut failed = Vec::new();
2623
2624 for (index, &id) in ids.iter().enumerate() {
2625 match delete_memory(conn, id) {
2626 Ok(()) => deleted.push(id),
2627 Err(e) => failed.push(BatchError {
2628 index,
2629 id: Some(id),
2630 error: e.to_string(),
2631 }),
2632 }
2633 }
2634
2635 Ok(BatchDeleteResult {
2636 total_deleted: deleted.len(),
2637 total_failed: failed.len(),
2638 deleted,
2639 failed,
2640 })
2641}
2642
2643#[derive(Debug, Clone, serde::Serialize)]
2649pub struct TagInfo {
2650 pub name: String,
2651 pub count: i64,
2652 pub last_used: Option<DateTime<Utc>>,
2653}
2654
2655pub fn list_tags(conn: &Connection) -> Result<Vec<TagInfo>> {
2657 let mut stmt = conn.prepare(
2658 r#"
2659 SELECT t.name, COUNT(mt.memory_id) as count,
2660 MAX(m.updated_at) as last_used
2661 FROM tags t
2662 LEFT JOIN memory_tags mt ON t.id = mt.tag_id
2663 LEFT JOIN memories m ON mt.memory_id = m.id AND m.valid_to IS NULL
2664 GROUP BY t.id, t.name
2665 ORDER BY count DESC, t.name ASC
2666 "#,
2667 )?;
2668
2669 let tags: Vec<TagInfo> = stmt
2670 .query_map([], |row| {
2671 let name: String = row.get(0)?;
2672 let count: i64 = row.get(1)?;
2673 let last_used: Option<String> = row.get(2)?;
2674
2675 Ok(TagInfo {
2676 name,
2677 count,
2678 last_used: last_used.and_then(|s| {
2679 DateTime::parse_from_rfc3339(&s)
2680 .map(|dt| dt.with_timezone(&Utc))
2681 .ok()
2682 }),
2683 })
2684 })?
2685 .filter_map(|r| r.ok())
2686 .collect();
2687
2688 Ok(tags)
2689}
2690
2691#[derive(Debug, Clone, serde::Serialize)]
2693pub struct TagHierarchyNode {
2694 pub name: String,
2695 pub full_path: String,
2696 pub count: i64,
2697 pub children: Vec<TagHierarchyNode>,
2698}
2699
2700pub fn get_tag_hierarchy(conn: &Connection) -> Result<Vec<TagHierarchyNode>> {
2702 let tags = list_tags(conn)?;
2703
2704 let mut root_nodes: HashMap<String, TagHierarchyNode> = HashMap::new();
2706
2707 for tag in tags {
2708 let parts: Vec<&str> = tag.name.split('/').collect();
2709 if parts.is_empty() {
2710 continue;
2711 }
2712
2713 let root_name = parts[0].to_string();
2714 if !root_nodes.contains_key(&root_name) {
2715 root_nodes.insert(
2716 root_name.clone(),
2717 TagHierarchyNode {
2718 name: root_name.clone(),
2719 full_path: root_name.clone(),
2720 count: 0,
2721 children: Vec::new(),
2722 },
2723 );
2724 }
2725
2726 if parts.len() == 1 {
2728 if let Some(node) = root_nodes.get_mut(&root_name) {
2729 node.count += tag.count;
2730 }
2731 } else {
2732 if let Some(node) = root_nodes.get_mut(&root_name) {
2735 node.count += tag.count;
2736 }
2737 }
2738 }
2739
2740 Ok(root_nodes.into_values().collect())
2741}
2742
2743#[derive(Debug, Clone, serde::Serialize)]
2745pub struct TagValidationResult {
2746 pub valid: bool,
2747 pub orphaned_tags: Vec<String>,
2748 pub empty_tags: Vec<String>,
2749 pub duplicate_assignments: Vec<(i64, String)>,
2750 pub total_tags: i64,
2751 pub total_assignments: i64,
2752}
2753
2754pub fn validate_tags(conn: &Connection) -> Result<TagValidationResult> {
2756 let orphaned: Vec<String> = conn
2758 .prepare(
2759 "SELECT t.name FROM tags t
2760 LEFT JOIN memory_tags mt ON t.id = mt.tag_id
2761 WHERE mt.tag_id IS NULL",
2762 )?
2763 .query_map([], |row| row.get(0))?
2764 .filter_map(|r| r.ok())
2765 .collect();
2766
2767 let empty: Vec<String> = conn
2769 .prepare("SELECT name FROM tags WHERE name = '' OR name IS NULL")?
2770 .query_map([], |row| row.get(0))?
2771 .filter_map(|r| r.ok())
2772 .collect();
2773
2774 let total_tags: i64 = conn.query_row("SELECT COUNT(*) FROM tags", [], |row| row.get(0))?;
2776 let total_assignments: i64 =
2777 conn.query_row("SELECT COUNT(*) FROM memory_tags", [], |row| row.get(0))?;
2778
2779 Ok(TagValidationResult {
2780 valid: orphaned.is_empty() && empty.is_empty(),
2781 orphaned_tags: orphaned,
2782 empty_tags: empty,
2783 duplicate_assignments: vec![], total_tags,
2785 total_assignments,
2786 })
2787}
2788
2789#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2795pub struct ExportedMemory {
2796 pub id: i64,
2797 pub content: String,
2798 pub memory_type: String,
2799 pub tags: Vec<String>,
2800 pub metadata: HashMap<String, serde_json::Value>,
2801 pub importance: f32,
2802 pub workspace: String,
2803 pub tier: String,
2804 pub created_at: String,
2805 pub updated_at: String,
2806}
2807
2808#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2810pub struct ExportData {
2811 pub version: String,
2812 pub exported_at: String,
2813 pub memory_count: usize,
2814 pub memories: Vec<ExportedMemory>,
2815}
2816
2817pub fn export_memories(conn: &Connection) -> Result<ExportData> {
2819 let memories = list_memories(
2820 conn,
2821 &ListOptions {
2822 limit: Some(100000),
2823 ..Default::default()
2824 },
2825 )?;
2826
2827 let exported: Vec<ExportedMemory> = memories
2828 .into_iter()
2829 .map(|m| ExportedMemory {
2830 id: m.id,
2831 content: m.content,
2832 memory_type: m.memory_type.as_str().to_string(),
2833 tags: m.tags,
2834 metadata: m.metadata,
2835 importance: m.importance,
2836 workspace: m.workspace,
2837 tier: m.tier.as_str().to_string(),
2838 created_at: m.created_at.to_rfc3339(),
2839 updated_at: m.updated_at.to_rfc3339(),
2840 })
2841 .collect();
2842
2843 Ok(ExportData {
2844 version: "1.0".to_string(),
2845 exported_at: Utc::now().to_rfc3339(),
2846 memory_count: exported.len(),
2847 memories: exported,
2848 })
2849}
2850
2851#[derive(Debug, Clone, serde::Serialize)]
2853pub struct ImportResult {
2854 pub imported: usize,
2855 pub skipped: usize,
2856 pub failed: usize,
2857 pub errors: Vec<String>,
2858}
2859
2860pub fn import_memories(
2862 conn: &Connection,
2863 data: &ExportData,
2864 skip_duplicates: bool,
2865) -> Result<ImportResult> {
2866 let mut imported = 0;
2867 let mut skipped = 0;
2868 let mut failed = 0;
2869 let mut errors = Vec::new();
2870
2871 for mem in &data.memories {
2872 let memory_type = mem.memory_type.parse().unwrap_or(MemoryType::Note);
2873 let tier = mem.tier.parse().unwrap_or(MemoryTier::Permanent);
2874
2875 let input = CreateMemoryInput {
2876 content: mem.content.clone(),
2877 memory_type,
2878 tags: mem.tags.clone(),
2879 metadata: mem.metadata.clone(),
2880 importance: Some(mem.importance),
2881 scope: MemoryScope::Global,
2882 workspace: Some(mem.workspace.clone()),
2883 tier,
2884 defer_embedding: false,
2885 ttl_seconds: None,
2886 dedup_mode: if skip_duplicates {
2887 DedupMode::Skip
2888 } else {
2889 DedupMode::Allow
2890 },
2891 dedup_threshold: None,
2892 event_time: None,
2893 event_duration_seconds: None,
2894 trigger_pattern: None,
2895 summary_of_id: None,
2896 };
2897
2898 match create_memory(conn, &input) {
2899 Ok(_) => imported += 1,
2900 Err(EngramError::Duplicate { .. }) => skipped += 1,
2901 Err(e) => {
2902 failed += 1;
2903 errors.push(format!("Failed to import memory {}: {}", mem.id, e));
2904 }
2905 }
2906 }
2907
2908 Ok(ImportResult {
2909 imported,
2910 skipped,
2911 failed,
2912 errors,
2913 })
2914}
2915
2916pub fn rebuild_embeddings(conn: &Connection) -> Result<i64> {
2922 let now = Utc::now().to_rfc3339();
2923
2924 conn.execute("DELETE FROM embedding_queue", [])?;
2926
2927 let count = conn.execute(
2929 "INSERT INTO embedding_queue (memory_id, status, queued_at)
2930 SELECT id, 'pending', ? FROM memories WHERE valid_to IS NULL",
2931 params![now],
2932 )?;
2933
2934 conn.execute(
2936 "UPDATE memories SET has_embedding = 0 WHERE valid_to IS NULL",
2937 [],
2938 )?;
2939
2940 Ok(count as i64)
2941}
2942
2943pub fn rebuild_crossrefs(conn: &Connection) -> Result<i64> {
2945 let now = Utc::now().to_rfc3339();
2946
2947 let deleted = conn.execute(
2949 "UPDATE crossrefs SET valid_to = ? WHERE source = 'auto' AND valid_to IS NULL",
2950 params![now],
2951 )?;
2952
2953 Ok(deleted as i64)
2957}
2958
2959pub fn create_section_memory(
2965 conn: &Connection,
2966 title: &str,
2967 content: &str,
2968 parent_id: Option<i64>,
2969 level: i32,
2970 workspace: Option<&str>,
2971) -> Result<Memory> {
2972 let mut metadata = HashMap::new();
2973 metadata.insert("section_title".to_string(), serde_json::json!(title));
2974 metadata.insert("section_level".to_string(), serde_json::json!(level));
2975 if let Some(pid) = parent_id {
2976 metadata.insert("parent_memory_id".to_string(), serde_json::json!(pid));
2977 }
2978
2979 let input = CreateMemoryInput {
2980 content: format!("# {}\n\n{}", title, content),
2981 memory_type: MemoryType::Context,
2982 tags: vec!["section".to_string()],
2983 metadata,
2984 importance: Some(0.6),
2985 scope: MemoryScope::Global,
2986 workspace: workspace.map(String::from),
2987 tier: MemoryTier::Permanent,
2988 defer_embedding: false,
2989 ttl_seconds: None,
2990 dedup_mode: DedupMode::Skip,
2991 dedup_threshold: None,
2992 event_time: None,
2993 event_duration_seconds: None,
2994 trigger_pattern: None,
2995 summary_of_id: None,
2996 };
2997
2998 create_memory(conn, &input)
2999}
3000
3001pub fn create_checkpoint(
3003 conn: &Connection,
3004 session_id: &str,
3005 summary: &str,
3006 context: &HashMap<String, serde_json::Value>,
3007 workspace: Option<&str>,
3008) -> Result<Memory> {
3009 let mut metadata = context.clone();
3010 metadata.insert(
3011 "checkpoint_session".to_string(),
3012 serde_json::json!(session_id),
3013 );
3014 metadata.insert(
3015 "checkpoint_time".to_string(),
3016 serde_json::json!(Utc::now().to_rfc3339()),
3017 );
3018
3019 let input = CreateMemoryInput {
3020 content: format!("Session Checkpoint: {}\n\n{}", session_id, summary),
3021 memory_type: MemoryType::Context,
3022 tags: vec!["checkpoint".to_string(), format!("session:{}", session_id)],
3023 metadata,
3024 importance: Some(0.7),
3025 scope: MemoryScope::Global,
3026 workspace: workspace.map(String::from),
3027 tier: MemoryTier::Permanent,
3028 defer_embedding: false,
3029 ttl_seconds: None,
3030 dedup_mode: DedupMode::Allow,
3031 dedup_threshold: None,
3032 event_time: None,
3033 event_duration_seconds: None,
3034 trigger_pattern: None,
3035 summary_of_id: None,
3036 };
3037
3038 create_memory(conn, &input)
3039}
3040
3041pub fn boost_memory(
3043 conn: &Connection,
3044 id: i64,
3045 boost_amount: f32,
3046 duration_seconds: Option<i64>,
3047) -> Result<Memory> {
3048 let memory = get_memory(conn, id)?;
3049 let new_importance = (memory.importance + boost_amount).min(1.0);
3050 let now = Utc::now();
3051
3052 conn.execute(
3054 "UPDATE memories SET importance = ?, updated_at = ? WHERE id = ?",
3055 params![new_importance, now.to_rfc3339(), id],
3056 )?;
3057
3058 if let Some(duration) = duration_seconds {
3060 let expires = now + chrono::Duration::seconds(duration);
3061 let mut metadata = memory.metadata.clone();
3062 metadata.insert(
3063 "boost_expires".to_string(),
3064 serde_json::json!(expires.to_rfc3339()),
3065 );
3066 metadata.insert(
3067 "boost_original_importance".to_string(),
3068 serde_json::json!(memory.importance),
3069 );
3070
3071 let metadata_json = serde_json::to_string(&metadata)?;
3072 conn.execute(
3073 "UPDATE memories SET metadata = ? WHERE id = ?",
3074 params![metadata_json, id],
3075 )?;
3076 }
3077
3078 get_memory(conn, id)
3079}
3080
3081#[derive(Debug, Clone, Serialize, Deserialize)]
3087pub enum MemoryEventType {
3088 Created,
3089 Updated,
3090 Deleted,
3091 Linked,
3092 Unlinked,
3093 Shared,
3094 Synced,
3095}
3096
3097impl std::fmt::Display for MemoryEventType {
3098 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3099 match self {
3100 MemoryEventType::Created => write!(f, "created"),
3101 MemoryEventType::Updated => write!(f, "updated"),
3102 MemoryEventType::Deleted => write!(f, "deleted"),
3103 MemoryEventType::Linked => write!(f, "linked"),
3104 MemoryEventType::Unlinked => write!(f, "unlinked"),
3105 MemoryEventType::Shared => write!(f, "shared"),
3106 MemoryEventType::Synced => write!(f, "synced"),
3107 }
3108 }
3109}
3110
3111impl std::str::FromStr for MemoryEventType {
3112 type Err = EngramError;
3113 fn from_str(s: &str) -> Result<Self> {
3114 match s {
3115 "created" => Ok(MemoryEventType::Created),
3116 "updated" => Ok(MemoryEventType::Updated),
3117 "deleted" => Ok(MemoryEventType::Deleted),
3118 "linked" => Ok(MemoryEventType::Linked),
3119 "unlinked" => Ok(MemoryEventType::Unlinked),
3120 "shared" => Ok(MemoryEventType::Shared),
3121 "synced" => Ok(MemoryEventType::Synced),
3122 _ => Err(EngramError::InvalidInput(format!(
3123 "Invalid event type: {}",
3124 s
3125 ))),
3126 }
3127 }
3128}
3129
3130#[derive(Debug, Clone, Serialize, Deserialize)]
3132pub struct MemoryEvent {
3133 pub id: i64,
3134 pub event_type: String,
3135 pub memory_id: Option<i64>,
3136 pub agent_id: Option<String>,
3137 pub data: serde_json::Value,
3138 pub created_at: DateTime<Utc>,
3139}
3140
3141pub fn record_event(
3143 conn: &Connection,
3144 event_type: MemoryEventType,
3145 memory_id: Option<i64>,
3146 agent_id: Option<&str>,
3147 data: serde_json::Value,
3148) -> Result<i64> {
3149 let now = Utc::now();
3150 let data_json = serde_json::to_string(&data)?;
3151
3152 conn.execute(
3153 "INSERT INTO memory_events (event_type, memory_id, agent_id, data, created_at)
3154 VALUES (?, ?, ?, ?, ?)",
3155 params![
3156 event_type.to_string(),
3157 memory_id,
3158 agent_id,
3159 data_json,
3160 now.to_rfc3339()
3161 ],
3162 )?;
3163
3164 Ok(conn.last_insert_rowid())
3165}
3166
3167pub fn poll_events(
3169 conn: &Connection,
3170 since_id: Option<i64>,
3171 since_time: Option<DateTime<Utc>>,
3172 agent_id: Option<&str>,
3173 limit: Option<usize>,
3174) -> Result<Vec<MemoryEvent>> {
3175 let limit = limit.unwrap_or(100);
3176
3177 let (query, params): (&str, Vec<Box<dyn rusqlite::ToSql>>) =
3178 match (since_id, since_time, agent_id) {
3179 (Some(id), _, Some(agent)) => (
3180 "SELECT id, event_type, memory_id, agent_id, data, created_at
3181 FROM memory_events WHERE id > ? AND (agent_id = ? OR agent_id IS NULL)
3182 ORDER BY id ASC LIMIT ?",
3183 vec![
3184 Box::new(id),
3185 Box::new(agent.to_string()),
3186 Box::new(limit as i64),
3187 ],
3188 ),
3189 (Some(id), _, None) => (
3190 "SELECT id, event_type, memory_id, agent_id, data, created_at
3191 FROM memory_events WHERE id > ?
3192 ORDER BY id ASC LIMIT ?",
3193 vec![Box::new(id), Box::new(limit as i64)],
3194 ),
3195 (None, Some(time), Some(agent)) => (
3196 "SELECT id, event_type, memory_id, agent_id, data, created_at
3197 FROM memory_events WHERE created_at > ? AND (agent_id = ? OR agent_id IS NULL)
3198 ORDER BY id ASC LIMIT ?",
3199 vec![
3200 Box::new(time.to_rfc3339()),
3201 Box::new(agent.to_string()),
3202 Box::new(limit as i64),
3203 ],
3204 ),
3205 (None, Some(time), None) => (
3206 "SELECT id, event_type, memory_id, agent_id, data, created_at
3207 FROM memory_events WHERE created_at > ?
3208 ORDER BY id ASC LIMIT ?",
3209 vec![Box::new(time.to_rfc3339()), Box::new(limit as i64)],
3210 ),
3211 (None, None, Some(agent)) => (
3212 "SELECT id, event_type, memory_id, agent_id, data, created_at
3213 FROM memory_events WHERE agent_id = ? OR agent_id IS NULL
3214 ORDER BY id DESC LIMIT ?",
3215 vec![Box::new(agent.to_string()), Box::new(limit as i64)],
3216 ),
3217 (None, None, None) => (
3218 "SELECT id, event_type, memory_id, agent_id, data, created_at
3219 FROM memory_events ORDER BY id DESC LIMIT ?",
3220 vec![Box::new(limit as i64)],
3221 ),
3222 };
3223
3224 let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
3225 let mut stmt = conn.prepare(query)?;
3226 let events = stmt
3227 .query_map(params_refs.as_slice(), |row| {
3228 let data_str: String = row.get(4)?;
3229 let created_str: String = row.get(5)?;
3230 Ok(MemoryEvent {
3231 id: row.get(0)?,
3232 event_type: row.get(1)?,
3233 memory_id: row.get(2)?,
3234 agent_id: row.get(3)?,
3235 data: serde_json::from_str(&data_str).unwrap_or(serde_json::json!({})),
3236 created_at: DateTime::parse_from_rfc3339(&created_str)
3237 .map(|dt| dt.with_timezone(&Utc))
3238 .unwrap_or_else(|_| Utc::now()),
3239 })
3240 })?
3241 .collect::<std::result::Result<Vec<_>, _>>()?;
3242
3243 Ok(events)
3244}
3245
3246pub fn clear_events(
3248 conn: &Connection,
3249 before_id: Option<i64>,
3250 before_time: Option<DateTime<Utc>>,
3251 keep_recent: Option<usize>,
3252) -> Result<i64> {
3253 let deleted = if let Some(id) = before_id {
3254 conn.execute("DELETE FROM memory_events WHERE id < ?", params![id])?
3255 } else if let Some(time) = before_time {
3256 conn.execute(
3257 "DELETE FROM memory_events WHERE created_at < ?",
3258 params![time.to_rfc3339()],
3259 )?
3260 } else if let Some(keep) = keep_recent {
3261 conn.execute(
3263 "DELETE FROM memory_events WHERE id NOT IN (
3264 SELECT id FROM memory_events ORDER BY id DESC LIMIT ?
3265 )",
3266 params![keep as i64],
3267 )?
3268 } else {
3269 conn.execute("DELETE FROM memory_events", [])?
3271 };
3272
3273 Ok(deleted as i64)
3274}
3275
3276#[derive(Debug, Clone, Serialize, Deserialize)]
3282pub struct SyncVersion {
3283 pub version: i64,
3284 pub last_modified: DateTime<Utc>,
3285 pub memory_count: i64,
3286 pub checksum: String,
3287}
3288
3289#[derive(Debug, Clone, Serialize, Deserialize)]
3291pub struct SyncTask {
3292 pub task_id: String,
3293 pub task_type: String,
3294 pub status: String,
3295 pub progress_percent: i32,
3296 pub traces_processed: i64,
3297 pub memories_created: i64,
3298 pub error_message: Option<String>,
3299 pub started_at: String,
3300 pub completed_at: Option<String>,
3301}
3302
3303pub fn get_sync_version(conn: &Connection) -> Result<SyncVersion> {
3305 let memory_count: i64 =
3306 conn.query_row("SELECT COUNT(*) FROM memories", [], |row| row.get(0))?;
3307
3308 let last_modified: Option<String> = conn
3309 .query_row("SELECT MAX(updated_at) FROM memories", [], |row| row.get(0))
3310 .ok();
3311
3312 let version: i64 = conn
3313 .query_row("SELECT MAX(version) FROM sync_state", [], |row| row.get(0))
3314 .unwrap_or(0);
3315
3316 let checksum = format!(
3318 "{}-{}-{}",
3319 memory_count,
3320 version,
3321 last_modified.as_deref().unwrap_or("none")
3322 );
3323
3324 Ok(SyncVersion {
3325 version,
3326 last_modified: last_modified
3327 .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
3328 .map(|dt| dt.with_timezone(&Utc))
3329 .unwrap_or_else(Utc::now),
3330 memory_count,
3331 checksum,
3332 })
3333}
3334
3335pub fn upsert_sync_task(conn: &Connection, task: &SyncTask) -> Result<()> {
3337 conn.execute(
3338 r#"
3339 INSERT INTO sync_tasks (
3340 task_id, task_type, status, progress_percent, traces_processed, memories_created,
3341 error_message, started_at, completed_at
3342 )
3343 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
3344 ON CONFLICT(task_id) DO UPDATE SET
3345 task_type = excluded.task_type,
3346 status = excluded.status,
3347 progress_percent = excluded.progress_percent,
3348 traces_processed = excluded.traces_processed,
3349 memories_created = excluded.memories_created,
3350 error_message = excluded.error_message,
3351 started_at = excluded.started_at,
3352 completed_at = excluded.completed_at
3353 "#,
3354 params![
3355 task.task_id,
3356 task.task_type,
3357 task.status,
3358 task.progress_percent,
3359 task.traces_processed,
3360 task.memories_created,
3361 task.error_message,
3362 task.started_at,
3363 task.completed_at
3364 ],
3365 )?;
3366
3367 Ok(())
3368}
3369
3370pub fn get_sync_task(conn: &Connection, task_id: &str) -> Result<Option<SyncTask>> {
3372 let mut stmt = conn.prepare(
3373 r#"
3374 SELECT task_id, task_type, status, progress_percent, traces_processed, memories_created,
3375 error_message, started_at, completed_at
3376 FROM sync_tasks
3377 WHERE task_id = ?
3378 "#,
3379 )?;
3380
3381 let mut rows = stmt.query(params![task_id])?;
3382 if let Some(row) = rows.next()? {
3383 Ok(Some(SyncTask {
3384 task_id: row.get("task_id")?,
3385 task_type: row.get("task_type")?,
3386 status: row.get("status")?,
3387 progress_percent: row.get("progress_percent")?,
3388 traces_processed: row.get("traces_processed")?,
3389 memories_created: row.get("memories_created")?,
3390 error_message: row.get("error_message")?,
3391 started_at: row.get("started_at")?,
3392 completed_at: row.get("completed_at")?,
3393 }))
3394 } else {
3395 Ok(None)
3396 }
3397}
3398
3399#[derive(Debug, Clone, Serialize, Deserialize)]
3401pub struct SyncDelta {
3402 pub created: Vec<Memory>,
3403 pub updated: Vec<Memory>,
3404 pub deleted: Vec<i64>,
3405 pub from_version: i64,
3406 pub to_version: i64,
3407}
3408
3409pub fn get_sync_delta(conn: &Connection, since_version: i64) -> Result<SyncDelta> {
3411 let current_version = get_sync_version(conn)?.version;
3412
3413 let events = poll_events(conn, Some(since_version), None, None, Some(10000))?;
3415
3416 let mut created_ids = std::collections::HashSet::new();
3417 let mut updated_ids = std::collections::HashSet::new();
3418 let mut deleted_ids = std::collections::HashSet::new();
3419
3420 for event in events {
3421 if let Some(memory_id) = event.memory_id {
3422 match event.event_type.as_str() {
3423 "created" => {
3424 created_ids.insert(memory_id);
3425 }
3426 "updated" => {
3427 if !created_ids.contains(&memory_id) {
3428 updated_ids.insert(memory_id);
3429 }
3430 }
3431 "deleted" => {
3432 created_ids.remove(&memory_id);
3433 updated_ids.remove(&memory_id);
3434 deleted_ids.insert(memory_id);
3435 }
3436 _ => {}
3437 }
3438 }
3439 }
3440
3441 let created: Vec<Memory> = created_ids
3442 .iter()
3443 .filter_map(|id| get_memory(conn, *id).ok())
3444 .collect();
3445
3446 let updated: Vec<Memory> = updated_ids
3447 .iter()
3448 .filter_map(|id| get_memory(conn, *id).ok())
3449 .collect();
3450
3451 Ok(SyncDelta {
3452 created,
3453 updated,
3454 deleted: deleted_ids.into_iter().collect(),
3455 from_version: since_version,
3456 to_version: current_version,
3457 })
3458}
3459
3460#[derive(Debug, Clone, Serialize, Deserialize)]
3462pub struct AgentSyncState {
3463 pub agent_id: String,
3464 pub last_sync_version: i64,
3465 pub last_sync_time: DateTime<Utc>,
3466 pub pending_changes: i64,
3467}
3468
3469pub fn get_agent_sync_state(conn: &Connection, agent_id: &str) -> Result<AgentSyncState> {
3471 let result: std::result::Result<(i64, String), rusqlite::Error> = conn.query_row(
3472 "SELECT last_sync_version, last_sync_time FROM agent_sync_state WHERE agent_id = ?",
3473 params![agent_id],
3474 |row| Ok((row.get(0)?, row.get(1)?)),
3475 );
3476
3477 match result {
3478 Ok((version, time_str)) => {
3479 let current_version = get_sync_version(conn)?.version;
3480 let pending = (current_version - version).max(0);
3481
3482 Ok(AgentSyncState {
3483 agent_id: agent_id.to_string(),
3484 last_sync_version: version,
3485 last_sync_time: DateTime::parse_from_rfc3339(&time_str)
3486 .map(|dt| dt.with_timezone(&Utc))
3487 .unwrap_or_else(|_| Utc::now()),
3488 pending_changes: pending,
3489 })
3490 }
3491 Err(_) => {
3492 Ok(AgentSyncState {
3494 agent_id: agent_id.to_string(),
3495 last_sync_version: 0,
3496 last_sync_time: Utc::now(),
3497 pending_changes: get_sync_version(conn)?.version,
3498 })
3499 }
3500 }
3501}
3502
3503pub fn update_agent_sync_state(conn: &Connection, agent_id: &str, version: i64) -> Result<()> {
3505 let now = Utc::now();
3506 conn.execute(
3507 "INSERT INTO agent_sync_state (agent_id, last_sync_version, last_sync_time)
3508 VALUES (?, ?, ?)
3509 ON CONFLICT(agent_id) DO UPDATE SET
3510 last_sync_version = excluded.last_sync_version,
3511 last_sync_time = excluded.last_sync_time",
3512 params![agent_id, version, now.to_rfc3339()],
3513 )?;
3514 Ok(())
3515}
3516
3517pub fn cleanup_sync_data(conn: &Connection, older_than_days: i64) -> Result<i64> {
3519 let cutoff = Utc::now() - chrono::Duration::days(older_than_days);
3520 let deleted = conn.execute(
3521 "DELETE FROM memory_events WHERE created_at < ?",
3522 params![cutoff.to_rfc3339()],
3523 )?;
3524 Ok(deleted as i64)
3525}
3526
3527#[derive(Debug, Clone, Serialize, Deserialize)]
3533pub struct SharedMemory {
3534 pub id: i64,
3535 pub memory_id: i64,
3536 pub from_agent: String,
3537 pub to_agent: String,
3538 pub message: Option<String>,
3539 pub acknowledged: bool,
3540 pub acknowledged_at: Option<DateTime<Utc>>,
3541 pub created_at: DateTime<Utc>,
3542}
3543
3544pub fn share_memory(
3546 conn: &Connection,
3547 memory_id: i64,
3548 from_agent: &str,
3549 to_agent: &str,
3550 message: Option<&str>,
3551) -> Result<i64> {
3552 let now = Utc::now();
3553
3554 let _ = get_memory(conn, memory_id)?;
3556
3557 conn.execute(
3558 "INSERT INTO shared_memories (memory_id, from_agent, to_agent, message, acknowledged, created_at)
3559 VALUES (?, ?, ?, ?, 0, ?)",
3560 params![memory_id, from_agent, to_agent, message, now.to_rfc3339()],
3561 )?;
3562
3563 let share_id = conn.last_insert_rowid();
3564
3565 record_event(
3567 conn,
3568 MemoryEventType::Shared,
3569 Some(memory_id),
3570 Some(from_agent),
3571 serde_json::json!({
3572 "to_agent": to_agent,
3573 "share_id": share_id,
3574 "message": message
3575 }),
3576 )?;
3577
3578 Ok(share_id)
3579}
3580
3581pub fn poll_shared_memories(
3583 conn: &Connection,
3584 to_agent: &str,
3585 include_acknowledged: bool,
3586) -> Result<Vec<SharedMemory>> {
3587 let query = if include_acknowledged {
3588 "SELECT id, memory_id, from_agent, to_agent, message, acknowledged, acknowledged_at, created_at
3589 FROM shared_memories WHERE to_agent = ?
3590 ORDER BY created_at DESC"
3591 } else {
3592 "SELECT id, memory_id, from_agent, to_agent, message, acknowledged, acknowledged_at, created_at
3593 FROM shared_memories WHERE to_agent = ? AND acknowledged = 0
3594 ORDER BY created_at DESC"
3595 };
3596
3597 let mut stmt = conn.prepare(query)?;
3598 let shares = stmt
3599 .query_map(params![to_agent], |row| {
3600 let created_str: String = row.get(7)?;
3601 let ack_str: Option<String> = row.get(6)?;
3602 Ok(SharedMemory {
3603 id: row.get(0)?,
3604 memory_id: row.get(1)?,
3605 from_agent: row.get(2)?,
3606 to_agent: row.get(3)?,
3607 message: row.get(4)?,
3608 acknowledged: row.get(5)?,
3609 acknowledged_at: ack_str.and_then(|s| {
3610 DateTime::parse_from_rfc3339(&s)
3611 .ok()
3612 .map(|dt| dt.with_timezone(&Utc))
3613 }),
3614 created_at: DateTime::parse_from_rfc3339(&created_str)
3615 .map(|dt| dt.with_timezone(&Utc))
3616 .unwrap_or_else(|_| Utc::now()),
3617 })
3618 })?
3619 .collect::<std::result::Result<Vec<_>, _>>()?;
3620
3621 Ok(shares)
3622}
3623
3624pub fn acknowledge_share(conn: &Connection, share_id: i64, agent_id: &str) -> Result<()> {
3626 let now = Utc::now();
3627
3628 let affected = conn.execute(
3629 "UPDATE shared_memories SET acknowledged = 1, acknowledged_at = ?
3630 WHERE id = ? AND to_agent = ?",
3631 params![now.to_rfc3339(), share_id, agent_id],
3632 )?;
3633
3634 if affected == 0 {
3635 return Err(EngramError::NotFound(share_id));
3636 }
3637
3638 Ok(())
3639}
3640
3641pub fn search_by_identity(
3647 conn: &Connection,
3648 identity: &str,
3649 workspace: Option<&str>,
3650 limit: Option<usize>,
3651) -> Result<Vec<Memory>> {
3652 let limit = limit.unwrap_or(50);
3653 let now = Utc::now().to_rfc3339();
3654
3655 let pattern = format!("%{}%", identity);
3658
3659 let query = if workspace.is_some() {
3660 "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
3661 m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
3662 m.visibility, m.version, m.has_embedding, m.metadata,
3663 m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash
3664 FROM memories m
3665 LEFT JOIN memory_tags mt ON m.id = mt.memory_id
3666 LEFT JOIN tags t ON mt.tag_id = t.id
3667 WHERE m.workspace = ? AND (m.content LIKE ? OR t.name LIKE ?)
3668 AND m.valid_to IS NULL
3669 AND (m.expires_at IS NULL OR m.expires_at > ?)
3670 ORDER BY m.importance DESC, m.created_at DESC
3671 LIMIT ?"
3672 } else {
3673 "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
3674 m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
3675 m.visibility, m.version, m.has_embedding, m.metadata,
3676 m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash
3677 FROM memories m
3678 LEFT JOIN memory_tags mt ON m.id = mt.memory_id
3679 LEFT JOIN tags t ON mt.tag_id = t.id
3680 WHERE (m.content LIKE ? OR t.name LIKE ?)
3681 AND m.valid_to IS NULL
3682 AND (m.expires_at IS NULL OR m.expires_at > ?)
3683 ORDER BY m.importance DESC, m.created_at DESC
3684 LIMIT ?"
3685 };
3686
3687 let mut stmt = conn.prepare(query)?;
3688
3689 let memories = if let Some(ws) = workspace {
3690 stmt.query_map(
3691 params![ws, &pattern, &pattern, &now, limit as i64],
3692 memory_from_row,
3693 )?
3694 .collect::<std::result::Result<Vec<_>, _>>()?
3695 } else {
3696 stmt.query_map(
3697 params![&pattern, &pattern, &now, limit as i64],
3698 memory_from_row,
3699 )?
3700 .collect::<std::result::Result<Vec<_>, _>>()?
3701 };
3702
3703 Ok(memories)
3704}
3705
3706pub fn search_sessions(
3708 conn: &Connection,
3709 query_text: &str,
3710 session_id: Option<&str>,
3711 workspace: Option<&str>,
3712 limit: Option<usize>,
3713) -> Result<Vec<Memory>> {
3714 let limit = limit.unwrap_or(20);
3715 let now = Utc::now().to_rfc3339();
3716 let pattern = format!("%{}%", query_text);
3717
3718 let mut conditions = vec![
3721 "m.memory_type = 'transcript_chunk'",
3722 "m.valid_to IS NULL",
3723 "(m.expires_at IS NULL OR m.expires_at > ?)",
3724 ];
3725 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
3726
3727 let use_tag_join = session_id.is_some();
3729 if let Some(sid) = session_id {
3730 let tag_name = format!("session:{}", sid);
3731 conditions.push("t.name = ?");
3732 params_vec.push(Box::new(tag_name));
3733 }
3734
3735 if let Some(ws) = workspace {
3737 conditions.push("m.workspace = ?");
3738 params_vec.push(Box::new(ws.to_string()));
3739 }
3740
3741 conditions.push("m.content LIKE ?");
3743 params_vec.push(Box::new(pattern));
3744
3745 params_vec.push(Box::new(limit as i64));
3747
3748 let join_clause = if use_tag_join {
3750 "JOIN memory_tags mt ON m.id = mt.memory_id JOIN tags t ON mt.tag_id = t.id"
3751 } else {
3752 ""
3753 };
3754
3755 let query = format!(
3756 "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
3757 m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
3758 m.visibility, m.version, m.has_embedding, m.metadata,
3759 m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash
3760 FROM memories m {} WHERE {} ORDER BY m.created_at DESC LIMIT ?",
3761 join_clause,
3762 conditions.join(" AND ")
3763 );
3764
3765 let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|p| p.as_ref()).collect();
3766 let mut stmt = conn.prepare(&query)?;
3767 let memories = stmt
3768 .query_map(params_refs.as_slice(), memory_from_row)?
3769 .collect::<std::result::Result<Vec<_>, _>>()?;
3770
3771 Ok(memories)
3772}
3773
3774#[cfg(test)]
3775mod tests {
3776 use super::*;
3777 use crate::storage::Storage;
3778 use serde_json::json;
3779 use std::collections::HashMap;
3780
3781 #[test]
3782 fn test_list_memories_metadata_filter_types() {
3783 let storage = Storage::open_in_memory().unwrap();
3784
3785 storage
3786 .with_connection(|conn| {
3787 let mut metadata1 = HashMap::new();
3788 metadata1.insert("status".to_string(), json!("active"));
3789 metadata1.insert("count".to_string(), json!(3));
3790 metadata1.insert("flag".to_string(), json!(true));
3791
3792 let mut metadata2 = HashMap::new();
3793 metadata2.insert("status".to_string(), json!("inactive"));
3794 metadata2.insert("count".to_string(), json!(5));
3795 metadata2.insert("flag".to_string(), json!(false));
3796 metadata2.insert("optional".to_string(), json!("set"));
3797
3798 let memory1 = create_memory(
3799 conn,
3800 &CreateMemoryInput {
3801 content: "First".to_string(),
3802 memory_type: MemoryType::Note,
3803 tags: vec![],
3804 metadata: metadata1,
3805 importance: None,
3806 scope: Default::default(),
3807 workspace: None,
3808 tier: Default::default(),
3809 defer_embedding: true,
3810 ttl_seconds: None,
3811 dedup_mode: Default::default(),
3812 dedup_threshold: None,
3813 event_time: None,
3814 event_duration_seconds: None,
3815 trigger_pattern: None,
3816 summary_of_id: None,
3817 },
3818 )?;
3819 let memory2 = create_memory(
3820 conn,
3821 &CreateMemoryInput {
3822 content: "Second".to_string(),
3823 memory_type: MemoryType::Note,
3824 tags: vec![],
3825 metadata: metadata2,
3826 importance: None,
3827 scope: Default::default(),
3828 workspace: None,
3829 tier: Default::default(),
3830 defer_embedding: true,
3831 ttl_seconds: None,
3832 dedup_mode: Default::default(),
3833 dedup_threshold: None,
3834 event_time: None,
3835 event_duration_seconds: None,
3836 trigger_pattern: None,
3837 summary_of_id: None,
3838 },
3839 )?;
3840
3841 let mut filter = HashMap::new();
3842 filter.insert("status".to_string(), json!("active"));
3843 let results = list_memories(
3844 conn,
3845 &ListOptions {
3846 metadata_filter: Some(filter),
3847 ..Default::default()
3848 },
3849 )?;
3850 assert_eq!(results.len(), 1);
3851 assert_eq!(results[0].id, memory1.id);
3852
3853 let mut filter = HashMap::new();
3854 filter.insert("count".to_string(), json!(5));
3855 let results = list_memories(
3856 conn,
3857 &ListOptions {
3858 metadata_filter: Some(filter),
3859 ..Default::default()
3860 },
3861 )?;
3862 assert_eq!(results.len(), 1);
3863 assert_eq!(results[0].id, memory2.id);
3864
3865 let mut filter = HashMap::new();
3866 filter.insert("flag".to_string(), json!(true));
3867 let results = list_memories(
3868 conn,
3869 &ListOptions {
3870 metadata_filter: Some(filter),
3871 ..Default::default()
3872 },
3873 )?;
3874 assert_eq!(results.len(), 1);
3875 assert_eq!(results[0].id, memory1.id);
3876
3877 let mut filter = HashMap::new();
3878 filter.insert("optional".to_string(), serde_json::Value::Null);
3879 let results = list_memories(
3880 conn,
3881 &ListOptions {
3882 metadata_filter: Some(filter),
3883 ..Default::default()
3884 },
3885 )?;
3886 assert_eq!(results.len(), 1);
3887 assert_eq!(results[0].id, memory1.id);
3888
3889 Ok(())
3890 })
3891 .unwrap();
3892 }
3893
3894 #[test]
3895 fn test_memory_scope_isolation() {
3896 use crate::types::MemoryScope;
3897
3898 let storage = Storage::open_in_memory().unwrap();
3899
3900 storage
3901 .with_connection(|conn| {
3902 let user1_memory = create_memory(
3904 conn,
3905 &CreateMemoryInput {
3906 content: "User 1 memory".to_string(),
3907 memory_type: MemoryType::Note,
3908 tags: vec!["test".to_string()],
3909 metadata: HashMap::new(),
3910 importance: None,
3911 scope: MemoryScope::user("user-1"),
3912 workspace: None,
3913 tier: Default::default(),
3914 defer_embedding: true,
3915 ttl_seconds: None,
3916 dedup_mode: Default::default(),
3917 dedup_threshold: None,
3918 event_time: None,
3919 event_duration_seconds: None,
3920 trigger_pattern: None,
3921 summary_of_id: None,
3922 },
3923 )?;
3924
3925 let user2_memory = create_memory(
3927 conn,
3928 &CreateMemoryInput {
3929 content: "User 2 memory".to_string(),
3930 memory_type: MemoryType::Note,
3931 tags: vec!["test".to_string()],
3932 metadata: HashMap::new(),
3933 importance: None,
3934 scope: MemoryScope::user("user-2"),
3935 workspace: None,
3936 tier: Default::default(),
3937 defer_embedding: true,
3938 ttl_seconds: None,
3939 dedup_mode: Default::default(),
3940 dedup_threshold: None,
3941 event_time: None,
3942 event_duration_seconds: None,
3943 trigger_pattern: None,
3944 summary_of_id: None,
3945 },
3946 )?;
3947
3948 let session_memory = create_memory(
3950 conn,
3951 &CreateMemoryInput {
3952 content: "Session memory".to_string(),
3953 memory_type: MemoryType::Note,
3954 tags: vec!["test".to_string()],
3955 metadata: HashMap::new(),
3956 importance: None,
3957 scope: MemoryScope::session("session-abc"),
3958 workspace: None,
3959 tier: Default::default(),
3960 defer_embedding: true,
3961 ttl_seconds: None,
3962 dedup_mode: Default::default(),
3963 dedup_threshold: None,
3964 event_time: None,
3965 event_duration_seconds: None,
3966 trigger_pattern: None,
3967 summary_of_id: None,
3968 },
3969 )?;
3970
3971 let global_memory = create_memory(
3973 conn,
3974 &CreateMemoryInput {
3975 content: "Global memory".to_string(),
3976 memory_type: MemoryType::Note,
3977 tags: vec!["test".to_string()],
3978 metadata: HashMap::new(),
3979 importance: None,
3980 scope: MemoryScope::Global,
3981 workspace: None,
3982 tier: Default::default(),
3983 defer_embedding: true,
3984 ttl_seconds: None,
3985 dedup_mode: Default::default(),
3986 dedup_threshold: None,
3987 event_time: None,
3988 event_duration_seconds: None,
3989 trigger_pattern: None,
3990 summary_of_id: None,
3991 },
3992 )?;
3993
3994 let all_results = list_memories(conn, &ListOptions::default())?;
3996 assert_eq!(all_results.len(), 4);
3997
3998 let user1_results = list_memories(
4000 conn,
4001 &ListOptions {
4002 scope: Some(MemoryScope::user("user-1")),
4003 ..Default::default()
4004 },
4005 )?;
4006 assert_eq!(user1_results.len(), 1);
4007 assert_eq!(user1_results[0].id, user1_memory.id);
4008 assert_eq!(user1_results[0].scope, MemoryScope::user("user-1"));
4009
4010 let user2_results = list_memories(
4012 conn,
4013 &ListOptions {
4014 scope: Some(MemoryScope::user("user-2")),
4015 ..Default::default()
4016 },
4017 )?;
4018 assert_eq!(user2_results.len(), 1);
4019 assert_eq!(user2_results[0].id, user2_memory.id);
4020
4021 let session_results = list_memories(
4023 conn,
4024 &ListOptions {
4025 scope: Some(MemoryScope::session("session-abc")),
4026 ..Default::default()
4027 },
4028 )?;
4029 assert_eq!(session_results.len(), 1);
4030 assert_eq!(session_results[0].id, session_memory.id);
4031
4032 let global_results = list_memories(
4034 conn,
4035 &ListOptions {
4036 scope: Some(MemoryScope::Global),
4037 ..Default::default()
4038 },
4039 )?;
4040 assert_eq!(global_results.len(), 1);
4041 assert_eq!(global_results[0].id, global_memory.id);
4042
4043 let retrieved = get_memory(conn, user1_memory.id)?;
4045 assert_eq!(retrieved.scope, MemoryScope::user("user-1"));
4046
4047 Ok(())
4048 })
4049 .unwrap();
4050 }
4051
4052 #[test]
4053 fn test_memory_scope_can_access() {
4054 use crate::types::MemoryScope;
4055
4056 assert!(MemoryScope::Global.can_access(&MemoryScope::user("user-1")));
4058 assert!(MemoryScope::Global.can_access(&MemoryScope::session("session-1")));
4059 assert!(MemoryScope::Global.can_access(&MemoryScope::agent("agent-1")));
4060 assert!(MemoryScope::Global.can_access(&MemoryScope::Global));
4061
4062 assert!(MemoryScope::user("user-1").can_access(&MemoryScope::user("user-1")));
4064 assert!(MemoryScope::session("s1").can_access(&MemoryScope::session("s1")));
4065 assert!(MemoryScope::agent("a1").can_access(&MemoryScope::agent("a1")));
4066
4067 assert!(!MemoryScope::user("user-1").can_access(&MemoryScope::user("user-2")));
4069 assert!(!MemoryScope::session("s1").can_access(&MemoryScope::session("s2")));
4070 assert!(!MemoryScope::agent("a1").can_access(&MemoryScope::agent("a2")));
4071
4072 assert!(!MemoryScope::user("user-1").can_access(&MemoryScope::session("s1")));
4074 assert!(!MemoryScope::session("s1").can_access(&MemoryScope::agent("a1")));
4075
4076 assert!(MemoryScope::user("user-1").can_access(&MemoryScope::Global));
4078 assert!(MemoryScope::session("s1").can_access(&MemoryScope::Global));
4079 assert!(MemoryScope::agent("a1").can_access(&MemoryScope::Global));
4080 }
4081
4082 #[test]
4083 fn test_memory_ttl_creation() {
4084 let storage = Storage::open_in_memory().unwrap();
4085
4086 storage
4087 .with_transaction(|conn| {
4088 let memory = create_memory(
4090 conn,
4091 &CreateMemoryInput {
4092 content: "Temporary memory".to_string(),
4093 memory_type: MemoryType::Note,
4094 tags: vec![],
4095 metadata: HashMap::new(),
4096 importance: None,
4097 scope: Default::default(),
4098 workspace: None,
4099 tier: MemoryTier::Daily, defer_embedding: true,
4101 ttl_seconds: Some(3600), dedup_mode: Default::default(),
4103 dedup_threshold: None,
4104 event_time: None,
4105 event_duration_seconds: None,
4106 trigger_pattern: None,
4107 summary_of_id: None,
4108 },
4109 )?;
4110
4111 assert!(memory.expires_at.is_some());
4113 assert_eq!(memory.tier, MemoryTier::Daily);
4114 let expires_at = memory.expires_at.unwrap();
4115 let now = Utc::now();
4116
4117 let diff = (expires_at - now).num_seconds();
4119 assert!(
4120 (3595..=3605).contains(&diff),
4121 "Expected ~3600 seconds, got {}",
4122 diff
4123 );
4124
4125 let permanent = create_memory(
4127 conn,
4128 &CreateMemoryInput {
4129 content: "Permanent memory".to_string(),
4130 memory_type: MemoryType::Note,
4131 tags: vec![],
4132 metadata: HashMap::new(),
4133 importance: None,
4134 scope: Default::default(),
4135 workspace: None,
4136 tier: Default::default(),
4137 defer_embedding: true,
4138 ttl_seconds: None,
4139 dedup_mode: Default::default(),
4140 dedup_threshold: None,
4141 event_time: None,
4142 event_duration_seconds: None,
4143 trigger_pattern: None,
4144 summary_of_id: None,
4145 },
4146 )?;
4147
4148 assert!(permanent.expires_at.is_none());
4150
4151 Ok(())
4152 })
4153 .unwrap();
4154 }
4155
4156 #[test]
4157 fn test_expired_memories_excluded_from_queries() {
4158 let storage = Storage::open_in_memory().unwrap();
4159
4160 storage
4161 .with_transaction(|conn| {
4162 let memory1 = create_memory(
4164 conn,
4165 &CreateMemoryInput {
4166 content: "Memory to expire".to_string(),
4167 memory_type: MemoryType::Note,
4168 tags: vec!["test".to_string()],
4169 metadata: HashMap::new(),
4170 importance: None,
4171 scope: Default::default(),
4172 workspace: None,
4173 tier: MemoryTier::Daily, defer_embedding: true,
4175 ttl_seconds: Some(3600), dedup_mode: Default::default(),
4177 dedup_threshold: None,
4178 event_time: None,
4179 event_duration_seconds: None,
4180 trigger_pattern: None,
4181 summary_of_id: None,
4182 },
4183 )?;
4184
4185 let active = create_memory(
4187 conn,
4188 &CreateMemoryInput {
4189 content: "Active memory".to_string(),
4190 memory_type: MemoryType::Note,
4191 tags: vec!["test".to_string()],
4192 metadata: HashMap::new(),
4193 importance: None,
4194 scope: Default::default(),
4195 workspace: None,
4196 tier: Default::default(),
4197 defer_embedding: true,
4198 ttl_seconds: None,
4199 dedup_mode: Default::default(),
4200 dedup_threshold: None,
4201 event_time: None,
4202 event_duration_seconds: None,
4203 trigger_pattern: None,
4204 summary_of_id: None,
4205 },
4206 )?;
4207
4208 let results = list_memories(conn, &ListOptions::default())?;
4210 assert_eq!(results.len(), 2);
4211
4212 let past = (Utc::now() - chrono::Duration::hours(1)).to_rfc3339();
4214 conn.execute(
4215 "UPDATE memories SET expires_at = ? WHERE id = ?",
4216 params![past, memory1.id],
4217 )?;
4218
4219 let results = list_memories(conn, &ListOptions::default())?;
4221 assert_eq!(results.len(), 1);
4222 assert_eq!(results[0].id, active.id);
4223
4224 let get_result = get_memory(conn, memory1.id);
4226 assert!(get_result.is_err());
4227
4228 let get_result = get_memory(conn, active.id);
4230 assert!(get_result.is_ok());
4231
4232 Ok(())
4233 })
4234 .unwrap();
4235 }
4236
4237 #[test]
4238 fn test_set_memory_expiration() {
4239 let storage = Storage::open_in_memory().unwrap();
4240
4241 storage
4242 .with_transaction(|conn| {
4243 let memory = create_memory(
4245 conn,
4246 &CreateMemoryInput {
4247 content: "Initially permanent".to_string(),
4248 memory_type: MemoryType::Note,
4249 tags: vec![],
4250 metadata: HashMap::new(),
4251 importance: None,
4252 scope: Default::default(),
4253 workspace: None,
4254 tier: Default::default(),
4255 defer_embedding: true,
4256 ttl_seconds: None,
4257 dedup_mode: Default::default(),
4258 dedup_threshold: None,
4259 event_time: None,
4260 event_duration_seconds: None,
4261 trigger_pattern: None,
4262 summary_of_id: None,
4263 },
4264 )?;
4265
4266 assert!(memory.expires_at.is_none());
4267
4268 let updated = set_memory_expiration(conn, memory.id, Some(1800))?;
4270 assert!(updated.expires_at.is_some());
4271
4272 let permanent_again = set_memory_expiration(conn, memory.id, Some(0))?;
4274 assert!(permanent_again.expires_at.is_none());
4275
4276 Ok(())
4277 })
4278 .unwrap();
4279 }
4280
4281 #[test]
4282 fn test_cleanup_expired_memories() {
4283 let storage = Storage::open_in_memory().unwrap();
4284
4285 storage
4286 .with_transaction(|conn| {
4287 let mut expired_ids = vec![];
4289 for i in 0..3 {
4290 let mem = create_memory(
4291 conn,
4292 &CreateMemoryInput {
4293 content: format!("To expire {}", i),
4294 memory_type: MemoryType::Note,
4295 tags: vec![],
4296 metadata: HashMap::new(),
4297 importance: None,
4298 scope: Default::default(),
4299 workspace: None,
4300 tier: MemoryTier::Daily, defer_embedding: true,
4302 ttl_seconds: Some(3600), dedup_mode: Default::default(),
4304 dedup_threshold: None,
4305 event_time: None,
4306 event_duration_seconds: None,
4307 trigger_pattern: None,
4308 summary_of_id: None,
4309 },
4310 )?;
4311 expired_ids.push(mem.id);
4312 }
4313
4314 for i in 0..2 {
4316 create_memory(
4317 conn,
4318 &CreateMemoryInput {
4319 content: format!("Active {}", i),
4320 memory_type: MemoryType::Note,
4321 tags: vec![],
4322 metadata: HashMap::new(),
4323 importance: None,
4324 scope: Default::default(),
4325 workspace: None,
4326 tier: Default::default(),
4327 defer_embedding: true,
4328 ttl_seconds: None,
4329 dedup_mode: Default::default(),
4330 dedup_threshold: None,
4331 event_time: None,
4332 event_duration_seconds: None,
4333 trigger_pattern: None,
4334 summary_of_id: None,
4335 },
4336 )?;
4337 }
4338
4339 let results = list_memories(conn, &ListOptions::default())?;
4341 assert_eq!(results.len(), 5);
4342
4343 let past = (Utc::now() - chrono::Duration::hours(1)).to_rfc3339();
4345 for id in &expired_ids {
4346 conn.execute(
4347 "UPDATE memories SET expires_at = ? WHERE id = ?",
4348 params![past, id],
4349 )?;
4350 }
4351
4352 let expired_count = count_expired_memories(conn)?;
4354 assert_eq!(expired_count, 3);
4355
4356 let deleted = cleanup_expired_memories(conn)?;
4358 assert_eq!(deleted, 3);
4359
4360 let remaining = list_memories(conn, &ListOptions::default())?;
4362 assert_eq!(remaining.len(), 2);
4363
4364 let expired_count = count_expired_memories(conn)?;
4366 assert_eq!(expired_count, 0);
4367
4368 Ok(())
4369 })
4370 .unwrap();
4371 }
4372
4373 #[test]
4376 fn test_content_hash_computation() {
4377 let hash1 = compute_content_hash("Hello World");
4379 let hash2 = compute_content_hash("hello world"); let hash3 = compute_content_hash(" hello world "); let hash4 = compute_content_hash("Hello World!"); assert_eq!(hash1, hash2);
4385 assert_eq!(hash2, hash3);
4386
4387 assert_ne!(hash1, hash4);
4389
4390 assert!(hash1.starts_with("sha256:"));
4392 }
4393
4394 #[test]
4395 fn test_dedup_mode_reject() {
4396 use crate::types::DedupMode;
4397
4398 let storage = Storage::open_in_memory().unwrap();
4399
4400 storage
4401 .with_transaction(|conn| {
4402 let _memory1 = create_memory(
4404 conn,
4405 &CreateMemoryInput {
4406 content: "Unique content for testing".to_string(),
4407 memory_type: MemoryType::Note,
4408 tags: vec![],
4409 metadata: HashMap::new(),
4410 importance: None,
4411 scope: Default::default(),
4412 workspace: None,
4413 tier: Default::default(),
4414 defer_embedding: true,
4415 ttl_seconds: None,
4416 dedup_mode: DedupMode::Allow, dedup_threshold: None,
4418 event_time: None,
4419 event_duration_seconds: None,
4420 trigger_pattern: None,
4421 summary_of_id: None,
4422 },
4423 )?;
4424
4425 let result = create_memory(
4427 conn,
4428 &CreateMemoryInput {
4429 content: "Unique content for testing".to_string(), memory_type: MemoryType::Note,
4431 tags: vec!["new-tag".to_string()],
4432 metadata: HashMap::new(),
4433 importance: None,
4434 scope: Default::default(),
4435 workspace: None,
4436 tier: Default::default(),
4437 defer_embedding: true,
4438 ttl_seconds: None,
4439 dedup_mode: DedupMode::Reject,
4440 dedup_threshold: None,
4441 event_time: None,
4442 event_duration_seconds: None,
4443 trigger_pattern: None,
4444 summary_of_id: None,
4445 },
4446 );
4447
4448 assert!(result.is_err());
4450 let err = result.unwrap_err();
4451 assert!(matches!(err, crate::error::EngramError::Duplicate { .. }));
4452
4453 Ok(())
4454 })
4455 .unwrap();
4456 }
4457
4458 #[test]
4459 fn test_dedup_mode_skip() {
4460 use crate::types::DedupMode;
4461
4462 let storage = Storage::open_in_memory().unwrap();
4463
4464 storage
4465 .with_transaction(|conn| {
4466 let memory1 = create_memory(
4468 conn,
4469 &CreateMemoryInput {
4470 content: "Skip test content".to_string(),
4471 memory_type: MemoryType::Note,
4472 tags: vec!["original".to_string()],
4473 metadata: HashMap::new(),
4474 importance: Some(0.5),
4475 scope: Default::default(),
4476 workspace: None,
4477 tier: Default::default(),
4478 defer_embedding: true,
4479 ttl_seconds: None,
4480 dedup_mode: DedupMode::Allow,
4481 dedup_threshold: None,
4482 event_time: None,
4483 event_duration_seconds: None,
4484 trigger_pattern: None,
4485 summary_of_id: None,
4486 },
4487 )?;
4488
4489 let memory2 = create_memory(
4491 conn,
4492 &CreateMemoryInput {
4493 content: "Skip test content".to_string(), memory_type: MemoryType::Note,
4495 tags: vec!["new-tag".to_string()], metadata: HashMap::new(),
4497 importance: Some(0.9), scope: Default::default(),
4499 workspace: None,
4500 tier: Default::default(),
4501 defer_embedding: true,
4502 ttl_seconds: None,
4503 dedup_mode: DedupMode::Skip,
4504 dedup_threshold: None,
4505 event_time: None,
4506 event_duration_seconds: None,
4507 trigger_pattern: None,
4508 summary_of_id: None,
4509 },
4510 )?;
4511
4512 assert_eq!(memory1.id, memory2.id);
4514 assert_eq!(memory2.tags, vec!["original".to_string()]); assert!((memory2.importance - 0.5).abs() < 0.01); let all = list_memories(conn, &ListOptions::default())?;
4519 assert_eq!(all.len(), 1);
4520
4521 Ok(())
4522 })
4523 .unwrap();
4524 }
4525
4526 #[test]
4527 fn test_dedup_mode_merge() {
4528 use crate::types::DedupMode;
4529
4530 let storage = Storage::open_in_memory().unwrap();
4531
4532 storage
4533 .with_transaction(|conn| {
4534 let memory1 = create_memory(
4536 conn,
4537 &CreateMemoryInput {
4538 content: "Merge test content".to_string(),
4539 memory_type: MemoryType::Note,
4540 tags: vec!["tag1".to_string(), "tag2".to_string()],
4541 metadata: {
4542 let mut m = HashMap::new();
4543 m.insert("key1".to_string(), serde_json::json!("value1"));
4544 m
4545 },
4546 importance: Some(0.5),
4547 scope: Default::default(),
4548 workspace: None,
4549 tier: Default::default(),
4550 defer_embedding: true,
4551 ttl_seconds: None,
4552 dedup_mode: DedupMode::Allow,
4553 dedup_threshold: None,
4554 event_time: None,
4555 event_duration_seconds: None,
4556 trigger_pattern: None,
4557 summary_of_id: None,
4558 },
4559 )?;
4560
4561 let memory2 = create_memory(
4563 conn,
4564 &CreateMemoryInput {
4565 content: "Merge test content".to_string(), memory_type: MemoryType::Note,
4567 tags: vec!["tag2".to_string(), "tag3".to_string()], metadata: {
4569 let mut m = HashMap::new();
4570 m.insert("key2".to_string(), serde_json::json!("value2"));
4571 m
4572 },
4573 importance: Some(0.8), scope: Default::default(),
4575 workspace: None,
4576 tier: Default::default(),
4577 defer_embedding: true,
4578 ttl_seconds: None,
4579 dedup_mode: DedupMode::Merge,
4580 dedup_threshold: None,
4581 event_time: None,
4582 event_duration_seconds: None,
4583 trigger_pattern: None,
4584 summary_of_id: None,
4585 },
4586 )?;
4587
4588 assert_eq!(memory1.id, memory2.id);
4590
4591 assert!(memory2.tags.contains(&"tag1".to_string()));
4593 assert!(memory2.tags.contains(&"tag2".to_string()));
4594 assert!(memory2.tags.contains(&"tag3".to_string()));
4595 assert_eq!(memory2.tags.len(), 3);
4596
4597 assert!(memory2.metadata.contains_key("key1"));
4599 assert!(memory2.metadata.contains_key("key2"));
4600
4601 let all = list_memories(conn, &ListOptions::default())?;
4603 assert_eq!(all.len(), 1);
4604
4605 Ok(())
4606 })
4607 .unwrap();
4608 }
4609
4610 #[test]
4611 fn test_dedup_mode_allow() {
4612 use crate::types::DedupMode;
4613
4614 let storage = Storage::open_in_memory().unwrap();
4615
4616 storage
4617 .with_transaction(|conn| {
4618 let memory1 = create_memory(
4620 conn,
4621 &CreateMemoryInput {
4622 content: "Allow duplicates content".to_string(),
4623 memory_type: MemoryType::Note,
4624 tags: vec![],
4625 metadata: HashMap::new(),
4626 importance: None,
4627 scope: Default::default(),
4628 workspace: None,
4629 tier: Default::default(),
4630 defer_embedding: true,
4631 ttl_seconds: None,
4632 dedup_mode: DedupMode::Allow,
4633 dedup_threshold: None,
4634 event_time: None,
4635 event_duration_seconds: None,
4636 trigger_pattern: None,
4637 summary_of_id: None,
4638 },
4639 )?;
4640
4641 let memory2 = create_memory(
4643 conn,
4644 &CreateMemoryInput {
4645 content: "Allow duplicates content".to_string(), memory_type: MemoryType::Note,
4647 tags: vec![],
4648 metadata: HashMap::new(),
4649 importance: None,
4650 scope: Default::default(),
4651 workspace: None,
4652 tier: Default::default(),
4653 defer_embedding: true,
4654 ttl_seconds: None,
4655 dedup_mode: DedupMode::Allow,
4656 dedup_threshold: None,
4657 event_time: None,
4658 event_duration_seconds: None,
4659 trigger_pattern: None,
4660 summary_of_id: None,
4661 },
4662 )?;
4663
4664 assert_ne!(memory1.id, memory2.id);
4666
4667 let all = list_memories(conn, &ListOptions::default())?;
4669 assert_eq!(all.len(), 2);
4670
4671 assert_eq!(memory1.content_hash, memory2.content_hash);
4673
4674 Ok(())
4675 })
4676 .unwrap();
4677 }
4678
4679 #[test]
4680 fn test_find_duplicates_exact_hash() {
4681 use crate::types::DedupMode;
4682
4683 let storage = Storage::open_in_memory().unwrap();
4684
4685 storage
4686 .with_transaction(|conn| {
4687 let _memory1 = create_memory(
4689 conn,
4690 &CreateMemoryInput {
4691 content: "Duplicate content".to_string(),
4692 memory_type: MemoryType::Note,
4693 tags: vec!["first".to_string()],
4694 metadata: HashMap::new(),
4695 importance: None,
4696 scope: Default::default(),
4697 workspace: None,
4698 tier: Default::default(),
4699 defer_embedding: true,
4700 ttl_seconds: None,
4701 dedup_mode: DedupMode::Allow,
4702 dedup_threshold: None,
4703 event_time: None,
4704 event_duration_seconds: None,
4705 trigger_pattern: None,
4706 summary_of_id: None,
4707 },
4708 )?;
4709
4710 let _memory2 = create_memory(
4711 conn,
4712 &CreateMemoryInput {
4713 content: "Duplicate content".to_string(), memory_type: MemoryType::Note,
4715 tags: vec!["second".to_string()],
4716 metadata: HashMap::new(),
4717 importance: None,
4718 scope: Default::default(),
4719 workspace: None,
4720 tier: Default::default(),
4721 defer_embedding: true,
4722 ttl_seconds: None,
4723 dedup_mode: DedupMode::Allow,
4724 dedup_threshold: None,
4725 event_time: None,
4726 event_duration_seconds: None,
4727 trigger_pattern: None,
4728 summary_of_id: None,
4729 },
4730 )?;
4731
4732 let _memory3 = create_memory(
4734 conn,
4735 &CreateMemoryInput {
4736 content: "Unique content".to_string(),
4737 memory_type: MemoryType::Note,
4738 tags: vec![],
4739 metadata: HashMap::new(),
4740 importance: None,
4741 scope: Default::default(),
4742 workspace: None,
4743 tier: Default::default(),
4744 defer_embedding: true,
4745 ttl_seconds: None,
4746 dedup_mode: DedupMode::Allow,
4747 dedup_threshold: None,
4748 event_time: None,
4749 event_duration_seconds: None,
4750 trigger_pattern: None,
4751 summary_of_id: None,
4752 },
4753 )?;
4754
4755 let duplicates = find_duplicates(conn, 0.9)?;
4757
4758 assert_eq!(duplicates.len(), 1);
4760
4761 assert_eq!(duplicates[0].match_type, DuplicateMatchType::ExactHash);
4763 assert!((duplicates[0].similarity_score - 1.0).abs() < 0.01);
4764
4765 Ok(())
4766 })
4767 .unwrap();
4768 }
4769
4770 #[test]
4771 fn test_content_hash_stored_on_create() {
4772 let storage = Storage::open_in_memory().unwrap();
4773
4774 storage
4775 .with_transaction(|conn| {
4776 let memory = create_memory(
4777 conn,
4778 &CreateMemoryInput {
4779 content: "Test content for hash".to_string(),
4780 memory_type: MemoryType::Note,
4781 tags: vec![],
4782 metadata: HashMap::new(),
4783 importance: None,
4784 scope: Default::default(),
4785 workspace: None,
4786 tier: Default::default(),
4787 defer_embedding: true,
4788 ttl_seconds: None,
4789 dedup_mode: Default::default(),
4790 dedup_threshold: None,
4791 event_time: None,
4792 event_duration_seconds: None,
4793 trigger_pattern: None,
4794 summary_of_id: None,
4795 },
4796 )?;
4797
4798 assert!(memory.content_hash.is_some());
4800 let hash = memory.content_hash.as_ref().unwrap();
4801 assert!(hash.starts_with("sha256:"));
4802
4803 let fetched = get_memory(conn, memory.id)?;
4805 assert_eq!(fetched.content_hash, memory.content_hash);
4806
4807 Ok(())
4808 })
4809 .unwrap();
4810 }
4811
4812 #[test]
4813 fn test_update_memory_recalculates_hash() {
4814 let storage = Storage::open_in_memory().unwrap();
4815
4816 storage
4817 .with_transaction(|conn| {
4818 let memory = create_memory(
4820 conn,
4821 &CreateMemoryInput {
4822 content: "Original content".to_string(),
4823 memory_type: MemoryType::Note,
4824 tags: vec![],
4825 metadata: HashMap::new(),
4826 importance: None,
4827 scope: Default::default(),
4828 workspace: None,
4829 tier: Default::default(),
4830 defer_embedding: true,
4831 ttl_seconds: None,
4832 dedup_mode: Default::default(),
4833 dedup_threshold: None,
4834 event_time: None,
4835 event_duration_seconds: None,
4836 trigger_pattern: None,
4837 summary_of_id: None,
4838 },
4839 )?;
4840
4841 let original_hash = memory.content_hash.clone();
4842
4843 let updated = update_memory(
4845 conn,
4846 memory.id,
4847 &UpdateMemoryInput {
4848 content: Some("Updated content".to_string()),
4849 memory_type: None,
4850 tags: None,
4851 metadata: None,
4852 importance: None,
4853 scope: None,
4854 ttl_seconds: None,
4855 event_time: None,
4856 trigger_pattern: None,
4857 },
4858 )?;
4859
4860 assert_ne!(updated.content_hash, original_hash);
4862 assert!(updated.content_hash.is_some());
4863
4864 let expected_hash = compute_content_hash("Updated content");
4866 assert_eq!(updated.content_hash.as_ref().unwrap(), &expected_hash);
4867
4868 Ok(())
4869 })
4870 .unwrap();
4871 }
4872
4873 #[test]
4874 fn test_dedup_scope_isolation() {
4875 use crate::types::{DedupMode, MemoryScope};
4876
4877 let storage = Storage::open_in_memory().unwrap();
4878
4879 storage
4880 .with_transaction(|conn| {
4881 let _user1_memory = create_memory(
4883 conn,
4884 &CreateMemoryInput {
4885 content: "Shared content".to_string(),
4886 memory_type: MemoryType::Note,
4887 tags: vec!["user1".to_string()],
4888 metadata: HashMap::new(),
4889 importance: None,
4890 scope: MemoryScope::user("user-1"),
4891 workspace: None,
4892 tier: Default::default(),
4893 defer_embedding: true,
4894 ttl_seconds: None,
4895 dedup_mode: DedupMode::Allow,
4896 dedup_threshold: None,
4897 event_time: None,
4898 event_duration_seconds: None,
4899 trigger_pattern: None,
4900 summary_of_id: None,
4901 },
4902 )?;
4903
4904 let user2_result = create_memory(
4907 conn,
4908 &CreateMemoryInput {
4909 content: "Shared content".to_string(), memory_type: MemoryType::Note,
4911 tags: vec!["user2".to_string()],
4912 metadata: HashMap::new(),
4913 importance: None,
4914 scope: MemoryScope::user("user-2"), workspace: None,
4916 tier: Default::default(),
4917 defer_embedding: true,
4918 ttl_seconds: None,
4919 dedup_mode: DedupMode::Reject, dedup_threshold: None,
4921 event_time: None,
4922 event_duration_seconds: None,
4923 trigger_pattern: None,
4924 summary_of_id: None,
4925 },
4926 );
4927
4928 assert!(user2_result.is_ok());
4930 let _user2_memory = user2_result.unwrap();
4931
4932 let duplicate_result = create_memory(
4934 conn,
4935 &CreateMemoryInput {
4936 content: "Shared content".to_string(), memory_type: MemoryType::Note,
4938 tags: vec![],
4939 metadata: HashMap::new(),
4940 importance: None,
4941 scope: MemoryScope::user("user-2"), workspace: None,
4943 tier: Default::default(),
4944 defer_embedding: true,
4945 ttl_seconds: None,
4946 dedup_mode: DedupMode::Reject, dedup_threshold: None,
4948 event_time: None,
4949 event_duration_seconds: None,
4950 trigger_pattern: None,
4951 summary_of_id: None,
4952 },
4953 );
4954
4955 assert!(duplicate_result.is_err());
4957 assert!(matches!(
4958 duplicate_result.unwrap_err(),
4959 crate::error::EngramError::Duplicate { .. }
4960 ));
4961
4962 let all = list_memories(conn, &ListOptions::default())?;
4964 assert_eq!(all.len(), 2);
4965
4966 Ok(())
4967 })
4968 .unwrap();
4969 }
4970
4971 #[test]
4972 fn test_find_similar_by_embedding() {
4973 fn store_test_embedding(
4975 conn: &Connection,
4976 memory_id: i64,
4977 embedding: &[f32],
4978 ) -> crate::error::Result<()> {
4979 let bytes: Vec<u8> = embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
4980 conn.execute(
4981 "INSERT INTO embeddings (memory_id, embedding, model, dimensions, created_at)
4982 VALUES (?, ?, ?, ?, datetime('now'))",
4983 params![memory_id, bytes, "test", embedding.len() as i32],
4984 )?;
4985 conn.execute(
4987 "UPDATE memories SET has_embedding = 1 WHERE id = ?",
4988 params![memory_id],
4989 )?;
4990 Ok(())
4991 }
4992
4993 let storage = Storage::open_in_memory().unwrap();
4994 storage
4995 .with_transaction(|conn| {
4996 let memory1 = create_memory(
4998 conn,
4999 &CreateMemoryInput {
5000 content: "Rust is a systems programming language".to_string(),
5001 memory_type: MemoryType::Note,
5002 tags: vec!["rust".to_string()],
5003 metadata: std::collections::HashMap::new(),
5004 importance: None,
5005 scope: MemoryScope::Global,
5006 workspace: None,
5007 tier: Default::default(),
5008 defer_embedding: false,
5009 ttl_seconds: None,
5010 dedup_mode: DedupMode::Allow,
5011 dedup_threshold: None,
5012 event_time: None,
5013 event_duration_seconds: None,
5014 trigger_pattern: None,
5015 summary_of_id: None,
5016 },
5017 )?;
5018
5019 let embedding1 = vec![0.8, 0.4, 0.2, 0.1]; store_test_embedding(conn, memory1.id, &embedding1)?;
5022
5023 let memory2 = create_memory(
5025 conn,
5026 &CreateMemoryInput {
5027 content: "Python is a scripting language".to_string(),
5028 memory_type: MemoryType::Note,
5029 tags: vec!["python".to_string()],
5030 metadata: std::collections::HashMap::new(),
5031 importance: None,
5032 scope: MemoryScope::Global,
5033 workspace: None,
5034 tier: Default::default(),
5035 defer_embedding: false,
5036 ttl_seconds: None,
5037 dedup_mode: DedupMode::Allow,
5038 dedup_threshold: None,
5039 event_time: None,
5040 event_duration_seconds: None,
5041 trigger_pattern: None,
5042 summary_of_id: None,
5043 },
5044 )?;
5045
5046 let embedding2 = vec![0.1, 0.2, 0.8, 0.4]; store_test_embedding(conn, memory2.id, &embedding2)?;
5049
5050 let query_similar_to_1 = vec![0.79, 0.41, 0.21, 0.11]; let result = find_similar_by_embedding(
5053 conn,
5054 &query_similar_to_1,
5055 &MemoryScope::Global,
5056 None, 0.95, )?;
5059 assert!(result.is_some());
5060 let (found_memory, similarity) = result.unwrap();
5061 assert_eq!(found_memory.id, memory1.id);
5062 assert!(similarity > 0.95);
5063
5064 let result_low_threshold = find_similar_by_embedding(
5066 conn,
5067 &query_similar_to_1,
5068 &MemoryScope::Global,
5069 None,
5070 0.5,
5071 )?;
5072 assert!(result_low_threshold.is_some());
5073
5074 let query_orthogonal = vec![0.0, 0.0, 0.0, 1.0]; let result_no_match = find_similar_by_embedding(
5077 conn,
5078 &query_orthogonal,
5079 &MemoryScope::Global,
5080 None,
5081 0.99, )?;
5083 assert!(result_no_match.is_none());
5084
5085 let result_wrong_scope = find_similar_by_embedding(
5087 conn,
5088 &query_similar_to_1,
5089 &MemoryScope::User {
5090 user_id: "other-user".to_string(),
5091 },
5092 None,
5093 0.5,
5094 )?;
5095 assert!(result_wrong_scope.is_none());
5096
5097 Ok(())
5098 })
5099 .unwrap();
5100 }
5101}