Skip to main content

mem7_store/
crud.rs

1use mem7_core::{MemoryAction, MemoryEvent, MemoryFilter, MemoryItem};
2use mem7_datetime::now_iso;
3use mem7_error::{Mem7Error, Result};
4use tracing::{info, instrument};
5use uuid::Uuid;
6
7use crate::engine::MemoryEngine;
8use crate::payload::payload_to_memory_item;
9
10impl MemoryEngine {
11    #[instrument(skip(self))]
12    pub async fn get(&self, memory_id: Uuid) -> Result<MemoryItem> {
13        let entry = self
14            .vector_index
15            .get(&memory_id)
16            .await?
17            .ok_or_else(|| Mem7Error::NotFound(format!("memory {memory_id}")))?;
18
19        Ok(payload_to_memory_item(memory_id, &entry.1, None))
20    }
21
22    #[instrument(skip(self, filters))]
23    pub async fn get_all(
24        &self,
25        user_id: Option<&str>,
26        agent_id: Option<&str>,
27        run_id: Option<&str>,
28        filters: Option<&serde_json::Value>,
29        limit: Option<usize>,
30    ) -> Result<Vec<MemoryItem>> {
31        let filter = MemoryFilter {
32            metadata: filters.cloned(),
33            ..MemoryFilter::from_session(user_id, agent_id, run_id)
34        };
35
36        let entries = self.vector_index.list(Some(&filter), limit).await?;
37
38        Ok(entries
39            .into_iter()
40            .map(|(id, payload)| payload_to_memory_item(id, &payload, None))
41            .collect())
42    }
43
44    #[instrument(skip(self, new_text))]
45    pub async fn update(&self, memory_id: Uuid, new_text: &str) -> Result<()> {
46        let entry = self
47            .vector_index
48            .get(&memory_id)
49            .await?
50            .ok_or_else(|| Mem7Error::NotFound(format!("memory {memory_id}")))?;
51
52        let old_text = entry
53            .1
54            .get("text")
55            .and_then(|v| v.as_str())
56            .map(String::from);
57
58        let vecs = self.embedder.embed(&[new_text.to_string()]).await?;
59        let vec = vecs.into_iter().next().unwrap_or_default();
60
61        let mut payload = entry.1.clone();
62        payload["text"] = serde_json::Value::String(new_text.to_string());
63        payload["updated_at"] = serde_json::Value::String(now_iso());
64
65        self.vector_index
66            .update(&memory_id, Some(&vec), Some(payload))
67            .await?;
68
69        self.history
70            .add_event(
71                memory_id,
72                old_text.as_deref(),
73                Some(new_text),
74                MemoryAction::Update,
75            )
76            .await?;
77
78        Ok(())
79    }
80
81    #[instrument(skip(self))]
82    pub async fn delete(&self, memory_id: Uuid) -> Result<()> {
83        let entry = self.vector_index.get(&memory_id).await?;
84        let old_text = entry
85            .as_ref()
86            .and_then(|(_, p)| p.get("text").and_then(|v| v.as_str()))
87            .map(String::from);
88
89        self.vector_index.delete(&memory_id).await?;
90
91        self.history
92            .add_event(memory_id, old_text.as_deref(), None, MemoryAction::Delete)
93            .await?;
94
95        Ok(())
96    }
97
98    #[instrument(skip(self))]
99    pub async fn delete_all(
100        &self,
101        user_id: Option<&str>,
102        agent_id: Option<&str>,
103        run_id: Option<&str>,
104    ) -> Result<()> {
105        let filter = MemoryFilter::from_session(user_id, agent_id, run_id);
106
107        let entries = self.vector_index.list(Some(&filter), None).await?;
108        for (id, _) in entries {
109            self.vector_index.delete(&id).await?;
110        }
111
112        if let Some(gp) = &self.graph_pipeline {
113            gp.store().delete_all(&filter).await?;
114        }
115
116        Ok(())
117    }
118
119    #[instrument(skip(self))]
120    pub async fn history(&self, memory_id: Uuid) -> Result<Vec<MemoryEvent>> {
121        self.history.get_history(memory_id).await
122    }
123
124    #[instrument(skip(self))]
125    pub async fn reset(&self) -> Result<()> {
126        self.vector_index.reset().await?;
127        self.history.reset().await?;
128        if let Some(gp) = &self.graph_pipeline {
129            gp.store().reset().await?;
130        }
131        info!("MemoryEngine reset");
132        Ok(())
133    }
134}