1use chrono::{DateTime, Utc};
4use rusqlite::{params, Connection, Row};
5use serde::{Deserialize, Serialize};
6use sha2::{Digest, Sha256};
7use std::collections::HashMap;
8
9use crate::error::{EngramError, Result};
10use crate::types::*;
11
12pub fn memory_from_row(row: &Row) -> rusqlite::Result<Memory> {
14 let id: i64 = row.get("id")?;
15 let content: String = row.get("content")?;
16 let memory_type_str: String = row.get("memory_type")?;
17 let importance: f32 = row.get("importance")?;
18 let access_count: i32 = row.get("access_count")?;
19 let created_at: String = row.get("created_at")?;
20 let updated_at: String = row.get("updated_at")?;
21 let last_accessed_at: Option<String> = row.get("last_accessed_at")?;
22 let owner_id: Option<String> = row.get("owner_id")?;
23 let visibility_str: String = row.get("visibility")?;
24 let version: i32 = row.get("version")?;
25 let has_embedding: i32 = row.get("has_embedding")?;
26 let metadata_str: String = row.get("metadata")?;
27
28 let scope_type: String = row
30 .get("scope_type")
31 .unwrap_or_else(|_| "global".to_string());
32 let scope_id: Option<String> = row.get("scope_id").unwrap_or(None);
33
34 let expires_at: Option<String> = row.get("expires_at").unwrap_or(None);
36
37 let content_hash: Option<String> = row.get("content_hash").unwrap_or(None);
39
40 let memory_type = memory_type_str.parse().unwrap_or(MemoryType::Note);
41 let visibility = match visibility_str.as_str() {
42 "shared" => Visibility::Shared,
43 "public" => Visibility::Public,
44 _ => Visibility::Private,
45 };
46
47 let scope = match (scope_type.as_str(), scope_id) {
49 ("user", Some(id)) => MemoryScope::User { user_id: id },
50 ("session", Some(id)) => MemoryScope::Session { session_id: id },
51 ("agent", Some(id)) => MemoryScope::Agent { agent_id: id },
52 _ => MemoryScope::Global,
53 };
54
55 let metadata: HashMap<String, serde_json::Value> =
56 serde_json::from_str(&metadata_str).unwrap_or_default();
57
58 let workspace: String = row
60 .get("workspace")
61 .unwrap_or_else(|_| "default".to_string());
62
63 let tier_str: String = row.get("tier").unwrap_or_else(|_| "permanent".to_string());
65 let tier = tier_str.parse().unwrap_or_default();
66
67 let event_time: Option<String> = row.get("event_time").unwrap_or(None);
68 let event_duration_seconds: Option<i64> = row.get("event_duration_seconds").unwrap_or(None);
69 let trigger_pattern: Option<String> = row.get("trigger_pattern").unwrap_or(None);
70 let procedure_success_count: i32 = row.get("procedure_success_count").unwrap_or(0);
71 let procedure_failure_count: i32 = row.get("procedure_failure_count").unwrap_or(0);
72 let summary_of_id: Option<i64> = row.get("summary_of_id").unwrap_or(None);
73 let lifecycle_state_str: Option<String> = row.get("lifecycle_state").unwrap_or(None);
74
75 let lifecycle_state = lifecycle_state_str
76 .and_then(|s| s.parse().ok())
77 .unwrap_or(crate::types::LifecycleState::Active);
78
79 Ok(Memory {
80 id,
81 content,
82 memory_type,
83 tags: vec![], metadata,
85 importance,
86 access_count,
87 created_at: DateTime::parse_from_rfc3339(&created_at)
88 .map(|dt| dt.with_timezone(&Utc))
89 .unwrap_or_else(|_| Utc::now()),
90 updated_at: DateTime::parse_from_rfc3339(&updated_at)
91 .map(|dt| dt.with_timezone(&Utc))
92 .unwrap_or_else(|_| Utc::now()),
93 last_accessed_at: last_accessed_at.and_then(|s| {
94 DateTime::parse_from_rfc3339(&s)
95 .map(|dt| dt.with_timezone(&Utc))
96 .ok()
97 }),
98 owner_id,
99 visibility,
100 scope,
101 workspace,
102 tier,
103 version,
104 has_embedding: has_embedding != 0,
105 expires_at: expires_at.and_then(|s| {
106 DateTime::parse_from_rfc3339(&s)
107 .map(|dt| dt.with_timezone(&Utc))
108 .ok()
109 }),
110 content_hash,
111 event_time: event_time.and_then(|s| {
112 DateTime::parse_from_rfc3339(&s)
113 .map(|dt| dt.with_timezone(&Utc))
114 .ok()
115 }),
116 event_duration_seconds,
117 trigger_pattern,
118 procedure_success_count,
119 procedure_failure_count,
120 summary_of_id,
121 lifecycle_state,
122 })
123}
124
125pub(crate) fn metadata_value_to_param(
126 key: &str,
127 value: &serde_json::Value,
128 conditions: &mut Vec<String>,
129 params: &mut Vec<Box<dyn rusqlite::ToSql>>,
130) -> Result<()> {
131 match value {
132 serde_json::Value::String(s) => {
133 conditions.push(format!("json_extract(m.metadata, '$.{}') = ?", key));
134 params.push(Box::new(s.clone()));
135 }
136 serde_json::Value::Number(n) => {
137 conditions.push(format!("json_extract(m.metadata, '$.{}') = ?", key));
138 if let Some(i) = n.as_i64() {
139 params.push(Box::new(i));
140 } else if let Some(f) = n.as_f64() {
141 params.push(Box::new(f));
142 } else {
143 return Err(EngramError::InvalidInput("Invalid number".to_string()));
144 }
145 }
146 serde_json::Value::Bool(b) => {
147 conditions.push(format!("json_extract(m.metadata, '$.{}') = ?", key));
148 params.push(Box::new(*b));
149 }
150 serde_json::Value::Null => {
151 conditions.push(format!("json_extract(m.metadata, '$.{}') IS NULL", key));
152 }
153 _ => {
154 return Err(EngramError::InvalidInput(format!(
155 "Unsupported metadata filter value for key: {}",
156 key
157 )));
158 }
159 }
160
161 Ok(())
162}
163
164fn get_memory_internal(conn: &Connection, id: i64, track_access: bool) -> Result<Memory> {
165 let now = Utc::now().to_rfc3339();
166
167 let mut stmt = conn.prepare_cached(
168 "SELECT id, content, memory_type, importance, access_count,
169 created_at, updated_at, last_accessed_at, owner_id,
170 visibility, version, has_embedding, metadata,
171 scope_type, scope_id, workspace, tier, expires_at, content_hash
172 FROM memories
173 WHERE id = ? AND valid_to IS NULL
174 AND (expires_at IS NULL OR expires_at > ?)",
175 )?;
176
177 let mut memory = stmt
178 .query_row(params![id, now], memory_from_row)
179 .map_err(|_| EngramError::NotFound(id))?;
180
181 memory.tags = load_tags(conn, id)?;
182
183 if track_access {
184 let now = Utc::now().to_rfc3339();
186 conn.execute(
187 "UPDATE memories SET access_count = access_count + 1, last_accessed_at = ?
188 WHERE id = ?",
189 params![now, id],
190 )?;
191 }
192
193 Ok(memory)
194}
195
196pub fn load_tags(conn: &Connection, memory_id: i64) -> Result<Vec<String>> {
198 let mut stmt = conn.prepare_cached(
199 "SELECT t.name FROM tags t
200 JOIN memory_tags mt ON t.id = mt.tag_id
201 WHERE mt.memory_id = ?",
202 )?;
203
204 let tags: Vec<String> = stmt
205 .query_map([memory_id], |row| row.get(0))?
206 .filter_map(|r| r.ok())
207 .collect();
208
209 Ok(tags)
210}
211
212pub fn compute_content_hash(content: &str) -> String {
214 let normalized = content
216 .to_lowercase()
217 .split_whitespace()
218 .collect::<Vec<_>>()
219 .join(" ");
220
221 let mut hasher = Sha256::new();
222 hasher.update(normalized.as_bytes());
223 format!("sha256:{}", hex::encode(hasher.finalize()))
224}
225
226pub fn find_by_content_hash(
234 conn: &Connection,
235 content_hash: &str,
236 scope: &MemoryScope,
237 workspace: Option<&str>,
238) -> Result<Option<Memory>> {
239 let now = Utc::now().to_rfc3339();
240 let scope_type = scope.scope_type();
241 let scope_id = scope.scope_id().map(|s| s.to_string());
242 let workspace = workspace.unwrap_or("default");
243
244 let mut stmt = conn.prepare_cached(
245 "SELECT id, content, memory_type, importance, access_count,
246 created_at, updated_at, last_accessed_at, owner_id,
247 visibility, version, has_embedding, metadata,
248 scope_type, scope_id, workspace, tier, expires_at, content_hash
249 FROM memories
250 WHERE content_hash = ? AND valid_to IS NULL
251 AND (expires_at IS NULL OR expires_at > ?)
252 AND scope_type = ?
253 AND (scope_id = ? OR (scope_id IS NULL AND ? IS NULL))
254 AND workspace = ?
255 LIMIT 1",
256 )?;
257
258 let result = stmt
259 .query_row(
260 params![content_hash, now, scope_type, scope_id, scope_id, workspace],
261 memory_from_row,
262 )
263 .ok();
264
265 if let Some(mut memory) = result {
266 memory.tags = load_tags(conn, memory.id)?;
267 Ok(Some(memory))
268 } else {
269 Ok(None)
270 }
271}
272
273pub fn find_similar_by_embedding(
278 conn: &Connection,
279 query_embedding: &[f32],
280 scope: &MemoryScope,
281 workspace: Option<&str>,
282 threshold: f32,
283) -> Result<Option<(Memory, f32)>> {
284 use crate::embedding::{cosine_similarity, get_embedding};
285
286 let now = Utc::now().to_rfc3339();
287 let scope_type = scope.scope_type();
288 let scope_id = scope.scope_id().map(|s| s.to_string());
289 let workspace = workspace.unwrap_or("default");
290
291 let mut stmt = conn.prepare_cached(
293 "SELECT id, content, memory_type, importance, access_count,
294 created_at, updated_at, last_accessed_at, owner_id,
295 visibility, version, has_embedding, metadata,
296 scope_type, scope_id, workspace, tier, expires_at, content_hash
297 FROM memories
298 WHERE has_embedding = 1 AND valid_to IS NULL
299 AND (expires_at IS NULL OR expires_at > ?)
300 AND scope_type = ?
301 AND (scope_id = ? OR (scope_id IS NULL AND ? IS NULL))
302 AND workspace = ?",
303 )?;
304
305 let memories: Vec<Memory> = stmt
306 .query_map(
307 params![now, scope_type, scope_id, scope_id, workspace],
308 memory_from_row,
309 )?
310 .filter_map(|r| r.ok())
311 .collect();
312
313 let mut best_match: Option<(Memory, f32)> = None;
314
315 for memory in memories {
316 if let Ok(Some(embedding)) = get_embedding(conn, memory.id) {
317 let similarity = cosine_similarity(query_embedding, &embedding);
318 if similarity >= threshold {
319 match &best_match {
320 None => best_match = Some((memory, similarity)),
321 Some((_, best_score)) if similarity > *best_score => {
322 best_match = Some((memory, similarity));
323 }
324 _ => {}
325 }
326 }
327 }
328 }
329
330 if let Some((mut memory, score)) = best_match {
332 memory.tags = load_tags(conn, memory.id)?;
333 Ok(Some((memory, score)))
334 } else {
335 Ok(None)
336 }
337}
338
339#[derive(Debug, Clone, serde::Serialize)]
341pub struct DuplicatePair {
342 pub memory_a: Memory,
343 pub memory_b: Memory,
344 pub similarity_score: f64,
345 pub match_type: DuplicateMatchType,
346}
347
348#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
350#[serde(rename_all = "snake_case")]
351pub enum DuplicateMatchType {
352 ExactHash,
354 HighSimilarity,
356}
357
358pub fn find_duplicates(conn: &Connection, threshold: f64) -> Result<Vec<DuplicatePair>> {
366 find_duplicates_in_workspace(conn, threshold, None)
367}
368
369pub fn find_duplicates_in_workspace(
371 conn: &Connection,
372 threshold: f64,
373 workspace: Option<&str>,
374) -> Result<Vec<DuplicatePair>> {
375 let now = Utc::now().to_rfc3339();
376 let mut duplicates = Vec::new();
377
378 let (hash_sql, hash_params): (&str, Vec<Box<dyn rusqlite::ToSql>>) = if let Some(ws) = workspace
380 {
381 (
382 "SELECT content_hash, scope_type, scope_id, GROUP_CONCAT(id) as ids
383 FROM memories
384 WHERE content_hash IS NOT NULL
385 AND valid_to IS NULL
386 AND (expires_at IS NULL OR expires_at > ?)
387 AND workspace = ?
388 GROUP BY content_hash, scope_type, scope_id, workspace
389 HAVING COUNT(*) > 1",
390 vec![Box::new(now.clone()), Box::new(ws.to_string())],
391 )
392 } else {
393 (
394 "SELECT content_hash, scope_type, scope_id, GROUP_CONCAT(id) as ids
395 FROM memories
396 WHERE content_hash IS NOT NULL
397 AND valid_to IS NULL
398 AND (expires_at IS NULL OR expires_at > ?)
399 GROUP BY content_hash, scope_type, scope_id, workspace
400 HAVING COUNT(*) > 1",
401 vec![Box::new(now.clone())],
402 )
403 };
404
405 let mut hash_stmt = conn.prepare_cached(hash_sql)?;
406 let hash_rows = hash_stmt.query_map(
407 rusqlite::params_from_iter(hash_params.iter().map(|p| p.as_ref())),
408 |row| {
409 let ids_str: String = row.get(3)?;
410 Ok(ids_str)
411 },
412 )?;
413
414 for ids_result in hash_rows {
415 let ids_str = ids_result?;
416 let ids: Vec<i64> = ids_str
417 .split(',')
418 .filter_map(|s| s.trim().parse().ok())
419 .collect();
420
421 for i in 0..ids.len() {
424 for j in (i + 1)..ids.len() {
425 let memory_a = get_memory_internal(conn, ids[i], false)?;
426 let memory_b = get_memory_internal(conn, ids[j], false)?;
427 duplicates.push(DuplicatePair {
428 memory_a,
429 memory_b,
430 similarity_score: 1.0, match_type: DuplicateMatchType::ExactHash,
432 });
433 }
434 }
435 }
436
437 let (sim_sql, sim_params): (&str, Vec<Box<dyn rusqlite::ToSql>>) = if let Some(ws) = workspace {
439 (
440 "SELECT DISTINCT c.from_id, c.to_id, c.score
441 FROM crossrefs c
442 JOIN memories m1 ON c.from_id = m1.id
443 JOIN memories m2 ON c.to_id = m2.id
444 WHERE c.score >= ?
445 AND m1.valid_to IS NULL
446 AND m2.valid_to IS NULL
447 AND (m1.expires_at IS NULL OR m1.expires_at > ?)
448 AND (m2.expires_at IS NULL OR m2.expires_at > ?)
449 AND c.from_id < c.to_id
450 AND m1.scope_type = m2.scope_type
451 AND (m1.scope_id = m2.scope_id OR (m1.scope_id IS NULL AND m2.scope_id IS NULL))
452 AND m1.workspace = ?
453 AND m2.workspace = ?
454 ORDER BY c.score DESC",
455 vec![
456 Box::new(threshold),
457 Box::new(now.clone()),
458 Box::new(now.clone()),
459 Box::new(ws.to_string()),
460 Box::new(ws.to_string()),
461 ],
462 )
463 } else {
464 (
465 "SELECT DISTINCT c.from_id, c.to_id, c.score
466 FROM crossrefs c
467 JOIN memories m1 ON c.from_id = m1.id
468 JOIN memories m2 ON c.to_id = m2.id
469 WHERE c.score >= ?
470 AND m1.valid_to IS NULL
471 AND m2.valid_to IS NULL
472 AND (m1.expires_at IS NULL OR m1.expires_at > ?)
473 AND (m2.expires_at IS NULL OR m2.expires_at > ?)
474 AND c.from_id < c.to_id
475 AND m1.scope_type = m2.scope_type
476 AND (m1.scope_id = m2.scope_id OR (m1.scope_id IS NULL AND m2.scope_id IS NULL))
477 AND m1.workspace = m2.workspace
478 ORDER BY c.score DESC",
479 vec![
480 Box::new(threshold),
481 Box::new(now.clone()),
482 Box::new(now.clone()),
483 ],
484 )
485 };
486
487 let mut sim_stmt = conn.prepare_cached(sim_sql)?;
488 let sim_rows = sim_stmt.query_map(
489 rusqlite::params_from_iter(sim_params.iter().map(|p| p.as_ref())),
490 |row| {
491 Ok((
492 row.get::<_, i64>(0)?,
493 row.get::<_, i64>(1)?,
494 row.get::<_, f64>(2)?,
495 ))
496 },
497 )?;
498
499 for row_result in sim_rows {
500 let (from_id, to_id, score) = row_result?;
501
502 let already_found = duplicates.iter().any(|d| {
504 (d.memory_a.id == from_id && d.memory_b.id == to_id)
505 || (d.memory_a.id == to_id && d.memory_b.id == from_id)
506 });
507
508 if !already_found {
509 let memory_a = get_memory_internal(conn, from_id, false)?;
511 let memory_b = get_memory_internal(conn, to_id, false)?;
512 duplicates.push(DuplicatePair {
513 memory_a,
514 memory_b,
515 similarity_score: score,
516 match_type: DuplicateMatchType::HighSimilarity,
517 });
518 }
519 }
520
521 Ok(duplicates)
522}
523
524pub fn create_memory(conn: &Connection, input: &CreateMemoryInput) -> Result<Memory> {
526 let now = Utc::now();
527 let now_str = now.to_rfc3339();
528 let metadata_json = serde_json::to_string(&input.metadata)?;
529 let importance = input.importance.unwrap_or(0.5);
530
531 let content_hash = compute_content_hash(&input.content);
533
534 let workspace = match &input.workspace {
536 Some(ws) => crate::types::normalize_workspace(ws)
537 .map_err(|e| EngramError::InvalidInput(format!("Invalid workspace: {}", e)))?,
538 None => "default".to_string(),
539 };
540
541 if input.dedup_mode != DedupMode::Allow {
543 if let Some(existing) =
544 find_by_content_hash(conn, &content_hash, &input.scope, Some(&workspace))?
545 {
546 match input.dedup_mode {
547 DedupMode::Reject => {
548 return Err(EngramError::Duplicate {
549 existing_id: existing.id,
550 message: format!(
551 "Duplicate memory detected (id={}). Content hash: {}",
552 existing.id, content_hash
553 ),
554 });
555 }
556 DedupMode::Skip => {
557 return Ok(existing);
559 }
560 DedupMode::Merge => {
561 let mut merged_tags = existing.tags.clone();
563 for tag in &input.tags {
564 if !merged_tags.contains(tag) {
565 merged_tags.push(tag.clone());
566 }
567 }
568
569 let mut merged_metadata = existing.metadata.clone();
570 for (key, value) in &input.metadata {
571 merged_metadata.insert(key.clone(), value.clone());
572 }
573
574 let update_input = UpdateMemoryInput {
575 content: None, memory_type: None,
577 tags: Some(merged_tags),
578 metadata: Some(merged_metadata),
579 importance: input.importance, scope: None,
581 ttl_seconds: input.ttl_seconds, event_time: None,
584 trigger_pattern: None,
585 };
586
587 return update_memory(conn, existing.id, &update_input);
588 }
589 DedupMode::Allow => unreachable!(),
590 }
591 }
592 }
593
594 let scope_type = input.scope.scope_type();
596 let scope_id = input.scope.scope_id().map(|s| s.to_string());
597
598 let tier = input.tier;
602
603 let expires_at = match tier {
608 MemoryTier::Permanent => {
609 if input.ttl_seconds.is_some() && input.ttl_seconds != Some(0) {
611 return Err(EngramError::InvalidInput(
612 "Permanent tier memories cannot have a TTL. Use Daily tier for expiring memories.".to_string()
613 ));
614 }
615 None
616 }
617 MemoryTier::Daily => {
618 let ttl = input.ttl_seconds.filter(|&t| t > 0).unwrap_or(86400); Some((now + chrono::Duration::seconds(ttl)).to_rfc3339())
621 }
622 };
623
624 let event_time = input.event_time.map(|dt| dt.to_rfc3339());
625
626 conn.execute(
627 "INSERT INTO memories (content, memory_type, importance, metadata, created_at, updated_at, valid_from, scope_type, scope_id, workspace, tier, expires_at, content_hash, event_time, event_duration_seconds, trigger_pattern, summary_of_id)
628 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
629 params![
630 input.content,
631 input.memory_type.as_str(),
632 importance,
633 metadata_json,
634 now_str,
635 now_str,
636 now_str,
637 scope_type,
638 scope_id,
639 workspace,
640 tier.as_str(),
641 expires_at,
642 content_hash,
643 event_time,
644 input.event_duration_seconds,
645 input.trigger_pattern,
646 input.summary_of_id,
647 ],
648 )?;
649
650 let id = conn.last_insert_rowid();
651
652 for tag in &input.tags {
654 ensure_tag(conn, tag)?;
655 conn.execute(
656 "INSERT OR IGNORE INTO memory_tags (memory_id, tag_id)
657 SELECT ?, id FROM tags WHERE name = ?",
658 params![id, tag],
659 )?;
660 }
661
662 if !input.defer_embedding {
664 conn.execute(
665 "INSERT INTO embedding_queue (memory_id, status, queued_at)
666 VALUES (?, 'pending', ?)",
667 params![id, now_str],
668 )?;
669 }
670
671 let tags_json = serde_json::to_string(&input.tags)?;
673 conn.execute(
674 "INSERT INTO memory_versions (memory_id, version, content, tags, metadata, created_at)
675 VALUES (?, 1, ?, ?, ?, ?)",
676 params![id, input.content, tags_json, metadata_json, now_str],
677 )?;
678
679 record_event(
681 conn,
682 MemoryEventType::Created,
683 Some(id),
684 None,
685 serde_json::json!({
686 "workspace": input.workspace.as_deref().unwrap_or("default"),
687 "memory_type": input.memory_type.as_str(),
688 }),
689 )?;
690
691 conn.execute(
693 "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
694 [],
695 )?;
696
697 get_memory_internal(conn, id, false)
698}
699
700fn ensure_tag(conn: &Connection, tag: &str) -> Result<i64> {
702 conn.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", params![tag])?;
703
704 let id: i64 = conn.query_row("SELECT id FROM tags WHERE name = ?", params![tag], |row| {
705 row.get(0)
706 })?;
707
708 Ok(id)
709}
710
711pub fn get_memory(conn: &Connection, id: i64) -> Result<Memory> {
713 get_memory_internal(conn, id, true)
714}
715
716pub fn update_memory(conn: &Connection, id: i64, input: &UpdateMemoryInput) -> Result<Memory> {
718 let current = get_memory_internal(conn, id, false)?;
720 let now = Utc::now().to_rfc3339();
721
722 let mut updates = vec!["updated_at = ?".to_string()];
724 let mut values: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now.clone())];
725
726 if let Some(ref content) = input.content {
727 updates.push("content = ?".to_string());
728 values.push(Box::new(content.clone()));
729 let new_hash = compute_content_hash(content);
731 updates.push("content_hash = ?".to_string());
732 values.push(Box::new(new_hash));
733 }
734
735 if let Some(ref memory_type) = input.memory_type {
736 updates.push("memory_type = ?".to_string());
737 values.push(Box::new(memory_type.as_str().to_string()));
738 }
739
740 if let Some(importance) = input.importance {
741 updates.push("importance = ?".to_string());
742 values.push(Box::new(importance));
743 }
744
745 if let Some(ref metadata) = input.metadata {
746 let metadata_json = serde_json::to_string(metadata)?;
747 updates.push("metadata = ?".to_string());
748 values.push(Box::new(metadata_json));
749 }
750
751 if let Some(ref scope) = input.scope {
752 updates.push("scope_type = ?".to_string());
753 values.push(Box::new(scope.scope_type().to_string()));
754 updates.push("scope_id = ?".to_string());
755 values.push(Box::new(scope.scope_id().map(|s| s.to_string())));
756 }
757
758 if let Some(event_time) = &input.event_time {
760 updates.push("event_time = ?".to_string());
761 let value = event_time.as_ref().map(|dt| dt.to_rfc3339());
762 values.push(Box::new(value));
763 }
764
765 if let Some(trigger_pattern) = &input.trigger_pattern {
767 updates.push("trigger_pattern = ?".to_string());
768 values.push(Box::new(trigger_pattern.clone()));
769 }
770
771 if let Some(ttl) = input.ttl_seconds {
777 if ttl <= 0 {
778 if current.tier == MemoryTier::Daily {
781 return Err(crate::error::EngramError::InvalidInput(
782 "Cannot remove expiration from a Daily tier memory. Use promote_to_permanent first.".to_string()
783 ));
784 }
785 updates.push("expires_at = NULL".to_string());
786 } else {
787 if current.tier == MemoryTier::Permanent {
790 return Err(crate::error::EngramError::InvalidInput(
791 "Cannot set expiration on a Permanent tier memory. Permanent memories cannot expire.".to_string()
792 ));
793 }
794 let expires_at = (Utc::now() + chrono::Duration::seconds(ttl)).to_rfc3339();
795 updates.push("expires_at = ?".to_string());
796 values.push(Box::new(expires_at));
797 }
798 }
799
800 updates.push("version = version + 1".to_string());
802
803 let sql = format!("UPDATE memories SET {} WHERE id = ?", updates.join(", "));
805 values.push(Box::new(id));
806
807 let params: Vec<&dyn rusqlite::ToSql> = values.iter().map(|b| b.as_ref()).collect();
808 conn.execute(&sql, params.as_slice())?;
809
810 if let Some(ref tags) = input.tags {
812 conn.execute("DELETE FROM memory_tags WHERE memory_id = ?", params![id])?;
813 for tag in tags {
814 ensure_tag(conn, tag)?;
815 conn.execute(
816 "INSERT OR IGNORE INTO memory_tags (memory_id, tag_id)
817 SELECT ?, id FROM tags WHERE name = ?",
818 params![id, tag],
819 )?;
820 }
821 }
822
823 let new_content = input.content.as_ref().unwrap_or(¤t.content);
825 let new_tags = input.tags.as_ref().unwrap_or(¤t.tags);
826 let new_metadata = input.metadata.as_ref().unwrap_or(¤t.metadata);
827 let tags_json = serde_json::to_string(new_tags)?;
828 let metadata_json = serde_json::to_string(new_metadata)?;
829
830 conn.execute(
831 "INSERT INTO memory_versions (memory_id, version, content, tags, metadata, created_at)
832 VALUES (?, (SELECT version FROM memories WHERE id = ?), ?, ?, ?, ?)",
833 params![id, id, new_content, tags_json, metadata_json, now],
834 )?;
835
836 if input.content.is_some() {
838 conn.execute(
839 "INSERT OR REPLACE INTO embedding_queue (memory_id, status, queued_at)
840 VALUES (?, 'pending', ?)",
841 params![id, now],
842 )?;
843 conn.execute(
844 "UPDATE memories SET has_embedding = 0 WHERE id = ?",
845 params![id],
846 )?;
847 }
848
849 let mut changed_fields = Vec::new();
851 if input.content.is_some() {
852 changed_fields.push("content");
853 }
854 if input.tags.is_some() {
855 changed_fields.push("tags");
856 }
857 if input.metadata.is_some() {
858 changed_fields.push("metadata");
859 }
860 if input.importance.is_some() {
861 changed_fields.push("importance");
862 }
863 if input.ttl_seconds.is_some() {
864 changed_fields.push("ttl");
865 }
866
867 record_event(
869 conn,
870 MemoryEventType::Updated,
871 Some(id),
872 None,
873 serde_json::json!({
874 "changed_fields": changed_fields,
875 }),
876 )?;
877
878 conn.execute(
880 "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
881 [],
882 )?;
883
884 get_memory_internal(conn, id, false)
885}
886
887pub fn promote_to_permanent(conn: &Connection, id: i64) -> Result<Memory> {
898 let memory = get_memory_internal(conn, id, false)?;
899
900 if memory.tier == MemoryTier::Permanent {
901 return Err(EngramError::InvalidInput(format!(
902 "Memory {} is already in the Permanent tier",
903 id
904 )));
905 }
906
907 let now = Utc::now().to_rfc3339();
908
909 conn.execute(
910 "UPDATE memories SET tier = 'permanent', expires_at = NULL, updated_at = ?, version = version + 1 WHERE id = ?",
911 params![now, id],
912 )?;
913
914 record_event(
916 conn,
917 MemoryEventType::Updated,
918 Some(id),
919 None,
920 serde_json::json!({
921 "changed_fields": ["tier", "expires_at"],
922 "action": "promote_to_permanent",
923 }),
924 )?;
925
926 conn.execute(
928 "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
929 [],
930 )?;
931
932 tracing::info!(memory_id = id, "Promoted memory to permanent tier");
933
934 get_memory_internal(conn, id, false)
935}
936
937pub fn move_to_workspace(conn: &Connection, id: i64, workspace: &str) -> Result<Memory> {
947 let _memory = get_memory_internal(conn, id, false)?;
949
950 let normalized = crate::types::normalize_workspace(workspace)
952 .map_err(|e| EngramError::InvalidInput(format!("Invalid workspace: {}", e)))?;
953
954 let now = Utc::now().to_rfc3339();
955
956 conn.execute(
957 "UPDATE memories SET workspace = ?, updated_at = ?, version = version + 1 WHERE id = ?",
958 params![normalized, now, id],
959 )?;
960
961 record_event(
963 conn,
964 MemoryEventType::Updated,
965 Some(id),
966 None,
967 serde_json::json!({
968 "changed_fields": ["workspace"],
969 "action": "move_to_workspace",
970 "new_workspace": normalized,
971 }),
972 )?;
973
974 conn.execute(
976 "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
977 [],
978 )?;
979
980 tracing::info!(memory_id = id, workspace = %normalized, "Moved memory to workspace");
981
982 get_memory_internal(conn, id, false)
983}
984
985pub fn list_workspaces(conn: &Connection) -> Result<Vec<WorkspaceStats>> {
990 let now = Utc::now().to_rfc3339();
991
992 let mut stmt = conn.prepare(
993 r#"
994 SELECT
995 workspace,
996 COUNT(*) as memory_count,
997 SUM(CASE WHEN tier = 'permanent' THEN 1 ELSE 0 END) as permanent_count,
998 SUM(CASE WHEN tier = 'daily' THEN 1 ELSE 0 END) as daily_count,
999 MIN(created_at) as first_memory_at,
1000 MAX(created_at) as last_memory_at,
1001 AVG(importance) as avg_importance
1002 FROM memories
1003 WHERE valid_to IS NULL AND (expires_at IS NULL OR expires_at > ?)
1004 GROUP BY workspace
1005 ORDER BY memory_count DESC
1006 "#,
1007 )?;
1008
1009 let workspaces: Vec<WorkspaceStats> = stmt
1010 .query_map(params![now], |row| {
1011 let workspace: String = row.get(0)?;
1012 let memory_count: i64 = row.get(1)?;
1013 let permanent_count: i64 = row.get(2)?;
1014 let daily_count: i64 = row.get(3)?;
1015 let first_memory_at: Option<String> = row.get(4)?;
1016 let last_memory_at: Option<String> = row.get(5)?;
1017 let avg_importance: Option<f64> = row.get(6)?;
1018
1019 Ok(WorkspaceStats {
1020 workspace,
1021 memory_count,
1022 permanent_count,
1023 daily_count,
1024 first_memory_at: first_memory_at.and_then(|s| {
1025 DateTime::parse_from_rfc3339(&s)
1026 .map(|dt| dt.with_timezone(&Utc))
1027 .ok()
1028 }),
1029 last_memory_at: last_memory_at.and_then(|s| {
1030 DateTime::parse_from_rfc3339(&s)
1031 .map(|dt| dt.with_timezone(&Utc))
1032 .ok()
1033 }),
1034 top_tags: vec![], avg_importance: avg_importance.map(|v| v as f32),
1036 })
1037 })?
1038 .filter_map(|r| r.ok())
1039 .collect();
1040
1041 Ok(workspaces)
1042}
1043
1044pub fn get_workspace_stats(conn: &Connection, workspace: &str) -> Result<WorkspaceStats> {
1046 let normalized = crate::types::normalize_workspace(workspace)
1047 .map_err(|e| EngramError::InvalidInput(format!("Invalid workspace: {}", e)))?;
1048
1049 let now = Utc::now().to_rfc3339();
1050
1051 let stats = conn
1052 .query_row(
1053 r#"
1054 SELECT
1055 workspace,
1056 COUNT(*) as memory_count,
1057 SUM(CASE WHEN tier = 'permanent' THEN 1 ELSE 0 END) as permanent_count,
1058 SUM(CASE WHEN tier = 'daily' THEN 1 ELSE 0 END) as daily_count,
1059 MIN(created_at) as first_memory_at,
1060 MAX(created_at) as last_memory_at,
1061 AVG(importance) as avg_importance
1062 FROM memories
1063 WHERE workspace = ? AND valid_to IS NULL AND (expires_at IS NULL OR expires_at > ?)
1064 GROUP BY workspace
1065 "#,
1066 params![normalized, now],
1067 |row| {
1068 let workspace: String = row.get(0)?;
1069 let memory_count: i64 = row.get(1)?;
1070 let permanent_count: i64 = row.get(2)?;
1071 let daily_count: i64 = row.get(3)?;
1072 let first_memory_at: Option<String> = row.get(4)?;
1073 let last_memory_at: Option<String> = row.get(5)?;
1074 let avg_importance: Option<f64> = row.get(6)?;
1075
1076 Ok(WorkspaceStats {
1077 workspace,
1078 memory_count,
1079 permanent_count,
1080 daily_count,
1081 first_memory_at: first_memory_at.and_then(|s| {
1082 DateTime::parse_from_rfc3339(&s)
1083 .map(|dt| dt.with_timezone(&Utc))
1084 .ok()
1085 }),
1086 last_memory_at: last_memory_at.and_then(|s| {
1087 DateTime::parse_from_rfc3339(&s)
1088 .map(|dt| dt.with_timezone(&Utc))
1089 .ok()
1090 }),
1091 top_tags: vec![],
1092 avg_importance: avg_importance.map(|v| v as f32),
1093 })
1094 },
1095 )
1096 .map_err(|e| match e {
1097 rusqlite::Error::QueryReturnedNoRows => {
1098 EngramError::NotFound(0) }
1100 _ => EngramError::Database(e),
1101 })?;
1102
1103 Ok(stats)
1104}
1105
1106pub fn delete_workspace(conn: &Connection, workspace: &str, move_to_default: bool) -> Result<i64> {
1115 let normalized = crate::types::normalize_workspace(workspace)
1116 .map_err(|e| EngramError::InvalidInput(format!("Invalid workspace: {}", e)))?;
1117
1118 if normalized == "default" {
1119 return Err(EngramError::InvalidInput(
1120 "Cannot delete the default workspace".to_string(),
1121 ));
1122 }
1123
1124 let now = Utc::now().to_rfc3339();
1125
1126 let affected_ids: Vec<i64> = {
1128 let mut stmt =
1129 conn.prepare("SELECT id FROM memories WHERE workspace = ? AND valid_to IS NULL")?;
1130 let rows = stmt.query_map(params![&normalized], |row| row.get(0))?;
1131 rows.collect::<std::result::Result<Vec<_>, _>>()?
1132 };
1133
1134 let affected = affected_ids.len() as i64;
1135
1136 if affected > 0 {
1137 if move_to_default {
1138 conn.execute(
1140 "UPDATE memories SET workspace = 'default', updated_at = ?, version = version + 1 WHERE workspace = ? AND valid_to IS NULL",
1141 params![&now, &normalized],
1142 )?;
1143 } else {
1144 conn.execute(
1146 "UPDATE memories SET valid_to = ? WHERE workspace = ? AND valid_to IS NULL",
1147 params![&now, &normalized],
1148 )?;
1149 }
1150
1151 let event_type = if move_to_default {
1153 MemoryEventType::Updated
1154 } else {
1155 MemoryEventType::Deleted
1156 };
1157
1158 for memory_id in &affected_ids {
1159 record_event(
1160 conn,
1161 event_type.clone(),
1162 Some(*memory_id),
1163 None,
1164 serde_json::json!({
1165 "action": "delete_workspace",
1166 "workspace": normalized,
1167 "move_to_default": move_to_default,
1168 }),
1169 )?;
1170 }
1171 }
1172
1173 conn.execute(
1175 "UPDATE sync_state SET pending_changes = pending_changes + ?, version = (SELECT COALESCE(MAX(id), 0) FROM memory_events) WHERE id = 1",
1176 params![affected],
1177 )?;
1178
1179 tracing::info!(
1180 workspace = %normalized,
1181 move_to_default,
1182 affected,
1183 "Deleted workspace"
1184 );
1185
1186 Ok(affected)
1187}
1188
1189pub fn delete_memory(conn: &Connection, id: i64) -> Result<()> {
1191 let now = Utc::now().to_rfc3339();
1192
1193 let memory_info: Option<(String, String)> = conn
1195 .query_row(
1196 "SELECT workspace, memory_type FROM memories WHERE id = ? AND valid_to IS NULL",
1197 params![id],
1198 |row| Ok((row.get(0)?, row.get(1)?)),
1199 )
1200 .ok();
1201
1202 let affected = conn.execute(
1203 "UPDATE memories SET valid_to = ? WHERE id = ? AND valid_to IS NULL",
1204 params![now, id],
1205 )?;
1206
1207 if affected == 0 {
1208 return Err(EngramError::NotFound(id));
1209 }
1210
1211 conn.execute(
1213 "UPDATE crossrefs SET valid_to = ? WHERE (from_id = ? OR to_id = ?) AND valid_to IS NULL",
1214 params![now, id, id],
1215 )?;
1216
1217 let (workspace, memory_type) =
1219 memory_info.unwrap_or(("default".to_string(), "unknown".to_string()));
1220 record_event(
1221 conn,
1222 MemoryEventType::Deleted,
1223 Some(id),
1224 None,
1225 serde_json::json!({
1226 "workspace": workspace,
1227 "memory_type": memory_type,
1228 }),
1229 )?;
1230
1231 conn.execute(
1233 "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
1234 [],
1235 )?;
1236
1237 Ok(())
1238}
1239
1240pub fn list_memories(conn: &Connection, options: &ListOptions) -> Result<Vec<Memory>> {
1242 let now = Utc::now().to_rfc3339();
1243
1244 let mut sql = String::from(
1245 "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
1246 m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
1247 m.visibility, m.version, m.has_embedding, m.metadata,
1248 m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash
1249 FROM memories m",
1250 );
1251
1252 let mut conditions = vec!["m.valid_to IS NULL".to_string()];
1253 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![];
1254
1255 conditions.push("(m.expires_at IS NULL OR m.expires_at > ?)".to_string());
1257 params.push(Box::new(now));
1258
1259 if let Some(ref tags) = options.tags {
1261 if !tags.is_empty() {
1262 sql.push_str(
1263 " JOIN memory_tags mt ON m.id = mt.memory_id
1264 JOIN tags t ON mt.tag_id = t.id",
1265 );
1266 let placeholders: Vec<String> = tags.iter().map(|_| "?".to_string()).collect();
1267 conditions.push(format!("t.name IN ({})", placeholders.join(", ")));
1268 for tag in tags {
1269 params.push(Box::new(tag.clone()));
1270 }
1271 }
1272 }
1273
1274 if let Some(ref memory_type) = options.memory_type {
1276 conditions.push("m.memory_type = ?".to_string());
1277 params.push(Box::new(memory_type.as_str().to_string()));
1278 }
1279
1280 if let Some(ref metadata_filter) = options.metadata_filter {
1282 for (key, value) in metadata_filter {
1283 metadata_value_to_param(key, value, &mut conditions, &mut params)?;
1284 }
1285 }
1286
1287 if let Some(ref scope) = options.scope {
1289 conditions.push("m.scope_type = ?".to_string());
1290 params.push(Box::new(scope.scope_type().to_string()));
1291 if let Some(scope_id) = scope.scope_id() {
1292 conditions.push("m.scope_id = ?".to_string());
1293 params.push(Box::new(scope_id.to_string()));
1294 } else {
1295 conditions.push("m.scope_id IS NULL".to_string());
1296 }
1297 }
1298
1299 if let Some(ref workspace) = options.workspace {
1301 conditions.push("m.workspace = ?".to_string());
1302 params.push(Box::new(workspace.clone()));
1303 }
1304
1305 if let Some(ref tier) = options.tier {
1307 conditions.push("m.tier = ?".to_string());
1308 params.push(Box::new(tier.as_str().to_string()));
1309 }
1310
1311 sql.push_str(" WHERE ");
1312 sql.push_str(&conditions.join(" AND "));
1313
1314 let sort_field = match options.sort_by.unwrap_or_default() {
1316 SortField::CreatedAt => "m.created_at",
1317 SortField::UpdatedAt => "m.updated_at",
1318 SortField::LastAccessedAt => "m.last_accessed_at",
1319 SortField::Importance => "m.importance",
1320 SortField::AccessCount => "m.access_count",
1321 };
1322 let sort_order = match options.sort_order.unwrap_or_default() {
1323 SortOrder::Asc => "ASC",
1324 SortOrder::Desc => "DESC",
1325 };
1326 sql.push_str(&format!(" ORDER BY {} {}", sort_field, sort_order));
1327
1328 let limit = options.limit.unwrap_or(100);
1330 let offset = options.offset.unwrap_or(0);
1331 sql.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
1332
1333 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
1334 let mut stmt = conn.prepare(&sql)?;
1335
1336 let memories: Vec<Memory> = stmt
1337 .query_map(param_refs.as_slice(), memory_from_row)?
1338 .filter_map(|r| r.ok())
1339 .map(|mut m| {
1340 m.tags = load_tags(conn, m.id).unwrap_or_default();
1341 m
1342 })
1343 .collect();
1344
1345 Ok(memories)
1346}
1347
1348pub fn create_crossref(conn: &Connection, input: &CreateCrossRefInput) -> Result<CrossReference> {
1350 let now = Utc::now().to_rfc3339();
1351
1352 let _ = get_memory_internal(conn, input.from_id, false)?;
1354 let _ = get_memory_internal(conn, input.to_id, false)?;
1355
1356 let strength = input.strength.unwrap_or(1.0);
1357
1358 conn.execute(
1359 "INSERT INTO crossrefs (from_id, to_id, edge_type, score, strength, source, source_context, pinned, created_at, valid_from)
1360 VALUES (?, ?, ?, 1.0, ?, 'manual', ?, ?, ?, ?)
1361 ON CONFLICT(from_id, to_id, edge_type) DO UPDATE SET
1362 strength = excluded.strength,
1363 source_context = COALESCE(excluded.source_context, crossrefs.source_context),
1364 pinned = excluded.pinned",
1365 params![
1366 input.from_id,
1367 input.to_id,
1368 input.edge_type.as_str(),
1369 strength,
1370 input.source_context,
1371 input.pinned,
1372 now,
1373 now,
1374 ],
1375 )?;
1376
1377 get_crossref(conn, input.from_id, input.to_id, input.edge_type)
1378}
1379
1380pub fn get_crossref(
1382 conn: &Connection,
1383 from_id: i64,
1384 to_id: i64,
1385 edge_type: EdgeType,
1386) -> Result<CrossReference> {
1387 let mut stmt = conn.prepare_cached(
1388 "SELECT from_id, to_id, edge_type, score, confidence, strength, source,
1389 source_context, created_at, valid_from, valid_to, pinned, metadata
1390 FROM crossrefs
1391 WHERE from_id = ? AND to_id = ? AND edge_type = ? AND valid_to IS NULL",
1392 )?;
1393
1394 let crossref = stmt.query_row(params![from_id, to_id, edge_type.as_str()], |row| {
1395 let edge_type_str: String = row.get("edge_type")?;
1396 let source_str: String = row.get("source")?;
1397 let created_at_str: String = row.get("created_at")?;
1398 let valid_from_str: String = row.get("valid_from")?;
1399 let valid_to_str: Option<String> = row.get("valid_to")?;
1400 let metadata_str: String = row.get("metadata")?;
1401
1402 Ok(CrossReference {
1403 from_id: row.get("from_id")?,
1404 to_id: row.get("to_id")?,
1405 edge_type: edge_type_str.parse().unwrap_or(EdgeType::RelatedTo),
1406 score: row.get("score")?,
1407 confidence: row.get("confidence")?,
1408 strength: row.get("strength")?,
1409 source: match source_str.as_str() {
1410 "manual" => RelationSource::Manual,
1411 "llm" => RelationSource::Llm,
1412 _ => RelationSource::Auto,
1413 },
1414 source_context: row.get("source_context")?,
1415 created_at: DateTime::parse_from_rfc3339(&created_at_str)
1416 .map(|dt| dt.with_timezone(&Utc))
1417 .unwrap_or_else(|_| Utc::now()),
1418 valid_from: DateTime::parse_from_rfc3339(&valid_from_str)
1419 .map(|dt| dt.with_timezone(&Utc))
1420 .unwrap_or_else(|_| Utc::now()),
1421 valid_to: valid_to_str.and_then(|s| {
1422 DateTime::parse_from_rfc3339(&s)
1423 .map(|dt| dt.with_timezone(&Utc))
1424 .ok()
1425 }),
1426 pinned: row.get::<_, i32>("pinned")? != 0,
1427 metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1428 })
1429 })?;
1430
1431 Ok(crossref)
1432}
1433
1434pub fn get_related(conn: &Connection, memory_id: i64) -> Result<Vec<CrossReference>> {
1436 let mut stmt = conn.prepare_cached(
1437 "SELECT from_id, to_id, edge_type, score, confidence, strength, source,
1438 source_context, created_at, valid_from, valid_to, pinned, metadata
1439 FROM crossrefs
1440 WHERE (from_id = ? OR to_id = ?) AND valid_to IS NULL
1441 ORDER BY score DESC",
1442 )?;
1443
1444 let crossrefs: Vec<CrossReference> = stmt
1445 .query_map(params![memory_id, memory_id], |row| {
1446 let edge_type_str: String = row.get("edge_type")?;
1447 let source_str: String = row.get("source")?;
1448 let created_at_str: String = row.get("created_at")?;
1449 let valid_from_str: String = row.get("valid_from")?;
1450 let valid_to_str: Option<String> = row.get("valid_to")?;
1451 let metadata_str: String = row.get("metadata")?;
1452
1453 Ok(CrossReference {
1454 from_id: row.get("from_id")?,
1455 to_id: row.get("to_id")?,
1456 edge_type: edge_type_str.parse().unwrap_or(EdgeType::RelatedTo),
1457 score: row.get("score")?,
1458 confidence: row.get("confidence")?,
1459 strength: row.get("strength")?,
1460 source: match source_str.as_str() {
1461 "manual" => RelationSource::Manual,
1462 "llm" => RelationSource::Llm,
1463 _ => RelationSource::Auto,
1464 },
1465 source_context: row.get("source_context")?,
1466 created_at: DateTime::parse_from_rfc3339(&created_at_str)
1467 .map(|dt| dt.with_timezone(&Utc))
1468 .unwrap_or_else(|_| Utc::now()),
1469 valid_from: DateTime::parse_from_rfc3339(&valid_from_str)
1470 .map(|dt| dt.with_timezone(&Utc))
1471 .unwrap_or_else(|_| Utc::now()),
1472 valid_to: valid_to_str.and_then(|s| {
1473 DateTime::parse_from_rfc3339(&s)
1474 .map(|dt| dt.with_timezone(&Utc))
1475 .ok()
1476 }),
1477 pinned: row.get::<_, i32>("pinned")? != 0,
1478 metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1479 })
1480 })?
1481 .filter_map(|r| r.ok())
1482 .collect();
1483
1484 Ok(crossrefs)
1485}
1486
1487pub fn delete_crossref(
1489 conn: &Connection,
1490 from_id: i64,
1491 to_id: i64,
1492 edge_type: EdgeType,
1493) -> Result<()> {
1494 let now = Utc::now().to_rfc3339();
1495
1496 let affected = conn.execute(
1497 "UPDATE crossrefs SET valid_to = ?
1498 WHERE from_id = ? AND to_id = ? AND edge_type = ? AND valid_to IS NULL",
1499 params![now, from_id, to_id, edge_type.as_str()],
1500 )?;
1501
1502 if affected == 0 {
1503 return Err(EngramError::NotFound(from_id));
1504 }
1505
1506 Ok(())
1507}
1508
1509pub fn set_memory_expiration(
1516 conn: &Connection,
1517 id: i64,
1518 ttl_seconds: Option<i64>,
1519) -> Result<Memory> {
1520 let _ = get_memory_internal(conn, id, false)?;
1522
1523 match ttl_seconds {
1524 Some(0) => {
1525 conn.execute(
1527 "UPDATE memories SET expires_at = NULL, updated_at = ? WHERE id = ?",
1528 params![Utc::now().to_rfc3339(), id],
1529 )?;
1530 }
1531 Some(ttl) => {
1532 let expires_at = (Utc::now() + chrono::Duration::seconds(ttl)).to_rfc3339();
1534 conn.execute(
1535 "UPDATE memories SET expires_at = ?, updated_at = ? WHERE id = ?",
1536 params![expires_at, Utc::now().to_rfc3339(), id],
1537 )?;
1538 }
1539 None => {
1540 return get_memory_internal(conn, id, false);
1542 }
1543 }
1544
1545 record_event(
1547 conn,
1548 MemoryEventType::Updated,
1549 Some(id),
1550 None,
1551 serde_json::json!({
1552 "changed_fields": ["expires_at"],
1553 "action": "set_expiration",
1554 }),
1555 )?;
1556
1557 conn.execute(
1559 "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
1560 [],
1561 )?;
1562
1563 get_memory_internal(conn, id, false)
1564}
1565
1566pub fn cleanup_expired_memories(conn: &Connection) -> Result<i64> {
1570 let now = Utc::now().to_rfc3339();
1571
1572 let affected = conn.execute(
1574 "UPDATE memories SET valid_to = ?
1575 WHERE expires_at IS NOT NULL AND expires_at <= ? AND valid_to IS NULL",
1576 params![now, now],
1577 )?;
1578
1579 if affected > 0 {
1580 conn.execute(
1582 "UPDATE crossrefs SET valid_to = ?
1583 WHERE valid_to IS NULL AND (
1584 from_id IN (SELECT id FROM memories WHERE valid_to IS NOT NULL AND expires_at IS NOT NULL AND expires_at <= ?)
1585 OR to_id IN (SELECT id FROM memories WHERE valid_to IS NOT NULL AND expires_at IS NOT NULL AND expires_at <= ?)
1586 )",
1587 params![now, now, now],
1588 )?;
1589
1590 conn.execute(
1593 "DELETE FROM memory_entities
1594 WHERE memory_id IN (
1595 SELECT id FROM memories
1596 WHERE valid_to IS NOT NULL AND expires_at IS NOT NULL AND expires_at <= ?
1597 )",
1598 params![now],
1599 )?;
1600
1601 conn.execute(
1603 "DELETE FROM memory_tags
1604 WHERE memory_id IN (
1605 SELECT id FROM memories
1606 WHERE valid_to IS NOT NULL AND expires_at IS NOT NULL AND expires_at <= ?
1607 )",
1608 params![now],
1609 )?;
1610
1611 record_event(
1613 conn,
1614 MemoryEventType::Deleted,
1615 None, None,
1617 serde_json::json!({
1618 "action": "cleanup_expired",
1619 "affected_count": affected,
1620 }),
1621 )?;
1622
1623 conn.execute(
1625 "UPDATE sync_state SET pending_changes = pending_changes + ?, version = (SELECT COALESCE(MAX(id), 0) FROM memory_events) WHERE id = 1",
1626 params![affected as i64],
1627 )?;
1628 }
1629
1630 Ok(affected as i64)
1631}
1632
1633pub fn count_expired_memories(conn: &Connection) -> Result<i64> {
1635 let now = Utc::now().to_rfc3339();
1636
1637 let count: i64 = conn.query_row(
1638 "SELECT COUNT(*) FROM memories
1639 WHERE expires_at IS NOT NULL AND expires_at <= ? AND valid_to IS NULL",
1640 params![now],
1641 |row| row.get(0),
1642 )?;
1643
1644 Ok(count)
1645}
1646
1647#[derive(Debug, Clone, serde::Serialize)]
1650pub struct CompactMemoryRow {
1651 pub id: i64,
1653 pub preview: String,
1655 pub truncated: bool,
1657 pub memory_type: MemoryType,
1659 pub tags: Vec<String>,
1661 pub importance: f32,
1663 pub created_at: DateTime<Utc>,
1665 pub updated_at: DateTime<Utc>,
1667 pub workspace: String,
1669 pub tier: MemoryTier,
1671 pub content_length: usize,
1673 pub line_count: usize,
1675}
1676
1677pub fn list_memories_compact(
1687 conn: &Connection,
1688 options: &ListOptions,
1689 preview_chars: Option<usize>,
1690) -> Result<Vec<CompactMemoryRow>> {
1691 use crate::intelligence::compact_preview;
1692
1693 let now = Utc::now().to_rfc3339();
1694 let max_preview = preview_chars.unwrap_or(100);
1695
1696 let mut sql = String::from(
1697 "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance,
1698 m.created_at, m.updated_at, m.workspace, m.tier
1699 FROM memories m",
1700 );
1701
1702 let mut conditions = vec!["m.valid_to IS NULL".to_string()];
1703 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![];
1704
1705 conditions.push("(m.expires_at IS NULL OR m.expires_at > ?)".to_string());
1707 params.push(Box::new(now));
1708
1709 if let Some(ref tags) = options.tags {
1711 if !tags.is_empty() {
1712 sql.push_str(
1713 " JOIN memory_tags mt ON m.id = mt.memory_id
1714 JOIN tags t ON mt.tag_id = t.id",
1715 );
1716 let placeholders: Vec<String> = tags.iter().map(|_| "?".to_string()).collect();
1717 conditions.push(format!("t.name IN ({})", placeholders.join(", ")));
1718 for tag in tags {
1719 params.push(Box::new(tag.clone()));
1720 }
1721 }
1722 }
1723
1724 if let Some(ref memory_type) = options.memory_type {
1726 conditions.push("m.memory_type = ?".to_string());
1727 params.push(Box::new(memory_type.as_str().to_string()));
1728 }
1729
1730 if let Some(ref metadata_filter) = options.metadata_filter {
1732 for (key, value) in metadata_filter {
1733 metadata_value_to_param(key, value, &mut conditions, &mut params)?;
1734 }
1735 }
1736
1737 if let Some(ref scope) = options.scope {
1739 conditions.push("m.scope_type = ?".to_string());
1740 params.push(Box::new(scope.scope_type().to_string()));
1741 if let Some(scope_id) = scope.scope_id() {
1742 conditions.push("m.scope_id = ?".to_string());
1743 params.push(Box::new(scope_id.to_string()));
1744 } else {
1745 conditions.push("m.scope_id IS NULL".to_string());
1746 }
1747 }
1748
1749 if let Some(ref workspace) = options.workspace {
1751 conditions.push("m.workspace = ?".to_string());
1752 params.push(Box::new(workspace.clone()));
1753 }
1754
1755 if let Some(ref tier) = options.tier {
1757 conditions.push("m.tier = ?".to_string());
1758 params.push(Box::new(tier.as_str().to_string()));
1759 }
1760
1761 sql.push_str(" WHERE ");
1762 sql.push_str(&conditions.join(" AND "));
1763
1764 let sort_field = match options.sort_by.unwrap_or_default() {
1766 SortField::CreatedAt => "m.created_at",
1767 SortField::UpdatedAt => "m.updated_at",
1768 SortField::LastAccessedAt => "m.last_accessed_at",
1769 SortField::Importance => "m.importance",
1770 SortField::AccessCount => "m.access_count",
1771 };
1772 let sort_order = match options.sort_order.unwrap_or_default() {
1773 SortOrder::Asc => "ASC",
1774 SortOrder::Desc => "DESC",
1775 };
1776 sql.push_str(&format!(" ORDER BY {} {}", sort_field, sort_order));
1777
1778 let limit = options.limit.unwrap_or(100);
1780 let offset = options.offset.unwrap_or(0);
1781 sql.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
1782
1783 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
1784 let mut stmt = conn.prepare(&sql)?;
1785
1786 let memories: Vec<CompactMemoryRow> = stmt
1787 .query_map(param_refs.as_slice(), |row| {
1788 let id: i64 = row.get("id")?;
1789 let content: String = row.get("content")?;
1790 let memory_type_str: String = row.get("memory_type")?;
1791 let importance: f32 = row.get("importance")?;
1792 let created_at_str: String = row.get("created_at")?;
1793 let updated_at_str: String = row.get("updated_at")?;
1794 let workspace: String = row.get("workspace")?;
1795 let tier_str: String = row.get("tier")?;
1796
1797 let memory_type = memory_type_str.parse().unwrap_or(MemoryType::Note);
1798 let tier = tier_str.parse().unwrap_or_default();
1799
1800 let (preview, truncated) = compact_preview(&content, max_preview);
1802 let content_length = content.len();
1803 let line_count = content.lines().count();
1804
1805 Ok(CompactMemoryRow {
1806 id,
1807 preview,
1808 truncated,
1809 memory_type,
1810 tags: vec![], importance,
1812 created_at: DateTime::parse_from_rfc3339(&created_at_str)
1813 .map(|dt| dt.with_timezone(&Utc))
1814 .unwrap_or_else(|_| Utc::now()),
1815 updated_at: DateTime::parse_from_rfc3339(&updated_at_str)
1816 .map(|dt| dt.with_timezone(&Utc))
1817 .unwrap_or_else(|_| Utc::now()),
1818 workspace,
1819 tier,
1820 content_length,
1821 line_count,
1822 })
1823 })?
1824 .filter_map(|r| r.ok())
1825 .map(|mut m| {
1826 m.tags = load_tags(conn, m.id).unwrap_or_default();
1827 m
1828 })
1829 .collect();
1830
1831 Ok(memories)
1832}
1833
1834pub fn get_stats(conn: &Connection) -> Result<StorageStats> {
1836 let total_memories: i64 = conn.query_row(
1837 "SELECT COUNT(*) FROM memories WHERE valid_to IS NULL",
1838 [],
1839 |row| row.get(0),
1840 )?;
1841
1842 let total_tags: i64 = conn.query_row("SELECT COUNT(*) FROM tags", [], |row| row.get(0))?;
1843
1844 let total_crossrefs: i64 = conn.query_row(
1845 "SELECT COUNT(*) FROM crossrefs WHERE valid_to IS NULL",
1846 [],
1847 |row| row.get(0),
1848 )?;
1849
1850 let total_versions: i64 =
1851 conn.query_row("SELECT COUNT(*) FROM memory_versions", [], |row| row.get(0))?;
1852
1853 let _total_identities: i64 =
1854 conn.query_row("SELECT COUNT(*) FROM identities", [], |row| row.get(0))?;
1855
1856 let _total_entities: i64 =
1857 conn.query_row("SELECT COUNT(*) FROM entities", [], |row| row.get(0))?;
1858
1859 let db_size_bytes: i64 = conn.query_row(
1860 "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
1861 [],
1862 |row| row.get(0),
1863 )?;
1864
1865 let _schema_version: i32 = conn
1866 .query_row("SELECT MAX(version) FROM schema_version", [], |row| {
1867 row.get(0)
1868 })
1869 .unwrap_or(0);
1870
1871 let mut workspace_stmt = conn.prepare(
1872 "SELECT workspace, COUNT(*) FROM memories WHERE valid_to IS NULL GROUP BY workspace",
1873 )?;
1874 let workspaces: HashMap<String, i64> = workspace_stmt
1875 .query_map([], |row| {
1876 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
1877 })?
1878 .filter_map(|r| r.ok())
1879 .collect();
1880
1881 let mut type_stmt = conn.prepare(
1882 "SELECT memory_type, COUNT(*) FROM memories WHERE valid_to IS NULL GROUP BY memory_type",
1883 )?;
1884 let type_counts: HashMap<String, i64> = type_stmt
1885 .query_map([], |row| {
1886 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
1887 })?
1888 .filter_map(|r| r.ok())
1889 .collect();
1890
1891 let mut tier_stmt = conn.prepare(
1892 "SELECT COALESCE(tier, 'permanent'), COUNT(*) FROM memories GROUP BY COALESCE(tier, 'permanent')",
1893 )?;
1894 let tier_counts: HashMap<String, i64> = tier_stmt
1895 .query_map([], |row| {
1896 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
1897 })?
1898 .filter_map(|r| r.ok())
1899 .collect();
1900
1901 let memories_with_embeddings: i64 = conn.query_row(
1902 "SELECT COUNT(*) FROM memories WHERE has_embedding = 1 AND valid_to IS NULL",
1903 [],
1904 |row| row.get(0),
1905 )?;
1906
1907 let memories_pending_embedding: i64 = conn.query_row(
1908 "SELECT COUNT(*) FROM embedding_queue WHERE status = 'pending'",
1909 [],
1910 |row| row.get(0),
1911 )?;
1912
1913 let (last_sync, sync_pending): (Option<String>, i64) = conn.query_row(
1914 "SELECT last_sync, pending_changes FROM sync_state WHERE id = 1",
1915 [],
1916 |row| Ok((row.get(0)?, row.get(1)?)),
1917 )?;
1918
1919 Ok(StorageStats {
1920 total_memories,
1921 total_tags,
1922 total_crossrefs,
1923 total_versions,
1924 total_identities: 0,
1925 total_entities: 0,
1926 db_size_bytes,
1927 memories_with_embeddings,
1928 memories_pending_embedding,
1929 last_sync: last_sync.and_then(|s| {
1930 DateTime::parse_from_rfc3339(&s)
1931 .map(|dt| dt.with_timezone(&Utc))
1932 .ok()
1933 }),
1934 sync_pending: sync_pending > 0,
1935 storage_mode: "sqlite".to_string(),
1936 schema_version: 0,
1937 workspaces,
1938 type_counts,
1939 tier_counts,
1940 })
1941}
1942
1943pub fn get_memory_versions(conn: &Connection, memory_id: i64) -> Result<Vec<MemoryVersion>> {
1945 let mut stmt = conn.prepare_cached(
1946 "SELECT version, content, tags, metadata, created_at, created_by, change_summary
1947 FROM memory_versions WHERE memory_id = ? ORDER BY version DESC",
1948 )?;
1949
1950 let versions: Vec<MemoryVersion> = stmt
1951 .query_map([memory_id], |row| {
1952 let tags_str: String = row.get("tags")?;
1953 let metadata_str: String = row.get("metadata")?;
1954 let created_at_str: String = row.get("created_at")?;
1955
1956 Ok(MemoryVersion {
1957 version: row.get("version")?,
1958 content: row.get("content")?,
1959 tags: serde_json::from_str(&tags_str).unwrap_or_default(),
1960 metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1961 created_at: DateTime::parse_from_rfc3339(&created_at_str)
1962 .map(|dt| dt.with_timezone(&Utc))
1963 .unwrap_or_else(|_| Utc::now()),
1964 created_by: row.get("created_by")?,
1965 change_summary: row.get("change_summary")?,
1966 })
1967 })?
1968 .filter_map(|r| r.ok())
1969 .collect();
1970
1971 Ok(versions)
1972}
1973
1974#[derive(Debug, Clone, serde::Serialize)]
1980pub struct BatchCreateResult {
1981 pub created: Vec<Memory>,
1982 pub failed: Vec<BatchError>,
1983 pub total_created: usize,
1984 pub total_failed: usize,
1985}
1986
1987#[derive(Debug, Clone, serde::Serialize)]
1989pub struct BatchDeleteResult {
1990 pub deleted: Vec<i64>,
1991 pub failed: Vec<BatchError>,
1992 pub total_deleted: usize,
1993 pub total_failed: usize,
1994}
1995
1996#[derive(Debug, Clone, serde::Serialize)]
1998pub struct BatchError {
1999 pub index: usize,
2000 pub id: Option<i64>,
2001 pub error: String,
2002}
2003
2004pub fn create_memory_batch(
2006 conn: &Connection,
2007 inputs: &[CreateMemoryInput],
2008) -> Result<BatchCreateResult> {
2009 let mut created = Vec::new();
2010 let mut failed = Vec::new();
2011
2012 for (index, input) in inputs.iter().enumerate() {
2013 match create_memory(conn, input) {
2014 Ok(memory) => created.push(memory),
2015 Err(e) => failed.push(BatchError {
2016 index,
2017 id: None,
2018 error: e.to_string(),
2019 }),
2020 }
2021 }
2022
2023 Ok(BatchCreateResult {
2024 total_created: created.len(),
2025 total_failed: failed.len(),
2026 created,
2027 failed,
2028 })
2029}
2030
2031pub fn delete_memory_batch(conn: &Connection, ids: &[i64]) -> Result<BatchDeleteResult> {
2033 let mut deleted = Vec::new();
2034 let mut failed = Vec::new();
2035
2036 for (index, &id) in ids.iter().enumerate() {
2037 match delete_memory(conn, id) {
2038 Ok(()) => deleted.push(id),
2039 Err(e) => failed.push(BatchError {
2040 index,
2041 id: Some(id),
2042 error: e.to_string(),
2043 }),
2044 }
2045 }
2046
2047 Ok(BatchDeleteResult {
2048 total_deleted: deleted.len(),
2049 total_failed: failed.len(),
2050 deleted,
2051 failed,
2052 })
2053}
2054
2055#[derive(Debug, Clone, serde::Serialize)]
2061pub struct TagInfo {
2062 pub name: String,
2063 pub count: i64,
2064 pub last_used: Option<DateTime<Utc>>,
2065}
2066
2067pub fn list_tags(conn: &Connection) -> Result<Vec<TagInfo>> {
2069 let mut stmt = conn.prepare(
2070 r#"
2071 SELECT t.name, COUNT(mt.memory_id) as count,
2072 MAX(m.updated_at) as last_used
2073 FROM tags t
2074 LEFT JOIN memory_tags mt ON t.id = mt.tag_id
2075 LEFT JOIN memories m ON mt.memory_id = m.id AND m.valid_to IS NULL
2076 GROUP BY t.id, t.name
2077 ORDER BY count DESC, t.name ASC
2078 "#,
2079 )?;
2080
2081 let tags: Vec<TagInfo> = stmt
2082 .query_map([], |row| {
2083 let name: String = row.get(0)?;
2084 let count: i64 = row.get(1)?;
2085 let last_used: Option<String> = row.get(2)?;
2086
2087 Ok(TagInfo {
2088 name,
2089 count,
2090 last_used: last_used.and_then(|s| {
2091 DateTime::parse_from_rfc3339(&s)
2092 .map(|dt| dt.with_timezone(&Utc))
2093 .ok()
2094 }),
2095 })
2096 })?
2097 .filter_map(|r| r.ok())
2098 .collect();
2099
2100 Ok(tags)
2101}
2102
2103#[derive(Debug, Clone, serde::Serialize)]
2105pub struct TagHierarchyNode {
2106 pub name: String,
2107 pub full_path: String,
2108 pub count: i64,
2109 pub children: Vec<TagHierarchyNode>,
2110}
2111
2112pub fn get_tag_hierarchy(conn: &Connection) -> Result<Vec<TagHierarchyNode>> {
2114 let tags = list_tags(conn)?;
2115
2116 let mut root_nodes: HashMap<String, TagHierarchyNode> = HashMap::new();
2118
2119 for tag in tags {
2120 let parts: Vec<&str> = tag.name.split('/').collect();
2121 if parts.is_empty() {
2122 continue;
2123 }
2124
2125 let root_name = parts[0].to_string();
2126 if !root_nodes.contains_key(&root_name) {
2127 root_nodes.insert(
2128 root_name.clone(),
2129 TagHierarchyNode {
2130 name: root_name.clone(),
2131 full_path: root_name.clone(),
2132 count: 0,
2133 children: Vec::new(),
2134 },
2135 );
2136 }
2137
2138 if parts.len() == 1 {
2140 if let Some(node) = root_nodes.get_mut(&root_name) {
2141 node.count += tag.count;
2142 }
2143 } else {
2144 if let Some(node) = root_nodes.get_mut(&root_name) {
2147 node.count += tag.count;
2148 }
2149 }
2150 }
2151
2152 Ok(root_nodes.into_values().collect())
2153}
2154
2155#[derive(Debug, Clone, serde::Serialize)]
2157pub struct TagValidationResult {
2158 pub valid: bool,
2159 pub orphaned_tags: Vec<String>,
2160 pub empty_tags: Vec<String>,
2161 pub duplicate_assignments: Vec<(i64, String)>,
2162 pub total_tags: i64,
2163 pub total_assignments: i64,
2164}
2165
2166pub fn validate_tags(conn: &Connection) -> Result<TagValidationResult> {
2168 let orphaned: Vec<String> = conn
2170 .prepare(
2171 "SELECT t.name FROM tags t
2172 LEFT JOIN memory_tags mt ON t.id = mt.tag_id
2173 WHERE mt.tag_id IS NULL",
2174 )?
2175 .query_map([], |row| row.get(0))?
2176 .filter_map(|r| r.ok())
2177 .collect();
2178
2179 let empty: Vec<String> = conn
2181 .prepare("SELECT name FROM tags WHERE name = '' OR name IS NULL")?
2182 .query_map([], |row| row.get(0))?
2183 .filter_map(|r| r.ok())
2184 .collect();
2185
2186 let total_tags: i64 = conn.query_row("SELECT COUNT(*) FROM tags", [], |row| row.get(0))?;
2188 let total_assignments: i64 =
2189 conn.query_row("SELECT COUNT(*) FROM memory_tags", [], |row| row.get(0))?;
2190
2191 Ok(TagValidationResult {
2192 valid: orphaned.is_empty() && empty.is_empty(),
2193 orphaned_tags: orphaned,
2194 empty_tags: empty,
2195 duplicate_assignments: vec![], total_tags,
2197 total_assignments,
2198 })
2199}
2200
2201#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2207pub struct ExportedMemory {
2208 pub id: i64,
2209 pub content: String,
2210 pub memory_type: String,
2211 pub tags: Vec<String>,
2212 pub metadata: HashMap<String, serde_json::Value>,
2213 pub importance: f32,
2214 pub workspace: String,
2215 pub tier: String,
2216 pub created_at: String,
2217 pub updated_at: String,
2218}
2219
2220#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2222pub struct ExportData {
2223 pub version: String,
2224 pub exported_at: String,
2225 pub memory_count: usize,
2226 pub memories: Vec<ExportedMemory>,
2227}
2228
2229pub fn export_memories(conn: &Connection) -> Result<ExportData> {
2231 let memories = list_memories(
2232 conn,
2233 &ListOptions {
2234 limit: Some(100000),
2235 ..Default::default()
2236 },
2237 )?;
2238
2239 let exported: Vec<ExportedMemory> = memories
2240 .into_iter()
2241 .map(|m| ExportedMemory {
2242 id: m.id,
2243 content: m.content,
2244 memory_type: m.memory_type.as_str().to_string(),
2245 tags: m.tags,
2246 metadata: m.metadata,
2247 importance: m.importance,
2248 workspace: m.workspace,
2249 tier: m.tier.as_str().to_string(),
2250 created_at: m.created_at.to_rfc3339(),
2251 updated_at: m.updated_at.to_rfc3339(),
2252 })
2253 .collect();
2254
2255 Ok(ExportData {
2256 version: "1.0".to_string(),
2257 exported_at: Utc::now().to_rfc3339(),
2258 memory_count: exported.len(),
2259 memories: exported,
2260 })
2261}
2262
2263#[derive(Debug, Clone, serde::Serialize)]
2265pub struct ImportResult {
2266 pub imported: usize,
2267 pub skipped: usize,
2268 pub failed: usize,
2269 pub errors: Vec<String>,
2270}
2271
2272pub fn import_memories(
2274 conn: &Connection,
2275 data: &ExportData,
2276 skip_duplicates: bool,
2277) -> Result<ImportResult> {
2278 let mut imported = 0;
2279 let mut skipped = 0;
2280 let mut failed = 0;
2281 let mut errors = Vec::new();
2282
2283 for mem in &data.memories {
2284 let memory_type = mem.memory_type.parse().unwrap_or(MemoryType::Note);
2285 let tier = mem.tier.parse().unwrap_or(MemoryTier::Permanent);
2286
2287 let input = CreateMemoryInput {
2288 content: mem.content.clone(),
2289 memory_type,
2290 tags: mem.tags.clone(),
2291 metadata: mem.metadata.clone(),
2292 importance: Some(mem.importance),
2293 scope: MemoryScope::Global,
2294 workspace: Some(mem.workspace.clone()),
2295 tier,
2296 defer_embedding: false,
2297 ttl_seconds: None,
2298 dedup_mode: if skip_duplicates {
2299 DedupMode::Skip
2300 } else {
2301 DedupMode::Allow
2302 },
2303 dedup_threshold: None,
2304 event_time: None,
2305 event_duration_seconds: None,
2306 trigger_pattern: None,
2307 summary_of_id: None,
2308 };
2309
2310 match create_memory(conn, &input) {
2311 Ok(_) => imported += 1,
2312 Err(EngramError::Duplicate { .. }) => skipped += 1,
2313 Err(e) => {
2314 failed += 1;
2315 errors.push(format!("Failed to import memory {}: {}", mem.id, e));
2316 }
2317 }
2318 }
2319
2320 Ok(ImportResult {
2321 imported,
2322 skipped,
2323 failed,
2324 errors,
2325 })
2326}
2327
2328pub fn rebuild_embeddings(conn: &Connection) -> Result<i64> {
2334 let now = Utc::now().to_rfc3339();
2335
2336 conn.execute("DELETE FROM embedding_queue", [])?;
2338
2339 let count = conn.execute(
2341 "INSERT INTO embedding_queue (memory_id, status, queued_at)
2342 SELECT id, 'pending', ? FROM memories WHERE valid_to IS NULL",
2343 params![now],
2344 )?;
2345
2346 conn.execute(
2348 "UPDATE memories SET has_embedding = 0 WHERE valid_to IS NULL",
2349 [],
2350 )?;
2351
2352 Ok(count as i64)
2353}
2354
2355pub fn rebuild_crossrefs(conn: &Connection) -> Result<i64> {
2357 let now = Utc::now().to_rfc3339();
2358
2359 let deleted = conn.execute(
2361 "UPDATE crossrefs SET valid_to = ? WHERE source = 'auto' AND valid_to IS NULL",
2362 params![now],
2363 )?;
2364
2365 Ok(deleted as i64)
2369}
2370
2371pub fn create_section_memory(
2377 conn: &Connection,
2378 title: &str,
2379 content: &str,
2380 parent_id: Option<i64>,
2381 level: i32,
2382 workspace: Option<&str>,
2383) -> Result<Memory> {
2384 let mut metadata = HashMap::new();
2385 metadata.insert("section_title".to_string(), serde_json::json!(title));
2386 metadata.insert("section_level".to_string(), serde_json::json!(level));
2387 if let Some(pid) = parent_id {
2388 metadata.insert("parent_memory_id".to_string(), serde_json::json!(pid));
2389 }
2390
2391 let input = CreateMemoryInput {
2392 content: format!("# {}\n\n{}", title, content),
2393 memory_type: MemoryType::Context,
2394 tags: vec!["section".to_string()],
2395 metadata,
2396 importance: Some(0.6),
2397 scope: MemoryScope::Global,
2398 workspace: workspace.map(String::from),
2399 tier: MemoryTier::Permanent,
2400 defer_embedding: false,
2401 ttl_seconds: None,
2402 dedup_mode: DedupMode::Skip,
2403 dedup_threshold: None,
2404 event_time: None,
2405 event_duration_seconds: None,
2406 trigger_pattern: None,
2407 summary_of_id: None,
2408 };
2409
2410 create_memory(conn, &input)
2411}
2412
2413pub fn create_checkpoint(
2415 conn: &Connection,
2416 session_id: &str,
2417 summary: &str,
2418 context: &HashMap<String, serde_json::Value>,
2419 workspace: Option<&str>,
2420) -> Result<Memory> {
2421 let mut metadata = context.clone();
2422 metadata.insert(
2423 "checkpoint_session".to_string(),
2424 serde_json::json!(session_id),
2425 );
2426 metadata.insert(
2427 "checkpoint_time".to_string(),
2428 serde_json::json!(Utc::now().to_rfc3339()),
2429 );
2430
2431 let input = CreateMemoryInput {
2432 content: format!("Session Checkpoint: {}\n\n{}", session_id, summary),
2433 memory_type: MemoryType::Context,
2434 tags: vec!["checkpoint".to_string(), format!("session:{}", session_id)],
2435 metadata,
2436 importance: Some(0.7),
2437 scope: MemoryScope::Global,
2438 workspace: workspace.map(String::from),
2439 tier: MemoryTier::Permanent,
2440 defer_embedding: false,
2441 ttl_seconds: None,
2442 dedup_mode: DedupMode::Allow,
2443 dedup_threshold: None,
2444 event_time: None,
2445 event_duration_seconds: None,
2446 trigger_pattern: None,
2447 summary_of_id: None,
2448 };
2449
2450 create_memory(conn, &input)
2451}
2452
2453pub fn boost_memory(
2455 conn: &Connection,
2456 id: i64,
2457 boost_amount: f32,
2458 duration_seconds: Option<i64>,
2459) -> Result<Memory> {
2460 let memory = get_memory(conn, id)?;
2461 let new_importance = (memory.importance + boost_amount).min(1.0);
2462 let now = Utc::now();
2463
2464 conn.execute(
2466 "UPDATE memories SET importance = ?, updated_at = ? WHERE id = ?",
2467 params![new_importance, now.to_rfc3339(), id],
2468 )?;
2469
2470 if let Some(duration) = duration_seconds {
2472 let expires = now + chrono::Duration::seconds(duration);
2473 let mut metadata = memory.metadata.clone();
2474 metadata.insert(
2475 "boost_expires".to_string(),
2476 serde_json::json!(expires.to_rfc3339()),
2477 );
2478 metadata.insert(
2479 "boost_original_importance".to_string(),
2480 serde_json::json!(memory.importance),
2481 );
2482
2483 let metadata_json = serde_json::to_string(&metadata)?;
2484 conn.execute(
2485 "UPDATE memories SET metadata = ? WHERE id = ?",
2486 params![metadata_json, id],
2487 )?;
2488 }
2489
2490 get_memory(conn, id)
2491}
2492
2493#[derive(Debug, Clone, Serialize, Deserialize)]
2499pub enum MemoryEventType {
2500 Created,
2501 Updated,
2502 Deleted,
2503 Linked,
2504 Unlinked,
2505 Shared,
2506 Synced,
2507}
2508
2509impl std::fmt::Display for MemoryEventType {
2510 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2511 match self {
2512 MemoryEventType::Created => write!(f, "created"),
2513 MemoryEventType::Updated => write!(f, "updated"),
2514 MemoryEventType::Deleted => write!(f, "deleted"),
2515 MemoryEventType::Linked => write!(f, "linked"),
2516 MemoryEventType::Unlinked => write!(f, "unlinked"),
2517 MemoryEventType::Shared => write!(f, "shared"),
2518 MemoryEventType::Synced => write!(f, "synced"),
2519 }
2520 }
2521}
2522
2523impl std::str::FromStr for MemoryEventType {
2524 type Err = EngramError;
2525 fn from_str(s: &str) -> Result<Self> {
2526 match s {
2527 "created" => Ok(MemoryEventType::Created),
2528 "updated" => Ok(MemoryEventType::Updated),
2529 "deleted" => Ok(MemoryEventType::Deleted),
2530 "linked" => Ok(MemoryEventType::Linked),
2531 "unlinked" => Ok(MemoryEventType::Unlinked),
2532 "shared" => Ok(MemoryEventType::Shared),
2533 "synced" => Ok(MemoryEventType::Synced),
2534 _ => Err(EngramError::InvalidInput(format!(
2535 "Invalid event type: {}",
2536 s
2537 ))),
2538 }
2539 }
2540}
2541
2542#[derive(Debug, Clone, Serialize, Deserialize)]
2544pub struct MemoryEvent {
2545 pub id: i64,
2546 pub event_type: String,
2547 pub memory_id: Option<i64>,
2548 pub agent_id: Option<String>,
2549 pub data: serde_json::Value,
2550 pub created_at: DateTime<Utc>,
2551}
2552
2553pub fn record_event(
2555 conn: &Connection,
2556 event_type: MemoryEventType,
2557 memory_id: Option<i64>,
2558 agent_id: Option<&str>,
2559 data: serde_json::Value,
2560) -> Result<i64> {
2561 let now = Utc::now();
2562 let data_json = serde_json::to_string(&data)?;
2563
2564 conn.execute(
2565 "INSERT INTO memory_events (event_type, memory_id, agent_id, data, created_at)
2566 VALUES (?, ?, ?, ?, ?)",
2567 params![
2568 event_type.to_string(),
2569 memory_id,
2570 agent_id,
2571 data_json,
2572 now.to_rfc3339()
2573 ],
2574 )?;
2575
2576 Ok(conn.last_insert_rowid())
2577}
2578
2579pub fn poll_events(
2581 conn: &Connection,
2582 since_id: Option<i64>,
2583 since_time: Option<DateTime<Utc>>,
2584 agent_id: Option<&str>,
2585 limit: Option<usize>,
2586) -> Result<Vec<MemoryEvent>> {
2587 let limit = limit.unwrap_or(100);
2588
2589 let (query, params): (&str, Vec<Box<dyn rusqlite::ToSql>>) =
2590 match (since_id, since_time, agent_id) {
2591 (Some(id), _, Some(agent)) => (
2592 "SELECT id, event_type, memory_id, agent_id, data, created_at
2593 FROM memory_events WHERE id > ? AND (agent_id = ? OR agent_id IS NULL)
2594 ORDER BY id ASC LIMIT ?",
2595 vec![
2596 Box::new(id),
2597 Box::new(agent.to_string()),
2598 Box::new(limit as i64),
2599 ],
2600 ),
2601 (Some(id), _, None) => (
2602 "SELECT id, event_type, memory_id, agent_id, data, created_at
2603 FROM memory_events WHERE id > ?
2604 ORDER BY id ASC LIMIT ?",
2605 vec![Box::new(id), Box::new(limit as i64)],
2606 ),
2607 (None, Some(time), Some(agent)) => (
2608 "SELECT id, event_type, memory_id, agent_id, data, created_at
2609 FROM memory_events WHERE created_at > ? AND (agent_id = ? OR agent_id IS NULL)
2610 ORDER BY id ASC LIMIT ?",
2611 vec![
2612 Box::new(time.to_rfc3339()),
2613 Box::new(agent.to_string()),
2614 Box::new(limit as i64),
2615 ],
2616 ),
2617 (None, Some(time), None) => (
2618 "SELECT id, event_type, memory_id, agent_id, data, created_at
2619 FROM memory_events WHERE created_at > ?
2620 ORDER BY id ASC LIMIT ?",
2621 vec![Box::new(time.to_rfc3339()), Box::new(limit as i64)],
2622 ),
2623 (None, None, Some(agent)) => (
2624 "SELECT id, event_type, memory_id, agent_id, data, created_at
2625 FROM memory_events WHERE agent_id = ? OR agent_id IS NULL
2626 ORDER BY id DESC LIMIT ?",
2627 vec![Box::new(agent.to_string()), Box::new(limit as i64)],
2628 ),
2629 (None, None, None) => (
2630 "SELECT id, event_type, memory_id, agent_id, data, created_at
2631 FROM memory_events ORDER BY id DESC LIMIT ?",
2632 vec![Box::new(limit as i64)],
2633 ),
2634 };
2635
2636 let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
2637 let mut stmt = conn.prepare(query)?;
2638 let events = stmt
2639 .query_map(params_refs.as_slice(), |row| {
2640 let data_str: String = row.get(4)?;
2641 let created_str: String = row.get(5)?;
2642 Ok(MemoryEvent {
2643 id: row.get(0)?,
2644 event_type: row.get(1)?,
2645 memory_id: row.get(2)?,
2646 agent_id: row.get(3)?,
2647 data: serde_json::from_str(&data_str).unwrap_or(serde_json::json!({})),
2648 created_at: DateTime::parse_from_rfc3339(&created_str)
2649 .map(|dt| dt.with_timezone(&Utc))
2650 .unwrap_or_else(|_| Utc::now()),
2651 })
2652 })?
2653 .collect::<std::result::Result<Vec<_>, _>>()?;
2654
2655 Ok(events)
2656}
2657
2658pub fn clear_events(
2660 conn: &Connection,
2661 before_id: Option<i64>,
2662 before_time: Option<DateTime<Utc>>,
2663 keep_recent: Option<usize>,
2664) -> Result<i64> {
2665 let deleted = if let Some(id) = before_id {
2666 conn.execute("DELETE FROM memory_events WHERE id < ?", params![id])?
2667 } else if let Some(time) = before_time {
2668 conn.execute(
2669 "DELETE FROM memory_events WHERE created_at < ?",
2670 params![time.to_rfc3339()],
2671 )?
2672 } else if let Some(keep) = keep_recent {
2673 conn.execute(
2675 "DELETE FROM memory_events WHERE id NOT IN (
2676 SELECT id FROM memory_events ORDER BY id DESC LIMIT ?
2677 )",
2678 params![keep as i64],
2679 )?
2680 } else {
2681 conn.execute("DELETE FROM memory_events", [])?
2683 };
2684
2685 Ok(deleted as i64)
2686}
2687
2688#[derive(Debug, Clone, Serialize, Deserialize)]
2694pub struct SyncVersion {
2695 pub version: i64,
2696 pub last_modified: DateTime<Utc>,
2697 pub memory_count: i64,
2698 pub checksum: String,
2699}
2700
2701#[derive(Debug, Clone, Serialize, Deserialize)]
2703pub struct SyncTask {
2704 pub task_id: String,
2705 pub task_type: String,
2706 pub status: String,
2707 pub progress_percent: i32,
2708 pub traces_processed: i64,
2709 pub memories_created: i64,
2710 pub error_message: Option<String>,
2711 pub started_at: String,
2712 pub completed_at: Option<String>,
2713}
2714
2715pub fn get_sync_version(conn: &Connection) -> Result<SyncVersion> {
2717 let memory_count: i64 =
2718 conn.query_row("SELECT COUNT(*) FROM memories", [], |row| row.get(0))?;
2719
2720 let last_modified: Option<String> = conn
2721 .query_row("SELECT MAX(updated_at) FROM memories", [], |row| row.get(0))
2722 .ok();
2723
2724 let version: i64 = conn
2725 .query_row("SELECT MAX(version) FROM sync_state", [], |row| row.get(0))
2726 .unwrap_or(0);
2727
2728 let checksum = format!(
2730 "{}-{}-{}",
2731 memory_count,
2732 version,
2733 last_modified.as_deref().unwrap_or("none")
2734 );
2735
2736 Ok(SyncVersion {
2737 version,
2738 last_modified: last_modified
2739 .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
2740 .map(|dt| dt.with_timezone(&Utc))
2741 .unwrap_or_else(Utc::now),
2742 memory_count,
2743 checksum,
2744 })
2745}
2746
2747pub fn upsert_sync_task(conn: &Connection, task: &SyncTask) -> Result<()> {
2749 conn.execute(
2750 r#"
2751 INSERT INTO sync_tasks (
2752 task_id, task_type, status, progress_percent, traces_processed, memories_created,
2753 error_message, started_at, completed_at
2754 )
2755 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
2756 ON CONFLICT(task_id) DO UPDATE SET
2757 task_type = excluded.task_type,
2758 status = excluded.status,
2759 progress_percent = excluded.progress_percent,
2760 traces_processed = excluded.traces_processed,
2761 memories_created = excluded.memories_created,
2762 error_message = excluded.error_message,
2763 started_at = excluded.started_at,
2764 completed_at = excluded.completed_at
2765 "#,
2766 params![
2767 task.task_id,
2768 task.task_type,
2769 task.status,
2770 task.progress_percent,
2771 task.traces_processed,
2772 task.memories_created,
2773 task.error_message,
2774 task.started_at,
2775 task.completed_at
2776 ],
2777 )?;
2778
2779 Ok(())
2780}
2781
2782pub fn get_sync_task(conn: &Connection, task_id: &str) -> Result<Option<SyncTask>> {
2784 let mut stmt = conn.prepare(
2785 r#"
2786 SELECT task_id, task_type, status, progress_percent, traces_processed, memories_created,
2787 error_message, started_at, completed_at
2788 FROM sync_tasks
2789 WHERE task_id = ?
2790 "#,
2791 )?;
2792
2793 let mut rows = stmt.query(params![task_id])?;
2794 if let Some(row) = rows.next()? {
2795 Ok(Some(SyncTask {
2796 task_id: row.get("task_id")?,
2797 task_type: row.get("task_type")?,
2798 status: row.get("status")?,
2799 progress_percent: row.get("progress_percent")?,
2800 traces_processed: row.get("traces_processed")?,
2801 memories_created: row.get("memories_created")?,
2802 error_message: row.get("error_message")?,
2803 started_at: row.get("started_at")?,
2804 completed_at: row.get("completed_at")?,
2805 }))
2806 } else {
2807 Ok(None)
2808 }
2809}
2810
2811#[derive(Debug, Clone, Serialize, Deserialize)]
2813pub struct SyncDelta {
2814 pub created: Vec<Memory>,
2815 pub updated: Vec<Memory>,
2816 pub deleted: Vec<i64>,
2817 pub from_version: i64,
2818 pub to_version: i64,
2819}
2820
2821pub fn get_sync_delta(conn: &Connection, since_version: i64) -> Result<SyncDelta> {
2823 let current_version = get_sync_version(conn)?.version;
2824
2825 let events = poll_events(conn, Some(since_version), None, None, Some(10000))?;
2827
2828 let mut created_ids = std::collections::HashSet::new();
2829 let mut updated_ids = std::collections::HashSet::new();
2830 let mut deleted_ids = std::collections::HashSet::new();
2831
2832 for event in events {
2833 if let Some(memory_id) = event.memory_id {
2834 match event.event_type.as_str() {
2835 "created" => {
2836 created_ids.insert(memory_id);
2837 }
2838 "updated" => {
2839 if !created_ids.contains(&memory_id) {
2840 updated_ids.insert(memory_id);
2841 }
2842 }
2843 "deleted" => {
2844 created_ids.remove(&memory_id);
2845 updated_ids.remove(&memory_id);
2846 deleted_ids.insert(memory_id);
2847 }
2848 _ => {}
2849 }
2850 }
2851 }
2852
2853 let created: Vec<Memory> = created_ids
2854 .iter()
2855 .filter_map(|id| get_memory(conn, *id).ok())
2856 .collect();
2857
2858 let updated: Vec<Memory> = updated_ids
2859 .iter()
2860 .filter_map(|id| get_memory(conn, *id).ok())
2861 .collect();
2862
2863 Ok(SyncDelta {
2864 created,
2865 updated,
2866 deleted: deleted_ids.into_iter().collect(),
2867 from_version: since_version,
2868 to_version: current_version,
2869 })
2870}
2871
2872#[derive(Debug, Clone, Serialize, Deserialize)]
2874pub struct AgentSyncState {
2875 pub agent_id: String,
2876 pub last_sync_version: i64,
2877 pub last_sync_time: DateTime<Utc>,
2878 pub pending_changes: i64,
2879}
2880
2881pub fn get_agent_sync_state(conn: &Connection, agent_id: &str) -> Result<AgentSyncState> {
2883 let result: std::result::Result<(i64, String), rusqlite::Error> = conn.query_row(
2884 "SELECT last_sync_version, last_sync_time FROM agent_sync_state WHERE agent_id = ?",
2885 params![agent_id],
2886 |row| Ok((row.get(0)?, row.get(1)?)),
2887 );
2888
2889 match result {
2890 Ok((version, time_str)) => {
2891 let current_version = get_sync_version(conn)?.version;
2892 let pending = (current_version - version).max(0);
2893
2894 Ok(AgentSyncState {
2895 agent_id: agent_id.to_string(),
2896 last_sync_version: version,
2897 last_sync_time: DateTime::parse_from_rfc3339(&time_str)
2898 .map(|dt| dt.with_timezone(&Utc))
2899 .unwrap_or_else(|_| Utc::now()),
2900 pending_changes: pending,
2901 })
2902 }
2903 Err(_) => {
2904 Ok(AgentSyncState {
2906 agent_id: agent_id.to_string(),
2907 last_sync_version: 0,
2908 last_sync_time: Utc::now(),
2909 pending_changes: get_sync_version(conn)?.version,
2910 })
2911 }
2912 }
2913}
2914
2915pub fn update_agent_sync_state(conn: &Connection, agent_id: &str, version: i64) -> Result<()> {
2917 let now = Utc::now();
2918 conn.execute(
2919 "INSERT INTO agent_sync_state (agent_id, last_sync_version, last_sync_time)
2920 VALUES (?, ?, ?)
2921 ON CONFLICT(agent_id) DO UPDATE SET
2922 last_sync_version = excluded.last_sync_version,
2923 last_sync_time = excluded.last_sync_time",
2924 params![agent_id, version, now.to_rfc3339()],
2925 )?;
2926 Ok(())
2927}
2928
2929pub fn cleanup_sync_data(conn: &Connection, older_than_days: i64) -> Result<i64> {
2931 let cutoff = Utc::now() - chrono::Duration::days(older_than_days);
2932 let deleted = conn.execute(
2933 "DELETE FROM memory_events WHERE created_at < ?",
2934 params![cutoff.to_rfc3339()],
2935 )?;
2936 Ok(deleted as i64)
2937}
2938
2939#[derive(Debug, Clone, Serialize, Deserialize)]
2945pub struct SharedMemory {
2946 pub id: i64,
2947 pub memory_id: i64,
2948 pub from_agent: String,
2949 pub to_agent: String,
2950 pub message: Option<String>,
2951 pub acknowledged: bool,
2952 pub acknowledged_at: Option<DateTime<Utc>>,
2953 pub created_at: DateTime<Utc>,
2954}
2955
2956pub fn share_memory(
2958 conn: &Connection,
2959 memory_id: i64,
2960 from_agent: &str,
2961 to_agent: &str,
2962 message: Option<&str>,
2963) -> Result<i64> {
2964 let now = Utc::now();
2965
2966 let _ = get_memory(conn, memory_id)?;
2968
2969 conn.execute(
2970 "INSERT INTO shared_memories (memory_id, from_agent, to_agent, message, acknowledged, created_at)
2971 VALUES (?, ?, ?, ?, 0, ?)",
2972 params![memory_id, from_agent, to_agent, message, now.to_rfc3339()],
2973 )?;
2974
2975 let share_id = conn.last_insert_rowid();
2976
2977 record_event(
2979 conn,
2980 MemoryEventType::Shared,
2981 Some(memory_id),
2982 Some(from_agent),
2983 serde_json::json!({
2984 "to_agent": to_agent,
2985 "share_id": share_id,
2986 "message": message
2987 }),
2988 )?;
2989
2990 Ok(share_id)
2991}
2992
2993pub fn poll_shared_memories(
2995 conn: &Connection,
2996 to_agent: &str,
2997 include_acknowledged: bool,
2998) -> Result<Vec<SharedMemory>> {
2999 let query = if include_acknowledged {
3000 "SELECT id, memory_id, from_agent, to_agent, message, acknowledged, acknowledged_at, created_at
3001 FROM shared_memories WHERE to_agent = ?
3002 ORDER BY created_at DESC"
3003 } else {
3004 "SELECT id, memory_id, from_agent, to_agent, message, acknowledged, acknowledged_at, created_at
3005 FROM shared_memories WHERE to_agent = ? AND acknowledged = 0
3006 ORDER BY created_at DESC"
3007 };
3008
3009 let mut stmt = conn.prepare(query)?;
3010 let shares = stmt
3011 .query_map(params![to_agent], |row| {
3012 let created_str: String = row.get(7)?;
3013 let ack_str: Option<String> = row.get(6)?;
3014 Ok(SharedMemory {
3015 id: row.get(0)?,
3016 memory_id: row.get(1)?,
3017 from_agent: row.get(2)?,
3018 to_agent: row.get(3)?,
3019 message: row.get(4)?,
3020 acknowledged: row.get(5)?,
3021 acknowledged_at: ack_str.and_then(|s| {
3022 DateTime::parse_from_rfc3339(&s)
3023 .ok()
3024 .map(|dt| dt.with_timezone(&Utc))
3025 }),
3026 created_at: DateTime::parse_from_rfc3339(&created_str)
3027 .map(|dt| dt.with_timezone(&Utc))
3028 .unwrap_or_else(|_| Utc::now()),
3029 })
3030 })?
3031 .collect::<std::result::Result<Vec<_>, _>>()?;
3032
3033 Ok(shares)
3034}
3035
3036pub fn acknowledge_share(conn: &Connection, share_id: i64, agent_id: &str) -> Result<()> {
3038 let now = Utc::now();
3039
3040 let affected = conn.execute(
3041 "UPDATE shared_memories SET acknowledged = 1, acknowledged_at = ?
3042 WHERE id = ? AND to_agent = ?",
3043 params![now.to_rfc3339(), share_id, agent_id],
3044 )?;
3045
3046 if affected == 0 {
3047 return Err(EngramError::NotFound(share_id));
3048 }
3049
3050 Ok(())
3051}
3052
3053pub fn search_by_identity(
3059 conn: &Connection,
3060 identity: &str,
3061 workspace: Option<&str>,
3062 limit: Option<usize>,
3063) -> Result<Vec<Memory>> {
3064 let limit = limit.unwrap_or(50);
3065 let now = Utc::now().to_rfc3339();
3066
3067 let pattern = format!("%{}%", identity);
3070
3071 let query = if workspace.is_some() {
3072 "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
3073 m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
3074 m.visibility, m.version, m.has_embedding, m.metadata,
3075 m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash
3076 FROM memories m
3077 LEFT JOIN memory_tags mt ON m.id = mt.memory_id
3078 LEFT JOIN tags t ON mt.tag_id = t.id
3079 WHERE m.workspace = ? AND (m.content LIKE ? OR t.name LIKE ?)
3080 AND m.valid_to IS NULL
3081 AND (m.expires_at IS NULL OR m.expires_at > ?)
3082 ORDER BY m.importance DESC, m.created_at DESC
3083 LIMIT ?"
3084 } else {
3085 "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
3086 m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
3087 m.visibility, m.version, m.has_embedding, m.metadata,
3088 m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash
3089 FROM memories m
3090 LEFT JOIN memory_tags mt ON m.id = mt.memory_id
3091 LEFT JOIN tags t ON mt.tag_id = t.id
3092 WHERE (m.content LIKE ? OR t.name LIKE ?)
3093 AND m.valid_to IS NULL
3094 AND (m.expires_at IS NULL OR m.expires_at > ?)
3095 ORDER BY m.importance DESC, m.created_at DESC
3096 LIMIT ?"
3097 };
3098
3099 let mut stmt = conn.prepare(query)?;
3100
3101 let memories = if let Some(ws) = workspace {
3102 stmt.query_map(
3103 params![ws, &pattern, &pattern, &now, limit as i64],
3104 memory_from_row,
3105 )?
3106 .collect::<std::result::Result<Vec<_>, _>>()?
3107 } else {
3108 stmt.query_map(
3109 params![&pattern, &pattern, &now, limit as i64],
3110 memory_from_row,
3111 )?
3112 .collect::<std::result::Result<Vec<_>, _>>()?
3113 };
3114
3115 Ok(memories)
3116}
3117
3118pub fn search_sessions(
3120 conn: &Connection,
3121 query_text: &str,
3122 session_id: Option<&str>,
3123 workspace: Option<&str>,
3124 limit: Option<usize>,
3125) -> Result<Vec<Memory>> {
3126 let limit = limit.unwrap_or(20);
3127 let now = Utc::now().to_rfc3339();
3128 let pattern = format!("%{}%", query_text);
3129
3130 let mut conditions = vec![
3133 "m.memory_type = 'transcript_chunk'",
3134 "m.valid_to IS NULL",
3135 "(m.expires_at IS NULL OR m.expires_at > ?)",
3136 ];
3137 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
3138
3139 let use_tag_join = session_id.is_some();
3141 if let Some(sid) = session_id {
3142 let tag_name = format!("session:{}", sid);
3143 conditions.push("t.name = ?");
3144 params_vec.push(Box::new(tag_name));
3145 }
3146
3147 if let Some(ws) = workspace {
3149 conditions.push("m.workspace = ?");
3150 params_vec.push(Box::new(ws.to_string()));
3151 }
3152
3153 conditions.push("m.content LIKE ?");
3155 params_vec.push(Box::new(pattern));
3156
3157 params_vec.push(Box::new(limit as i64));
3159
3160 let join_clause = if use_tag_join {
3162 "JOIN memory_tags mt ON m.id = mt.memory_id JOIN tags t ON mt.tag_id = t.id"
3163 } else {
3164 ""
3165 };
3166
3167 let query = format!(
3168 "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
3169 m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
3170 m.visibility, m.version, m.has_embedding, m.metadata,
3171 m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash
3172 FROM memories m {} WHERE {} ORDER BY m.created_at DESC LIMIT ?",
3173 join_clause,
3174 conditions.join(" AND ")
3175 );
3176
3177 let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|p| p.as_ref()).collect();
3178 let mut stmt = conn.prepare(&query)?;
3179 let memories = stmt
3180 .query_map(params_refs.as_slice(), memory_from_row)?
3181 .collect::<std::result::Result<Vec<_>, _>>()?;
3182
3183 Ok(memories)
3184}
3185
3186#[cfg(test)]
3187mod tests {
3188 use super::*;
3189 use crate::storage::Storage;
3190 use serde_json::json;
3191 use std::collections::HashMap;
3192
3193 #[test]
3194 fn test_list_memories_metadata_filter_types() {
3195 let storage = Storage::open_in_memory().unwrap();
3196
3197 storage
3198 .with_connection(|conn| {
3199 let mut metadata1 = HashMap::new();
3200 metadata1.insert("status".to_string(), json!("active"));
3201 metadata1.insert("count".to_string(), json!(3));
3202 metadata1.insert("flag".to_string(), json!(true));
3203
3204 let mut metadata2 = HashMap::new();
3205 metadata2.insert("status".to_string(), json!("inactive"));
3206 metadata2.insert("count".to_string(), json!(5));
3207 metadata2.insert("flag".to_string(), json!(false));
3208 metadata2.insert("optional".to_string(), json!("set"));
3209
3210 let memory1 = create_memory(
3211 conn,
3212 &CreateMemoryInput {
3213 content: "First".to_string(),
3214 memory_type: MemoryType::Note,
3215 tags: vec![],
3216 metadata: metadata1,
3217 importance: None,
3218 scope: Default::default(),
3219 workspace: None,
3220 tier: Default::default(),
3221 defer_embedding: true,
3222 ttl_seconds: None,
3223 dedup_mode: Default::default(),
3224 dedup_threshold: None,
3225 event_time: None,
3226 event_duration_seconds: None,
3227 trigger_pattern: None,
3228 summary_of_id: None,
3229 },
3230 )?;
3231 let memory2 = create_memory(
3232 conn,
3233 &CreateMemoryInput {
3234 content: "Second".to_string(),
3235 memory_type: MemoryType::Note,
3236 tags: vec![],
3237 metadata: metadata2,
3238 importance: None,
3239 scope: Default::default(),
3240 workspace: None,
3241 tier: Default::default(),
3242 defer_embedding: true,
3243 ttl_seconds: None,
3244 dedup_mode: Default::default(),
3245 dedup_threshold: None,
3246 event_time: None,
3247 event_duration_seconds: None,
3248 trigger_pattern: None,
3249 summary_of_id: None,
3250 },
3251 )?;
3252
3253 let mut filter = HashMap::new();
3254 filter.insert("status".to_string(), json!("active"));
3255 let results = list_memories(
3256 conn,
3257 &ListOptions {
3258 metadata_filter: Some(filter),
3259 ..Default::default()
3260 },
3261 )?;
3262 assert_eq!(results.len(), 1);
3263 assert_eq!(results[0].id, memory1.id);
3264
3265 let mut filter = HashMap::new();
3266 filter.insert("count".to_string(), json!(5));
3267 let results = list_memories(
3268 conn,
3269 &ListOptions {
3270 metadata_filter: Some(filter),
3271 ..Default::default()
3272 },
3273 )?;
3274 assert_eq!(results.len(), 1);
3275 assert_eq!(results[0].id, memory2.id);
3276
3277 let mut filter = HashMap::new();
3278 filter.insert("flag".to_string(), json!(true));
3279 let results = list_memories(
3280 conn,
3281 &ListOptions {
3282 metadata_filter: Some(filter),
3283 ..Default::default()
3284 },
3285 )?;
3286 assert_eq!(results.len(), 1);
3287 assert_eq!(results[0].id, memory1.id);
3288
3289 let mut filter = HashMap::new();
3290 filter.insert("optional".to_string(), serde_json::Value::Null);
3291 let results = list_memories(
3292 conn,
3293 &ListOptions {
3294 metadata_filter: Some(filter),
3295 ..Default::default()
3296 },
3297 )?;
3298 assert_eq!(results.len(), 1);
3299 assert_eq!(results[0].id, memory1.id);
3300
3301 Ok(())
3302 })
3303 .unwrap();
3304 }
3305
3306 #[test]
3307 fn test_memory_scope_isolation() {
3308 use crate::types::MemoryScope;
3309
3310 let storage = Storage::open_in_memory().unwrap();
3311
3312 storage
3313 .with_connection(|conn| {
3314 let user1_memory = create_memory(
3316 conn,
3317 &CreateMemoryInput {
3318 content: "User 1 memory".to_string(),
3319 memory_type: MemoryType::Note,
3320 tags: vec!["test".to_string()],
3321 metadata: HashMap::new(),
3322 importance: None,
3323 scope: MemoryScope::user("user-1"),
3324 workspace: None,
3325 tier: Default::default(),
3326 defer_embedding: true,
3327 ttl_seconds: None,
3328 dedup_mode: Default::default(),
3329 dedup_threshold: None,
3330 event_time: None,
3331 event_duration_seconds: None,
3332 trigger_pattern: None,
3333 summary_of_id: None,
3334 },
3335 )?;
3336
3337 let user2_memory = create_memory(
3339 conn,
3340 &CreateMemoryInput {
3341 content: "User 2 memory".to_string(),
3342 memory_type: MemoryType::Note,
3343 tags: vec!["test".to_string()],
3344 metadata: HashMap::new(),
3345 importance: None,
3346 scope: MemoryScope::user("user-2"),
3347 workspace: None,
3348 tier: Default::default(),
3349 defer_embedding: true,
3350 ttl_seconds: None,
3351 dedup_mode: Default::default(),
3352 dedup_threshold: None,
3353 event_time: None,
3354 event_duration_seconds: None,
3355 trigger_pattern: None,
3356 summary_of_id: None,
3357 },
3358 )?;
3359
3360 let session_memory = create_memory(
3362 conn,
3363 &CreateMemoryInput {
3364 content: "Session memory".to_string(),
3365 memory_type: MemoryType::Note,
3366 tags: vec!["test".to_string()],
3367 metadata: HashMap::new(),
3368 importance: None,
3369 scope: MemoryScope::session("session-abc"),
3370 workspace: None,
3371 tier: Default::default(),
3372 defer_embedding: true,
3373 ttl_seconds: None,
3374 dedup_mode: Default::default(),
3375 dedup_threshold: None,
3376 event_time: None,
3377 event_duration_seconds: None,
3378 trigger_pattern: None,
3379 summary_of_id: None,
3380 },
3381 )?;
3382
3383 let global_memory = create_memory(
3385 conn,
3386 &CreateMemoryInput {
3387 content: "Global memory".to_string(),
3388 memory_type: MemoryType::Note,
3389 tags: vec!["test".to_string()],
3390 metadata: HashMap::new(),
3391 importance: None,
3392 scope: MemoryScope::Global,
3393 workspace: None,
3394 tier: Default::default(),
3395 defer_embedding: true,
3396 ttl_seconds: None,
3397 dedup_mode: Default::default(),
3398 dedup_threshold: None,
3399 event_time: None,
3400 event_duration_seconds: None,
3401 trigger_pattern: None,
3402 summary_of_id: None,
3403 },
3404 )?;
3405
3406 let all_results = list_memories(conn, &ListOptions::default())?;
3408 assert_eq!(all_results.len(), 4);
3409
3410 let user1_results = list_memories(
3412 conn,
3413 &ListOptions {
3414 scope: Some(MemoryScope::user("user-1")),
3415 ..Default::default()
3416 },
3417 )?;
3418 assert_eq!(user1_results.len(), 1);
3419 assert_eq!(user1_results[0].id, user1_memory.id);
3420 assert_eq!(user1_results[0].scope, MemoryScope::user("user-1"));
3421
3422 let user2_results = list_memories(
3424 conn,
3425 &ListOptions {
3426 scope: Some(MemoryScope::user("user-2")),
3427 ..Default::default()
3428 },
3429 )?;
3430 assert_eq!(user2_results.len(), 1);
3431 assert_eq!(user2_results[0].id, user2_memory.id);
3432
3433 let session_results = list_memories(
3435 conn,
3436 &ListOptions {
3437 scope: Some(MemoryScope::session("session-abc")),
3438 ..Default::default()
3439 },
3440 )?;
3441 assert_eq!(session_results.len(), 1);
3442 assert_eq!(session_results[0].id, session_memory.id);
3443
3444 let global_results = list_memories(
3446 conn,
3447 &ListOptions {
3448 scope: Some(MemoryScope::Global),
3449 ..Default::default()
3450 },
3451 )?;
3452 assert_eq!(global_results.len(), 1);
3453 assert_eq!(global_results[0].id, global_memory.id);
3454
3455 let retrieved = get_memory(conn, user1_memory.id)?;
3457 assert_eq!(retrieved.scope, MemoryScope::user("user-1"));
3458
3459 Ok(())
3460 })
3461 .unwrap();
3462 }
3463
3464 #[test]
3465 fn test_memory_scope_can_access() {
3466 use crate::types::MemoryScope;
3467
3468 assert!(MemoryScope::Global.can_access(&MemoryScope::user("user-1")));
3470 assert!(MemoryScope::Global.can_access(&MemoryScope::session("session-1")));
3471 assert!(MemoryScope::Global.can_access(&MemoryScope::agent("agent-1")));
3472 assert!(MemoryScope::Global.can_access(&MemoryScope::Global));
3473
3474 assert!(MemoryScope::user("user-1").can_access(&MemoryScope::user("user-1")));
3476 assert!(MemoryScope::session("s1").can_access(&MemoryScope::session("s1")));
3477 assert!(MemoryScope::agent("a1").can_access(&MemoryScope::agent("a1")));
3478
3479 assert!(!MemoryScope::user("user-1").can_access(&MemoryScope::user("user-2")));
3481 assert!(!MemoryScope::session("s1").can_access(&MemoryScope::session("s2")));
3482 assert!(!MemoryScope::agent("a1").can_access(&MemoryScope::agent("a2")));
3483
3484 assert!(!MemoryScope::user("user-1").can_access(&MemoryScope::session("s1")));
3486 assert!(!MemoryScope::session("s1").can_access(&MemoryScope::agent("a1")));
3487
3488 assert!(MemoryScope::user("user-1").can_access(&MemoryScope::Global));
3490 assert!(MemoryScope::session("s1").can_access(&MemoryScope::Global));
3491 assert!(MemoryScope::agent("a1").can_access(&MemoryScope::Global));
3492 }
3493
3494 #[test]
3495 fn test_memory_ttl_creation() {
3496 let storage = Storage::open_in_memory().unwrap();
3497
3498 storage
3499 .with_transaction(|conn| {
3500 let memory = create_memory(
3502 conn,
3503 &CreateMemoryInput {
3504 content: "Temporary memory".to_string(),
3505 memory_type: MemoryType::Note,
3506 tags: vec![],
3507 metadata: HashMap::new(),
3508 importance: None,
3509 scope: Default::default(),
3510 workspace: None,
3511 tier: MemoryTier::Daily, defer_embedding: true,
3513 ttl_seconds: Some(3600), dedup_mode: Default::default(),
3515 dedup_threshold: None,
3516 event_time: None,
3517 event_duration_seconds: None,
3518 trigger_pattern: None,
3519 summary_of_id: None,
3520 },
3521 )?;
3522
3523 assert!(memory.expires_at.is_some());
3525 assert_eq!(memory.tier, MemoryTier::Daily);
3526 let expires_at = memory.expires_at.unwrap();
3527 let now = Utc::now();
3528
3529 let diff = (expires_at - now).num_seconds();
3531 assert!(
3532 (3595..=3605).contains(&diff),
3533 "Expected ~3600 seconds, got {}",
3534 diff
3535 );
3536
3537 let permanent = create_memory(
3539 conn,
3540 &CreateMemoryInput {
3541 content: "Permanent memory".to_string(),
3542 memory_type: MemoryType::Note,
3543 tags: vec![],
3544 metadata: HashMap::new(),
3545 importance: None,
3546 scope: Default::default(),
3547 workspace: None,
3548 tier: Default::default(),
3549 defer_embedding: true,
3550 ttl_seconds: None,
3551 dedup_mode: Default::default(),
3552 dedup_threshold: None,
3553 event_time: None,
3554 event_duration_seconds: None,
3555 trigger_pattern: None,
3556 summary_of_id: None,
3557 },
3558 )?;
3559
3560 assert!(permanent.expires_at.is_none());
3562
3563 Ok(())
3564 })
3565 .unwrap();
3566 }
3567
3568 #[test]
3569 fn test_expired_memories_excluded_from_queries() {
3570 let storage = Storage::open_in_memory().unwrap();
3571
3572 storage
3573 .with_transaction(|conn| {
3574 let memory1 = create_memory(
3576 conn,
3577 &CreateMemoryInput {
3578 content: "Memory to expire".to_string(),
3579 memory_type: MemoryType::Note,
3580 tags: vec!["test".to_string()],
3581 metadata: HashMap::new(),
3582 importance: None,
3583 scope: Default::default(),
3584 workspace: None,
3585 tier: MemoryTier::Daily, defer_embedding: true,
3587 ttl_seconds: Some(3600), dedup_mode: Default::default(),
3589 dedup_threshold: None,
3590 event_time: None,
3591 event_duration_seconds: None,
3592 trigger_pattern: None,
3593 summary_of_id: None,
3594 },
3595 )?;
3596
3597 let active = create_memory(
3599 conn,
3600 &CreateMemoryInput {
3601 content: "Active memory".to_string(),
3602 memory_type: MemoryType::Note,
3603 tags: vec!["test".to_string()],
3604 metadata: HashMap::new(),
3605 importance: None,
3606 scope: Default::default(),
3607 workspace: None,
3608 tier: Default::default(),
3609 defer_embedding: true,
3610 ttl_seconds: None,
3611 dedup_mode: Default::default(),
3612 dedup_threshold: None,
3613 event_time: None,
3614 event_duration_seconds: None,
3615 trigger_pattern: None,
3616 summary_of_id: None,
3617 },
3618 )?;
3619
3620 let results = list_memories(conn, &ListOptions::default())?;
3622 assert_eq!(results.len(), 2);
3623
3624 let past = (Utc::now() - chrono::Duration::hours(1)).to_rfc3339();
3626 conn.execute(
3627 "UPDATE memories SET expires_at = ? WHERE id = ?",
3628 params![past, memory1.id],
3629 )?;
3630
3631 let results = list_memories(conn, &ListOptions::default())?;
3633 assert_eq!(results.len(), 1);
3634 assert_eq!(results[0].id, active.id);
3635
3636 let get_result = get_memory(conn, memory1.id);
3638 assert!(get_result.is_err());
3639
3640 let get_result = get_memory(conn, active.id);
3642 assert!(get_result.is_ok());
3643
3644 Ok(())
3645 })
3646 .unwrap();
3647 }
3648
3649 #[test]
3650 fn test_set_memory_expiration() {
3651 let storage = Storage::open_in_memory().unwrap();
3652
3653 storage
3654 .with_transaction(|conn| {
3655 let memory = create_memory(
3657 conn,
3658 &CreateMemoryInput {
3659 content: "Initially permanent".to_string(),
3660 memory_type: MemoryType::Note,
3661 tags: vec![],
3662 metadata: HashMap::new(),
3663 importance: None,
3664 scope: Default::default(),
3665 workspace: None,
3666 tier: Default::default(),
3667 defer_embedding: true,
3668 ttl_seconds: None,
3669 dedup_mode: Default::default(),
3670 dedup_threshold: None,
3671 event_time: None,
3672 event_duration_seconds: None,
3673 trigger_pattern: None,
3674 summary_of_id: None,
3675 },
3676 )?;
3677
3678 assert!(memory.expires_at.is_none());
3679
3680 let updated = set_memory_expiration(conn, memory.id, Some(1800))?;
3682 assert!(updated.expires_at.is_some());
3683
3684 let permanent_again = set_memory_expiration(conn, memory.id, Some(0))?;
3686 assert!(permanent_again.expires_at.is_none());
3687
3688 Ok(())
3689 })
3690 .unwrap();
3691 }
3692
3693 #[test]
3694 fn test_cleanup_expired_memories() {
3695 let storage = Storage::open_in_memory().unwrap();
3696
3697 storage
3698 .with_transaction(|conn| {
3699 let mut expired_ids = vec![];
3701 for i in 0..3 {
3702 let mem = create_memory(
3703 conn,
3704 &CreateMemoryInput {
3705 content: format!("To expire {}", i),
3706 memory_type: MemoryType::Note,
3707 tags: vec![],
3708 metadata: HashMap::new(),
3709 importance: None,
3710 scope: Default::default(),
3711 workspace: None,
3712 tier: MemoryTier::Daily, defer_embedding: true,
3714 ttl_seconds: Some(3600), dedup_mode: Default::default(),
3716 dedup_threshold: None,
3717 event_time: None,
3718 event_duration_seconds: None,
3719 trigger_pattern: None,
3720 summary_of_id: None,
3721 },
3722 )?;
3723 expired_ids.push(mem.id);
3724 }
3725
3726 for i in 0..2 {
3728 create_memory(
3729 conn,
3730 &CreateMemoryInput {
3731 content: format!("Active {}", i),
3732 memory_type: MemoryType::Note,
3733 tags: vec![],
3734 metadata: HashMap::new(),
3735 importance: None,
3736 scope: Default::default(),
3737 workspace: None,
3738 tier: Default::default(),
3739 defer_embedding: true,
3740 ttl_seconds: None,
3741 dedup_mode: Default::default(),
3742 dedup_threshold: None,
3743 event_time: None,
3744 event_duration_seconds: None,
3745 trigger_pattern: None,
3746 summary_of_id: None,
3747 },
3748 )?;
3749 }
3750
3751 let results = list_memories(conn, &ListOptions::default())?;
3753 assert_eq!(results.len(), 5);
3754
3755 let past = (Utc::now() - chrono::Duration::hours(1)).to_rfc3339();
3757 for id in &expired_ids {
3758 conn.execute(
3759 "UPDATE memories SET expires_at = ? WHERE id = ?",
3760 params![past, id],
3761 )?;
3762 }
3763
3764 let expired_count = count_expired_memories(conn)?;
3766 assert_eq!(expired_count, 3);
3767
3768 let deleted = cleanup_expired_memories(conn)?;
3770 assert_eq!(deleted, 3);
3771
3772 let remaining = list_memories(conn, &ListOptions::default())?;
3774 assert_eq!(remaining.len(), 2);
3775
3776 let expired_count = count_expired_memories(conn)?;
3778 assert_eq!(expired_count, 0);
3779
3780 Ok(())
3781 })
3782 .unwrap();
3783 }
3784
3785 #[test]
3788 fn test_content_hash_computation() {
3789 let hash1 = compute_content_hash("Hello World");
3791 let hash2 = compute_content_hash("hello world"); let hash3 = compute_content_hash(" hello world "); let hash4 = compute_content_hash("Hello World!"); assert_eq!(hash1, hash2);
3797 assert_eq!(hash2, hash3);
3798
3799 assert_ne!(hash1, hash4);
3801
3802 assert!(hash1.starts_with("sha256:"));
3804 }
3805
3806 #[test]
3807 fn test_dedup_mode_reject() {
3808 use crate::types::DedupMode;
3809
3810 let storage = Storage::open_in_memory().unwrap();
3811
3812 storage
3813 .with_transaction(|conn| {
3814 let _memory1 = create_memory(
3816 conn,
3817 &CreateMemoryInput {
3818 content: "Unique content for testing".to_string(),
3819 memory_type: MemoryType::Note,
3820 tags: vec![],
3821 metadata: HashMap::new(),
3822 importance: None,
3823 scope: Default::default(),
3824 workspace: None,
3825 tier: Default::default(),
3826 defer_embedding: true,
3827 ttl_seconds: None,
3828 dedup_mode: DedupMode::Allow, dedup_threshold: None,
3830 event_time: None,
3831 event_duration_seconds: None,
3832 trigger_pattern: None,
3833 summary_of_id: None,
3834 },
3835 )?;
3836
3837 let result = create_memory(
3839 conn,
3840 &CreateMemoryInput {
3841 content: "Unique content for testing".to_string(), memory_type: MemoryType::Note,
3843 tags: vec!["new-tag".to_string()],
3844 metadata: HashMap::new(),
3845 importance: None,
3846 scope: Default::default(),
3847 workspace: None,
3848 tier: Default::default(),
3849 defer_embedding: true,
3850 ttl_seconds: None,
3851 dedup_mode: DedupMode::Reject,
3852 dedup_threshold: None,
3853 event_time: None,
3854 event_duration_seconds: None,
3855 trigger_pattern: None,
3856 summary_of_id: None,
3857 },
3858 );
3859
3860 assert!(result.is_err());
3862 let err = result.unwrap_err();
3863 assert!(matches!(err, crate::error::EngramError::Duplicate { .. }));
3864
3865 Ok(())
3866 })
3867 .unwrap();
3868 }
3869
3870 #[test]
3871 fn test_dedup_mode_skip() {
3872 use crate::types::DedupMode;
3873
3874 let storage = Storage::open_in_memory().unwrap();
3875
3876 storage
3877 .with_transaction(|conn| {
3878 let memory1 = create_memory(
3880 conn,
3881 &CreateMemoryInput {
3882 content: "Skip test content".to_string(),
3883 memory_type: MemoryType::Note,
3884 tags: vec!["original".to_string()],
3885 metadata: HashMap::new(),
3886 importance: Some(0.5),
3887 scope: Default::default(),
3888 workspace: None,
3889 tier: Default::default(),
3890 defer_embedding: true,
3891 ttl_seconds: None,
3892 dedup_mode: DedupMode::Allow,
3893 dedup_threshold: None,
3894 event_time: None,
3895 event_duration_seconds: None,
3896 trigger_pattern: None,
3897 summary_of_id: None,
3898 },
3899 )?;
3900
3901 let memory2 = create_memory(
3903 conn,
3904 &CreateMemoryInput {
3905 content: "Skip test content".to_string(), memory_type: MemoryType::Note,
3907 tags: vec!["new-tag".to_string()], metadata: HashMap::new(),
3909 importance: Some(0.9), scope: Default::default(),
3911 workspace: None,
3912 tier: Default::default(),
3913 defer_embedding: true,
3914 ttl_seconds: None,
3915 dedup_mode: DedupMode::Skip,
3916 dedup_threshold: None,
3917 event_time: None,
3918 event_duration_seconds: None,
3919 trigger_pattern: None,
3920 summary_of_id: None,
3921 },
3922 )?;
3923
3924 assert_eq!(memory1.id, memory2.id);
3926 assert_eq!(memory2.tags, vec!["original".to_string()]); assert!((memory2.importance - 0.5).abs() < 0.01); let all = list_memories(conn, &ListOptions::default())?;
3931 assert_eq!(all.len(), 1);
3932
3933 Ok(())
3934 })
3935 .unwrap();
3936 }
3937
3938 #[test]
3939 fn test_dedup_mode_merge() {
3940 use crate::types::DedupMode;
3941
3942 let storage = Storage::open_in_memory().unwrap();
3943
3944 storage
3945 .with_transaction(|conn| {
3946 let memory1 = create_memory(
3948 conn,
3949 &CreateMemoryInput {
3950 content: "Merge test content".to_string(),
3951 memory_type: MemoryType::Note,
3952 tags: vec!["tag1".to_string(), "tag2".to_string()],
3953 metadata: {
3954 let mut m = HashMap::new();
3955 m.insert("key1".to_string(), serde_json::json!("value1"));
3956 m
3957 },
3958 importance: Some(0.5),
3959 scope: Default::default(),
3960 workspace: None,
3961 tier: Default::default(),
3962 defer_embedding: true,
3963 ttl_seconds: None,
3964 dedup_mode: DedupMode::Allow,
3965 dedup_threshold: None,
3966 event_time: None,
3967 event_duration_seconds: None,
3968 trigger_pattern: None,
3969 summary_of_id: None,
3970 },
3971 )?;
3972
3973 let memory2 = create_memory(
3975 conn,
3976 &CreateMemoryInput {
3977 content: "Merge test content".to_string(), memory_type: MemoryType::Note,
3979 tags: vec!["tag2".to_string(), "tag3".to_string()], metadata: {
3981 let mut m = HashMap::new();
3982 m.insert("key2".to_string(), serde_json::json!("value2"));
3983 m
3984 },
3985 importance: Some(0.8), scope: Default::default(),
3987 workspace: None,
3988 tier: Default::default(),
3989 defer_embedding: true,
3990 ttl_seconds: None,
3991 dedup_mode: DedupMode::Merge,
3992 dedup_threshold: None,
3993 event_time: None,
3994 event_duration_seconds: None,
3995 trigger_pattern: None,
3996 summary_of_id: None,
3997 },
3998 )?;
3999
4000 assert_eq!(memory1.id, memory2.id);
4002
4003 assert!(memory2.tags.contains(&"tag1".to_string()));
4005 assert!(memory2.tags.contains(&"tag2".to_string()));
4006 assert!(memory2.tags.contains(&"tag3".to_string()));
4007 assert_eq!(memory2.tags.len(), 3);
4008
4009 assert!(memory2.metadata.contains_key("key1"));
4011 assert!(memory2.metadata.contains_key("key2"));
4012
4013 let all = list_memories(conn, &ListOptions::default())?;
4015 assert_eq!(all.len(), 1);
4016
4017 Ok(())
4018 })
4019 .unwrap();
4020 }
4021
4022 #[test]
4023 fn test_dedup_mode_allow() {
4024 use crate::types::DedupMode;
4025
4026 let storage = Storage::open_in_memory().unwrap();
4027
4028 storage
4029 .with_transaction(|conn| {
4030 let memory1 = create_memory(
4032 conn,
4033 &CreateMemoryInput {
4034 content: "Allow duplicates content".to_string(),
4035 memory_type: MemoryType::Note,
4036 tags: vec![],
4037 metadata: HashMap::new(),
4038 importance: None,
4039 scope: Default::default(),
4040 workspace: None,
4041 tier: Default::default(),
4042 defer_embedding: true,
4043 ttl_seconds: None,
4044 dedup_mode: DedupMode::Allow,
4045 dedup_threshold: None,
4046 event_time: None,
4047 event_duration_seconds: None,
4048 trigger_pattern: None,
4049 summary_of_id: None,
4050 },
4051 )?;
4052
4053 let memory2 = create_memory(
4055 conn,
4056 &CreateMemoryInput {
4057 content: "Allow duplicates content".to_string(), memory_type: MemoryType::Note,
4059 tags: vec![],
4060 metadata: HashMap::new(),
4061 importance: None,
4062 scope: Default::default(),
4063 workspace: None,
4064 tier: Default::default(),
4065 defer_embedding: true,
4066 ttl_seconds: None,
4067 dedup_mode: DedupMode::Allow,
4068 dedup_threshold: None,
4069 event_time: None,
4070 event_duration_seconds: None,
4071 trigger_pattern: None,
4072 summary_of_id: None,
4073 },
4074 )?;
4075
4076 assert_ne!(memory1.id, memory2.id);
4078
4079 let all = list_memories(conn, &ListOptions::default())?;
4081 assert_eq!(all.len(), 2);
4082
4083 assert_eq!(memory1.content_hash, memory2.content_hash);
4085
4086 Ok(())
4087 })
4088 .unwrap();
4089 }
4090
4091 #[test]
4092 fn test_find_duplicates_exact_hash() {
4093 use crate::types::DedupMode;
4094
4095 let storage = Storage::open_in_memory().unwrap();
4096
4097 storage
4098 .with_transaction(|conn| {
4099 let _memory1 = create_memory(
4101 conn,
4102 &CreateMemoryInput {
4103 content: "Duplicate content".to_string(),
4104 memory_type: MemoryType::Note,
4105 tags: vec!["first".to_string()],
4106 metadata: HashMap::new(),
4107 importance: None,
4108 scope: Default::default(),
4109 workspace: None,
4110 tier: Default::default(),
4111 defer_embedding: true,
4112 ttl_seconds: None,
4113 dedup_mode: DedupMode::Allow,
4114 dedup_threshold: None,
4115 event_time: None,
4116 event_duration_seconds: None,
4117 trigger_pattern: None,
4118 summary_of_id: None,
4119 },
4120 )?;
4121
4122 let _memory2 = create_memory(
4123 conn,
4124 &CreateMemoryInput {
4125 content: "Duplicate content".to_string(), memory_type: MemoryType::Note,
4127 tags: vec!["second".to_string()],
4128 metadata: HashMap::new(),
4129 importance: None,
4130 scope: Default::default(),
4131 workspace: None,
4132 tier: Default::default(),
4133 defer_embedding: true,
4134 ttl_seconds: None,
4135 dedup_mode: DedupMode::Allow,
4136 dedup_threshold: None,
4137 event_time: None,
4138 event_duration_seconds: None,
4139 trigger_pattern: None,
4140 summary_of_id: None,
4141 },
4142 )?;
4143
4144 let _memory3 = create_memory(
4146 conn,
4147 &CreateMemoryInput {
4148 content: "Unique content".to_string(),
4149 memory_type: MemoryType::Note,
4150 tags: vec![],
4151 metadata: HashMap::new(),
4152 importance: None,
4153 scope: Default::default(),
4154 workspace: None,
4155 tier: Default::default(),
4156 defer_embedding: true,
4157 ttl_seconds: None,
4158 dedup_mode: DedupMode::Allow,
4159 dedup_threshold: None,
4160 event_time: None,
4161 event_duration_seconds: None,
4162 trigger_pattern: None,
4163 summary_of_id: None,
4164 },
4165 )?;
4166
4167 let duplicates = find_duplicates(conn, 0.9)?;
4169
4170 assert_eq!(duplicates.len(), 1);
4172
4173 assert_eq!(duplicates[0].match_type, DuplicateMatchType::ExactHash);
4175 assert!((duplicates[0].similarity_score - 1.0).abs() < 0.01);
4176
4177 Ok(())
4178 })
4179 .unwrap();
4180 }
4181
4182 #[test]
4183 fn test_content_hash_stored_on_create() {
4184 let storage = Storage::open_in_memory().unwrap();
4185
4186 storage
4187 .with_transaction(|conn| {
4188 let memory = create_memory(
4189 conn,
4190 &CreateMemoryInput {
4191 content: "Test content for hash".to_string(),
4192 memory_type: MemoryType::Note,
4193 tags: vec![],
4194 metadata: HashMap::new(),
4195 importance: None,
4196 scope: Default::default(),
4197 workspace: None,
4198 tier: Default::default(),
4199 defer_embedding: true,
4200 ttl_seconds: None,
4201 dedup_mode: Default::default(),
4202 dedup_threshold: None,
4203 event_time: None,
4204 event_duration_seconds: None,
4205 trigger_pattern: None,
4206 summary_of_id: None,
4207 },
4208 )?;
4209
4210 assert!(memory.content_hash.is_some());
4212 let hash = memory.content_hash.as_ref().unwrap();
4213 assert!(hash.starts_with("sha256:"));
4214
4215 let fetched = get_memory(conn, memory.id)?;
4217 assert_eq!(fetched.content_hash, memory.content_hash);
4218
4219 Ok(())
4220 })
4221 .unwrap();
4222 }
4223
4224 #[test]
4225 fn test_update_memory_recalculates_hash() {
4226 let storage = Storage::open_in_memory().unwrap();
4227
4228 storage
4229 .with_transaction(|conn| {
4230 let memory = create_memory(
4232 conn,
4233 &CreateMemoryInput {
4234 content: "Original content".to_string(),
4235 memory_type: MemoryType::Note,
4236 tags: vec![],
4237 metadata: HashMap::new(),
4238 importance: None,
4239 scope: Default::default(),
4240 workspace: None,
4241 tier: Default::default(),
4242 defer_embedding: true,
4243 ttl_seconds: None,
4244 dedup_mode: Default::default(),
4245 dedup_threshold: None,
4246 event_time: None,
4247 event_duration_seconds: None,
4248 trigger_pattern: None,
4249 summary_of_id: None,
4250 },
4251 )?;
4252
4253 let original_hash = memory.content_hash.clone();
4254
4255 let updated = update_memory(
4257 conn,
4258 memory.id,
4259 &UpdateMemoryInput {
4260 content: Some("Updated content".to_string()),
4261 memory_type: None,
4262 tags: None,
4263 metadata: None,
4264 importance: None,
4265 scope: None,
4266 ttl_seconds: None,
4267 event_time: None,
4268 trigger_pattern: None,
4269 },
4270 )?;
4271
4272 assert_ne!(updated.content_hash, original_hash);
4274 assert!(updated.content_hash.is_some());
4275
4276 let expected_hash = compute_content_hash("Updated content");
4278 assert_eq!(updated.content_hash.as_ref().unwrap(), &expected_hash);
4279
4280 Ok(())
4281 })
4282 .unwrap();
4283 }
4284
4285 #[test]
4286 fn test_dedup_scope_isolation() {
4287 use crate::types::{DedupMode, MemoryScope};
4288
4289 let storage = Storage::open_in_memory().unwrap();
4290
4291 storage
4292 .with_transaction(|conn| {
4293 let _user1_memory = create_memory(
4295 conn,
4296 &CreateMemoryInput {
4297 content: "Shared content".to_string(),
4298 memory_type: MemoryType::Note,
4299 tags: vec!["user1".to_string()],
4300 metadata: HashMap::new(),
4301 importance: None,
4302 scope: MemoryScope::user("user-1"),
4303 workspace: None,
4304 tier: Default::default(),
4305 defer_embedding: true,
4306 ttl_seconds: None,
4307 dedup_mode: DedupMode::Allow,
4308 dedup_threshold: None,
4309 event_time: None,
4310 event_duration_seconds: None,
4311 trigger_pattern: None,
4312 summary_of_id: None,
4313 },
4314 )?;
4315
4316 let user2_result = create_memory(
4319 conn,
4320 &CreateMemoryInput {
4321 content: "Shared content".to_string(), memory_type: MemoryType::Note,
4323 tags: vec!["user2".to_string()],
4324 metadata: HashMap::new(),
4325 importance: None,
4326 scope: MemoryScope::user("user-2"), workspace: None,
4328 tier: Default::default(),
4329 defer_embedding: true,
4330 ttl_seconds: None,
4331 dedup_mode: DedupMode::Reject, dedup_threshold: None,
4333 event_time: None,
4334 event_duration_seconds: None,
4335 trigger_pattern: None,
4336 summary_of_id: None,
4337 },
4338 );
4339
4340 assert!(user2_result.is_ok());
4342 let _user2_memory = user2_result.unwrap();
4343
4344 let duplicate_result = create_memory(
4346 conn,
4347 &CreateMemoryInput {
4348 content: "Shared content".to_string(), memory_type: MemoryType::Note,
4350 tags: vec![],
4351 metadata: HashMap::new(),
4352 importance: None,
4353 scope: MemoryScope::user("user-2"), workspace: None,
4355 tier: Default::default(),
4356 defer_embedding: true,
4357 ttl_seconds: None,
4358 dedup_mode: DedupMode::Reject, dedup_threshold: None,
4360 event_time: None,
4361 event_duration_seconds: None,
4362 trigger_pattern: None,
4363 summary_of_id: None,
4364 },
4365 );
4366
4367 assert!(duplicate_result.is_err());
4369 assert!(matches!(
4370 duplicate_result.unwrap_err(),
4371 crate::error::EngramError::Duplicate { .. }
4372 ));
4373
4374 let all = list_memories(conn, &ListOptions::default())?;
4376 assert_eq!(all.len(), 2);
4377
4378 Ok(())
4379 })
4380 .unwrap();
4381 }
4382
4383 #[test]
4384 fn test_find_similar_by_embedding() {
4385 fn store_test_embedding(
4387 conn: &Connection,
4388 memory_id: i64,
4389 embedding: &[f32],
4390 ) -> crate::error::Result<()> {
4391 let bytes: Vec<u8> = embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
4392 conn.execute(
4393 "INSERT INTO embeddings (memory_id, embedding, model, dimensions, created_at)
4394 VALUES (?, ?, ?, ?, datetime('now'))",
4395 params![memory_id, bytes, "test", embedding.len() as i32],
4396 )?;
4397 conn.execute(
4399 "UPDATE memories SET has_embedding = 1 WHERE id = ?",
4400 params![memory_id],
4401 )?;
4402 Ok(())
4403 }
4404
4405 let storage = Storage::open_in_memory().unwrap();
4406 storage
4407 .with_transaction(|conn| {
4408 let memory1 = create_memory(
4410 conn,
4411 &CreateMemoryInput {
4412 content: "Rust is a systems programming language".to_string(),
4413 memory_type: MemoryType::Note,
4414 tags: vec!["rust".to_string()],
4415 metadata: std::collections::HashMap::new(),
4416 importance: None,
4417 scope: MemoryScope::Global,
4418 workspace: None,
4419 tier: Default::default(),
4420 defer_embedding: false,
4421 ttl_seconds: None,
4422 dedup_mode: DedupMode::Allow,
4423 dedup_threshold: None,
4424 event_time: None,
4425 event_duration_seconds: None,
4426 trigger_pattern: None,
4427 summary_of_id: None,
4428 },
4429 )?;
4430
4431 let embedding1 = vec![0.8, 0.4, 0.2, 0.1]; store_test_embedding(conn, memory1.id, &embedding1)?;
4434
4435 let memory2 = create_memory(
4437 conn,
4438 &CreateMemoryInput {
4439 content: "Python is a scripting language".to_string(),
4440 memory_type: MemoryType::Note,
4441 tags: vec!["python".to_string()],
4442 metadata: std::collections::HashMap::new(),
4443 importance: None,
4444 scope: MemoryScope::Global,
4445 workspace: None,
4446 tier: Default::default(),
4447 defer_embedding: false,
4448 ttl_seconds: None,
4449 dedup_mode: DedupMode::Allow,
4450 dedup_threshold: None,
4451 event_time: None,
4452 event_duration_seconds: None,
4453 trigger_pattern: None,
4454 summary_of_id: None,
4455 },
4456 )?;
4457
4458 let embedding2 = vec![0.1, 0.2, 0.8, 0.4]; store_test_embedding(conn, memory2.id, &embedding2)?;
4461
4462 let query_similar_to_1 = vec![0.79, 0.41, 0.21, 0.11]; let result = find_similar_by_embedding(
4465 conn,
4466 &query_similar_to_1,
4467 &MemoryScope::Global,
4468 None, 0.95, )?;
4471 assert!(result.is_some());
4472 let (found_memory, similarity) = result.unwrap();
4473 assert_eq!(found_memory.id, memory1.id);
4474 assert!(similarity > 0.95);
4475
4476 let result_low_threshold = find_similar_by_embedding(
4478 conn,
4479 &query_similar_to_1,
4480 &MemoryScope::Global,
4481 None,
4482 0.5,
4483 )?;
4484 assert!(result_low_threshold.is_some());
4485
4486 let query_orthogonal = vec![0.0, 0.0, 0.0, 1.0]; let result_no_match = find_similar_by_embedding(
4489 conn,
4490 &query_orthogonal,
4491 &MemoryScope::Global,
4492 None,
4493 0.99, )?;
4495 assert!(result_no_match.is_none());
4496
4497 let result_wrong_scope = find_similar_by_embedding(
4499 conn,
4500 &query_similar_to_1,
4501 &MemoryScope::User {
4502 user_id: "other-user".to_string(),
4503 },
4504 None,
4505 0.5,
4506 )?;
4507 assert!(result_wrong_scope.is_none());
4508
4509 Ok(())
4510 })
4511 .unwrap();
4512 }
4513}