1use chrono::{DateTime, Utc};
4use rusqlite::{params, Connection, OptionalExtension, Row};
5use serde::{Deserialize, Serialize};
6use sha2::{Digest, Sha256};
7use std::collections::HashMap;
8
9use crate::error::{EngramError, Result};
10use crate::types::*;
11
12pub fn memory_from_row(row: &Row) -> rusqlite::Result<Memory> {
14 let id: i64 = row.get("id")?;
15 let content: String = row.get("content")?;
16 let memory_type_str: String = row.get("memory_type")?;
17 let importance: f32 = row.get("importance")?;
18 let access_count: i32 = row.get("access_count")?;
19 let created_at: String = row.get("created_at")?;
20 let updated_at: String = row.get("updated_at")?;
21 let last_accessed_at: Option<String> = row.get("last_accessed_at")?;
22 let owner_id: Option<String> = row.get("owner_id")?;
23 let visibility_str: String = row.get("visibility")?;
24 let version: i32 = row.get("version")?;
25 let has_embedding: i32 = row.get("has_embedding")?;
26 let metadata_str: String = row.get("metadata")?;
27
28 let scope_type: String = row
30 .get("scope_type")
31 .unwrap_or_else(|_| "global".to_string());
32 let scope_id: Option<String> = row.get("scope_id").unwrap_or(None);
33
34 let expires_at: Option<String> = row.get("expires_at").unwrap_or(None);
36
37 let content_hash: Option<String> = row.get("content_hash").unwrap_or(None);
39
40 let memory_type = memory_type_str.parse().unwrap_or(MemoryType::Note);
41 let visibility = match visibility_str.as_str() {
42 "shared" => Visibility::Shared,
43 "public" => Visibility::Public,
44 _ => Visibility::Private,
45 };
46
47 let scope = match (scope_type.as_str(), scope_id) {
49 ("user", Some(id)) => MemoryScope::User { user_id: id },
50 ("session", Some(id)) => MemoryScope::Session { session_id: id },
51 ("agent", Some(id)) => MemoryScope::Agent { agent_id: id },
52 _ => MemoryScope::Global,
53 };
54
55 let metadata: HashMap<String, serde_json::Value> =
56 serde_json::from_str(&metadata_str).unwrap_or_default();
57
58 let workspace: String = row
60 .get("workspace")
61 .unwrap_or_else(|_| "default".to_string());
62
63 let tier_str: String = row.get("tier").unwrap_or_else(|_| "permanent".to_string());
65 let tier = tier_str.parse().unwrap_or_default();
66
67 let event_time: Option<String> = row.get("event_time").unwrap_or(None);
68 let event_duration_seconds: Option<i64> = row.get("event_duration_seconds").unwrap_or(None);
69 let trigger_pattern: Option<String> = row.get("trigger_pattern").unwrap_or(None);
70 let procedure_success_count: i32 = row.get("procedure_success_count").unwrap_or(0);
71 let procedure_failure_count: i32 = row.get("procedure_failure_count").unwrap_or(0);
72 let summary_of_id: Option<i64> = row.get("summary_of_id").unwrap_or(None);
73 let lifecycle_state_str: Option<String> = row.get("lifecycle_state").unwrap_or(None);
74
75 let lifecycle_state = lifecycle_state_str
76 .and_then(|s| s.parse().ok())
77 .unwrap_or(crate::types::LifecycleState::Active);
78
79 let media_url: Option<String> = row.get("media_url").unwrap_or(None);
81
82 Ok(Memory {
83 id,
84 content,
85 memory_type,
86 tags: vec![], metadata,
88 importance,
89 access_count,
90 created_at: DateTime::parse_from_rfc3339(&created_at)
91 .map(|dt| dt.with_timezone(&Utc))
92 .unwrap_or_else(|_| Utc::now()),
93 updated_at: DateTime::parse_from_rfc3339(&updated_at)
94 .map(|dt| dt.with_timezone(&Utc))
95 .unwrap_or_else(|_| Utc::now()),
96 last_accessed_at: last_accessed_at.and_then(|s| {
97 DateTime::parse_from_rfc3339(&s)
98 .map(|dt| dt.with_timezone(&Utc))
99 .ok()
100 }),
101 owner_id,
102 visibility,
103 scope,
104 workspace,
105 tier,
106 version,
107 has_embedding: has_embedding != 0,
108 expires_at: expires_at.and_then(|s| {
109 DateTime::parse_from_rfc3339(&s)
110 .map(|dt| dt.with_timezone(&Utc))
111 .ok()
112 }),
113 content_hash,
114 event_time: event_time.and_then(|s| {
115 DateTime::parse_from_rfc3339(&s)
116 .map(|dt| dt.with_timezone(&Utc))
117 .ok()
118 }),
119 event_duration_seconds,
120 trigger_pattern,
121 procedure_success_count,
122 procedure_failure_count,
123 summary_of_id,
124 lifecycle_state,
125 media_url,
126 })
127}
128
129pub(crate) fn metadata_value_to_param(
130 key: &str,
131 value: &serde_json::Value,
132 conditions: &mut Vec<String>,
133 params: &mut Vec<Box<dyn rusqlite::ToSql>>,
134) -> Result<()> {
135 match value {
136 serde_json::Value::String(s) => {
137 conditions.push(format!("json_extract(m.metadata, '$.{}') = ?", key));
138 params.push(Box::new(s.clone()));
139 }
140 serde_json::Value::Number(n) => {
141 conditions.push(format!("json_extract(m.metadata, '$.{}') = ?", key));
142 if let Some(i) = n.as_i64() {
143 params.push(Box::new(i));
144 } else if let Some(f) = n.as_f64() {
145 params.push(Box::new(f));
146 } else {
147 return Err(EngramError::InvalidInput("Invalid number".to_string()));
148 }
149 }
150 serde_json::Value::Bool(b) => {
151 conditions.push(format!("json_extract(m.metadata, '$.{}') = ?", key));
152 params.push(Box::new(*b));
153 }
154 serde_json::Value::Null => {
155 conditions.push(format!("json_extract(m.metadata, '$.{}') IS NULL", key));
156 }
157 _ => {
158 return Err(EngramError::InvalidInput(format!(
159 "Unsupported metadata filter value for key: {}",
160 key
161 )));
162 }
163 }
164
165 Ok(())
166}
167
168fn get_memory_internal(conn: &Connection, id: i64, track_access: bool) -> Result<Memory> {
169 let now = Utc::now().to_rfc3339();
170
171 let mut stmt = conn.prepare_cached(
172 "SELECT id, content, memory_type, importance, access_count,
173 created_at, updated_at, last_accessed_at, owner_id,
174 visibility, version, has_embedding, metadata,
175 scope_type, scope_id, workspace, tier, expires_at, content_hash,
176 event_time, event_duration_seconds, trigger_pattern, procedure_success_count,
177 procedure_failure_count, summary_of_id, lifecycle_state, media_url
178 FROM memories
179 WHERE id = ? AND valid_to IS NULL
180 AND (expires_at IS NULL OR expires_at > ?)",
181 )?;
182
183 let mut memory = stmt
184 .query_row(params![id, now], memory_from_row)
185 .map_err(|_| EngramError::NotFound(id))?;
186
187 memory.tags = load_tags(conn, id)?;
188
189 if track_access {
190 let now = Utc::now().to_rfc3339();
192 conn.execute(
193 "UPDATE memories SET access_count = access_count + 1, last_accessed_at = ?
194 WHERE id = ?",
195 params![now, id],
196 )?;
197 }
198
199 Ok(memory)
200}
201
202pub fn load_tags(conn: &Connection, memory_id: i64) -> Result<Vec<String>> {
204 let mut stmt = conn.prepare_cached(
205 "SELECT t.name FROM tags t
206 JOIN memory_tags mt ON t.id = mt.tag_id
207 WHERE mt.memory_id = ?",
208 )?;
209
210 let tags: Vec<String> = stmt
211 .query_map([memory_id], |row| row.get(0))?
212 .filter_map(|r| r.ok())
213 .collect();
214
215 Ok(tags)
216}
217
218pub fn compute_content_hash(content: &str) -> String {
220 let normalized = content
222 .to_lowercase()
223 .split_whitespace()
224 .collect::<Vec<_>>()
225 .join(" ");
226
227 let mut hasher = Sha256::new();
228 hasher.update(normalized.as_bytes());
229 format!("sha256:{}", hex::encode(hasher.finalize()))
230}
231
232pub fn find_by_content_hash(
240 conn: &Connection,
241 content_hash: &str,
242 scope: &MemoryScope,
243 workspace: Option<&str>,
244) -> Result<Option<Memory>> {
245 let now = Utc::now().to_rfc3339();
246 let scope_type = scope.scope_type();
247 let scope_id = scope.scope_id().map(|s| s.to_string());
248 let workspace = workspace.unwrap_or("default");
249
250 let mut stmt = conn.prepare_cached(
251 "SELECT id, content, memory_type, importance, access_count,
252 created_at, updated_at, last_accessed_at, owner_id,
253 visibility, version, has_embedding, metadata,
254 scope_type, scope_id, workspace, tier, expires_at, content_hash,
255 event_time, event_duration_seconds, trigger_pattern, procedure_success_count,
256 procedure_failure_count, summary_of_id, lifecycle_state, media_url
257 FROM memories
258 WHERE content_hash = ? AND valid_to IS NULL
259 AND (expires_at IS NULL OR expires_at > ?)
260 AND scope_type = ?
261 AND (scope_id = ? OR (scope_id IS NULL AND ? IS NULL))
262 AND workspace = ?
263 LIMIT 1",
264 )?;
265
266 let result = stmt
267 .query_row(
268 params![content_hash, now, scope_type, scope_id, scope_id, workspace],
269 memory_from_row,
270 )
271 .ok();
272
273 if let Some(mut memory) = result {
274 memory.tags = load_tags(conn, memory.id)?;
275 Ok(Some(memory))
276 } else {
277 Ok(None)
278 }
279}
280
281pub fn find_similar_by_embedding(
286 conn: &Connection,
287 query_embedding: &[f32],
288 scope: &MemoryScope,
289 workspace: Option<&str>,
290 threshold: f32,
291) -> Result<Option<(Memory, f32)>> {
292 use crate::embedding::{cosine_similarity, get_embedding};
293
294 let now = Utc::now().to_rfc3339();
295 let scope_type = scope.scope_type();
296 let scope_id = scope.scope_id().map(|s| s.to_string());
297 let workspace = workspace.unwrap_or("default");
298
299 let mut stmt = conn.prepare_cached(
301 "SELECT id, content, memory_type, importance, access_count,
302 created_at, updated_at, last_accessed_at, owner_id,
303 visibility, version, has_embedding, metadata,
304 scope_type, scope_id, workspace, tier, expires_at, content_hash,
305 event_time, event_duration_seconds, trigger_pattern, procedure_success_count,
306 procedure_failure_count, summary_of_id, lifecycle_state, media_url
307 FROM memories
308 WHERE has_embedding = 1 AND valid_to IS NULL
309 AND (expires_at IS NULL OR expires_at > ?)
310 AND scope_type = ?
311 AND (scope_id = ? OR (scope_id IS NULL AND ? IS NULL))
312 AND workspace = ?",
313 )?;
314
315 let memories: Vec<Memory> = stmt
316 .query_map(
317 params![now, scope_type, scope_id, scope_id, workspace],
318 memory_from_row,
319 )?
320 .filter_map(|r| r.ok())
321 .collect();
322
323 let mut best_match: Option<(Memory, f32)> = None;
324
325 for memory in memories {
326 if let Ok(Some(embedding)) = get_embedding(conn, memory.id) {
327 let similarity = cosine_similarity(query_embedding, &embedding);
328 if similarity >= threshold {
329 match &best_match {
330 None => best_match = Some((memory, similarity)),
331 Some((_, best_score)) if similarity > *best_score => {
332 best_match = Some((memory, similarity));
333 }
334 _ => {}
335 }
336 }
337 }
338 }
339
340 if let Some((mut memory, score)) = best_match {
342 memory.tags = load_tags(conn, memory.id)?;
343 Ok(Some((memory, score)))
344 } else {
345 Ok(None)
346 }
347}
348
349#[derive(Debug, Clone, serde::Serialize)]
351pub struct DuplicatePair {
352 pub memory_a: Memory,
353 pub memory_b: Memory,
354 pub similarity_score: f64,
355 pub match_type: DuplicateMatchType,
356}
357
358#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
360#[serde(rename_all = "snake_case")]
361pub enum DuplicateMatchType {
362 ExactHash,
364 HighSimilarity,
366 EmbeddingSimilarity,
368}
369
370pub fn find_duplicates(conn: &Connection, threshold: f64) -> Result<Vec<DuplicatePair>> {
378 find_duplicates_in_workspace(conn, threshold, None)
379}
380
381pub fn find_duplicates_in_workspace(
383 conn: &Connection,
384 threshold: f64,
385 workspace: Option<&str>,
386) -> Result<Vec<DuplicatePair>> {
387 let now = Utc::now().to_rfc3339();
388 let mut duplicates = Vec::new();
389
390 let (hash_sql, hash_params): (&str, Vec<Box<dyn rusqlite::ToSql>>) = if let Some(ws) = workspace
392 {
393 (
394 "SELECT content_hash, scope_type, scope_id, GROUP_CONCAT(id) as ids
395 FROM memories
396 WHERE content_hash IS NOT NULL
397 AND valid_to IS NULL
398 AND (expires_at IS NULL OR expires_at > ?)
399 AND workspace = ?
400 GROUP BY content_hash, scope_type, scope_id, workspace
401 HAVING COUNT(*) > 1",
402 vec![Box::new(now.clone()), Box::new(ws.to_string())],
403 )
404 } else {
405 (
406 "SELECT content_hash, scope_type, scope_id, GROUP_CONCAT(id) as ids
407 FROM memories
408 WHERE content_hash IS NOT NULL
409 AND valid_to IS NULL
410 AND (expires_at IS NULL OR expires_at > ?)
411 GROUP BY content_hash, scope_type, scope_id, workspace
412 HAVING COUNT(*) > 1",
413 vec![Box::new(now.clone())],
414 )
415 };
416
417 let mut hash_stmt = conn.prepare_cached(hash_sql)?;
418 let hash_rows = hash_stmt.query_map(
419 rusqlite::params_from_iter(hash_params.iter().map(|p| p.as_ref())),
420 |row| {
421 let ids_str: String = row.get(3)?;
422 Ok(ids_str)
423 },
424 )?;
425
426 for ids_result in hash_rows {
427 let ids_str = ids_result?;
428 let ids: Vec<i64> = ids_str
429 .split(',')
430 .filter_map(|s| s.trim().parse().ok())
431 .collect();
432
433 for i in 0..ids.len() {
436 for j in (i + 1)..ids.len() {
437 let memory_a = get_memory_internal(conn, ids[i], false)?;
438 let memory_b = get_memory_internal(conn, ids[j], false)?;
439 duplicates.push(DuplicatePair {
440 memory_a,
441 memory_b,
442 similarity_score: 1.0, match_type: DuplicateMatchType::ExactHash,
444 });
445 }
446 }
447 }
448
449 let (sim_sql, sim_params): (&str, Vec<Box<dyn rusqlite::ToSql>>) = if let Some(ws) = workspace {
451 (
452 "SELECT DISTINCT c.from_id, c.to_id, c.score
453 FROM crossrefs c
454 JOIN memories m1 ON c.from_id = m1.id
455 JOIN memories m2 ON c.to_id = m2.id
456 WHERE c.score >= ?
457 AND m1.valid_to IS NULL
458 AND m2.valid_to IS NULL
459 AND (m1.expires_at IS NULL OR m1.expires_at > ?)
460 AND (m2.expires_at IS NULL OR m2.expires_at > ?)
461 AND c.from_id < c.to_id
462 AND m1.scope_type = m2.scope_type
463 AND (m1.scope_id = m2.scope_id OR (m1.scope_id IS NULL AND m2.scope_id IS NULL))
464 AND m1.workspace = ?
465 AND m2.workspace = ?
466 ORDER BY c.score DESC",
467 vec![
468 Box::new(threshold),
469 Box::new(now.clone()),
470 Box::new(now.clone()),
471 Box::new(ws.to_string()),
472 Box::new(ws.to_string()),
473 ],
474 )
475 } else {
476 (
477 "SELECT DISTINCT c.from_id, c.to_id, c.score
478 FROM crossrefs c
479 JOIN memories m1 ON c.from_id = m1.id
480 JOIN memories m2 ON c.to_id = m2.id
481 WHERE c.score >= ?
482 AND m1.valid_to IS NULL
483 AND m2.valid_to IS NULL
484 AND (m1.expires_at IS NULL OR m1.expires_at > ?)
485 AND (m2.expires_at IS NULL OR m2.expires_at > ?)
486 AND c.from_id < c.to_id
487 AND m1.scope_type = m2.scope_type
488 AND (m1.scope_id = m2.scope_id OR (m1.scope_id IS NULL AND m2.scope_id IS NULL))
489 AND m1.workspace = m2.workspace
490 ORDER BY c.score DESC",
491 vec![
492 Box::new(threshold),
493 Box::new(now.clone()),
494 Box::new(now.clone()),
495 ],
496 )
497 };
498
499 let mut sim_stmt = conn.prepare_cached(sim_sql)?;
500 let sim_rows = sim_stmt.query_map(
501 rusqlite::params_from_iter(sim_params.iter().map(|p| p.as_ref())),
502 |row| {
503 Ok((
504 row.get::<_, i64>(0)?,
505 row.get::<_, i64>(1)?,
506 row.get::<_, f64>(2)?,
507 ))
508 },
509 )?;
510
511 for row_result in sim_rows {
512 let (from_id, to_id, score) = row_result?;
513
514 let already_found = duplicates.iter().any(|d| {
516 (d.memory_a.id == from_id && d.memory_b.id == to_id)
517 || (d.memory_a.id == to_id && d.memory_b.id == from_id)
518 });
519
520 if !already_found {
521 let memory_a = get_memory_internal(conn, from_id, false)?;
523 let memory_b = get_memory_internal(conn, to_id, false)?;
524 duplicates.push(DuplicatePair {
525 memory_a,
526 memory_b,
527 similarity_score: score,
528 match_type: DuplicateMatchType::HighSimilarity,
529 });
530 }
531 }
532
533 Ok(duplicates)
534}
535
536pub fn find_duplicates_by_embedding(
540 conn: &Connection,
541 threshold: f32,
542 workspace: Option<&str>,
543 limit: usize,
544) -> Result<Vec<DuplicatePair>> {
545 use crate::embedding::{cosine_similarity, get_embedding};
546
547 let now = Utc::now().to_rfc3339();
548
549 let (sql, params_vec): (&str, Vec<Box<dyn rusqlite::ToSql>>) = if let Some(ws) = workspace {
551 (
552 "SELECT id FROM memories
553 WHERE has_embedding = 1 AND valid_to IS NULL
554 AND (expires_at IS NULL OR expires_at > ?)
555 AND COALESCE(lifecycle_state, 'active') = 'active'
556 AND workspace = ?
557 ORDER BY id",
558 vec![Box::new(now), Box::new(ws.to_string())],
559 )
560 } else {
561 (
562 "SELECT id FROM memories
563 WHERE has_embedding = 1 AND valid_to IS NULL
564 AND (expires_at IS NULL OR expires_at > ?)
565 AND COALESCE(lifecycle_state, 'active') = 'active'
566 ORDER BY id",
567 vec![Box::new(now)],
568 )
569 };
570
571 let mut stmt = conn.prepare(sql)?;
572 let ids: Vec<i64> = stmt
573 .query_map(
574 rusqlite::params_from_iter(params_vec.iter().map(|p| p.as_ref())),
575 |row| row.get(0),
576 )?
577 .filter_map(|r| r.ok())
578 .collect();
579
580 let mut embeddings: Vec<(i64, Vec<f32>)> = Vec::with_capacity(ids.len());
582 for &id in &ids {
583 if let Ok(Some(emb)) = get_embedding(conn, id) {
584 embeddings.push((id, emb));
585 }
586 }
587
588 let mut duplicates = Vec::new();
589
590 for i in 0..embeddings.len() {
592 if duplicates.len() >= limit {
593 break;
594 }
595 for j in (i + 1)..embeddings.len() {
596 if duplicates.len() >= limit {
597 break;
598 }
599 let sim = cosine_similarity(&embeddings[i].1, &embeddings[j].1);
600 if sim >= threshold {
601 let memory_a = get_memory_internal(conn, embeddings[i].0, false)?;
602 let memory_b = get_memory_internal(conn, embeddings[j].0, false)?;
603 duplicates.push(DuplicatePair {
604 memory_a,
605 memory_b,
606 similarity_score: sim as f64,
607 match_type: DuplicateMatchType::EmbeddingSimilarity,
608 });
609 }
610 }
611 }
612
613 duplicates.sort_by(|a, b| {
615 b.similarity_score
616 .partial_cmp(&a.similarity_score)
617 .unwrap_or(std::cmp::Ordering::Equal)
618 });
619
620 Ok(duplicates)
621}
622
623pub fn create_memory(conn: &Connection, input: &CreateMemoryInput) -> Result<Memory> {
625 let now = Utc::now();
626 let now_str = now.to_rfc3339();
627 let metadata_json = serde_json::to_string(&input.metadata)?;
628 let importance = input.importance.unwrap_or(0.5);
629
630 let content_hash = compute_content_hash(&input.content);
632
633 let workspace = match &input.workspace {
635 Some(ws) => crate::types::normalize_workspace(ws)
636 .map_err(|e| EngramError::InvalidInput(format!("Invalid workspace: {}", e)))?,
637 None => "default".to_string(),
638 };
639
640 if input.dedup_mode != DedupMode::Allow {
642 if let Some(existing) =
643 find_by_content_hash(conn, &content_hash, &input.scope, Some(&workspace))?
644 {
645 match input.dedup_mode {
646 DedupMode::Reject => {
647 return Err(EngramError::Duplicate {
648 existing_id: existing.id,
649 message: format!(
650 "Duplicate memory detected (id={}). Content hash: {}",
651 existing.id, content_hash
652 ),
653 });
654 }
655 DedupMode::Skip => {
656 return Ok(existing);
658 }
659 DedupMode::Merge => {
660 let mut merged_tags = existing.tags.clone();
662 for tag in &input.tags {
663 if !merged_tags.contains(tag) {
664 merged_tags.push(tag.clone());
665 }
666 }
667
668 let mut merged_metadata = existing.metadata.clone();
669 for (key, value) in &input.metadata {
670 merged_metadata.insert(key.clone(), value.clone());
671 }
672
673 let update_input = UpdateMemoryInput {
674 content: None, memory_type: None,
676 tags: Some(merged_tags),
677 metadata: Some(merged_metadata),
678 importance: input.importance, scope: None,
680 ttl_seconds: input.ttl_seconds, event_time: None,
682 trigger_pattern: None,
683 media_url: input.media_url.clone().map(Some),
684 };
685
686 return update_memory(conn, existing.id, &update_input);
687 }
688 DedupMode::Allow => unreachable!(),
689 }
690 }
691 }
692
693 let scope_type = input.scope.scope_type();
695 let scope_id = input.scope.scope_id().map(|s| s.to_string());
696
697 let tier = input.tier;
701
702 let expires_at = match tier {
707 MemoryTier::Permanent => {
708 if input.ttl_seconds.is_some() && input.ttl_seconds != Some(0) {
710 return Err(EngramError::InvalidInput(
711 "Permanent tier memories cannot have a TTL. Use Daily tier for expiring memories.".to_string()
712 ));
713 }
714 None
715 }
716 MemoryTier::Daily => {
717 let ttl = input.ttl_seconds.filter(|&t| t > 0).unwrap_or(86400); Some((now + chrono::Duration::seconds(ttl)).to_rfc3339())
720 }
721 };
722
723 let event_time = input.event_time.map(|dt| dt.to_rfc3339());
724
725 conn.execute(
726 "INSERT INTO memories (content, memory_type, importance, metadata, created_at, updated_at, valid_from, scope_type, scope_id, workspace, tier, expires_at, content_hash, event_time, event_duration_seconds, trigger_pattern, summary_of_id, media_url)
727 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
728 params![
729 input.content,
730 input.memory_type.as_str(),
731 importance,
732 metadata_json,
733 now_str,
734 now_str,
735 now_str,
736 scope_type,
737 scope_id,
738 workspace,
739 tier.as_str(),
740 expires_at,
741 content_hash,
742 event_time,
743 input.event_duration_seconds,
744 input.trigger_pattern,
745 input.summary_of_id,
746 input.media_url,
747 ],
748 )?;
749
750 let id = conn.last_insert_rowid();
751
752 for tag in &input.tags {
754 ensure_tag(conn, tag)?;
755 conn.execute(
756 "INSERT OR IGNORE INTO memory_tags (memory_id, tag_id)
757 SELECT ?, id FROM tags WHERE name = ?",
758 params![id, tag],
759 )?;
760 }
761
762 if !input.defer_embedding {
764 conn.execute(
765 "INSERT INTO embedding_queue (memory_id, status, queued_at)
766 VALUES (?, 'pending', ?)",
767 params![id, now_str],
768 )?;
769 }
770
771 let tags_json = serde_json::to_string(&input.tags)?;
773 conn.execute(
774 "INSERT INTO memory_versions (memory_id, version, content, tags, metadata, created_at)
775 VALUES (?, 1, ?, ?, ?, ?)",
776 params![id, input.content, tags_json, metadata_json, now_str],
777 )?;
778
779 record_event(
781 conn,
782 MemoryEventType::Created,
783 Some(id),
784 None,
785 serde_json::json!({
786 "workspace": input.workspace.as_deref().unwrap_or("default"),
787 "memory_type": input.memory_type.as_str(),
788 }),
789 )?;
790
791 conn.execute(
793 "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
794 [],
795 )?;
796
797 get_memory_internal(conn, id, false)
798}
799
800fn ensure_tag(conn: &Connection, tag: &str) -> Result<i64> {
802 conn.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", params![tag])?;
803
804 let id: i64 = conn.query_row("SELECT id FROM tags WHERE name = ?", params![tag], |row| {
805 row.get(0)
806 })?;
807
808 Ok(id)
809}
810
811pub fn get_memory(conn: &Connection, id: i64) -> Result<Memory> {
813 get_memory_internal(conn, id, true)
814}
815
816pub fn update_memory(conn: &Connection, id: i64, input: &UpdateMemoryInput) -> Result<Memory> {
818 let current = get_memory_internal(conn, id, false)?;
820 let now = Utc::now().to_rfc3339();
821
822 let mut updates = vec!["updated_at = ?".to_string()];
824 let mut values: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now.clone())];
825
826 if let Some(ref content) = input.content {
827 updates.push("content = ?".to_string());
828 values.push(Box::new(content.clone()));
829 let new_hash = compute_content_hash(content);
831 updates.push("content_hash = ?".to_string());
832 values.push(Box::new(new_hash));
833 }
834
835 if let Some(ref memory_type) = input.memory_type {
836 updates.push("memory_type = ?".to_string());
837 values.push(Box::new(memory_type.as_str().to_string()));
838 }
839
840 if let Some(importance) = input.importance {
841 updates.push("importance = ?".to_string());
842 values.push(Box::new(importance));
843 }
844
845 if let Some(ref metadata) = input.metadata {
846 let metadata_json = serde_json::to_string(metadata)?;
847 updates.push("metadata = ?".to_string());
848 values.push(Box::new(metadata_json));
849 }
850
851 if let Some(ref scope) = input.scope {
852 updates.push("scope_type = ?".to_string());
853 values.push(Box::new(scope.scope_type().to_string()));
854 updates.push("scope_id = ?".to_string());
855 values.push(Box::new(scope.scope_id().map(|s| s.to_string())));
856 }
857
858 if let Some(event_time) = &input.event_time {
860 updates.push("event_time = ?".to_string());
861 let value = event_time.as_ref().map(|dt| dt.to_rfc3339());
862 values.push(Box::new(value));
863 }
864
865 if let Some(trigger_pattern) = &input.trigger_pattern {
867 updates.push("trigger_pattern = ?".to_string());
868 values.push(Box::new(trigger_pattern.clone()));
869 }
870
871 if let Some(media_url) = &input.media_url {
873 updates.push("media_url = ?".to_string());
874 values.push(Box::new(media_url.clone()));
875 }
876
877 if let Some(ttl) = input.ttl_seconds {
883 if ttl <= 0 {
884 if current.tier == MemoryTier::Daily {
887 return Err(crate::error::EngramError::InvalidInput(
888 "Cannot remove expiration from a Daily tier memory. Use promote_to_permanent first.".to_string()
889 ));
890 }
891 updates.push("expires_at = NULL".to_string());
892 } else {
893 if current.tier == MemoryTier::Permanent {
896 return Err(crate::error::EngramError::InvalidInput(
897 "Cannot set expiration on a Permanent tier memory. Permanent memories cannot expire.".to_string()
898 ));
899 }
900 let expires_at = (Utc::now() + chrono::Duration::seconds(ttl)).to_rfc3339();
901 updates.push("expires_at = ?".to_string());
902 values.push(Box::new(expires_at));
903 }
904 }
905
906 updates.push("version = version + 1".to_string());
908
909 let sql = format!("UPDATE memories SET {} WHERE id = ?", updates.join(", "));
911 values.push(Box::new(id));
912
913 let params: Vec<&dyn rusqlite::ToSql> = values.iter().map(|b| b.as_ref()).collect();
914 conn.execute(&sql, params.as_slice())?;
915
916 if let Some(ref tags) = input.tags {
918 conn.execute("DELETE FROM memory_tags WHERE memory_id = ?", params![id])?;
919 for tag in tags {
920 ensure_tag(conn, tag)?;
921 conn.execute(
922 "INSERT OR IGNORE INTO memory_tags (memory_id, tag_id)
923 SELECT ?, id FROM tags WHERE name = ?",
924 params![id, tag],
925 )?;
926 }
927 }
928
929 let new_content = input.content.as_ref().unwrap_or(¤t.content);
931 let new_tags = input.tags.as_ref().unwrap_or(¤t.tags);
932 let new_metadata = input.metadata.as_ref().unwrap_or(¤t.metadata);
933 let tags_json = serde_json::to_string(new_tags)?;
934 let metadata_json = serde_json::to_string(new_metadata)?;
935
936 conn.execute(
937 "INSERT INTO memory_versions (memory_id, version, content, tags, metadata, created_at)
938 VALUES (?, (SELECT version FROM memories WHERE id = ?), ?, ?, ?, ?)",
939 params![id, id, new_content, tags_json, metadata_json, now],
940 )?;
941
942 if input.content.is_some() {
944 conn.execute(
945 "INSERT OR REPLACE INTO embedding_queue (memory_id, status, queued_at)
946 VALUES (?, 'pending', ?)",
947 params![id, now],
948 )?;
949 conn.execute(
950 "UPDATE memories SET has_embedding = 0 WHERE id = ?",
951 params![id],
952 )?;
953 }
954
955 let mut changed_fields = Vec::new();
957 if input.content.is_some() {
958 changed_fields.push("content");
959 }
960 if input.tags.is_some() {
961 changed_fields.push("tags");
962 }
963 if input.metadata.is_some() {
964 changed_fields.push("metadata");
965 }
966 if input.importance.is_some() {
967 changed_fields.push("importance");
968 }
969 if input.ttl_seconds.is_some() {
970 changed_fields.push("ttl");
971 }
972
973 record_event(
975 conn,
976 MemoryEventType::Updated,
977 Some(id),
978 None,
979 serde_json::json!({
980 "changed_fields": changed_fields,
981 }),
982 )?;
983
984 conn.execute(
986 "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
987 [],
988 )?;
989
990 get_memory_internal(conn, id, false)
991}
992
993pub fn promote_to_permanent(conn: &Connection, id: i64) -> Result<Memory> {
1004 let memory = get_memory_internal(conn, id, false)?;
1005
1006 if memory.tier == MemoryTier::Permanent {
1007 return Err(EngramError::InvalidInput(format!(
1008 "Memory {} is already in the Permanent tier",
1009 id
1010 )));
1011 }
1012
1013 let now = Utc::now().to_rfc3339();
1014
1015 conn.execute(
1016 "UPDATE memories SET tier = 'permanent', expires_at = NULL, updated_at = ?, version = version + 1 WHERE id = ?",
1017 params![now, id],
1018 )?;
1019
1020 record_event(
1022 conn,
1023 MemoryEventType::Updated,
1024 Some(id),
1025 None,
1026 serde_json::json!({
1027 "changed_fields": ["tier", "expires_at"],
1028 "action": "promote_to_permanent",
1029 }),
1030 )?;
1031
1032 conn.execute(
1034 "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
1035 [],
1036 )?;
1037
1038 tracing::info!(memory_id = id, "Promoted memory to permanent tier");
1039
1040 get_memory_internal(conn, id, false)
1041}
1042
1043pub fn move_to_workspace(conn: &Connection, id: i64, workspace: &str) -> Result<Memory> {
1053 let _memory = get_memory_internal(conn, id, false)?;
1055
1056 let normalized = crate::types::normalize_workspace(workspace)
1058 .map_err(|e| EngramError::InvalidInput(format!("Invalid workspace: {}", e)))?;
1059
1060 let now = Utc::now().to_rfc3339();
1061
1062 conn.execute(
1063 "UPDATE memories SET workspace = ?, updated_at = ?, version = version + 1 WHERE id = ?",
1064 params![normalized, now, id],
1065 )?;
1066
1067 record_event(
1069 conn,
1070 MemoryEventType::Updated,
1071 Some(id),
1072 None,
1073 serde_json::json!({
1074 "changed_fields": ["workspace"],
1075 "action": "move_to_workspace",
1076 "new_workspace": normalized,
1077 }),
1078 )?;
1079
1080 conn.execute(
1082 "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
1083 [],
1084 )?;
1085
1086 tracing::info!(memory_id = id, workspace = %normalized, "Moved memory to workspace");
1087
1088 get_memory_internal(conn, id, false)
1089}
1090
1091pub fn list_workspaces(conn: &Connection) -> Result<Vec<WorkspaceStats>> {
1096 let now = Utc::now().to_rfc3339();
1097
1098 let mut stmt = conn.prepare(
1099 r#"
1100 SELECT
1101 workspace,
1102 COUNT(*) as memory_count,
1103 SUM(CASE WHEN tier = 'permanent' THEN 1 ELSE 0 END) as permanent_count,
1104 SUM(CASE WHEN tier = 'daily' THEN 1 ELSE 0 END) as daily_count,
1105 MIN(created_at) as first_memory_at,
1106 MAX(created_at) as last_memory_at,
1107 AVG(importance) as avg_importance
1108 FROM memories
1109 WHERE valid_to IS NULL AND (expires_at IS NULL OR expires_at > ?)
1110 GROUP BY workspace
1111 ORDER BY memory_count DESC
1112 "#,
1113 )?;
1114
1115 let workspaces: Vec<WorkspaceStats> = stmt
1116 .query_map(params![now], |row| {
1117 let workspace: String = row.get(0)?;
1118 let memory_count: i64 = row.get(1)?;
1119 let permanent_count: i64 = row.get(2)?;
1120 let daily_count: i64 = row.get(3)?;
1121 let first_memory_at: Option<String> = row.get(4)?;
1122 let last_memory_at: Option<String> = row.get(5)?;
1123 let avg_importance: Option<f64> = row.get(6)?;
1124
1125 Ok(WorkspaceStats {
1126 workspace,
1127 memory_count,
1128 permanent_count,
1129 daily_count,
1130 first_memory_at: first_memory_at.and_then(|s| {
1131 DateTime::parse_from_rfc3339(&s)
1132 .map(|dt| dt.with_timezone(&Utc))
1133 .ok()
1134 }),
1135 last_memory_at: last_memory_at.and_then(|s| {
1136 DateTime::parse_from_rfc3339(&s)
1137 .map(|dt| dt.with_timezone(&Utc))
1138 .ok()
1139 }),
1140 top_tags: vec![], avg_importance: avg_importance.map(|v| v as f32),
1142 })
1143 })?
1144 .filter_map(|r| r.ok())
1145 .collect();
1146
1147 Ok(workspaces)
1148}
1149
1150pub fn get_workspace_stats(conn: &Connection, workspace: &str) -> Result<WorkspaceStats> {
1152 let normalized = crate::types::normalize_workspace(workspace)
1153 .map_err(|e| EngramError::InvalidInput(format!("Invalid workspace: {}", e)))?;
1154
1155 let now = Utc::now().to_rfc3339();
1156
1157 let stats = conn
1158 .query_row(
1159 r#"
1160 SELECT
1161 workspace,
1162 COUNT(*) as memory_count,
1163 SUM(CASE WHEN tier = 'permanent' THEN 1 ELSE 0 END) as permanent_count,
1164 SUM(CASE WHEN tier = 'daily' THEN 1 ELSE 0 END) as daily_count,
1165 MIN(created_at) as first_memory_at,
1166 MAX(created_at) as last_memory_at,
1167 AVG(importance) as avg_importance
1168 FROM memories
1169 WHERE workspace = ? AND valid_to IS NULL AND (expires_at IS NULL OR expires_at > ?)
1170 GROUP BY workspace
1171 "#,
1172 params![normalized, now],
1173 |row| {
1174 let workspace: String = row.get(0)?;
1175 let memory_count: i64 = row.get(1)?;
1176 let permanent_count: i64 = row.get(2)?;
1177 let daily_count: i64 = row.get(3)?;
1178 let first_memory_at: Option<String> = row.get(4)?;
1179 let last_memory_at: Option<String> = row.get(5)?;
1180 let avg_importance: Option<f64> = row.get(6)?;
1181
1182 Ok(WorkspaceStats {
1183 workspace,
1184 memory_count,
1185 permanent_count,
1186 daily_count,
1187 first_memory_at: first_memory_at.and_then(|s| {
1188 DateTime::parse_from_rfc3339(&s)
1189 .map(|dt| dt.with_timezone(&Utc))
1190 .ok()
1191 }),
1192 last_memory_at: last_memory_at.and_then(|s| {
1193 DateTime::parse_from_rfc3339(&s)
1194 .map(|dt| dt.with_timezone(&Utc))
1195 .ok()
1196 }),
1197 top_tags: vec![],
1198 avg_importance: avg_importance.map(|v| v as f32),
1199 })
1200 },
1201 )
1202 .map_err(|e| match e {
1203 rusqlite::Error::QueryReturnedNoRows => {
1204 EngramError::NotFound(0) }
1206 _ => EngramError::Database(e),
1207 })?;
1208
1209 Ok(stats)
1210}
1211
1212pub fn delete_workspace(conn: &Connection, workspace: &str, move_to_default: bool) -> Result<i64> {
1221 let normalized = crate::types::normalize_workspace(workspace)
1222 .map_err(|e| EngramError::InvalidInput(format!("Invalid workspace: {}", e)))?;
1223
1224 if normalized == "default" {
1225 return Err(EngramError::InvalidInput(
1226 "Cannot delete the default workspace".to_string(),
1227 ));
1228 }
1229
1230 let now = Utc::now().to_rfc3339();
1231
1232 let affected_ids: Vec<i64> = {
1234 let mut stmt =
1235 conn.prepare("SELECT id FROM memories WHERE workspace = ? AND valid_to IS NULL")?;
1236 let rows = stmt.query_map(params![&normalized], |row| row.get(0))?;
1237 rows.collect::<std::result::Result<Vec<_>, _>>()?
1238 };
1239
1240 let affected = affected_ids.len() as i64;
1241
1242 if affected > 0 {
1243 if move_to_default {
1244 conn.execute(
1246 "UPDATE memories SET workspace = 'default', updated_at = ?, version = version + 1 WHERE workspace = ? AND valid_to IS NULL",
1247 params![&now, &normalized],
1248 )?;
1249 } else {
1250 conn.execute(
1252 "UPDATE memories SET valid_to = ? WHERE workspace = ? AND valid_to IS NULL",
1253 params![&now, &normalized],
1254 )?;
1255 }
1256
1257 let event_type = if move_to_default {
1259 MemoryEventType::Updated
1260 } else {
1261 MemoryEventType::Deleted
1262 };
1263
1264 for memory_id in &affected_ids {
1265 record_event(
1266 conn,
1267 event_type.clone(),
1268 Some(*memory_id),
1269 None,
1270 serde_json::json!({
1271 "action": "delete_workspace",
1272 "workspace": normalized,
1273 "move_to_default": move_to_default,
1274 }),
1275 )?;
1276 }
1277 }
1278
1279 conn.execute(
1281 "UPDATE sync_state SET pending_changes = pending_changes + ?, version = (SELECT COALESCE(MAX(id), 0) FROM memory_events) WHERE id = 1",
1282 params![affected],
1283 )?;
1284
1285 tracing::info!(
1286 workspace = %normalized,
1287 move_to_default,
1288 affected,
1289 "Deleted workspace"
1290 );
1291
1292 Ok(affected)
1293}
1294
1295pub fn delete_memory(conn: &Connection, id: i64) -> Result<()> {
1297 let now = Utc::now().to_rfc3339();
1298
1299 let memory_info: Option<(String, String)> = conn
1301 .query_row(
1302 "SELECT workspace, memory_type FROM memories WHERE id = ? AND valid_to IS NULL",
1303 params![id],
1304 |row| Ok((row.get(0)?, row.get(1)?)),
1305 )
1306 .ok();
1307
1308 let affected = conn.execute(
1309 "UPDATE memories SET valid_to = ? WHERE id = ? AND valid_to IS NULL",
1310 params![now, id],
1311 )?;
1312
1313 if affected == 0 {
1314 return Err(EngramError::NotFound(id));
1315 }
1316
1317 conn.execute(
1319 "UPDATE crossrefs SET valid_to = ? WHERE (from_id = ? OR to_id = ?) AND valid_to IS NULL",
1320 params![now, id, id],
1321 )?;
1322
1323 let (workspace, memory_type) =
1325 memory_info.unwrap_or(("default".to_string(), "unknown".to_string()));
1326 record_event(
1327 conn,
1328 MemoryEventType::Deleted,
1329 Some(id),
1330 None,
1331 serde_json::json!({
1332 "workspace": workspace,
1333 "memory_type": memory_type,
1334 }),
1335 )?;
1336
1337 conn.execute(
1339 "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
1340 [],
1341 )?;
1342
1343 Ok(())
1344}
1345
1346pub fn list_memories(conn: &Connection, options: &ListOptions) -> Result<Vec<Memory>> {
1348 let now = Utc::now().to_rfc3339();
1349
1350 let mut sql = String::from(
1351 "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
1352 m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
1353 m.visibility, m.version, m.has_embedding, m.metadata,
1354 m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash,
1355 m.event_time, m.event_duration_seconds, m.trigger_pattern, m.procedure_success_count,
1356 m.procedure_failure_count, m.summary_of_id, m.lifecycle_state, m.media_url
1357 FROM memories m",
1358 );
1359
1360 let mut conditions = vec!["m.valid_to IS NULL".to_string()];
1361 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![];
1362
1363 conditions.push("(m.expires_at IS NULL OR m.expires_at > ?)".to_string());
1365 params.push(Box::new(now));
1366
1367 if let Some(ref tags) = options.tags {
1369 if !tags.is_empty() {
1370 sql.push_str(
1371 " JOIN memory_tags mt ON m.id = mt.memory_id
1372 JOIN tags t ON mt.tag_id = t.id",
1373 );
1374 let placeholders: Vec<String> = tags.iter().map(|_| "?".to_string()).collect();
1375 conditions.push(format!("t.name IN ({})", placeholders.join(", ")));
1376 for tag in tags {
1377 params.push(Box::new(tag.clone()));
1378 }
1379 }
1380 }
1381
1382 if let Some(ref memory_type) = options.memory_type {
1384 conditions.push("m.memory_type = ?".to_string());
1385 params.push(Box::new(memory_type.as_str().to_string()));
1386 }
1387
1388 if let Some(ref metadata_filter) = options.metadata_filter {
1390 for (key, value) in metadata_filter {
1391 metadata_value_to_param(key, value, &mut conditions, &mut params)?;
1392 }
1393 }
1394
1395 if let Some(ref scope) = options.scope {
1397 conditions.push("m.scope_type = ?".to_string());
1398 params.push(Box::new(scope.scope_type().to_string()));
1399 if let Some(scope_id) = scope.scope_id() {
1400 conditions.push("m.scope_id = ?".to_string());
1401 params.push(Box::new(scope_id.to_string()));
1402 } else {
1403 conditions.push("m.scope_id IS NULL".to_string());
1404 }
1405 }
1406
1407 if let Some(ref workspace) = options.workspace {
1409 conditions.push("m.workspace = ?".to_string());
1410 params.push(Box::new(workspace.clone()));
1411 }
1412
1413 if let Some(ref tier) = options.tier {
1415 conditions.push("m.tier = ?".to_string());
1416 params.push(Box::new(tier.as_str().to_string()));
1417 }
1418
1419 sql.push_str(" WHERE ");
1420 sql.push_str(&conditions.join(" AND "));
1421
1422 let sort_field = match options.sort_by.unwrap_or_default() {
1424 SortField::CreatedAt => "m.created_at",
1425 SortField::UpdatedAt => "m.updated_at",
1426 SortField::LastAccessedAt => "m.last_accessed_at",
1427 SortField::Importance => "m.importance",
1428 SortField::AccessCount => "m.access_count",
1429 };
1430 let sort_order = match options.sort_order.unwrap_or_default() {
1431 SortOrder::Asc => "ASC",
1432 SortOrder::Desc => "DESC",
1433 };
1434 sql.push_str(&format!(" ORDER BY {} {}", sort_field, sort_order));
1435
1436 let limit = options.limit.unwrap_or(100);
1438 let offset = options.offset.unwrap_or(0);
1439 sql.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
1440
1441 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
1442 let mut stmt = conn.prepare(&sql)?;
1443
1444 let memories: Vec<Memory> = stmt
1445 .query_map(param_refs.as_slice(), memory_from_row)?
1446 .filter_map(|r| r.ok())
1447 .map(|mut m| {
1448 m.tags = load_tags(conn, m.id).unwrap_or_default();
1449 m
1450 })
1451 .collect();
1452
1453 Ok(memories)
1454}
1455
1456pub fn get_episodic_timeline(
1458 conn: &Connection,
1459 start_time: Option<DateTime<Utc>>,
1460 end_time: Option<DateTime<Utc>>,
1461 workspace: Option<&str>,
1462 tags: Option<&[String]>,
1463 limit: i64,
1464) -> Result<Vec<Memory>> {
1465 let now = Utc::now().to_rfc3339();
1466
1467 let mut sql = String::from(
1468 "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
1469 m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
1470 m.visibility, m.version, m.has_embedding, m.metadata,
1471 m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash,
1472 m.event_time, m.event_duration_seconds, m.trigger_pattern, m.procedure_success_count,
1473 m.procedure_failure_count, m.summary_of_id, m.lifecycle_state, m.media_url,
1474 m.event_time, m.event_duration_seconds, m.trigger_pattern,
1475 m.procedure_success_count, m.procedure_failure_count, m.summary_of_id,
1476 m.lifecycle_state
1477 FROM memories m",
1478 );
1479
1480 let mut conditions = vec![
1481 "m.valid_to IS NULL".to_string(),
1482 "(m.expires_at IS NULL OR m.expires_at > ?)".to_string(),
1483 "m.memory_type = 'episodic'".to_string(),
1484 "m.event_time IS NOT NULL".to_string(),
1485 ];
1486 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
1487
1488 if let Some(start) = start_time {
1489 conditions.push("m.event_time >= ?".to_string());
1490 params.push(Box::new(start.to_rfc3339()));
1491 }
1492
1493 if let Some(end) = end_time {
1494 conditions.push("m.event_time <= ?".to_string());
1495 params.push(Box::new(end.to_rfc3339()));
1496 }
1497
1498 if let Some(ws) = workspace {
1499 conditions.push("m.workspace = ?".to_string());
1500 params.push(Box::new(ws.to_string()));
1501 }
1502
1503 if let Some(tag_list) = tags {
1504 if !tag_list.is_empty() {
1505 sql.push_str(
1506 " JOIN memory_tags mt ON m.id = mt.memory_id
1507 JOIN tags t ON mt.tag_id = t.id",
1508 );
1509 let placeholders: Vec<String> = tag_list.iter().map(|_| "?".to_string()).collect();
1510 conditions.push(format!("t.name IN ({})", placeholders.join(", ")));
1511 for tag in tag_list {
1512 params.push(Box::new(tag.clone()));
1513 }
1514 }
1515 }
1516
1517 sql.push_str(" WHERE ");
1518 sql.push_str(&conditions.join(" AND "));
1519 sql.push_str(" ORDER BY m.event_time ASC");
1520 sql.push_str(&format!(" LIMIT {}", limit));
1521
1522 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
1523 let mut stmt = conn.prepare(&sql)?;
1524
1525 let memories: Vec<Memory> = stmt
1526 .query_map(param_refs.as_slice(), memory_from_row)?
1527 .filter_map(|r| r.ok())
1528 .map(|mut m| {
1529 m.tags = load_tags(conn, m.id).unwrap_or_default();
1530 m
1531 })
1532 .collect();
1533
1534 Ok(memories)
1535}
1536
1537pub fn get_procedural_memories(
1539 conn: &Connection,
1540 trigger_pattern: Option<&str>,
1541 workspace: Option<&str>,
1542 min_success_rate: Option<f32>,
1543 limit: i64,
1544) -> Result<Vec<Memory>> {
1545 let now = Utc::now().to_rfc3339();
1546
1547 let sql_base = "SELECT m.id, m.content, m.memory_type, m.importance, m.access_count,
1548 m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
1549 m.visibility, m.version, m.has_embedding, m.metadata,
1550 m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash,
1551 m.event_time, m.event_duration_seconds, m.trigger_pattern, m.procedure_success_count,
1552 m.procedure_failure_count, m.summary_of_id, m.lifecycle_state, m.media_url,
1553 m.event_time, m.event_duration_seconds, m.trigger_pattern,
1554 m.procedure_success_count, m.procedure_failure_count, m.summary_of_id,
1555 m.lifecycle_state
1556 FROM memories m";
1557
1558 let mut conditions = vec![
1559 "m.valid_to IS NULL".to_string(),
1560 "(m.expires_at IS NULL OR m.expires_at > ?)".to_string(),
1561 "m.memory_type = 'procedural'".to_string(),
1562 ];
1563 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
1564
1565 if let Some(pattern) = trigger_pattern {
1566 conditions.push("m.trigger_pattern LIKE ?".to_string());
1567 params.push(Box::new(format!("%{}%", pattern)));
1568 }
1569
1570 if let Some(ws) = workspace {
1571 conditions.push("m.workspace = ?".to_string());
1572 params.push(Box::new(ws.to_string()));
1573 }
1574
1575 if let Some(min_rate) = min_success_rate {
1576 conditions.push("(m.procedure_success_count + m.procedure_failure_count) > 0".to_string());
1579 conditions.push(
1580 "CAST(m.procedure_success_count AS REAL) / (m.procedure_success_count + m.procedure_failure_count) >= ?"
1581 .to_string(),
1582 );
1583 params.push(Box::new(min_rate as f64));
1584 }
1585
1586 let sql = format!(
1587 "{} WHERE {} ORDER BY m.procedure_success_count DESC LIMIT {}",
1588 sql_base,
1589 conditions.join(" AND "),
1590 limit
1591 );
1592
1593 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
1594 let mut stmt = conn.prepare(&sql)?;
1595
1596 let memories: Vec<Memory> = stmt
1597 .query_map(param_refs.as_slice(), memory_from_row)?
1598 .filter_map(|r| r.ok())
1599 .map(|mut m| {
1600 m.tags = load_tags(conn, m.id).unwrap_or_default();
1601 m
1602 })
1603 .collect();
1604
1605 Ok(memories)
1606}
1607
1608pub fn record_procedure_outcome(
1610 conn: &Connection,
1611 memory_id: i64,
1612 success: bool,
1613) -> Result<Memory> {
1614 let column = if success {
1615 "procedure_success_count"
1616 } else {
1617 "procedure_failure_count"
1618 };
1619
1620 let now = Utc::now().to_rfc3339();
1621
1622 let memory_type: String = conn
1624 .query_row(
1625 "SELECT memory_type FROM memories WHERE id = ? AND valid_to IS NULL",
1626 params![memory_id],
1627 |row| row.get(0),
1628 )
1629 .map_err(|_| EngramError::NotFound(memory_id))?;
1630
1631 if memory_type != "procedural" {
1632 return Err(EngramError::InvalidInput(format!(
1633 "Memory {} is type '{}', not 'procedural'",
1634 memory_id, memory_type
1635 )));
1636 }
1637
1638 conn.execute(
1639 &format!(
1640 "UPDATE memories SET {} = {} + 1, updated_at = ? WHERE id = ?",
1641 column, column
1642 ),
1643 params![now, memory_id],
1644 )?;
1645
1646 get_memory(conn, memory_id)
1647}
1648
1649pub fn create_crossref(conn: &Connection, input: &CreateCrossRefInput) -> Result<CrossReference> {
1651 let now = Utc::now().to_rfc3339();
1652
1653 let _ = get_memory_internal(conn, input.from_id, false)?;
1655 let _ = get_memory_internal(conn, input.to_id, false)?;
1656
1657 let strength = input.strength.unwrap_or(1.0);
1658
1659 conn.execute(
1660 "INSERT INTO crossrefs (from_id, to_id, edge_type, score, strength, source, source_context, pinned, created_at, valid_from)
1661 VALUES (?, ?, ?, 1.0, ?, 'manual', ?, ?, ?, ?)
1662 ON CONFLICT(from_id, to_id, edge_type) DO UPDATE SET
1663 strength = excluded.strength,
1664 source_context = COALESCE(excluded.source_context, crossrefs.source_context),
1665 pinned = excluded.pinned",
1666 params![
1667 input.from_id,
1668 input.to_id,
1669 input.edge_type.as_str(),
1670 strength,
1671 input.source_context,
1672 input.pinned,
1673 now,
1674 now,
1675 ],
1676 )?;
1677
1678 get_crossref(conn, input.from_id, input.to_id, input.edge_type)
1679}
1680
1681pub fn get_crossref(
1683 conn: &Connection,
1684 from_id: i64,
1685 to_id: i64,
1686 edge_type: EdgeType,
1687) -> Result<CrossReference> {
1688 let mut stmt = conn.prepare_cached(
1689 "SELECT from_id, to_id, edge_type, score, confidence, strength, source,
1690 source_context, created_at, valid_from, valid_to, pinned, metadata
1691 FROM crossrefs
1692 WHERE from_id = ? AND to_id = ? AND edge_type = ? AND valid_to IS NULL",
1693 )?;
1694
1695 let crossref = stmt.query_row(params![from_id, to_id, edge_type.as_str()], |row| {
1696 let edge_type_str: String = row.get("edge_type")?;
1697 let source_str: String = row.get("source")?;
1698 let created_at_str: String = row.get("created_at")?;
1699 let valid_from_str: String = row.get("valid_from")?;
1700 let valid_to_str: Option<String> = row.get("valid_to")?;
1701 let metadata_str: String = row.get("metadata")?;
1702
1703 Ok(CrossReference {
1704 from_id: row.get("from_id")?,
1705 to_id: row.get("to_id")?,
1706 edge_type: edge_type_str.parse().unwrap_or(EdgeType::RelatedTo),
1707 score: row.get("score")?,
1708 confidence: row.get("confidence")?,
1709 strength: row.get("strength")?,
1710 source: match source_str.as_str() {
1711 "manual" => RelationSource::Manual,
1712 "llm" => RelationSource::Llm,
1713 _ => RelationSource::Auto,
1714 },
1715 source_context: row.get("source_context")?,
1716 created_at: DateTime::parse_from_rfc3339(&created_at_str)
1717 .map(|dt| dt.with_timezone(&Utc))
1718 .unwrap_or_else(|_| Utc::now()),
1719 valid_from: DateTime::parse_from_rfc3339(&valid_from_str)
1720 .map(|dt| dt.with_timezone(&Utc))
1721 .unwrap_or_else(|_| Utc::now()),
1722 valid_to: valid_to_str.and_then(|s| {
1723 DateTime::parse_from_rfc3339(&s)
1724 .map(|dt| dt.with_timezone(&Utc))
1725 .ok()
1726 }),
1727 pinned: row.get::<_, i32>("pinned")? != 0,
1728 metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1729 })
1730 })?;
1731
1732 Ok(crossref)
1733}
1734
1735pub fn get_related(conn: &Connection, memory_id: i64) -> Result<Vec<CrossReference>> {
1737 let mut stmt = conn.prepare_cached(
1738 "SELECT from_id, to_id, edge_type, score, confidence, strength, source,
1739 source_context, created_at, valid_from, valid_to, pinned, metadata
1740 FROM crossrefs
1741 WHERE (from_id = ? OR to_id = ?) AND valid_to IS NULL
1742 ORDER BY score DESC",
1743 )?;
1744
1745 let crossrefs: Vec<CrossReference> = stmt
1746 .query_map(params![memory_id, memory_id], |row| {
1747 let edge_type_str: String = row.get("edge_type")?;
1748 let source_str: String = row.get("source")?;
1749 let created_at_str: String = row.get("created_at")?;
1750 let valid_from_str: String = row.get("valid_from")?;
1751 let valid_to_str: Option<String> = row.get("valid_to")?;
1752 let metadata_str: String = row.get("metadata")?;
1753
1754 Ok(CrossReference {
1755 from_id: row.get("from_id")?,
1756 to_id: row.get("to_id")?,
1757 edge_type: edge_type_str.parse().unwrap_or(EdgeType::RelatedTo),
1758 score: row.get("score")?,
1759 confidence: row.get("confidence")?,
1760 strength: row.get("strength")?,
1761 source: match source_str.as_str() {
1762 "manual" => RelationSource::Manual,
1763 "llm" => RelationSource::Llm,
1764 _ => RelationSource::Auto,
1765 },
1766 source_context: row.get("source_context")?,
1767 created_at: DateTime::parse_from_rfc3339(&created_at_str)
1768 .map(|dt| dt.with_timezone(&Utc))
1769 .unwrap_or_else(|_| Utc::now()),
1770 valid_from: DateTime::parse_from_rfc3339(&valid_from_str)
1771 .map(|dt| dt.with_timezone(&Utc))
1772 .unwrap_or_else(|_| Utc::now()),
1773 valid_to: valid_to_str.and_then(|s| {
1774 DateTime::parse_from_rfc3339(&s)
1775 .map(|dt| dt.with_timezone(&Utc))
1776 .ok()
1777 }),
1778 pinned: row.get::<_, i32>("pinned")? != 0,
1779 metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1780 })
1781 })?
1782 .filter_map(|r| r.ok())
1783 .collect();
1784
1785 Ok(crossrefs)
1786}
1787
1788pub fn delete_crossref(
1790 conn: &Connection,
1791 from_id: i64,
1792 to_id: i64,
1793 edge_type: EdgeType,
1794) -> Result<()> {
1795 let now = Utc::now().to_rfc3339();
1796
1797 let affected = conn.execute(
1798 "UPDATE crossrefs SET valid_to = ?
1799 WHERE from_id = ? AND to_id = ? AND edge_type = ? AND valid_to IS NULL",
1800 params![now, from_id, to_id, edge_type.as_str()],
1801 )?;
1802
1803 if affected == 0 {
1804 return Err(EngramError::NotFound(from_id));
1805 }
1806
1807 Ok(())
1808}
1809
1810pub fn set_memory_expiration(
1817 conn: &Connection,
1818 id: i64,
1819 ttl_seconds: Option<i64>,
1820) -> Result<Memory> {
1821 let _ = get_memory_internal(conn, id, false)?;
1823
1824 match ttl_seconds {
1825 Some(0) => {
1826 conn.execute(
1828 "UPDATE memories SET expires_at = NULL, updated_at = ? WHERE id = ?",
1829 params![Utc::now().to_rfc3339(), id],
1830 )?;
1831 }
1832 Some(ttl) => {
1833 let expires_at = (Utc::now() + chrono::Duration::seconds(ttl)).to_rfc3339();
1835 conn.execute(
1836 "UPDATE memories SET expires_at = ?, updated_at = ? WHERE id = ?",
1837 params![expires_at, Utc::now().to_rfc3339(), id],
1838 )?;
1839 }
1840 None => {
1841 return get_memory_internal(conn, id, false);
1843 }
1844 }
1845
1846 record_event(
1848 conn,
1849 MemoryEventType::Updated,
1850 Some(id),
1851 None,
1852 serde_json::json!({
1853 "changed_fields": ["expires_at"],
1854 "action": "set_expiration",
1855 }),
1856 )?;
1857
1858 conn.execute(
1860 "UPDATE sync_state SET pending_changes = pending_changes + 1, version = (SELECT MAX(id) FROM memory_events) WHERE id = 1",
1861 [],
1862 )?;
1863
1864 get_memory_internal(conn, id, false)
1865}
1866
1867pub fn cleanup_expired_memories(conn: &Connection) -> Result<i64> {
1871 let now = Utc::now().to_rfc3339();
1872
1873 let affected = conn.execute(
1875 "UPDATE memories SET valid_to = ?
1876 WHERE expires_at IS NOT NULL AND expires_at <= ? AND valid_to IS NULL",
1877 params![now, now],
1878 )?;
1879
1880 if affected > 0 {
1881 conn.execute(
1883 "UPDATE crossrefs SET valid_to = ?
1884 WHERE valid_to IS NULL AND (
1885 from_id IN (SELECT id FROM memories WHERE valid_to IS NOT NULL AND expires_at IS NOT NULL AND expires_at <= ?)
1886 OR to_id IN (SELECT id FROM memories WHERE valid_to IS NOT NULL AND expires_at IS NOT NULL AND expires_at <= ?)
1887 )",
1888 params![now, now, now],
1889 )?;
1890
1891 conn.execute(
1894 "DELETE FROM memory_entities
1895 WHERE memory_id IN (
1896 SELECT id FROM memories
1897 WHERE valid_to IS NOT NULL AND expires_at IS NOT NULL AND expires_at <= ?
1898 )",
1899 params![now],
1900 )?;
1901
1902 conn.execute(
1904 "DELETE FROM memory_tags
1905 WHERE memory_id IN (
1906 SELECT id FROM memories
1907 WHERE valid_to IS NOT NULL AND expires_at IS NOT NULL AND expires_at <= ?
1908 )",
1909 params![now],
1910 )?;
1911
1912 record_event(
1914 conn,
1915 MemoryEventType::Deleted,
1916 None, None,
1918 serde_json::json!({
1919 "action": "cleanup_expired",
1920 "affected_count": affected,
1921 }),
1922 )?;
1923
1924 conn.execute(
1926 "UPDATE sync_state SET pending_changes = pending_changes + ?, version = (SELECT COALESCE(MAX(id), 0) FROM memory_events) WHERE id = 1",
1927 params![affected as i64],
1928 )?;
1929 }
1930
1931 Ok(affected as i64)
1932}
1933
1934pub fn count_expired_memories(conn: &Connection) -> Result<i64> {
1936 let now = Utc::now().to_rfc3339();
1937
1938 let count: i64 = conn.query_row(
1939 "SELECT COUNT(*) FROM memories
1940 WHERE expires_at IS NOT NULL AND expires_at <= ? AND valid_to IS NULL",
1941 params![now],
1942 |row| row.get(0),
1943 )?;
1944
1945 Ok(count)
1946}
1947
1948#[derive(Debug, Clone, Serialize, Deserialize)]
1950pub struct RetentionPolicy {
1951 pub id: i64,
1952 pub workspace: String,
1953 pub max_age_days: Option<i64>,
1954 pub max_memories: Option<i64>,
1955 pub compress_after_days: Option<i64>,
1956 pub compress_max_importance: f32,
1957 pub compress_min_access: i32,
1958 pub auto_delete_after_days: Option<i64>,
1959 pub exclude_types: Vec<String>,
1960 pub created_at: String,
1961 pub updated_at: String,
1962}
1963
1964pub fn get_retention_policy(conn: &Connection, workspace: &str) -> Result<Option<RetentionPolicy>> {
1966 conn.query_row(
1967 "SELECT id, workspace, max_age_days, max_memories, compress_after_days,
1968 compress_max_importance, compress_min_access, auto_delete_after_days,
1969 exclude_types, created_at, updated_at
1970 FROM retention_policies WHERE workspace = ?",
1971 params![workspace],
1972 |row| {
1973 let exclude_str: Option<String> = row.get(8)?;
1974 Ok(RetentionPolicy {
1975 id: row.get(0)?,
1976 workspace: row.get(1)?,
1977 max_age_days: row.get(2)?,
1978 max_memories: row.get(3)?,
1979 compress_after_days: row.get(4)?,
1980 compress_max_importance: row.get::<_, f32>(5).unwrap_or(0.3),
1981 compress_min_access: row.get::<_, i32>(6).unwrap_or(3),
1982 auto_delete_after_days: row.get(7)?,
1983 exclude_types: exclude_str
1984 .map(|s| s.split(',').map(|t| t.trim().to_string()).collect())
1985 .unwrap_or_default(),
1986 created_at: row.get(9)?,
1987 updated_at: row.get(10)?,
1988 })
1989 },
1990 )
1991 .optional()
1992 .map_err(EngramError::from)
1993}
1994
1995pub fn list_retention_policies(conn: &Connection) -> Result<Vec<RetentionPolicy>> {
1997 let mut stmt = conn.prepare(
1998 "SELECT id, workspace, max_age_days, max_memories, compress_after_days,
1999 compress_max_importance, compress_min_access, auto_delete_after_days,
2000 exclude_types, created_at, updated_at
2001 FROM retention_policies ORDER BY workspace",
2002 )?;
2003
2004 let policies = stmt
2005 .query_map([], |row| {
2006 let exclude_str: Option<String> = row.get(8)?;
2007 Ok(RetentionPolicy {
2008 id: row.get(0)?,
2009 workspace: row.get(1)?,
2010 max_age_days: row.get(2)?,
2011 max_memories: row.get(3)?,
2012 compress_after_days: row.get(4)?,
2013 compress_max_importance: row.get::<_, f32>(5).unwrap_or(0.3),
2014 compress_min_access: row.get::<_, i32>(6).unwrap_or(3),
2015 auto_delete_after_days: row.get(7)?,
2016 exclude_types: exclude_str
2017 .map(|s| s.split(',').map(|t| t.trim().to_string()).collect())
2018 .unwrap_or_default(),
2019 created_at: row.get(9)?,
2020 updated_at: row.get(10)?,
2021 })
2022 })?
2023 .filter_map(|r| r.ok())
2024 .collect();
2025
2026 Ok(policies)
2027}
2028
2029pub fn set_retention_policy(
2031 conn: &Connection,
2032 workspace: &str,
2033 max_age_days: Option<i64>,
2034 max_memories: Option<i64>,
2035 compress_after_days: Option<i64>,
2036 compress_max_importance: Option<f32>,
2037 compress_min_access: Option<i32>,
2038 auto_delete_after_days: Option<i64>,
2039 exclude_types: Option<Vec<String>>,
2040) -> Result<RetentionPolicy> {
2041 let now = Utc::now().to_rfc3339();
2042 let exclude_str = exclude_types.map(|v| v.join(","));
2043
2044 conn.execute(
2045 "INSERT INTO retention_policies (workspace, max_age_days, max_memories, compress_after_days,
2046 compress_max_importance, compress_min_access, auto_delete_after_days, exclude_types,
2047 created_at, updated_at)
2048 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9)
2049 ON CONFLICT(workspace) DO UPDATE SET
2050 max_age_days = COALESCE(?2, max_age_days),
2051 max_memories = COALESCE(?3, max_memories),
2052 compress_after_days = COALESCE(?4, compress_after_days),
2053 compress_max_importance = COALESCE(?5, compress_max_importance),
2054 compress_min_access = COALESCE(?6, compress_min_access),
2055 auto_delete_after_days = COALESCE(?7, auto_delete_after_days),
2056 exclude_types = COALESCE(?8, exclude_types),
2057 updated_at = ?9",
2058 params![
2059 workspace,
2060 max_age_days,
2061 max_memories,
2062 compress_after_days,
2063 compress_max_importance.unwrap_or(0.3),
2064 compress_min_access.unwrap_or(3),
2065 auto_delete_after_days,
2066 exclude_str,
2067 now,
2068 ],
2069 )?;
2070
2071 get_retention_policy(conn, workspace)?.ok_or_else(|| EngramError::NotFound(0))
2072}
2073
2074pub fn delete_retention_policy(conn: &Connection, workspace: &str) -> Result<bool> {
2076 let affected = conn.execute(
2077 "DELETE FROM retention_policies WHERE workspace = ?",
2078 params![workspace],
2079 )?;
2080 Ok(affected > 0)
2081}
2082
2083pub fn apply_retention_policies(conn: &Connection) -> Result<i64> {
2085 let policies = list_retention_policies(conn)?;
2086 let mut total_affected = 0i64;
2087
2088 for policy in &policies {
2089 if let Some(compress_days) = policy.compress_after_days {
2091 let compressed = compress_old_memories(
2092 conn,
2093 compress_days,
2094 policy.compress_max_importance,
2095 policy.compress_min_access,
2096 100,
2097 )?;
2098 total_affected += compressed;
2099 }
2100
2101 if let Some(max_mem) = policy.max_memories {
2103 let count: i64 = conn
2104 .query_row(
2105 "SELECT COUNT(*) FROM memories WHERE workspace = ? AND valid_to IS NULL
2106 AND COALESCE(lifecycle_state, 'active') = 'active'",
2107 params![policy.workspace],
2108 |row| row.get(0),
2109 )
2110 .unwrap_or(0);
2111
2112 if count > max_mem {
2113 let excess = count - max_mem;
2115 let archived = conn.execute(
2116 "UPDATE memories SET lifecycle_state = 'archived'
2117 WHERE id IN (
2118 SELECT id FROM memories
2119 WHERE workspace = ? AND valid_to IS NULL
2120 AND COALESCE(lifecycle_state, 'active') = 'active'
2121 AND memory_type NOT IN ('summary', 'checkpoint')
2122 ORDER BY importance ASC, access_count ASC, created_at ASC
2123 LIMIT ?
2124 )",
2125 params![policy.workspace, excess],
2126 )?;
2127 total_affected += archived as i64;
2128 }
2129 }
2130
2131 if let Some(delete_days) = policy.auto_delete_after_days {
2133 let cutoff = (Utc::now() - chrono::Duration::days(delete_days)).to_rfc3339();
2134 let now = Utc::now().to_rfc3339();
2135 let deleted = conn.execute(
2136 "UPDATE memories SET valid_to = ?
2137 WHERE workspace = ? AND valid_to IS NULL
2138 AND lifecycle_state = 'archived'
2139 AND created_at < ?",
2140 params![now, policy.workspace, cutoff],
2141 )?;
2142 total_affected += deleted as i64;
2143 }
2144 }
2145
2146 Ok(total_affected)
2147}
2148
2149pub fn compress_old_memories(
2152 conn: &Connection,
2153 max_age_days: i64,
2154 max_importance: f32,
2155 min_access_count: i32,
2156 batch_limit: usize,
2157) -> Result<i64> {
2158 let cutoff = (Utc::now() - chrono::Duration::days(max_age_days)).to_rfc3339();
2159 let now = Utc::now().to_rfc3339();
2160
2161 let mut stmt = conn.prepare(
2163 "SELECT id, content, memory_type, importance, tags, workspace
2164 FROM (
2165 SELECT m.id, m.content, m.memory_type, m.importance, m.access_count, m.workspace,
2166 COALESCE(m.lifecycle_state, 'active') as lifecycle_state,
2167 (SELECT GROUP_CONCAT(t.name, ',') FROM memory_tags mt JOIN tags t ON mt.tag_id = t.id WHERE mt.memory_id = m.id) as tags
2168 FROM memories m
2169 WHERE m.valid_to IS NULL
2170 AND (m.expires_at IS NULL OR m.expires_at > ?1)
2171 AND m.created_at < ?2
2172 AND m.importance <= ?3
2173 AND m.access_count < ?4
2174 AND m.memory_type NOT IN ('summary', 'checkpoint')
2175 AND COALESCE(m.lifecycle_state, 'active') = 'active'
2176 ORDER BY m.created_at ASC
2177 LIMIT ?5
2178 )",
2179 )?;
2180
2181 let candidates: Vec<(i64, String, String, f32, Option<String>, String)> = stmt
2182 .query_map(
2183 params![
2184 now,
2185 cutoff,
2186 max_importance,
2187 min_access_count,
2188 batch_limit as i64
2189 ],
2190 |row| {
2191 Ok((
2192 row.get(0)?,
2193 row.get(1)?,
2194 row.get(2)?,
2195 row.get(3)?,
2196 row.get(4)?,
2197 row.get::<_, String>(5)
2198 .unwrap_or_else(|_| "default".to_string()),
2199 ))
2200 },
2201 )?
2202 .filter_map(|r| r.ok())
2203 .collect();
2204
2205 let mut archived = 0i64;
2206
2207 for (id, content, memory_type, importance, tags_csv, workspace) in &candidates {
2208 let summary_text = if content.len() > 200 {
2210 let head: String = content.chars().take(120).collect();
2211 let tail: String = content
2212 .chars()
2213 .rev()
2214 .take(60)
2215 .collect::<String>()
2216 .chars()
2217 .rev()
2218 .collect();
2219 format!("{}...{}", head, tail)
2220 } else {
2221 content.clone()
2222 };
2223
2224 let tags: Vec<String> = tags_csv
2225 .as_deref()
2226 .unwrap_or("")
2227 .split(',')
2228 .filter(|s| !s.is_empty())
2229 .map(|s| s.to_string())
2230 .collect();
2231
2232 let input = CreateMemoryInput {
2233 content: format!("[Archived {}] {}", memory_type, summary_text),
2234 memory_type: MemoryType::Summary,
2235 importance: Some(*importance),
2236 tags,
2237 workspace: Some(workspace.clone()),
2238 tier: MemoryTier::Permanent,
2239 summary_of_id: Some(*id),
2240 ..Default::default()
2241 };
2242
2243 if create_memory(conn, &input).is_ok()
2244 && conn
2245 .execute(
2246 "UPDATE memories SET lifecycle_state = 'archived' WHERE id = ? AND valid_to IS NULL",
2247 params![id],
2248 )
2249 .is_ok()
2250 {
2251 archived += 1;
2252 }
2253 }
2254
2255 Ok(archived)
2256}
2257
2258#[derive(Debug, Clone, serde::Serialize)]
2261pub struct CompactMemoryRow {
2262 pub id: i64,
2264 pub preview: String,
2266 pub truncated: bool,
2268 pub memory_type: MemoryType,
2270 pub tags: Vec<String>,
2272 pub importance: f32,
2274 pub created_at: DateTime<Utc>,
2276 pub updated_at: DateTime<Utc>,
2278 pub workspace: String,
2280 pub tier: MemoryTier,
2282 pub content_length: usize,
2284 pub line_count: usize,
2286}
2287
2288pub fn list_memories_compact(
2298 conn: &Connection,
2299 options: &ListOptions,
2300 preview_chars: Option<usize>,
2301) -> Result<Vec<CompactMemoryRow>> {
2302 use crate::intelligence::compact_preview;
2303
2304 let now = Utc::now().to_rfc3339();
2305 let max_preview = preview_chars.unwrap_or(100);
2306
2307 let mut sql = String::from(
2308 "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance,
2309 m.created_at, m.updated_at, m.workspace, m.tier
2310 FROM memories m",
2311 );
2312
2313 let mut conditions = vec!["m.valid_to IS NULL".to_string()];
2314 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![];
2315
2316 conditions.push("(m.expires_at IS NULL OR m.expires_at > ?)".to_string());
2318 params.push(Box::new(now));
2319
2320 if let Some(ref tags) = options.tags {
2322 if !tags.is_empty() {
2323 sql.push_str(
2324 " JOIN memory_tags mt ON m.id = mt.memory_id
2325 JOIN tags t ON mt.tag_id = t.id",
2326 );
2327 let placeholders: Vec<String> = tags.iter().map(|_| "?".to_string()).collect();
2328 conditions.push(format!("t.name IN ({})", placeholders.join(", ")));
2329 for tag in tags {
2330 params.push(Box::new(tag.clone()));
2331 }
2332 }
2333 }
2334
2335 if let Some(ref memory_type) = options.memory_type {
2337 conditions.push("m.memory_type = ?".to_string());
2338 params.push(Box::new(memory_type.as_str().to_string()));
2339 }
2340
2341 if let Some(ref metadata_filter) = options.metadata_filter {
2343 for (key, value) in metadata_filter {
2344 metadata_value_to_param(key, value, &mut conditions, &mut params)?;
2345 }
2346 }
2347
2348 if let Some(ref scope) = options.scope {
2350 conditions.push("m.scope_type = ?".to_string());
2351 params.push(Box::new(scope.scope_type().to_string()));
2352 if let Some(scope_id) = scope.scope_id() {
2353 conditions.push("m.scope_id = ?".to_string());
2354 params.push(Box::new(scope_id.to_string()));
2355 } else {
2356 conditions.push("m.scope_id IS NULL".to_string());
2357 }
2358 }
2359
2360 if let Some(ref workspace) = options.workspace {
2362 conditions.push("m.workspace = ?".to_string());
2363 params.push(Box::new(workspace.clone()));
2364 }
2365
2366 if let Some(ref tier) = options.tier {
2368 conditions.push("m.tier = ?".to_string());
2369 params.push(Box::new(tier.as_str().to_string()));
2370 }
2371
2372 sql.push_str(" WHERE ");
2373 sql.push_str(&conditions.join(" AND "));
2374
2375 let sort_field = match options.sort_by.unwrap_or_default() {
2377 SortField::CreatedAt => "m.created_at",
2378 SortField::UpdatedAt => "m.updated_at",
2379 SortField::LastAccessedAt => "m.last_accessed_at",
2380 SortField::Importance => "m.importance",
2381 SortField::AccessCount => "m.access_count",
2382 };
2383 let sort_order = match options.sort_order.unwrap_or_default() {
2384 SortOrder::Asc => "ASC",
2385 SortOrder::Desc => "DESC",
2386 };
2387 sql.push_str(&format!(" ORDER BY {} {}", sort_field, sort_order));
2388
2389 let limit = options.limit.unwrap_or(100);
2391 let offset = options.offset.unwrap_or(0);
2392 sql.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
2393
2394 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
2395 let mut stmt = conn.prepare(&sql)?;
2396
2397 let memories: Vec<CompactMemoryRow> = stmt
2398 .query_map(param_refs.as_slice(), |row| {
2399 let id: i64 = row.get("id")?;
2400 let content: String = row.get("content")?;
2401 let memory_type_str: String = row.get("memory_type")?;
2402 let importance: f32 = row.get("importance")?;
2403 let created_at_str: String = row.get("created_at")?;
2404 let updated_at_str: String = row.get("updated_at")?;
2405 let workspace: String = row.get("workspace")?;
2406 let tier_str: String = row.get("tier")?;
2407
2408 let memory_type = memory_type_str.parse().unwrap_or(MemoryType::Note);
2409 let tier = tier_str.parse().unwrap_or_default();
2410
2411 let (preview, truncated) = compact_preview(&content, max_preview);
2413 let content_length = content.len();
2414 let line_count = content.lines().count();
2415
2416 Ok(CompactMemoryRow {
2417 id,
2418 preview,
2419 truncated,
2420 memory_type,
2421 tags: vec![], importance,
2423 created_at: DateTime::parse_from_rfc3339(&created_at_str)
2424 .map(|dt| dt.with_timezone(&Utc))
2425 .unwrap_or_else(|_| Utc::now()),
2426 updated_at: DateTime::parse_from_rfc3339(&updated_at_str)
2427 .map(|dt| dt.with_timezone(&Utc))
2428 .unwrap_or_else(|_| Utc::now()),
2429 workspace,
2430 tier,
2431 content_length,
2432 line_count,
2433 })
2434 })?
2435 .filter_map(|r| r.ok())
2436 .map(|mut m| {
2437 m.tags = load_tags(conn, m.id).unwrap_or_default();
2438 m
2439 })
2440 .collect();
2441
2442 Ok(memories)
2443}
2444
2445pub fn get_stats(conn: &Connection) -> Result<StorageStats> {
2447 let total_memories: i64 = conn.query_row(
2448 "SELECT COUNT(*) FROM memories WHERE valid_to IS NULL",
2449 [],
2450 |row| row.get(0),
2451 )?;
2452
2453 let total_tags: i64 = conn.query_row("SELECT COUNT(*) FROM tags", [], |row| row.get(0))?;
2454
2455 let total_crossrefs: i64 = conn.query_row(
2456 "SELECT COUNT(*) FROM crossrefs WHERE valid_to IS NULL",
2457 [],
2458 |row| row.get(0),
2459 )?;
2460
2461 let total_versions: i64 =
2462 conn.query_row("SELECT COUNT(*) FROM memory_versions", [], |row| row.get(0))?;
2463
2464 let _total_identities: i64 =
2465 conn.query_row("SELECT COUNT(*) FROM identities", [], |row| row.get(0))?;
2466
2467 let _total_entities: i64 =
2468 conn.query_row("SELECT COUNT(*) FROM entities", [], |row| row.get(0))?;
2469
2470 let db_size_bytes: i64 = conn.query_row(
2471 "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
2472 [],
2473 |row| row.get(0),
2474 )?;
2475
2476 let _schema_version: i32 = conn
2477 .query_row("SELECT MAX(version) FROM schema_version", [], |row| {
2478 row.get(0)
2479 })
2480 .unwrap_or(0);
2481
2482 let mut workspace_stmt = conn.prepare(
2483 "SELECT workspace, COUNT(*) FROM memories WHERE valid_to IS NULL GROUP BY workspace",
2484 )?;
2485 let workspaces: HashMap<String, i64> = workspace_stmt
2486 .query_map([], |row| {
2487 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
2488 })?
2489 .filter_map(|r| r.ok())
2490 .collect();
2491
2492 let mut type_stmt = conn.prepare(
2493 "SELECT memory_type, COUNT(*) FROM memories WHERE valid_to IS NULL GROUP BY memory_type",
2494 )?;
2495 let type_counts: HashMap<String, i64> = type_stmt
2496 .query_map([], |row| {
2497 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
2498 })?
2499 .filter_map(|r| r.ok())
2500 .collect();
2501
2502 let mut tier_stmt = conn.prepare(
2503 "SELECT COALESCE(tier, 'permanent'), COUNT(*) FROM memories GROUP BY COALESCE(tier, 'permanent')",
2504 )?;
2505 let tier_counts: HashMap<String, i64> = tier_stmt
2506 .query_map([], |row| {
2507 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
2508 })?
2509 .filter_map(|r| r.ok())
2510 .collect();
2511
2512 let memories_with_embeddings: i64 = conn.query_row(
2513 "SELECT COUNT(*) FROM memories WHERE has_embedding = 1 AND valid_to IS NULL",
2514 [],
2515 |row| row.get(0),
2516 )?;
2517
2518 let memories_pending_embedding: i64 = conn.query_row(
2519 "SELECT COUNT(*) FROM embedding_queue WHERE status = 'pending'",
2520 [],
2521 |row| row.get(0),
2522 )?;
2523
2524 let (last_sync, sync_pending): (Option<String>, i64) = conn.query_row(
2525 "SELECT last_sync, pending_changes FROM sync_state WHERE id = 1",
2526 [],
2527 |row| Ok((row.get(0)?, row.get(1)?)),
2528 )?;
2529
2530 Ok(StorageStats {
2531 total_memories,
2532 total_tags,
2533 total_crossrefs,
2534 total_versions,
2535 total_identities: 0,
2536 total_entities: 0,
2537 db_size_bytes,
2538 memories_with_embeddings,
2539 memories_pending_embedding,
2540 last_sync: last_sync.and_then(|s| {
2541 DateTime::parse_from_rfc3339(&s)
2542 .map(|dt| dt.with_timezone(&Utc))
2543 .ok()
2544 }),
2545 sync_pending: sync_pending > 0,
2546 storage_mode: "sqlite".to_string(),
2547 schema_version: 0,
2548 workspaces,
2549 type_counts,
2550 tier_counts,
2551 })
2552}
2553
2554pub fn get_memory_versions(conn: &Connection, memory_id: i64) -> Result<Vec<MemoryVersion>> {
2556 let mut stmt = conn.prepare_cached(
2557 "SELECT version, content, tags, metadata, created_at, created_by, change_summary
2558 FROM memory_versions WHERE memory_id = ? ORDER BY version DESC",
2559 )?;
2560
2561 let versions: Vec<MemoryVersion> = stmt
2562 .query_map([memory_id], |row| {
2563 let tags_str: String = row.get("tags")?;
2564 let metadata_str: String = row.get("metadata")?;
2565 let created_at_str: String = row.get("created_at")?;
2566
2567 Ok(MemoryVersion {
2568 version: row.get("version")?,
2569 content: row.get("content")?,
2570 tags: serde_json::from_str(&tags_str).unwrap_or_default(),
2571 metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
2572 created_at: DateTime::parse_from_rfc3339(&created_at_str)
2573 .map(|dt| dt.with_timezone(&Utc))
2574 .unwrap_or_else(|_| Utc::now()),
2575 created_by: row.get("created_by")?,
2576 change_summary: row.get("change_summary")?,
2577 })
2578 })?
2579 .filter_map(|r| r.ok())
2580 .collect();
2581
2582 Ok(versions)
2583}
2584
2585#[derive(Debug, Clone, serde::Serialize)]
2591pub struct BatchCreateResult {
2592 pub created: Vec<Memory>,
2593 pub failed: Vec<BatchError>,
2594 pub total_created: usize,
2595 pub total_failed: usize,
2596}
2597
2598#[derive(Debug, Clone, serde::Serialize)]
2600pub struct BatchDeleteResult {
2601 pub deleted: Vec<i64>,
2602 pub failed: Vec<BatchError>,
2603 pub total_deleted: usize,
2604 pub total_failed: usize,
2605}
2606
2607#[derive(Debug, Clone, serde::Serialize)]
2609pub struct BatchError {
2610 pub index: usize,
2611 pub id: Option<i64>,
2612 pub error: String,
2613}
2614
2615pub fn create_memory_batch(
2617 conn: &Connection,
2618 inputs: &[CreateMemoryInput],
2619) -> Result<BatchCreateResult> {
2620 let mut created = Vec::new();
2621 let mut failed = Vec::new();
2622
2623 for (index, input) in inputs.iter().enumerate() {
2624 match create_memory(conn, input) {
2625 Ok(memory) => created.push(memory),
2626 Err(e) => failed.push(BatchError {
2627 index,
2628 id: None,
2629 error: e.to_string(),
2630 }),
2631 }
2632 }
2633
2634 Ok(BatchCreateResult {
2635 total_created: created.len(),
2636 total_failed: failed.len(),
2637 created,
2638 failed,
2639 })
2640}
2641
2642pub fn delete_memory_batch(conn: &Connection, ids: &[i64]) -> Result<BatchDeleteResult> {
2644 let mut deleted = Vec::new();
2645 let mut failed = Vec::new();
2646
2647 for (index, &id) in ids.iter().enumerate() {
2648 match delete_memory(conn, id) {
2649 Ok(()) => deleted.push(id),
2650 Err(e) => failed.push(BatchError {
2651 index,
2652 id: Some(id),
2653 error: e.to_string(),
2654 }),
2655 }
2656 }
2657
2658 Ok(BatchDeleteResult {
2659 total_deleted: deleted.len(),
2660 total_failed: failed.len(),
2661 deleted,
2662 failed,
2663 })
2664}
2665
2666#[derive(Debug, Clone, serde::Serialize)]
2672pub struct TagInfo {
2673 pub name: String,
2674 pub count: i64,
2675 pub last_used: Option<DateTime<Utc>>,
2676}
2677
2678pub fn list_tags(conn: &Connection) -> Result<Vec<TagInfo>> {
2680 let mut stmt = conn.prepare(
2681 r#"
2682 SELECT t.name, COUNT(mt.memory_id) as count,
2683 MAX(m.updated_at) as last_used
2684 FROM tags t
2685 LEFT JOIN memory_tags mt ON t.id = mt.tag_id
2686 LEFT JOIN memories m ON mt.memory_id = m.id AND m.valid_to IS NULL
2687 GROUP BY t.id, t.name
2688 ORDER BY count DESC, t.name ASC
2689 "#,
2690 )?;
2691
2692 let tags: Vec<TagInfo> = stmt
2693 .query_map([], |row| {
2694 let name: String = row.get(0)?;
2695 let count: i64 = row.get(1)?;
2696 let last_used: Option<String> = row.get(2)?;
2697
2698 Ok(TagInfo {
2699 name,
2700 count,
2701 last_used: last_used.and_then(|s| {
2702 DateTime::parse_from_rfc3339(&s)
2703 .map(|dt| dt.with_timezone(&Utc))
2704 .ok()
2705 }),
2706 })
2707 })?
2708 .filter_map(|r| r.ok())
2709 .collect();
2710
2711 Ok(tags)
2712}
2713
2714#[derive(Debug, Clone, serde::Serialize)]
2716pub struct TagHierarchyNode {
2717 pub name: String,
2718 pub full_path: String,
2719 pub count: i64,
2720 pub children: Vec<TagHierarchyNode>,
2721}
2722
2723pub fn get_tag_hierarchy(conn: &Connection) -> Result<Vec<TagHierarchyNode>> {
2725 let tags = list_tags(conn)?;
2726
2727 let mut root_nodes: HashMap<String, TagHierarchyNode> = HashMap::new();
2729
2730 for tag in tags {
2731 let parts: Vec<&str> = tag.name.split('/').collect();
2732 if parts.is_empty() {
2733 continue;
2734 }
2735
2736 let root_name = parts[0].to_string();
2737 if !root_nodes.contains_key(&root_name) {
2738 root_nodes.insert(
2739 root_name.clone(),
2740 TagHierarchyNode {
2741 name: root_name.clone(),
2742 full_path: root_name.clone(),
2743 count: 0,
2744 children: Vec::new(),
2745 },
2746 );
2747 }
2748
2749 if parts.len() == 1 {
2751 if let Some(node) = root_nodes.get_mut(&root_name) {
2752 node.count += tag.count;
2753 }
2754 } else {
2755 if let Some(node) = root_nodes.get_mut(&root_name) {
2758 node.count += tag.count;
2759 }
2760 }
2761 }
2762
2763 Ok(root_nodes.into_values().collect())
2764}
2765
2766#[derive(Debug, Clone, serde::Serialize)]
2768pub struct TagValidationResult {
2769 pub valid: bool,
2770 pub orphaned_tags: Vec<String>,
2771 pub empty_tags: Vec<String>,
2772 pub duplicate_assignments: Vec<(i64, String)>,
2773 pub total_tags: i64,
2774 pub total_assignments: i64,
2775}
2776
2777pub fn validate_tags(conn: &Connection) -> Result<TagValidationResult> {
2779 let orphaned: Vec<String> = conn
2781 .prepare(
2782 "SELECT t.name FROM tags t
2783 LEFT JOIN memory_tags mt ON t.id = mt.tag_id
2784 WHERE mt.tag_id IS NULL",
2785 )?
2786 .query_map([], |row| row.get(0))?
2787 .filter_map(|r| r.ok())
2788 .collect();
2789
2790 let empty: Vec<String> = conn
2792 .prepare("SELECT name FROM tags WHERE name = '' OR name IS NULL")?
2793 .query_map([], |row| row.get(0))?
2794 .filter_map(|r| r.ok())
2795 .collect();
2796
2797 let total_tags: i64 = conn.query_row("SELECT COUNT(*) FROM tags", [], |row| row.get(0))?;
2799 let total_assignments: i64 =
2800 conn.query_row("SELECT COUNT(*) FROM memory_tags", [], |row| row.get(0))?;
2801
2802 Ok(TagValidationResult {
2803 valid: orphaned.is_empty() && empty.is_empty(),
2804 orphaned_tags: orphaned,
2805 empty_tags: empty,
2806 duplicate_assignments: vec![], total_tags,
2808 total_assignments,
2809 })
2810}
2811
2812#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2818pub struct ExportedMemory {
2819 pub id: i64,
2820 pub content: String,
2821 pub memory_type: String,
2822 pub tags: Vec<String>,
2823 pub metadata: HashMap<String, serde_json::Value>,
2824 pub importance: f32,
2825 pub workspace: String,
2826 pub tier: String,
2827 pub created_at: String,
2828 pub updated_at: String,
2829}
2830
2831#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2833pub struct ExportData {
2834 pub version: String,
2835 pub exported_at: String,
2836 pub memory_count: usize,
2837 pub memories: Vec<ExportedMemory>,
2838}
2839
2840pub fn export_memories(conn: &Connection) -> Result<ExportData> {
2842 let memories = list_memories(
2843 conn,
2844 &ListOptions {
2845 limit: Some(100000),
2846 ..Default::default()
2847 },
2848 )?;
2849
2850 let exported: Vec<ExportedMemory> = memories
2851 .into_iter()
2852 .map(|m| ExportedMemory {
2853 id: m.id,
2854 content: m.content,
2855 memory_type: m.memory_type.as_str().to_string(),
2856 tags: m.tags,
2857 metadata: m.metadata,
2858 importance: m.importance,
2859 workspace: m.workspace,
2860 tier: m.tier.as_str().to_string(),
2861 created_at: m.created_at.to_rfc3339(),
2862 updated_at: m.updated_at.to_rfc3339(),
2863 })
2864 .collect();
2865
2866 Ok(ExportData {
2867 version: "1.0".to_string(),
2868 exported_at: Utc::now().to_rfc3339(),
2869 memory_count: exported.len(),
2870 memories: exported,
2871 })
2872}
2873
2874#[derive(Debug, Clone, serde::Serialize)]
2876pub struct ImportResult {
2877 pub imported: usize,
2878 pub skipped: usize,
2879 pub failed: usize,
2880 pub errors: Vec<String>,
2881}
2882
2883pub fn import_memories(
2885 conn: &Connection,
2886 data: &ExportData,
2887 skip_duplicates: bool,
2888) -> Result<ImportResult> {
2889 let mut imported = 0;
2890 let mut skipped = 0;
2891 let mut failed = 0;
2892 let mut errors = Vec::new();
2893
2894 for mem in &data.memories {
2895 let memory_type = mem.memory_type.parse().unwrap_or(MemoryType::Note);
2896 let tier = mem.tier.parse().unwrap_or(MemoryTier::Permanent);
2897
2898 let input = CreateMemoryInput {
2899 content: mem.content.clone(),
2900 memory_type,
2901 tags: mem.tags.clone(),
2902 metadata: mem.metadata.clone(),
2903 importance: Some(mem.importance),
2904 scope: MemoryScope::Global,
2905 workspace: Some(mem.workspace.clone()),
2906 tier,
2907 defer_embedding: false,
2908 ttl_seconds: None,
2909 dedup_mode: if skip_duplicates {
2910 DedupMode::Skip
2911 } else {
2912 DedupMode::Allow
2913 },
2914 dedup_threshold: None,
2915 event_time: None,
2916 event_duration_seconds: None,
2917 trigger_pattern: None,
2918 summary_of_id: None,
2919 media_url: None,
2920 };
2921
2922 match create_memory(conn, &input) {
2923 Ok(_) => imported += 1,
2924 Err(EngramError::Duplicate { .. }) => skipped += 1,
2925 Err(e) => {
2926 failed += 1;
2927 errors.push(format!("Failed to import memory {}: {}", mem.id, e));
2928 }
2929 }
2930 }
2931
2932 Ok(ImportResult {
2933 imported,
2934 skipped,
2935 failed,
2936 errors,
2937 })
2938}
2939
2940pub fn rebuild_embeddings(conn: &Connection) -> Result<i64> {
2946 let now = Utc::now().to_rfc3339();
2947
2948 conn.execute("DELETE FROM embedding_queue", [])?;
2950
2951 let count = conn.execute(
2953 "INSERT INTO embedding_queue (memory_id, status, queued_at)
2954 SELECT id, 'pending', ? FROM memories WHERE valid_to IS NULL",
2955 params![now],
2956 )?;
2957
2958 conn.execute(
2960 "UPDATE memories SET has_embedding = 0 WHERE valid_to IS NULL",
2961 [],
2962 )?;
2963
2964 Ok(count as i64)
2965}
2966
2967pub fn rebuild_crossrefs(conn: &Connection) -> Result<i64> {
2969 let now = Utc::now().to_rfc3339();
2970
2971 let deleted = conn.execute(
2973 "UPDATE crossrefs SET valid_to = ? WHERE source = 'auto' AND valid_to IS NULL",
2974 params![now],
2975 )?;
2976
2977 Ok(deleted as i64)
2981}
2982
2983pub fn create_section_memory(
2989 conn: &Connection,
2990 title: &str,
2991 content: &str,
2992 parent_id: Option<i64>,
2993 level: i32,
2994 workspace: Option<&str>,
2995) -> Result<Memory> {
2996 let mut metadata = HashMap::new();
2997 metadata.insert("section_title".to_string(), serde_json::json!(title));
2998 metadata.insert("section_level".to_string(), serde_json::json!(level));
2999 if let Some(pid) = parent_id {
3000 metadata.insert("parent_memory_id".to_string(), serde_json::json!(pid));
3001 }
3002
3003 let input = CreateMemoryInput {
3004 content: format!("# {}\n\n{}", title, content),
3005 memory_type: MemoryType::Context,
3006 tags: vec!["section".to_string()],
3007 metadata,
3008 importance: Some(0.6),
3009 scope: MemoryScope::Global,
3010 workspace: workspace.map(String::from),
3011 tier: MemoryTier::Permanent,
3012 defer_embedding: false,
3013 ttl_seconds: None,
3014 dedup_mode: DedupMode::Skip,
3015 dedup_threshold: None,
3016 event_time: None,
3017 event_duration_seconds: None,
3018 trigger_pattern: None,
3019 summary_of_id: None,
3020 media_url: None,
3021 };
3022
3023 create_memory(conn, &input)
3024}
3025
3026pub fn create_checkpoint(
3028 conn: &Connection,
3029 session_id: &str,
3030 summary: &str,
3031 context: &HashMap<String, serde_json::Value>,
3032 workspace: Option<&str>,
3033) -> Result<Memory> {
3034 let mut metadata = context.clone();
3035 metadata.insert(
3036 "checkpoint_session".to_string(),
3037 serde_json::json!(session_id),
3038 );
3039 metadata.insert(
3040 "checkpoint_time".to_string(),
3041 serde_json::json!(Utc::now().to_rfc3339()),
3042 );
3043
3044 let input = CreateMemoryInput {
3045 content: format!("Session Checkpoint: {}\n\n{}", session_id, summary),
3046 memory_type: MemoryType::Context,
3047 tags: vec!["checkpoint".to_string(), format!("session:{}", session_id)],
3048 metadata,
3049 importance: Some(0.7),
3050 scope: MemoryScope::Global,
3051 workspace: workspace.map(String::from),
3052 tier: MemoryTier::Permanent,
3053 defer_embedding: false,
3054 ttl_seconds: None,
3055 dedup_mode: DedupMode::Allow,
3056 dedup_threshold: None,
3057 event_time: None,
3058 event_duration_seconds: None,
3059 trigger_pattern: None,
3060 summary_of_id: None,
3061 media_url: None,
3062 };
3063
3064 create_memory(conn, &input)
3065}
3066
3067pub fn boost_memory(
3069 conn: &Connection,
3070 id: i64,
3071 boost_amount: f32,
3072 duration_seconds: Option<i64>,
3073) -> Result<Memory> {
3074 let memory = get_memory(conn, id)?;
3075 let new_importance = (memory.importance + boost_amount).min(1.0);
3076 let now = Utc::now();
3077
3078 conn.execute(
3080 "UPDATE memories SET importance = ?, updated_at = ? WHERE id = ?",
3081 params![new_importance, now.to_rfc3339(), id],
3082 )?;
3083
3084 if let Some(duration) = duration_seconds {
3086 let expires = now + chrono::Duration::seconds(duration);
3087 let mut metadata = memory.metadata.clone();
3088 metadata.insert(
3089 "boost_expires".to_string(),
3090 serde_json::json!(expires.to_rfc3339()),
3091 );
3092 metadata.insert(
3093 "boost_original_importance".to_string(),
3094 serde_json::json!(memory.importance),
3095 );
3096
3097 let metadata_json = serde_json::to_string(&metadata)?;
3098 conn.execute(
3099 "UPDATE memories SET metadata = ? WHERE id = ?",
3100 params![metadata_json, id],
3101 )?;
3102 }
3103
3104 get_memory(conn, id)
3105}
3106
3107#[derive(Debug, Clone, Serialize, Deserialize)]
3113pub enum MemoryEventType {
3114 Created,
3115 Updated,
3116 Deleted,
3117 Linked,
3118 Unlinked,
3119 Shared,
3120 Synced,
3121}
3122
3123impl std::fmt::Display for MemoryEventType {
3124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3125 match self {
3126 MemoryEventType::Created => write!(f, "created"),
3127 MemoryEventType::Updated => write!(f, "updated"),
3128 MemoryEventType::Deleted => write!(f, "deleted"),
3129 MemoryEventType::Linked => write!(f, "linked"),
3130 MemoryEventType::Unlinked => write!(f, "unlinked"),
3131 MemoryEventType::Shared => write!(f, "shared"),
3132 MemoryEventType::Synced => write!(f, "synced"),
3133 }
3134 }
3135}
3136
3137impl std::str::FromStr for MemoryEventType {
3138 type Err = EngramError;
3139 fn from_str(s: &str) -> Result<Self> {
3140 match s {
3141 "created" => Ok(MemoryEventType::Created),
3142 "updated" => Ok(MemoryEventType::Updated),
3143 "deleted" => Ok(MemoryEventType::Deleted),
3144 "linked" => Ok(MemoryEventType::Linked),
3145 "unlinked" => Ok(MemoryEventType::Unlinked),
3146 "shared" => Ok(MemoryEventType::Shared),
3147 "synced" => Ok(MemoryEventType::Synced),
3148 _ => Err(EngramError::InvalidInput(format!(
3149 "Invalid event type: {}",
3150 s
3151 ))),
3152 }
3153 }
3154}
3155
3156#[derive(Debug, Clone, Serialize, Deserialize)]
3158pub struct MemoryEvent {
3159 pub id: i64,
3160 pub event_type: String,
3161 pub memory_id: Option<i64>,
3162 pub agent_id: Option<String>,
3163 pub data: serde_json::Value,
3164 pub created_at: DateTime<Utc>,
3165}
3166
3167pub fn record_event(
3169 conn: &Connection,
3170 event_type: MemoryEventType,
3171 memory_id: Option<i64>,
3172 agent_id: Option<&str>,
3173 data: serde_json::Value,
3174) -> Result<i64> {
3175 let now = Utc::now();
3176 let data_json = serde_json::to_string(&data)?;
3177
3178 conn.execute(
3179 "INSERT INTO memory_events (event_type, memory_id, agent_id, data, created_at)
3180 VALUES (?, ?, ?, ?, ?)",
3181 params![
3182 event_type.to_string(),
3183 memory_id,
3184 agent_id,
3185 data_json,
3186 now.to_rfc3339()
3187 ],
3188 )?;
3189
3190 Ok(conn.last_insert_rowid())
3191}
3192
3193pub fn poll_events(
3195 conn: &Connection,
3196 since_id: Option<i64>,
3197 since_time: Option<DateTime<Utc>>,
3198 agent_id: Option<&str>,
3199 limit: Option<usize>,
3200) -> Result<Vec<MemoryEvent>> {
3201 let limit = limit.unwrap_or(100);
3202
3203 let (query, params): (&str, Vec<Box<dyn rusqlite::ToSql>>) =
3204 match (since_id, since_time, agent_id) {
3205 (Some(id), _, Some(agent)) => (
3206 "SELECT id, event_type, memory_id, agent_id, data, created_at
3207 FROM memory_events WHERE id > ? AND (agent_id = ? OR agent_id IS NULL)
3208 ORDER BY id ASC LIMIT ?",
3209 vec![
3210 Box::new(id),
3211 Box::new(agent.to_string()),
3212 Box::new(limit as i64),
3213 ],
3214 ),
3215 (Some(id), _, None) => (
3216 "SELECT id, event_type, memory_id, agent_id, data, created_at
3217 FROM memory_events WHERE id > ?
3218 ORDER BY id ASC LIMIT ?",
3219 vec![Box::new(id), Box::new(limit as i64)],
3220 ),
3221 (None, Some(time), Some(agent)) => (
3222 "SELECT id, event_type, memory_id, agent_id, data, created_at
3223 FROM memory_events WHERE created_at > ? AND (agent_id = ? OR agent_id IS NULL)
3224 ORDER BY id ASC LIMIT ?",
3225 vec![
3226 Box::new(time.to_rfc3339()),
3227 Box::new(agent.to_string()),
3228 Box::new(limit as i64),
3229 ],
3230 ),
3231 (None, Some(time), None) => (
3232 "SELECT id, event_type, memory_id, agent_id, data, created_at
3233 FROM memory_events WHERE created_at > ?
3234 ORDER BY id ASC LIMIT ?",
3235 vec![Box::new(time.to_rfc3339()), Box::new(limit as i64)],
3236 ),
3237 (None, None, Some(agent)) => (
3238 "SELECT id, event_type, memory_id, agent_id, data, created_at
3239 FROM memory_events WHERE agent_id = ? OR agent_id IS NULL
3240 ORDER BY id DESC LIMIT ?",
3241 vec![Box::new(agent.to_string()), Box::new(limit as i64)],
3242 ),
3243 (None, None, None) => (
3244 "SELECT id, event_type, memory_id, agent_id, data, created_at
3245 FROM memory_events ORDER BY id DESC LIMIT ?",
3246 vec![Box::new(limit as i64)],
3247 ),
3248 };
3249
3250 let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
3251 let mut stmt = conn.prepare(query)?;
3252 let events = stmt
3253 .query_map(params_refs.as_slice(), |row| {
3254 let data_str: String = row.get(4)?;
3255 let created_str: String = row.get(5)?;
3256 Ok(MemoryEvent {
3257 id: row.get(0)?,
3258 event_type: row.get(1)?,
3259 memory_id: row.get(2)?,
3260 agent_id: row.get(3)?,
3261 data: serde_json::from_str(&data_str).unwrap_or(serde_json::json!({})),
3262 created_at: DateTime::parse_from_rfc3339(&created_str)
3263 .map(|dt| dt.with_timezone(&Utc))
3264 .unwrap_or_else(|_| Utc::now()),
3265 })
3266 })?
3267 .collect::<std::result::Result<Vec<_>, _>>()?;
3268
3269 Ok(events)
3270}
3271
3272pub fn clear_events(
3274 conn: &Connection,
3275 before_id: Option<i64>,
3276 before_time: Option<DateTime<Utc>>,
3277 keep_recent: Option<usize>,
3278) -> Result<i64> {
3279 let deleted = if let Some(id) = before_id {
3280 conn.execute("DELETE FROM memory_events WHERE id < ?", params![id])?
3281 } else if let Some(time) = before_time {
3282 conn.execute(
3283 "DELETE FROM memory_events WHERE created_at < ?",
3284 params![time.to_rfc3339()],
3285 )?
3286 } else if let Some(keep) = keep_recent {
3287 conn.execute(
3289 "DELETE FROM memory_events WHERE id NOT IN (
3290 SELECT id FROM memory_events ORDER BY id DESC LIMIT ?
3291 )",
3292 params![keep as i64],
3293 )?
3294 } else {
3295 conn.execute("DELETE FROM memory_events", [])?
3297 };
3298
3299 Ok(deleted as i64)
3300}
3301
3302#[derive(Debug, Clone, Serialize, Deserialize)]
3308pub struct SyncVersion {
3309 pub version: i64,
3310 pub last_modified: DateTime<Utc>,
3311 pub memory_count: i64,
3312 pub checksum: String,
3313}
3314
3315#[derive(Debug, Clone, Serialize, Deserialize)]
3317pub struct SyncTask {
3318 pub task_id: String,
3319 pub task_type: String,
3320 pub status: String,
3321 pub progress_percent: i32,
3322 pub traces_processed: i64,
3323 pub memories_created: i64,
3324 pub error_message: Option<String>,
3325 pub started_at: String,
3326 pub completed_at: Option<String>,
3327}
3328
3329pub fn get_sync_version(conn: &Connection) -> Result<SyncVersion> {
3331 let memory_count: i64 =
3332 conn.query_row("SELECT COUNT(*) FROM memories", [], |row| row.get(0))?;
3333
3334 let last_modified: Option<String> = conn
3335 .query_row("SELECT MAX(updated_at) FROM memories", [], |row| row.get(0))
3336 .ok();
3337
3338 let version: i64 = conn
3339 .query_row("SELECT MAX(version) FROM sync_state", [], |row| row.get(0))
3340 .unwrap_or(0);
3341
3342 let checksum = format!(
3344 "{}-{}-{}",
3345 memory_count,
3346 version,
3347 last_modified.as_deref().unwrap_or("none")
3348 );
3349
3350 Ok(SyncVersion {
3351 version,
3352 last_modified: last_modified
3353 .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
3354 .map(|dt| dt.with_timezone(&Utc))
3355 .unwrap_or_else(Utc::now),
3356 memory_count,
3357 checksum,
3358 })
3359}
3360
3361pub fn upsert_sync_task(conn: &Connection, task: &SyncTask) -> Result<()> {
3363 conn.execute(
3364 r#"
3365 INSERT INTO sync_tasks (
3366 task_id, task_type, status, progress_percent, traces_processed, memories_created,
3367 error_message, started_at, completed_at
3368 )
3369 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
3370 ON CONFLICT(task_id) DO UPDATE SET
3371 task_type = excluded.task_type,
3372 status = excluded.status,
3373 progress_percent = excluded.progress_percent,
3374 traces_processed = excluded.traces_processed,
3375 memories_created = excluded.memories_created,
3376 error_message = excluded.error_message,
3377 started_at = excluded.started_at,
3378 completed_at = excluded.completed_at
3379 "#,
3380 params![
3381 task.task_id,
3382 task.task_type,
3383 task.status,
3384 task.progress_percent,
3385 task.traces_processed,
3386 task.memories_created,
3387 task.error_message,
3388 task.started_at,
3389 task.completed_at
3390 ],
3391 )?;
3392
3393 Ok(())
3394}
3395
3396pub fn get_sync_task(conn: &Connection, task_id: &str) -> Result<Option<SyncTask>> {
3398 let mut stmt = conn.prepare(
3399 r#"
3400 SELECT task_id, task_type, status, progress_percent, traces_processed, memories_created,
3401 error_message, started_at, completed_at
3402 FROM sync_tasks
3403 WHERE task_id = ?
3404 "#,
3405 )?;
3406
3407 let mut rows = stmt.query(params![task_id])?;
3408 if let Some(row) = rows.next()? {
3409 Ok(Some(SyncTask {
3410 task_id: row.get("task_id")?,
3411 task_type: row.get("task_type")?,
3412 status: row.get("status")?,
3413 progress_percent: row.get("progress_percent")?,
3414 traces_processed: row.get("traces_processed")?,
3415 memories_created: row.get("memories_created")?,
3416 error_message: row.get("error_message")?,
3417 started_at: row.get("started_at")?,
3418 completed_at: row.get("completed_at")?,
3419 }))
3420 } else {
3421 Ok(None)
3422 }
3423}
3424
3425#[derive(Debug, Clone, Serialize, Deserialize)]
3427pub struct SyncDelta {
3428 pub created: Vec<Memory>,
3429 pub updated: Vec<Memory>,
3430 pub deleted: Vec<i64>,
3431 pub from_version: i64,
3432 pub to_version: i64,
3433}
3434
3435pub fn get_sync_delta(conn: &Connection, since_version: i64) -> Result<SyncDelta> {
3437 let current_version = get_sync_version(conn)?.version;
3438
3439 let events = poll_events(conn, Some(since_version), None, None, Some(10000))?;
3441
3442 let mut created_ids = std::collections::HashSet::new();
3443 let mut updated_ids = std::collections::HashSet::new();
3444 let mut deleted_ids = std::collections::HashSet::new();
3445
3446 for event in events {
3447 if let Some(memory_id) = event.memory_id {
3448 match event.event_type.as_str() {
3449 "created" => {
3450 created_ids.insert(memory_id);
3451 }
3452 "updated" => {
3453 if !created_ids.contains(&memory_id) {
3454 updated_ids.insert(memory_id);
3455 }
3456 }
3457 "deleted" => {
3458 created_ids.remove(&memory_id);
3459 updated_ids.remove(&memory_id);
3460 deleted_ids.insert(memory_id);
3461 }
3462 _ => {}
3463 }
3464 }
3465 }
3466
3467 let created: Vec<Memory> = created_ids
3468 .iter()
3469 .filter_map(|id| get_memory(conn, *id).ok())
3470 .collect();
3471
3472 let updated: Vec<Memory> = updated_ids
3473 .iter()
3474 .filter_map(|id| get_memory(conn, *id).ok())
3475 .collect();
3476
3477 Ok(SyncDelta {
3478 created,
3479 updated,
3480 deleted: deleted_ids.into_iter().collect(),
3481 from_version: since_version,
3482 to_version: current_version,
3483 })
3484}
3485
3486#[derive(Debug, Clone, Serialize, Deserialize)]
3488pub struct AgentSyncState {
3489 pub agent_id: String,
3490 pub last_sync_version: i64,
3491 pub last_sync_time: DateTime<Utc>,
3492 pub pending_changes: i64,
3493}
3494
3495pub fn get_agent_sync_state(conn: &Connection, agent_id: &str) -> Result<AgentSyncState> {
3497 let result: std::result::Result<(i64, String), rusqlite::Error> = conn.query_row(
3498 "SELECT last_sync_version, last_sync_time FROM agent_sync_state WHERE agent_id = ?",
3499 params![agent_id],
3500 |row| Ok((row.get(0)?, row.get(1)?)),
3501 );
3502
3503 match result {
3504 Ok((version, time_str)) => {
3505 let current_version = get_sync_version(conn)?.version;
3506 let pending = (current_version - version).max(0);
3507
3508 Ok(AgentSyncState {
3509 agent_id: agent_id.to_string(),
3510 last_sync_version: version,
3511 last_sync_time: DateTime::parse_from_rfc3339(&time_str)
3512 .map(|dt| dt.with_timezone(&Utc))
3513 .unwrap_or_else(|_| Utc::now()),
3514 pending_changes: pending,
3515 })
3516 }
3517 Err(_) => {
3518 Ok(AgentSyncState {
3520 agent_id: agent_id.to_string(),
3521 last_sync_version: 0,
3522 last_sync_time: Utc::now(),
3523 pending_changes: get_sync_version(conn)?.version,
3524 })
3525 }
3526 }
3527}
3528
3529pub fn update_agent_sync_state(conn: &Connection, agent_id: &str, version: i64) -> Result<()> {
3531 let now = Utc::now();
3532 conn.execute(
3533 "INSERT INTO agent_sync_state (agent_id, last_sync_version, last_sync_time)
3534 VALUES (?, ?, ?)
3535 ON CONFLICT(agent_id) DO UPDATE SET
3536 last_sync_version = excluded.last_sync_version,
3537 last_sync_time = excluded.last_sync_time",
3538 params![agent_id, version, now.to_rfc3339()],
3539 )?;
3540 Ok(())
3541}
3542
3543pub fn cleanup_sync_data(conn: &Connection, older_than_days: i64) -> Result<i64> {
3545 let cutoff = Utc::now() - chrono::Duration::days(older_than_days);
3546 let deleted = conn.execute(
3547 "DELETE FROM memory_events WHERE created_at < ?",
3548 params![cutoff.to_rfc3339()],
3549 )?;
3550 Ok(deleted as i64)
3551}
3552
3553#[derive(Debug, Clone, Serialize, Deserialize)]
3559pub struct SharedMemory {
3560 pub id: i64,
3561 pub memory_id: i64,
3562 pub from_agent: String,
3563 pub to_agent: String,
3564 pub message: Option<String>,
3565 pub acknowledged: bool,
3566 pub acknowledged_at: Option<DateTime<Utc>>,
3567 pub created_at: DateTime<Utc>,
3568}
3569
3570pub fn share_memory(
3572 conn: &Connection,
3573 memory_id: i64,
3574 from_agent: &str,
3575 to_agent: &str,
3576 message: Option<&str>,
3577) -> Result<i64> {
3578 let now = Utc::now();
3579
3580 let _ = get_memory(conn, memory_id)?;
3582
3583 conn.execute(
3584 "INSERT INTO shared_memories (memory_id, from_agent, to_agent, message, acknowledged, created_at)
3585 VALUES (?, ?, ?, ?, 0, ?)",
3586 params![memory_id, from_agent, to_agent, message, now.to_rfc3339()],
3587 )?;
3588
3589 let share_id = conn.last_insert_rowid();
3590
3591 record_event(
3593 conn,
3594 MemoryEventType::Shared,
3595 Some(memory_id),
3596 Some(from_agent),
3597 serde_json::json!({
3598 "to_agent": to_agent,
3599 "share_id": share_id,
3600 "message": message
3601 }),
3602 )?;
3603
3604 Ok(share_id)
3605}
3606
3607pub fn poll_shared_memories(
3609 conn: &Connection,
3610 to_agent: &str,
3611 include_acknowledged: bool,
3612) -> Result<Vec<SharedMemory>> {
3613 let query = if include_acknowledged {
3614 "SELECT id, memory_id, from_agent, to_agent, message, acknowledged, acknowledged_at, created_at
3615 FROM shared_memories WHERE to_agent = ?
3616 ORDER BY created_at DESC"
3617 } else {
3618 "SELECT id, memory_id, from_agent, to_agent, message, acknowledged, acknowledged_at, created_at
3619 FROM shared_memories WHERE to_agent = ? AND acknowledged = 0
3620 ORDER BY created_at DESC"
3621 };
3622
3623 let mut stmt = conn.prepare(query)?;
3624 let shares = stmt
3625 .query_map(params![to_agent], |row| {
3626 let created_str: String = row.get(7)?;
3627 let ack_str: Option<String> = row.get(6)?;
3628 Ok(SharedMemory {
3629 id: row.get(0)?,
3630 memory_id: row.get(1)?,
3631 from_agent: row.get(2)?,
3632 to_agent: row.get(3)?,
3633 message: row.get(4)?,
3634 acknowledged: row.get(5)?,
3635 acknowledged_at: ack_str.and_then(|s| {
3636 DateTime::parse_from_rfc3339(&s)
3637 .ok()
3638 .map(|dt| dt.with_timezone(&Utc))
3639 }),
3640 created_at: DateTime::parse_from_rfc3339(&created_str)
3641 .map(|dt| dt.with_timezone(&Utc))
3642 .unwrap_or_else(|_| Utc::now()),
3643 })
3644 })?
3645 .collect::<std::result::Result<Vec<_>, _>>()?;
3646
3647 Ok(shares)
3648}
3649
3650pub fn acknowledge_share(conn: &Connection, share_id: i64, agent_id: &str) -> Result<()> {
3652 let now = Utc::now();
3653
3654 let affected = conn.execute(
3655 "UPDATE shared_memories SET acknowledged = 1, acknowledged_at = ?
3656 WHERE id = ? AND to_agent = ?",
3657 params![now.to_rfc3339(), share_id, agent_id],
3658 )?;
3659
3660 if affected == 0 {
3661 return Err(EngramError::NotFound(share_id));
3662 }
3663
3664 Ok(())
3665}
3666
3667pub fn search_by_identity(
3673 conn: &Connection,
3674 identity: &str,
3675 workspace: Option<&str>,
3676 limit: Option<usize>,
3677) -> Result<Vec<Memory>> {
3678 let limit = limit.unwrap_or(50);
3679 let now = Utc::now().to_rfc3339();
3680
3681 let pattern = format!("%{}%", identity);
3684
3685 let query = if workspace.is_some() {
3686 "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
3687 m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
3688 m.visibility, m.version, m.has_embedding, m.metadata,
3689 m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash,
3690 m.event_time, m.event_duration_seconds, m.trigger_pattern, m.procedure_success_count,
3691 m.procedure_failure_count, m.summary_of_id, m.lifecycle_state, m.media_url
3692 FROM memories m
3693 LEFT JOIN memory_tags mt ON m.id = mt.memory_id
3694 LEFT JOIN tags t ON mt.tag_id = t.id
3695 WHERE m.workspace = ? AND (m.content LIKE ? OR t.name LIKE ?)
3696 AND m.valid_to IS NULL
3697 AND (m.expires_at IS NULL OR m.expires_at > ?)
3698 ORDER BY m.importance DESC, m.created_at DESC
3699 LIMIT ?"
3700 } else {
3701 "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
3702 m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
3703 m.visibility, m.version, m.has_embedding, m.metadata,
3704 m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash,
3705 m.event_time, m.event_duration_seconds, m.trigger_pattern, m.procedure_success_count,
3706 m.procedure_failure_count, m.summary_of_id, m.lifecycle_state, m.media_url
3707 FROM memories m
3708 LEFT JOIN memory_tags mt ON m.id = mt.memory_id
3709 LEFT JOIN tags t ON mt.tag_id = t.id
3710 WHERE (m.content LIKE ? OR t.name LIKE ?)
3711 AND m.valid_to IS NULL
3712 AND (m.expires_at IS NULL OR m.expires_at > ?)
3713 ORDER BY m.importance DESC, m.created_at DESC
3714 LIMIT ?"
3715 };
3716
3717 let mut stmt = conn.prepare(query)?;
3718
3719 let memories = if let Some(ws) = workspace {
3720 stmt.query_map(
3721 params![ws, &pattern, &pattern, &now, limit as i64],
3722 memory_from_row,
3723 )?
3724 .collect::<std::result::Result<Vec<_>, _>>()?
3725 } else {
3726 stmt.query_map(
3727 params![&pattern, &pattern, &now, limit as i64],
3728 memory_from_row,
3729 )?
3730 .collect::<std::result::Result<Vec<_>, _>>()?
3731 };
3732
3733 Ok(memories)
3734}
3735
3736pub fn search_sessions(
3738 conn: &Connection,
3739 query_text: &str,
3740 session_id: Option<&str>,
3741 workspace: Option<&str>,
3742 limit: Option<usize>,
3743) -> Result<Vec<Memory>> {
3744 let limit = limit.unwrap_or(20);
3745 let now = Utc::now().to_rfc3339();
3746 let pattern = format!("%{}%", query_text);
3747
3748 let mut conditions = vec![
3751 "m.memory_type = 'transcript_chunk'",
3752 "m.valid_to IS NULL",
3753 "(m.expires_at IS NULL OR m.expires_at > ?)",
3754 ];
3755 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
3756
3757 let use_tag_join = session_id.is_some();
3759 if let Some(sid) = session_id {
3760 let tag_name = format!("session:{}", sid);
3761 conditions.push("t.name = ?");
3762 params_vec.push(Box::new(tag_name));
3763 }
3764
3765 if let Some(ws) = workspace {
3767 conditions.push("m.workspace = ?");
3768 params_vec.push(Box::new(ws.to_string()));
3769 }
3770
3771 conditions.push("m.content LIKE ?");
3773 params_vec.push(Box::new(pattern));
3774
3775 params_vec.push(Box::new(limit as i64));
3777
3778 let join_clause = if use_tag_join {
3780 "JOIN memory_tags mt ON m.id = mt.memory_id JOIN tags t ON mt.tag_id = t.id"
3781 } else {
3782 ""
3783 };
3784
3785 let query = format!(
3786 "SELECT DISTINCT m.id, m.content, m.memory_type, m.importance, m.access_count,
3787 m.created_at, m.updated_at, m.last_accessed_at, m.owner_id,
3788 m.visibility, m.version, m.has_embedding, m.metadata,
3789 m.scope_type, m.scope_id, m.workspace, m.tier, m.expires_at, m.content_hash,
3790 m.event_time, m.event_duration_seconds, m.trigger_pattern, m.procedure_success_count,
3791 m.procedure_failure_count, m.summary_of_id, m.lifecycle_state, m.media_url
3792 FROM memories m {} WHERE {} ORDER BY m.created_at DESC LIMIT ?",
3793 join_clause,
3794 conditions.join(" AND ")
3795 );
3796
3797 let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|p| p.as_ref()).collect();
3798 let mut stmt = conn.prepare(&query)?;
3799 let memories = stmt
3800 .query_map(params_refs.as_slice(), memory_from_row)?
3801 .collect::<std::result::Result<Vec<_>, _>>()?;
3802
3803 Ok(memories)
3804}
3805
3806#[cfg(test)]
3807mod tests {
3808 use super::*;
3809 use crate::storage::Storage;
3810 use serde_json::json;
3811 use std::collections::HashMap;
3812
3813 #[test]
3814 fn test_list_memories_metadata_filter_types() {
3815 let storage = Storage::open_in_memory().unwrap();
3816
3817 storage
3818 .with_connection(|conn| {
3819 let mut metadata1 = HashMap::new();
3820 metadata1.insert("status".to_string(), json!("active"));
3821 metadata1.insert("count".to_string(), json!(3));
3822 metadata1.insert("flag".to_string(), json!(true));
3823
3824 let mut metadata2 = HashMap::new();
3825 metadata2.insert("status".to_string(), json!("inactive"));
3826 metadata2.insert("count".to_string(), json!(5));
3827 metadata2.insert("flag".to_string(), json!(false));
3828 metadata2.insert("optional".to_string(), json!("set"));
3829
3830 let memory1 = create_memory(
3831 conn,
3832 &CreateMemoryInput {
3833 content: "First".to_string(),
3834 memory_type: MemoryType::Note,
3835 tags: vec![],
3836 metadata: metadata1,
3837 importance: None,
3838 scope: Default::default(),
3839 workspace: None,
3840 tier: Default::default(),
3841 defer_embedding: true,
3842 ttl_seconds: None,
3843 dedup_mode: Default::default(),
3844 dedup_threshold: None,
3845 event_time: None,
3846 event_duration_seconds: None,
3847 trigger_pattern: None,
3848 summary_of_id: None,
3849 media_url: None,
3850 },
3851 )?;
3852 let memory2 = create_memory(
3853 conn,
3854 &CreateMemoryInput {
3855 content: "Second".to_string(),
3856 memory_type: MemoryType::Note,
3857 tags: vec![],
3858 metadata: metadata2,
3859 importance: None,
3860 scope: Default::default(),
3861 workspace: None,
3862 tier: Default::default(),
3863 defer_embedding: true,
3864 ttl_seconds: None,
3865 dedup_mode: Default::default(),
3866 dedup_threshold: None,
3867 event_time: None,
3868 event_duration_seconds: None,
3869 trigger_pattern: None,
3870 summary_of_id: None,
3871 media_url: None,
3872 },
3873 )?;
3874
3875 let mut filter = HashMap::new();
3876 filter.insert("status".to_string(), json!("active"));
3877 let results = list_memories(
3878 conn,
3879 &ListOptions {
3880 metadata_filter: Some(filter),
3881 ..Default::default()
3882 },
3883 )?;
3884 assert_eq!(results.len(), 1);
3885 assert_eq!(results[0].id, memory1.id);
3886
3887 let mut filter = HashMap::new();
3888 filter.insert("count".to_string(), json!(5));
3889 let results = list_memories(
3890 conn,
3891 &ListOptions {
3892 metadata_filter: Some(filter),
3893 ..Default::default()
3894 },
3895 )?;
3896 assert_eq!(results.len(), 1);
3897 assert_eq!(results[0].id, memory2.id);
3898
3899 let mut filter = HashMap::new();
3900 filter.insert("flag".to_string(), json!(true));
3901 let results = list_memories(
3902 conn,
3903 &ListOptions {
3904 metadata_filter: Some(filter),
3905 ..Default::default()
3906 },
3907 )?;
3908 assert_eq!(results.len(), 1);
3909 assert_eq!(results[0].id, memory1.id);
3910
3911 let mut filter = HashMap::new();
3912 filter.insert("optional".to_string(), serde_json::Value::Null);
3913 let results = list_memories(
3914 conn,
3915 &ListOptions {
3916 metadata_filter: Some(filter),
3917 ..Default::default()
3918 },
3919 )?;
3920 assert_eq!(results.len(), 1);
3921 assert_eq!(results[0].id, memory1.id);
3922
3923 Ok(())
3924 })
3925 .unwrap();
3926 }
3927
3928 #[test]
3929 fn test_memory_scope_isolation() {
3930 use crate::types::MemoryScope;
3931
3932 let storage = Storage::open_in_memory().unwrap();
3933
3934 storage
3935 .with_connection(|conn| {
3936 let user1_memory = create_memory(
3938 conn,
3939 &CreateMemoryInput {
3940 content: "User 1 memory".to_string(),
3941 memory_type: MemoryType::Note,
3942 tags: vec!["test".to_string()],
3943 metadata: HashMap::new(),
3944 importance: None,
3945 scope: MemoryScope::user("user-1"),
3946 workspace: None,
3947 tier: Default::default(),
3948 defer_embedding: true,
3949 ttl_seconds: None,
3950 dedup_mode: Default::default(),
3951 dedup_threshold: None,
3952 event_time: None,
3953 event_duration_seconds: None,
3954 trigger_pattern: None,
3955 summary_of_id: None,
3956 media_url: None,
3957 },
3958 )?;
3959
3960 let user2_memory = create_memory(
3962 conn,
3963 &CreateMemoryInput {
3964 content: "User 2 memory".to_string(),
3965 memory_type: MemoryType::Note,
3966 tags: vec!["test".to_string()],
3967 metadata: HashMap::new(),
3968 importance: None,
3969 scope: MemoryScope::user("user-2"),
3970 workspace: None,
3971 tier: Default::default(),
3972 defer_embedding: true,
3973 ttl_seconds: None,
3974 dedup_mode: Default::default(),
3975 dedup_threshold: None,
3976 event_time: None,
3977 event_duration_seconds: None,
3978 trigger_pattern: None,
3979 summary_of_id: None,
3980 media_url: None,
3981 },
3982 )?;
3983
3984 let session_memory = create_memory(
3986 conn,
3987 &CreateMemoryInput {
3988 content: "Session memory".to_string(),
3989 memory_type: MemoryType::Note,
3990 tags: vec!["test".to_string()],
3991 metadata: HashMap::new(),
3992 importance: None,
3993 scope: MemoryScope::session("session-abc"),
3994 workspace: None,
3995 tier: Default::default(),
3996 defer_embedding: true,
3997 ttl_seconds: None,
3998 dedup_mode: Default::default(),
3999 dedup_threshold: None,
4000 event_time: None,
4001 event_duration_seconds: None,
4002 trigger_pattern: None,
4003 summary_of_id: None,
4004 media_url: None,
4005 },
4006 )?;
4007
4008 let global_memory = create_memory(
4010 conn,
4011 &CreateMemoryInput {
4012 content: "Global memory".to_string(),
4013 memory_type: MemoryType::Note,
4014 tags: vec!["test".to_string()],
4015 metadata: HashMap::new(),
4016 importance: None,
4017 scope: MemoryScope::Global,
4018 workspace: None,
4019 tier: Default::default(),
4020 defer_embedding: true,
4021 ttl_seconds: None,
4022 dedup_mode: Default::default(),
4023 dedup_threshold: None,
4024 event_time: None,
4025 event_duration_seconds: None,
4026 trigger_pattern: None,
4027 summary_of_id: None,
4028 media_url: None,
4029 },
4030 )?;
4031
4032 let all_results = list_memories(conn, &ListOptions::default())?;
4034 assert_eq!(all_results.len(), 4);
4035
4036 let user1_results = list_memories(
4038 conn,
4039 &ListOptions {
4040 scope: Some(MemoryScope::user("user-1")),
4041 ..Default::default()
4042 },
4043 )?;
4044 assert_eq!(user1_results.len(), 1);
4045 assert_eq!(user1_results[0].id, user1_memory.id);
4046 assert_eq!(user1_results[0].scope, MemoryScope::user("user-1"));
4047
4048 let user2_results = list_memories(
4050 conn,
4051 &ListOptions {
4052 scope: Some(MemoryScope::user("user-2")),
4053 ..Default::default()
4054 },
4055 )?;
4056 assert_eq!(user2_results.len(), 1);
4057 assert_eq!(user2_results[0].id, user2_memory.id);
4058
4059 let session_results = list_memories(
4061 conn,
4062 &ListOptions {
4063 scope: Some(MemoryScope::session("session-abc")),
4064 ..Default::default()
4065 },
4066 )?;
4067 assert_eq!(session_results.len(), 1);
4068 assert_eq!(session_results[0].id, session_memory.id);
4069
4070 let global_results = list_memories(
4072 conn,
4073 &ListOptions {
4074 scope: Some(MemoryScope::Global),
4075 ..Default::default()
4076 },
4077 )?;
4078 assert_eq!(global_results.len(), 1);
4079 assert_eq!(global_results[0].id, global_memory.id);
4080
4081 let retrieved = get_memory(conn, user1_memory.id)?;
4083 assert_eq!(retrieved.scope, MemoryScope::user("user-1"));
4084
4085 Ok(())
4086 })
4087 .unwrap();
4088 }
4089
4090 #[test]
4091 fn test_memory_scope_can_access() {
4092 use crate::types::MemoryScope;
4093
4094 assert!(MemoryScope::Global.can_access(&MemoryScope::user("user-1")));
4096 assert!(MemoryScope::Global.can_access(&MemoryScope::session("session-1")));
4097 assert!(MemoryScope::Global.can_access(&MemoryScope::agent("agent-1")));
4098 assert!(MemoryScope::Global.can_access(&MemoryScope::Global));
4099
4100 assert!(MemoryScope::user("user-1").can_access(&MemoryScope::user("user-1")));
4102 assert!(MemoryScope::session("s1").can_access(&MemoryScope::session("s1")));
4103 assert!(MemoryScope::agent("a1").can_access(&MemoryScope::agent("a1")));
4104
4105 assert!(!MemoryScope::user("user-1").can_access(&MemoryScope::user("user-2")));
4107 assert!(!MemoryScope::session("s1").can_access(&MemoryScope::session("s2")));
4108 assert!(!MemoryScope::agent("a1").can_access(&MemoryScope::agent("a2")));
4109
4110 assert!(!MemoryScope::user("user-1").can_access(&MemoryScope::session("s1")));
4112 assert!(!MemoryScope::session("s1").can_access(&MemoryScope::agent("a1")));
4113
4114 assert!(MemoryScope::user("user-1").can_access(&MemoryScope::Global));
4116 assert!(MemoryScope::session("s1").can_access(&MemoryScope::Global));
4117 assert!(MemoryScope::agent("a1").can_access(&MemoryScope::Global));
4118 }
4119
4120 #[test]
4121 fn test_memory_ttl_creation() {
4122 let storage = Storage::open_in_memory().unwrap();
4123
4124 storage
4125 .with_transaction(|conn| {
4126 let memory = create_memory(
4128 conn,
4129 &CreateMemoryInput {
4130 content: "Temporary memory".to_string(),
4131 memory_type: MemoryType::Note,
4132 tags: vec![],
4133 metadata: HashMap::new(),
4134 importance: None,
4135 scope: Default::default(),
4136 workspace: None,
4137 tier: MemoryTier::Daily, defer_embedding: true,
4139 ttl_seconds: Some(3600), dedup_mode: Default::default(),
4141 dedup_threshold: None,
4142 event_time: None,
4143 event_duration_seconds: None,
4144 trigger_pattern: None,
4145 summary_of_id: None,
4146 media_url: None,
4147 },
4148 )?;
4149
4150 assert!(memory.expires_at.is_some());
4152 assert_eq!(memory.tier, MemoryTier::Daily);
4153 let expires_at = memory.expires_at.unwrap();
4154 let now = Utc::now();
4155
4156 let diff = (expires_at - now).num_seconds();
4158 assert!(
4159 (3595..=3605).contains(&diff),
4160 "Expected ~3600 seconds, got {}",
4161 diff
4162 );
4163
4164 let permanent = create_memory(
4166 conn,
4167 &CreateMemoryInput {
4168 content: "Permanent memory".to_string(),
4169 memory_type: MemoryType::Note,
4170 tags: vec![],
4171 metadata: HashMap::new(),
4172 importance: None,
4173 scope: Default::default(),
4174 workspace: None,
4175 tier: Default::default(),
4176 defer_embedding: true,
4177 ttl_seconds: None,
4178 dedup_mode: Default::default(),
4179 dedup_threshold: None,
4180 event_time: None,
4181 event_duration_seconds: None,
4182 trigger_pattern: None,
4183 summary_of_id: None,
4184 media_url: None,
4185 },
4186 )?;
4187
4188 assert!(permanent.expires_at.is_none());
4190
4191 Ok(())
4192 })
4193 .unwrap();
4194 }
4195
4196 #[test]
4197 fn test_expired_memories_excluded_from_queries() {
4198 let storage = Storage::open_in_memory().unwrap();
4199
4200 storage
4201 .with_transaction(|conn| {
4202 let memory1 = create_memory(
4204 conn,
4205 &CreateMemoryInput {
4206 content: "Memory to expire".to_string(),
4207 memory_type: MemoryType::Note,
4208 tags: vec!["test".to_string()],
4209 metadata: HashMap::new(),
4210 importance: None,
4211 scope: Default::default(),
4212 workspace: None,
4213 tier: MemoryTier::Daily, defer_embedding: true,
4215 ttl_seconds: Some(3600), dedup_mode: Default::default(),
4217 dedup_threshold: None,
4218 event_time: None,
4219 event_duration_seconds: None,
4220 trigger_pattern: None,
4221 summary_of_id: None,
4222 media_url: None,
4223 },
4224 )?;
4225
4226 let active = create_memory(
4228 conn,
4229 &CreateMemoryInput {
4230 content: "Active memory".to_string(),
4231 memory_type: MemoryType::Note,
4232 tags: vec!["test".to_string()],
4233 metadata: HashMap::new(),
4234 importance: None,
4235 scope: Default::default(),
4236 workspace: None,
4237 tier: Default::default(),
4238 defer_embedding: true,
4239 ttl_seconds: None,
4240 dedup_mode: Default::default(),
4241 dedup_threshold: None,
4242 event_time: None,
4243 event_duration_seconds: None,
4244 trigger_pattern: None,
4245 summary_of_id: None,
4246 media_url: None,
4247 },
4248 )?;
4249
4250 let results = list_memories(conn, &ListOptions::default())?;
4252 assert_eq!(results.len(), 2);
4253
4254 let past = (Utc::now() - chrono::Duration::hours(1)).to_rfc3339();
4256 conn.execute(
4257 "UPDATE memories SET expires_at = ? WHERE id = ?",
4258 params![past, memory1.id],
4259 )?;
4260
4261 let results = list_memories(conn, &ListOptions::default())?;
4263 assert_eq!(results.len(), 1);
4264 assert_eq!(results[0].id, active.id);
4265
4266 let get_result = get_memory(conn, memory1.id);
4268 assert!(get_result.is_err());
4269
4270 let get_result = get_memory(conn, active.id);
4272 assert!(get_result.is_ok());
4273
4274 Ok(())
4275 })
4276 .unwrap();
4277 }
4278
4279 #[test]
4280 fn test_set_memory_expiration() {
4281 let storage = Storage::open_in_memory().unwrap();
4282
4283 storage
4284 .with_transaction(|conn| {
4285 let memory = create_memory(
4287 conn,
4288 &CreateMemoryInput {
4289 content: "Initially permanent".to_string(),
4290 memory_type: MemoryType::Note,
4291 tags: vec![],
4292 metadata: HashMap::new(),
4293 importance: None,
4294 scope: Default::default(),
4295 workspace: None,
4296 tier: Default::default(),
4297 defer_embedding: true,
4298 ttl_seconds: None,
4299 dedup_mode: Default::default(),
4300 dedup_threshold: None,
4301 event_time: None,
4302 event_duration_seconds: None,
4303 trigger_pattern: None,
4304 summary_of_id: None,
4305 media_url: None,
4306 },
4307 )?;
4308
4309 assert!(memory.expires_at.is_none());
4310
4311 let updated = set_memory_expiration(conn, memory.id, Some(1800))?;
4313 assert!(updated.expires_at.is_some());
4314
4315 let permanent_again = set_memory_expiration(conn, memory.id, Some(0))?;
4317 assert!(permanent_again.expires_at.is_none());
4318
4319 Ok(())
4320 })
4321 .unwrap();
4322 }
4323
4324 #[test]
4325 fn test_cleanup_expired_memories() {
4326 let storage = Storage::open_in_memory().unwrap();
4327
4328 storage
4329 .with_transaction(|conn| {
4330 let mut expired_ids = vec![];
4332 for i in 0..3 {
4333 let mem = create_memory(
4334 conn,
4335 &CreateMemoryInput {
4336 content: format!("To expire {}", i),
4337 memory_type: MemoryType::Note,
4338 tags: vec![],
4339 metadata: HashMap::new(),
4340 importance: None,
4341 scope: Default::default(),
4342 workspace: None,
4343 tier: MemoryTier::Daily, defer_embedding: true,
4345 ttl_seconds: Some(3600), dedup_mode: Default::default(),
4347 dedup_threshold: None,
4348 event_time: None,
4349 event_duration_seconds: None,
4350 trigger_pattern: None,
4351 summary_of_id: None,
4352 media_url: None,
4353 },
4354 )?;
4355 expired_ids.push(mem.id);
4356 }
4357
4358 for i in 0..2 {
4360 create_memory(
4361 conn,
4362 &CreateMemoryInput {
4363 content: format!("Active {}", i),
4364 memory_type: MemoryType::Note,
4365 tags: vec![],
4366 metadata: HashMap::new(),
4367 importance: None,
4368 scope: Default::default(),
4369 workspace: None,
4370 tier: Default::default(),
4371 defer_embedding: true,
4372 ttl_seconds: None,
4373 dedup_mode: Default::default(),
4374 dedup_threshold: None,
4375 event_time: None,
4376 event_duration_seconds: None,
4377 trigger_pattern: None,
4378 summary_of_id: None,
4379 media_url: None,
4380 },
4381 )?;
4382 }
4383
4384 let results = list_memories(conn, &ListOptions::default())?;
4386 assert_eq!(results.len(), 5);
4387
4388 let past = (Utc::now() - chrono::Duration::hours(1)).to_rfc3339();
4390 for id in &expired_ids {
4391 conn.execute(
4392 "UPDATE memories SET expires_at = ? WHERE id = ?",
4393 params![past, id],
4394 )?;
4395 }
4396
4397 let expired_count = count_expired_memories(conn)?;
4399 assert_eq!(expired_count, 3);
4400
4401 let deleted = cleanup_expired_memories(conn)?;
4403 assert_eq!(deleted, 3);
4404
4405 let remaining = list_memories(conn, &ListOptions::default())?;
4407 assert_eq!(remaining.len(), 2);
4408
4409 let expired_count = count_expired_memories(conn)?;
4411 assert_eq!(expired_count, 0);
4412
4413 Ok(())
4414 })
4415 .unwrap();
4416 }
4417
4418 #[test]
4421 fn test_content_hash_computation() {
4422 let hash1 = compute_content_hash("Hello World");
4424 let hash2 = compute_content_hash("hello world"); let hash3 = compute_content_hash(" hello world "); let hash4 = compute_content_hash("Hello World!"); assert_eq!(hash1, hash2);
4430 assert_eq!(hash2, hash3);
4431
4432 assert_ne!(hash1, hash4);
4434
4435 assert!(hash1.starts_with("sha256:"));
4437 }
4438
4439 #[test]
4440 fn test_dedup_mode_reject() {
4441 use crate::types::DedupMode;
4442
4443 let storage = Storage::open_in_memory().unwrap();
4444
4445 storage
4446 .with_transaction(|conn| {
4447 let _memory1 = create_memory(
4449 conn,
4450 &CreateMemoryInput {
4451 content: "Unique content for testing".to_string(),
4452 memory_type: MemoryType::Note,
4453 tags: vec![],
4454 metadata: HashMap::new(),
4455 importance: None,
4456 scope: Default::default(),
4457 workspace: None,
4458 tier: Default::default(),
4459 defer_embedding: true,
4460 ttl_seconds: None,
4461 dedup_mode: DedupMode::Allow, dedup_threshold: None,
4463 event_time: None,
4464 event_duration_seconds: None,
4465 trigger_pattern: None,
4466 summary_of_id: None,
4467 media_url: None,
4468 },
4469 )?;
4470
4471 let result = create_memory(
4473 conn,
4474 &CreateMemoryInput {
4475 content: "Unique content for testing".to_string(), memory_type: MemoryType::Note,
4477 tags: vec!["new-tag".to_string()],
4478 metadata: HashMap::new(),
4479 importance: None,
4480 scope: Default::default(),
4481 workspace: None,
4482 tier: Default::default(),
4483 defer_embedding: true,
4484 ttl_seconds: None,
4485 dedup_mode: DedupMode::Reject,
4486 dedup_threshold: None,
4487 event_time: None,
4488 event_duration_seconds: None,
4489 trigger_pattern: None,
4490 summary_of_id: None,
4491 media_url: None,
4492 },
4493 );
4494
4495 assert!(result.is_err());
4497 let err = result.unwrap_err();
4498 assert!(matches!(err, crate::error::EngramError::Duplicate { .. }));
4499
4500 Ok(())
4501 })
4502 .unwrap();
4503 }
4504
4505 #[test]
4506 fn test_dedup_mode_skip() {
4507 use crate::types::DedupMode;
4508
4509 let storage = Storage::open_in_memory().unwrap();
4510
4511 storage
4512 .with_transaction(|conn| {
4513 let memory1 = create_memory(
4515 conn,
4516 &CreateMemoryInput {
4517 content: "Skip test content".to_string(),
4518 memory_type: MemoryType::Note,
4519 tags: vec!["original".to_string()],
4520 metadata: HashMap::new(),
4521 importance: Some(0.5),
4522 scope: Default::default(),
4523 workspace: None,
4524 tier: Default::default(),
4525 defer_embedding: true,
4526 ttl_seconds: None,
4527 dedup_mode: DedupMode::Allow,
4528 dedup_threshold: None,
4529 event_time: None,
4530 event_duration_seconds: None,
4531 trigger_pattern: None,
4532 summary_of_id: None,
4533 media_url: None,
4534 },
4535 )?;
4536
4537 let memory2 = create_memory(
4539 conn,
4540 &CreateMemoryInput {
4541 content: "Skip test content".to_string(), memory_type: MemoryType::Note,
4543 tags: vec!["new-tag".to_string()], metadata: HashMap::new(),
4545 importance: Some(0.9), scope: Default::default(),
4547 workspace: None,
4548 tier: Default::default(),
4549 defer_embedding: true,
4550 ttl_seconds: None,
4551 dedup_mode: DedupMode::Skip,
4552 dedup_threshold: None,
4553 event_time: None,
4554 event_duration_seconds: None,
4555 trigger_pattern: None,
4556 summary_of_id: None,
4557 media_url: None,
4558 },
4559 )?;
4560
4561 assert_eq!(memory1.id, memory2.id);
4563 assert_eq!(memory2.tags, vec!["original".to_string()]); assert!((memory2.importance - 0.5).abs() < 0.01); let all = list_memories(conn, &ListOptions::default())?;
4568 assert_eq!(all.len(), 1);
4569
4570 Ok(())
4571 })
4572 .unwrap();
4573 }
4574
4575 #[test]
4576 fn test_dedup_mode_merge() {
4577 use crate::types::DedupMode;
4578
4579 let storage = Storage::open_in_memory().unwrap();
4580
4581 storage
4582 .with_transaction(|conn| {
4583 let memory1 = create_memory(
4585 conn,
4586 &CreateMemoryInput {
4587 content: "Merge test content".to_string(),
4588 memory_type: MemoryType::Note,
4589 tags: vec!["tag1".to_string(), "tag2".to_string()],
4590 metadata: {
4591 let mut m = HashMap::new();
4592 m.insert("key1".to_string(), serde_json::json!("value1"));
4593 m
4594 },
4595 importance: Some(0.5),
4596 scope: Default::default(),
4597 workspace: None,
4598 tier: Default::default(),
4599 defer_embedding: true,
4600 ttl_seconds: None,
4601 dedup_mode: DedupMode::Allow,
4602 dedup_threshold: None,
4603 event_time: None,
4604 event_duration_seconds: None,
4605 trigger_pattern: None,
4606 summary_of_id: None,
4607 media_url: None,
4608 },
4609 )?;
4610
4611 let memory2 = create_memory(
4613 conn,
4614 &CreateMemoryInput {
4615 content: "Merge test content".to_string(), memory_type: MemoryType::Note,
4617 tags: vec!["tag2".to_string(), "tag3".to_string()], metadata: {
4619 let mut m = HashMap::new();
4620 m.insert("key2".to_string(), serde_json::json!("value2"));
4621 m
4622 },
4623 importance: Some(0.8), scope: Default::default(),
4625 workspace: None,
4626 tier: Default::default(),
4627 defer_embedding: true,
4628 ttl_seconds: None,
4629 dedup_mode: DedupMode::Merge,
4630 dedup_threshold: None,
4631 event_time: None,
4632 event_duration_seconds: None,
4633 trigger_pattern: None,
4634 summary_of_id: None,
4635 media_url: None,
4636 },
4637 )?;
4638
4639 assert_eq!(memory1.id, memory2.id);
4641
4642 assert!(memory2.tags.contains(&"tag1".to_string()));
4644 assert!(memory2.tags.contains(&"tag2".to_string()));
4645 assert!(memory2.tags.contains(&"tag3".to_string()));
4646 assert_eq!(memory2.tags.len(), 3);
4647
4648 assert!(memory2.metadata.contains_key("key1"));
4650 assert!(memory2.metadata.contains_key("key2"));
4651
4652 let all = list_memories(conn, &ListOptions::default())?;
4654 assert_eq!(all.len(), 1);
4655
4656 Ok(())
4657 })
4658 .unwrap();
4659 }
4660
4661 #[test]
4662 fn test_dedup_mode_allow() {
4663 use crate::types::DedupMode;
4664
4665 let storage = Storage::open_in_memory().unwrap();
4666
4667 storage
4668 .with_transaction(|conn| {
4669 let memory1 = create_memory(
4671 conn,
4672 &CreateMemoryInput {
4673 content: "Allow duplicates content".to_string(),
4674 memory_type: MemoryType::Note,
4675 tags: vec![],
4676 metadata: HashMap::new(),
4677 importance: None,
4678 scope: Default::default(),
4679 workspace: None,
4680 tier: Default::default(),
4681 defer_embedding: true,
4682 ttl_seconds: None,
4683 dedup_mode: DedupMode::Allow,
4684 dedup_threshold: None,
4685 event_time: None,
4686 event_duration_seconds: None,
4687 trigger_pattern: None,
4688 summary_of_id: None,
4689 media_url: None,
4690 },
4691 )?;
4692
4693 let memory2 = create_memory(
4695 conn,
4696 &CreateMemoryInput {
4697 content: "Allow duplicates content".to_string(), memory_type: MemoryType::Note,
4699 tags: vec![],
4700 metadata: HashMap::new(),
4701 importance: None,
4702 scope: Default::default(),
4703 workspace: None,
4704 tier: Default::default(),
4705 defer_embedding: true,
4706 ttl_seconds: None,
4707 dedup_mode: DedupMode::Allow,
4708 dedup_threshold: None,
4709 event_time: None,
4710 event_duration_seconds: None,
4711 trigger_pattern: None,
4712 summary_of_id: None,
4713 media_url: None,
4714 },
4715 )?;
4716
4717 assert_ne!(memory1.id, memory2.id);
4719
4720 let all = list_memories(conn, &ListOptions::default())?;
4722 assert_eq!(all.len(), 2);
4723
4724 assert_eq!(memory1.content_hash, memory2.content_hash);
4726
4727 Ok(())
4728 })
4729 .unwrap();
4730 }
4731
4732 #[test]
4733 fn test_find_duplicates_exact_hash() {
4734 use crate::types::DedupMode;
4735
4736 let storage = Storage::open_in_memory().unwrap();
4737
4738 storage
4739 .with_transaction(|conn| {
4740 let _memory1 = create_memory(
4742 conn,
4743 &CreateMemoryInput {
4744 content: "Duplicate content".to_string(),
4745 memory_type: MemoryType::Note,
4746 tags: vec!["first".to_string()],
4747 metadata: HashMap::new(),
4748 importance: None,
4749 scope: Default::default(),
4750 workspace: None,
4751 tier: Default::default(),
4752 defer_embedding: true,
4753 ttl_seconds: None,
4754 dedup_mode: DedupMode::Allow,
4755 dedup_threshold: None,
4756 event_time: None,
4757 event_duration_seconds: None,
4758 trigger_pattern: None,
4759 summary_of_id: None,
4760 media_url: None,
4761 },
4762 )?;
4763
4764 let _memory2 = create_memory(
4765 conn,
4766 &CreateMemoryInput {
4767 content: "Duplicate content".to_string(), memory_type: MemoryType::Note,
4769 tags: vec!["second".to_string()],
4770 metadata: HashMap::new(),
4771 importance: None,
4772 scope: Default::default(),
4773 workspace: None,
4774 tier: Default::default(),
4775 defer_embedding: true,
4776 ttl_seconds: None,
4777 dedup_mode: DedupMode::Allow,
4778 dedup_threshold: None,
4779 event_time: None,
4780 event_duration_seconds: None,
4781 trigger_pattern: None,
4782 summary_of_id: None,
4783 media_url: None,
4784 },
4785 )?;
4786
4787 let _memory3 = create_memory(
4789 conn,
4790 &CreateMemoryInput {
4791 content: "Unique content".to_string(),
4792 memory_type: MemoryType::Note,
4793 tags: vec![],
4794 metadata: HashMap::new(),
4795 importance: None,
4796 scope: Default::default(),
4797 workspace: None,
4798 tier: Default::default(),
4799 defer_embedding: true,
4800 ttl_seconds: None,
4801 dedup_mode: DedupMode::Allow,
4802 dedup_threshold: None,
4803 event_time: None,
4804 event_duration_seconds: None,
4805 trigger_pattern: None,
4806 summary_of_id: None,
4807 media_url: None,
4808 },
4809 )?;
4810
4811 let duplicates = find_duplicates(conn, 0.9)?;
4813
4814 assert_eq!(duplicates.len(), 1);
4816
4817 assert_eq!(duplicates[0].match_type, DuplicateMatchType::ExactHash);
4819 assert!((duplicates[0].similarity_score - 1.0).abs() < 0.01);
4820
4821 Ok(())
4822 })
4823 .unwrap();
4824 }
4825
4826 #[test]
4827 fn test_content_hash_stored_on_create() {
4828 let storage = Storage::open_in_memory().unwrap();
4829
4830 storage
4831 .with_transaction(|conn| {
4832 let memory = create_memory(
4833 conn,
4834 &CreateMemoryInput {
4835 content: "Test content for hash".to_string(),
4836 memory_type: MemoryType::Note,
4837 tags: vec![],
4838 metadata: HashMap::new(),
4839 importance: None,
4840 scope: Default::default(),
4841 workspace: None,
4842 tier: Default::default(),
4843 defer_embedding: true,
4844 ttl_seconds: None,
4845 dedup_mode: Default::default(),
4846 dedup_threshold: None,
4847 event_time: None,
4848 event_duration_seconds: None,
4849 trigger_pattern: None,
4850 summary_of_id: None,
4851 media_url: None,
4852 },
4853 )?;
4854
4855 assert!(memory.content_hash.is_some());
4857 let hash = memory.content_hash.as_ref().unwrap();
4858 assert!(hash.starts_with("sha256:"));
4859
4860 let fetched = get_memory(conn, memory.id)?;
4862 assert_eq!(fetched.content_hash, memory.content_hash);
4863
4864 Ok(())
4865 })
4866 .unwrap();
4867 }
4868
4869 #[test]
4870 fn test_update_memory_recalculates_hash() {
4871 let storage = Storage::open_in_memory().unwrap();
4872
4873 storage
4874 .with_transaction(|conn| {
4875 let memory = create_memory(
4877 conn,
4878 &CreateMemoryInput {
4879 content: "Original content".to_string(),
4880 memory_type: MemoryType::Note,
4881 tags: vec![],
4882 metadata: HashMap::new(),
4883 importance: None,
4884 scope: Default::default(),
4885 workspace: None,
4886 tier: Default::default(),
4887 defer_embedding: true,
4888 ttl_seconds: None,
4889 dedup_mode: Default::default(),
4890 dedup_threshold: None,
4891 event_time: None,
4892 event_duration_seconds: None,
4893 trigger_pattern: None,
4894 summary_of_id: None,
4895 media_url: None,
4896 },
4897 )?;
4898
4899 let original_hash = memory.content_hash.clone();
4900
4901 let updated = update_memory(
4903 conn,
4904 memory.id,
4905 &UpdateMemoryInput {
4906 content: Some("Updated content".to_string()),
4907 memory_type: None,
4908 tags: None,
4909 metadata: None,
4910 importance: None,
4911 scope: None,
4912 ttl_seconds: None,
4913 event_time: None,
4914 trigger_pattern: None,
4915 media_url: None,
4916 },
4917 )?;
4918
4919 assert_ne!(updated.content_hash, original_hash);
4921 assert!(updated.content_hash.is_some());
4922
4923 let expected_hash = compute_content_hash("Updated content");
4925 assert_eq!(updated.content_hash.as_ref().unwrap(), &expected_hash);
4926
4927 Ok(())
4928 })
4929 .unwrap();
4930 }
4931
4932 #[test]
4933 fn test_dedup_scope_isolation() {
4934 use crate::types::{DedupMode, MemoryScope};
4935
4936 let storage = Storage::open_in_memory().unwrap();
4937
4938 storage
4939 .with_transaction(|conn| {
4940 let _user1_memory = create_memory(
4942 conn,
4943 &CreateMemoryInput {
4944 content: "Shared content".to_string(),
4945 memory_type: MemoryType::Note,
4946 tags: vec!["user1".to_string()],
4947 metadata: HashMap::new(),
4948 importance: None,
4949 scope: MemoryScope::user("user-1"),
4950 workspace: None,
4951 tier: Default::default(),
4952 defer_embedding: true,
4953 ttl_seconds: None,
4954 dedup_mode: DedupMode::Allow,
4955 dedup_threshold: None,
4956 event_time: None,
4957 event_duration_seconds: None,
4958 trigger_pattern: None,
4959 summary_of_id: None,
4960 media_url: None,
4961 },
4962 )?;
4963
4964 let user2_result = create_memory(
4967 conn,
4968 &CreateMemoryInput {
4969 content: "Shared content".to_string(), memory_type: MemoryType::Note,
4971 tags: vec!["user2".to_string()],
4972 metadata: HashMap::new(),
4973 importance: None,
4974 scope: MemoryScope::user("user-2"), workspace: None,
4976 tier: Default::default(),
4977 defer_embedding: true,
4978 ttl_seconds: None,
4979 dedup_mode: DedupMode::Reject, dedup_threshold: None,
4981 event_time: None,
4982 event_duration_seconds: None,
4983 trigger_pattern: None,
4984 summary_of_id: None,
4985 media_url: None,
4986 },
4987 );
4988
4989 assert!(user2_result.is_ok());
4991 let _user2_memory = user2_result.unwrap();
4992
4993 let duplicate_result = create_memory(
4995 conn,
4996 &CreateMemoryInput {
4997 content: "Shared content".to_string(), memory_type: MemoryType::Note,
4999 tags: vec![],
5000 metadata: HashMap::new(),
5001 importance: None,
5002 scope: MemoryScope::user("user-2"), workspace: None,
5004 tier: Default::default(),
5005 defer_embedding: true,
5006 ttl_seconds: None,
5007 dedup_mode: DedupMode::Reject, dedup_threshold: None,
5009 event_time: None,
5010 event_duration_seconds: None,
5011 trigger_pattern: None,
5012 summary_of_id: None,
5013 media_url: None,
5014 },
5015 );
5016
5017 assert!(duplicate_result.is_err());
5019 assert!(matches!(
5020 duplicate_result.unwrap_err(),
5021 crate::error::EngramError::Duplicate { .. }
5022 ));
5023
5024 let all = list_memories(conn, &ListOptions::default())?;
5026 assert_eq!(all.len(), 2);
5027
5028 Ok(())
5029 })
5030 .unwrap();
5031 }
5032
5033 #[test]
5034 fn test_find_similar_by_embedding() {
5035 fn store_test_embedding(
5037 conn: &Connection,
5038 memory_id: i64,
5039 embedding: &[f32],
5040 ) -> crate::error::Result<()> {
5041 let bytes: Vec<u8> = embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
5042 conn.execute(
5043 "INSERT INTO embeddings (memory_id, embedding, model, dimensions, created_at)
5044 VALUES (?, ?, ?, ?, datetime('now'))",
5045 params![memory_id, bytes, "test", embedding.len() as i32],
5046 )?;
5047 conn.execute(
5049 "UPDATE memories SET has_embedding = 1 WHERE id = ?",
5050 params![memory_id],
5051 )?;
5052 Ok(())
5053 }
5054
5055 let storage = Storage::open_in_memory().unwrap();
5056 storage
5057 .with_transaction(|conn| {
5058 let memory1 = create_memory(
5060 conn,
5061 &CreateMemoryInput {
5062 content: "Rust is a systems programming language".to_string(),
5063 memory_type: MemoryType::Note,
5064 tags: vec!["rust".to_string()],
5065 metadata: std::collections::HashMap::new(),
5066 importance: None,
5067 scope: MemoryScope::Global,
5068 workspace: None,
5069 tier: Default::default(),
5070 defer_embedding: false,
5071 ttl_seconds: None,
5072 dedup_mode: DedupMode::Allow,
5073 dedup_threshold: None,
5074 event_time: None,
5075 event_duration_seconds: None,
5076 trigger_pattern: None,
5077 summary_of_id: None,
5078 media_url: None,
5079 },
5080 )?;
5081
5082 let embedding1 = vec![0.8, 0.4, 0.2, 0.1]; store_test_embedding(conn, memory1.id, &embedding1)?;
5085
5086 let memory2 = create_memory(
5088 conn,
5089 &CreateMemoryInput {
5090 content: "Python is a scripting language".to_string(),
5091 memory_type: MemoryType::Note,
5092 tags: vec!["python".to_string()],
5093 metadata: std::collections::HashMap::new(),
5094 importance: None,
5095 scope: MemoryScope::Global,
5096 workspace: None,
5097 tier: Default::default(),
5098 defer_embedding: false,
5099 ttl_seconds: None,
5100 dedup_mode: DedupMode::Allow,
5101 dedup_threshold: None,
5102 event_time: None,
5103 event_duration_seconds: None,
5104 trigger_pattern: None,
5105 summary_of_id: None,
5106 media_url: None,
5107 },
5108 )?;
5109
5110 let embedding2 = vec![0.1, 0.2, 0.8, 0.4]; store_test_embedding(conn, memory2.id, &embedding2)?;
5113
5114 let query_similar_to_1 = vec![0.79, 0.41, 0.21, 0.11]; let result = find_similar_by_embedding(
5117 conn,
5118 &query_similar_to_1,
5119 &MemoryScope::Global,
5120 None, 0.95, )?;
5123 assert!(result.is_some());
5124 let (found_memory, similarity) = result.unwrap();
5125 assert_eq!(found_memory.id, memory1.id);
5126 assert!(similarity > 0.95);
5127
5128 let result_low_threshold = find_similar_by_embedding(
5130 conn,
5131 &query_similar_to_1,
5132 &MemoryScope::Global,
5133 None,
5134 0.5,
5135 )?;
5136 assert!(result_low_threshold.is_some());
5137
5138 let query_orthogonal = vec![0.0, 0.0, 0.0, 1.0]; let result_no_match = find_similar_by_embedding(
5141 conn,
5142 &query_orthogonal,
5143 &MemoryScope::Global,
5144 None,
5145 0.99, )?;
5147 assert!(result_no_match.is_none());
5148
5149 let result_wrong_scope = find_similar_by_embedding(
5151 conn,
5152 &query_similar_to_1,
5153 &MemoryScope::User {
5154 user_id: "other-user".to_string(),
5155 },
5156 None,
5157 0.5,
5158 )?;
5159 assert!(result_wrong_scope.is_none());
5160
5161 Ok(())
5162 })
5163 .unwrap();
5164 }
5165
5166 fn open_test_storage() -> crate::storage::Storage {
5169 crate::storage::Storage::open_in_memory().expect("in-memory storage")
5170 }
5171
5172 #[test]
5173 fn test_create_image_memory_with_media_url() {
5174 let storage = open_test_storage();
5175 let memory = storage
5176 .with_transaction(|conn| {
5177 create_memory(
5178 conn,
5179 &CreateMemoryInput {
5180 content: "A screenshot of the dashboard".to_string(),
5181 memory_type: MemoryType::Image,
5182 media_url: Some("local:///tmp/dashboard.png".to_string()),
5183 ..Default::default()
5184 },
5185 )
5186 })
5187 .expect("create image memory");
5188 assert_eq!(memory.memory_type, MemoryType::Image);
5189 assert_eq!(
5190 memory.media_url.as_deref(),
5191 Some("local:///tmp/dashboard.png")
5192 );
5193 }
5194
5195 #[test]
5196 fn test_create_audio_memory_with_media_url() {
5197 let storage = open_test_storage();
5198 let memory = storage
5199 .with_transaction(|conn| {
5200 create_memory(
5201 conn,
5202 &CreateMemoryInput {
5203 content: "Meeting recording transcript".to_string(),
5204 memory_type: MemoryType::Audio,
5205 media_url: Some("local:///tmp/meeting.mp3".to_string()),
5206 ..Default::default()
5207 },
5208 )
5209 })
5210 .expect("create audio memory");
5211 assert_eq!(memory.memory_type, MemoryType::Audio);
5212 assert_eq!(
5213 memory.media_url.as_deref(),
5214 Some("local:///tmp/meeting.mp3")
5215 );
5216 }
5217
5218 #[test]
5219 fn test_create_video_memory_with_media_url() {
5220 let storage = open_test_storage();
5221 let memory = storage
5222 .with_transaction(|conn| {
5223 create_memory(
5224 conn,
5225 &CreateMemoryInput {
5226 content: "Keynote presentation video".to_string(),
5227 memory_type: MemoryType::Video,
5228 media_url: Some("https://cdn.example.com/keynote.mp4".to_string()),
5229 ..Default::default()
5230 },
5231 )
5232 })
5233 .expect("create video memory");
5234 assert_eq!(memory.memory_type, MemoryType::Video);
5235 assert_eq!(
5236 memory.media_url.as_deref(),
5237 Some("https://cdn.example.com/keynote.mp4")
5238 );
5239 }
5240
5241 #[test]
5242 fn test_get_memory_returns_media_url() {
5243 let storage = open_test_storage();
5244 let created = storage
5245 .with_transaction(|conn| {
5246 create_memory(
5247 conn,
5248 &CreateMemoryInput {
5249 content: "Image with URL".to_string(),
5250 memory_type: MemoryType::Image,
5251 media_url: Some("local:///tmp/image.png".to_string()),
5252 ..Default::default()
5253 },
5254 )
5255 })
5256 .expect("create");
5257 let fetched = storage
5258 .with_connection(|conn| get_memory(conn, created.id))
5259 .expect("get");
5260 assert_eq!(fetched.media_url, created.media_url);
5261 }
5262
5263 #[test]
5264 fn test_create_image_memory_without_media_url() {
5265 let storage = open_test_storage();
5266 let memory = storage
5267 .with_transaction(|conn| {
5268 create_memory(
5269 conn,
5270 &CreateMemoryInput {
5271 content: "Image described in text only".to_string(),
5272 memory_type: MemoryType::Image,
5273 media_url: None,
5274 ..Default::default()
5275 },
5276 )
5277 })
5278 .expect("create image memory without media_url");
5279 assert_eq!(memory.memory_type, MemoryType::Image);
5280 assert!(memory.media_url.is_none());
5281 }
5282
5283 #[test]
5284 fn test_update_memory_sets_media_url() {
5285 let storage = open_test_storage();
5286 let created = storage
5287 .with_transaction(|conn| {
5288 create_memory(
5289 conn,
5290 &CreateMemoryInput {
5291 content: "Image memory".to_string(),
5292 memory_type: MemoryType::Image,
5293 ..Default::default()
5294 },
5295 )
5296 })
5297 .expect("create");
5298 let updated = storage
5299 .with_transaction(|conn| {
5300 update_memory(
5301 conn,
5302 created.id,
5303 &UpdateMemoryInput {
5304 media_url: Some(Some("local:///tmp/updated.png".to_string())),
5305 content: None,
5306 memory_type: None,
5307 tags: None,
5308 metadata: None,
5309 importance: None,
5310 scope: None,
5311 ttl_seconds: None,
5312 event_time: None,
5313 trigger_pattern: None,
5314 },
5315 )
5316 })
5317 .expect("update");
5318 assert_eq!(
5319 updated.media_url.as_deref(),
5320 Some("local:///tmp/updated.png")
5321 );
5322 }
5323
5324 #[test]
5325 fn test_memory_type_is_multimodal() {
5326 assert!(MemoryType::Image.is_multimodal());
5327 assert!(MemoryType::Audio.is_multimodal());
5328 assert!(MemoryType::Video.is_multimodal());
5329 assert!(!MemoryType::Note.is_multimodal());
5330 assert!(!MemoryType::Episodic.is_multimodal());
5331 }
5332
5333 #[test]
5334 fn test_schema_migration_v34_idempotent() {
5335 use crate::storage::migrations::run_migrations;
5336 let conn =
5337 rusqlite::Connection::open_in_memory().expect("in-memory db");
5338 run_migrations(&conn).expect("run migrations");
5339 run_migrations(&conn).expect("idempotent second run");
5341 let version: i32 = conn
5342 .query_row(
5343 "SELECT COALESCE(MAX(version), 0) FROM schema_version",
5344 [],
5345 |row| row.get(0),
5346 )
5347 .expect("query version");
5348 assert_eq!(version, 34);
5349 }
5350}