1use mem7_core::{MemoryAction, MemoryEvent, MemoryFilter, MemoryItem};
2use mem7_datetime::now_iso;
3use mem7_error::{Mem7Error, Result};
4use tracing::{info, instrument};
5use uuid::Uuid;
6
7use crate::engine::MemoryEngine;
8use crate::payload::{payload_to_event_metadata, payload_to_memory_item};
9use crate::require_scope;
10
11impl MemoryEngine {
12 #[instrument(skip(self))]
13 pub async fn get(&self, memory_id: Uuid) -> Result<MemoryItem> {
14 let entry = self
15 .vector_index
16 .get(&memory_id)
17 .await?
18 .ok_or_else(|| Mem7Error::NotFound(format!("memory {memory_id}")))?;
19
20 Ok(payload_to_memory_item(memory_id, &entry.1, None))
21 }
22
23 #[instrument(skip(self, filters))]
24 pub async fn get_all(
25 &self,
26 user_id: Option<&str>,
27 agent_id: Option<&str>,
28 run_id: Option<&str>,
29 filters: Option<&serde_json::Value>,
30 limit: Option<usize>,
31 ) -> Result<Vec<MemoryItem>> {
32 require_scope("get_all", user_id, agent_id, run_id)?;
33 let filter = MemoryFilter {
34 metadata: filters.cloned(),
35 ..MemoryFilter::from_session(user_id, agent_id, run_id)
36 };
37
38 let entries = self.vector_index.list(Some(&filter), limit).await?;
39
40 Ok(entries
41 .into_iter()
42 .map(|(id, payload)| payload_to_memory_item(id, &payload, None))
43 .collect())
44 }
45
46 #[instrument(skip(self, new_text))]
47 pub async fn update(&self, memory_id: Uuid, new_text: &str) -> Result<()> {
48 let entry = self
49 .vector_index
50 .get(&memory_id)
51 .await?
52 .ok_or_else(|| Mem7Error::NotFound(format!("memory {memory_id}")))?;
53
54 let old_text = entry
55 .1
56 .get("text")
57 .and_then(|v| v.as_str())
58 .map(String::from);
59
60 let vecs = self.embedder.embed(&[new_text.to_string()]).await?;
61 let vec = vecs.into_iter().next().unwrap_or_default();
62
63 let mut payload = entry.1.clone();
64 payload["text"] = serde_json::Value::String(new_text.to_string());
65 payload["updated_at"] = serde_json::Value::String(now_iso());
66 let audit = payload_to_event_metadata(&payload);
67
68 self.vector_index
69 .update(&memory_id, Some(&vec), Some(payload))
70 .await?;
71
72 self.history
73 .add_event(
74 memory_id,
75 old_text.as_deref(),
76 Some(new_text),
77 MemoryAction::Update,
78 audit,
79 )
80 .await?;
81
82 Ok(())
83 }
84
85 #[instrument(skip(self))]
86 pub async fn delete(&self, memory_id: Uuid) -> Result<()> {
87 let entry = self.vector_index.get(&memory_id).await?;
88 let old_text = entry
89 .as_ref()
90 .and_then(|(_, p)| p.get("text").and_then(|v| v.as_str()))
91 .map(String::from);
92 let audit = entry
93 .as_ref()
94 .map(|(_, payload)| {
95 let mut metadata = payload_to_event_metadata(payload);
96 metadata.is_deleted = true;
97 metadata
98 })
99 .unwrap_or_else(|| mem7_core::MemoryEventMetadata {
100 is_deleted: true,
101 ..Default::default()
102 });
103
104 self.vector_index.delete(&memory_id).await?;
105
106 self.history
107 .add_event(
108 memory_id,
109 old_text.as_deref(),
110 None,
111 MemoryAction::Delete,
112 audit,
113 )
114 .await?;
115
116 Ok(())
117 }
118
119 #[instrument(skip(self))]
120 pub async fn delete_all(
121 &self,
122 user_id: Option<&str>,
123 agent_id: Option<&str>,
124 run_id: Option<&str>,
125 ) -> Result<()> {
126 require_scope("delete_all", user_id, agent_id, run_id).map_err(|_| {
127 Mem7Error::Config(
128 "delete_all requires at least one of user_id, agent_id, or run_id; use reset() to clear all memories"
129 .into(),
130 )
131 })?;
132
133 let filter = MemoryFilter::from_session(user_id, agent_id, run_id);
134 let entries = self.vector_index.list(Some(&filter), None).await?;
135
136 if let Some(gp) = &self.graph_pipeline {
137 gp.store().delete_all(&filter).await?;
138 }
139
140 for (id, payload) in entries {
141 let old_text = payload
142 .get("text")
143 .and_then(|v| v.as_str())
144 .map(String::from);
145 let mut audit = payload_to_event_metadata(&payload);
146 audit.is_deleted = true;
147 self.vector_index.delete(&id).await?;
148 self.history
149 .add_event(id, old_text.as_deref(), None, MemoryAction::Delete, audit)
150 .await?;
151 }
152
153 Ok(())
154 }
155
156 #[instrument(skip(self))]
157 pub async fn history(&self, memory_id: Uuid) -> Result<Vec<MemoryEvent>> {
158 self.history.get_history(memory_id).await
159 }
160
161 #[instrument(skip(self))]
162 pub async fn reset(&self) -> Result<()> {
163 self.vector_index.reset().await?;
164 self.history.reset().await?;
165 if let Some(gp) = &self.graph_pipeline {
166 gp.store().reset().await?;
167 }
168 info!("MemoryEngine reset");
169 Ok(())
170 }
171}