Skip to main content

mem7_store/
crud.rs

1use mem7_core::{MemoryAction, MemoryEvent, MemoryFilter, MemoryItem};
2use mem7_datetime::now_iso;
3use mem7_error::{Mem7Error, Result};
4use tracing::{info, instrument};
5use uuid::Uuid;
6
7use crate::engine::MemoryEngine;
8use crate::payload::{payload_to_event_metadata, payload_to_memory_item};
9use crate::require_scope;
10
11impl MemoryEngine {
12    #[instrument(skip(self))]
13    pub async fn get(&self, memory_id: Uuid) -> Result<MemoryItem> {
14        let entry = self
15            .vector_index
16            .get(&memory_id)
17            .await?
18            .ok_or_else(|| Mem7Error::NotFound(format!("memory {memory_id}")))?;
19
20        Ok(payload_to_memory_item(memory_id, &entry.1, None))
21    }
22
23    #[instrument(skip(self, filters))]
24    pub async fn get_all(
25        &self,
26        user_id: Option<&str>,
27        agent_id: Option<&str>,
28        run_id: Option<&str>,
29        filters: Option<&serde_json::Value>,
30        limit: Option<usize>,
31    ) -> Result<Vec<MemoryItem>> {
32        require_scope("get_all", user_id, agent_id, run_id)?;
33        let filter = MemoryFilter {
34            metadata: filters.cloned(),
35            ..MemoryFilter::from_session(user_id, agent_id, run_id)
36        };
37
38        let entries = self.vector_index.list(Some(&filter), limit).await?;
39
40        Ok(entries
41            .into_iter()
42            .map(|(id, payload)| payload_to_memory_item(id, &payload, None))
43            .collect())
44    }
45
46    #[instrument(skip(self, new_text))]
47    pub async fn update(&self, memory_id: Uuid, new_text: &str) -> Result<()> {
48        let entry = self
49            .vector_index
50            .get(&memory_id)
51            .await?
52            .ok_or_else(|| Mem7Error::NotFound(format!("memory {memory_id}")))?;
53
54        let old_text = entry
55            .1
56            .get("text")
57            .and_then(|v| v.as_str())
58            .map(String::from);
59
60        let vecs = self.embedder.embed(&[new_text.to_string()]).await?;
61        let vec = vecs.into_iter().next().unwrap_or_default();
62
63        let mut payload = entry.1.clone();
64        payload["text"] = serde_json::Value::String(new_text.to_string());
65        payload["updated_at"] = serde_json::Value::String(now_iso());
66        let audit = payload_to_event_metadata(&payload);
67
68        self.vector_index
69            .update(&memory_id, Some(&vec), Some(payload))
70            .await?;
71
72        self.history
73            .add_event(
74                memory_id,
75                old_text.as_deref(),
76                Some(new_text),
77                MemoryAction::Update,
78                audit,
79            )
80            .await?;
81
82        Ok(())
83    }
84
85    #[instrument(skip(self))]
86    pub async fn delete(&self, memory_id: Uuid) -> Result<()> {
87        let entry = self.vector_index.get(&memory_id).await?;
88        let old_text = entry
89            .as_ref()
90            .and_then(|(_, p)| p.get("text").and_then(|v| v.as_str()))
91            .map(String::from);
92        let audit = entry
93            .as_ref()
94            .map(|(_, payload)| {
95                let mut metadata = payload_to_event_metadata(payload);
96                metadata.is_deleted = true;
97                metadata
98            })
99            .unwrap_or_else(|| mem7_core::MemoryEventMetadata {
100                is_deleted: true,
101                ..Default::default()
102            });
103
104        self.vector_index.delete(&memory_id).await?;
105
106        self.history
107            .add_event(
108                memory_id,
109                old_text.as_deref(),
110                None,
111                MemoryAction::Delete,
112                audit,
113            )
114            .await?;
115
116        Ok(())
117    }
118
119    #[instrument(skip(self))]
120    pub async fn delete_all(
121        &self,
122        user_id: Option<&str>,
123        agent_id: Option<&str>,
124        run_id: Option<&str>,
125    ) -> Result<()> {
126        require_scope("delete_all", user_id, agent_id, run_id).map_err(|_| {
127            Mem7Error::Config(
128                "delete_all requires at least one of user_id, agent_id, or run_id; use reset() to clear all memories"
129                    .into(),
130            )
131        })?;
132
133        let filter = MemoryFilter::from_session(user_id, agent_id, run_id);
134        let entries = self.vector_index.list(Some(&filter), None).await?;
135
136        if let Some(gp) = &self.graph_pipeline {
137            gp.store().delete_all(&filter).await?;
138        }
139
140        for (id, payload) in entries {
141            let old_text = payload
142                .get("text")
143                .and_then(|v| v.as_str())
144                .map(String::from);
145            let mut audit = payload_to_event_metadata(&payload);
146            audit.is_deleted = true;
147            self.vector_index.delete(&id).await?;
148            self.history
149                .add_event(id, old_text.as_deref(), None, MemoryAction::Delete, audit)
150                .await?;
151        }
152
153        Ok(())
154    }
155
156    #[instrument(skip(self))]
157    pub async fn history(&self, memory_id: Uuid) -> Result<Vec<MemoryEvent>> {
158        self.history.get_history(memory_id).await
159    }
160
161    #[instrument(skip(self))]
162    pub async fn reset(&self) -> Result<()> {
163        self.vector_index.reset().await?;
164        self.history.reset().await?;
165        if let Some(gp) = &self.graph_pipeline {
166            gp.store().reset().await?;
167        }
168        info!("MemoryEngine reset");
169        Ok(())
170    }
171}