1use mem7_core::{MemoryAction, MemoryEvent, MemoryFilter, MemoryItem};
2use mem7_datetime::now_iso;
3use mem7_error::{Mem7Error, Result};
4use tracing::{info, instrument};
5use uuid::Uuid;
6
7use crate::engine::MemoryEngine;
8use crate::payload::payload_to_memory_item;
9
10impl MemoryEngine {
11 #[instrument(skip(self))]
12 pub async fn get(&self, memory_id: Uuid) -> Result<MemoryItem> {
13 let entry = self
14 .vector_index
15 .get(&memory_id)
16 .await?
17 .ok_or_else(|| Mem7Error::NotFound(format!("memory {memory_id}")))?;
18
19 Ok(payload_to_memory_item(memory_id, &entry.1, None))
20 }
21
22 #[instrument(skip(self, filters))]
23 pub async fn get_all(
24 &self,
25 user_id: Option<&str>,
26 agent_id: Option<&str>,
27 run_id: Option<&str>,
28 filters: Option<&serde_json::Value>,
29 limit: Option<usize>,
30 ) -> Result<Vec<MemoryItem>> {
31 let filter = MemoryFilter {
32 metadata: filters.cloned(),
33 ..MemoryFilter::from_session(user_id, agent_id, run_id)
34 };
35
36 let entries = self.vector_index.list(Some(&filter), limit).await?;
37
38 Ok(entries
39 .into_iter()
40 .map(|(id, payload)| payload_to_memory_item(id, &payload, None))
41 .collect())
42 }
43
44 #[instrument(skip(self, new_text))]
45 pub async fn update(&self, memory_id: Uuid, new_text: &str) -> Result<()> {
46 let entry = self
47 .vector_index
48 .get(&memory_id)
49 .await?
50 .ok_or_else(|| Mem7Error::NotFound(format!("memory {memory_id}")))?;
51
52 let old_text = entry
53 .1
54 .get("text")
55 .and_then(|v| v.as_str())
56 .map(String::from);
57
58 let vecs = self.embedder.embed(&[new_text.to_string()]).await?;
59 let vec = vecs.into_iter().next().unwrap_or_default();
60
61 let mut payload = entry.1.clone();
62 payload["text"] = serde_json::Value::String(new_text.to_string());
63 payload["updated_at"] = serde_json::Value::String(now_iso());
64
65 self.vector_index
66 .update(&memory_id, Some(&vec), Some(payload))
67 .await?;
68
69 self.history
70 .add_event(
71 memory_id,
72 old_text.as_deref(),
73 Some(new_text),
74 MemoryAction::Update,
75 )
76 .await?;
77
78 Ok(())
79 }
80
81 #[instrument(skip(self))]
82 pub async fn delete(&self, memory_id: Uuid) -> Result<()> {
83 let entry = self.vector_index.get(&memory_id).await?;
84 let old_text = entry
85 .as_ref()
86 .and_then(|(_, p)| p.get("text").and_then(|v| v.as_str()))
87 .map(String::from);
88
89 self.vector_index.delete(&memory_id).await?;
90
91 self.history
92 .add_event(memory_id, old_text.as_deref(), None, MemoryAction::Delete)
93 .await?;
94
95 Ok(())
96 }
97
98 #[instrument(skip(self))]
99 pub async fn delete_all(
100 &self,
101 user_id: Option<&str>,
102 agent_id: Option<&str>,
103 run_id: Option<&str>,
104 ) -> Result<()> {
105 let filter = MemoryFilter::from_session(user_id, agent_id, run_id);
106
107 let entries = self.vector_index.list(Some(&filter), None).await?;
108 for (id, _) in entries {
109 self.vector_index.delete(&id).await?;
110 }
111
112 if let Some(gp) = &self.graph_pipeline {
113 gp.store().delete_all(&filter).await?;
114 }
115
116 Ok(())
117 }
118
119 #[instrument(skip(self))]
120 pub async fn history(&self, memory_id: Uuid) -> Result<Vec<MemoryEvent>> {
121 self.history.get_history(memory_id).await
122 }
123
124 #[instrument(skip(self))]
125 pub async fn reset(&self) -> Result<()> {
126 self.vector_index.reset().await?;
127 self.history.reset().await?;
128 if let Some(gp) = &self.graph_pipeline {
129 gp.store().reset().await?;
130 }
131 info!("MemoryEngine reset");
132 Ok(())
133 }
134}