mem0_rust/history/
sqlite.rs1use crate::models::{EventType, HistoryEntry};
2use chrono::{DateTime, Utc};
3use rusqlite::{params, Connection};
4use std::path::Path;
5use std::sync::{Arc, Mutex};
6use uuid::Uuid;
7use crate::errors::MemoryError;
8
9pub struct HistoryManager {
10 conn: Arc<Mutex<Connection>>,
11}
12
13impl HistoryManager {
14 pub fn new(path: impl AsRef<Path>) -> Result<Self, MemoryError> {
15 if let Some(parent) = path.as_ref().parent() {
17 std::fs::create_dir_all(parent).map_err(|e| MemoryError::History(e.to_string()))?;
18 }
19
20 let conn = Connection::open(path).map_err(|e| MemoryError::History(e.to_string()))?;
21
22 conn.execute(
24 "CREATE TABLE IF NOT EXISTS history (
25 id TEXT PRIMARY KEY,
26 memory_id TEXT NOT NULL,
27 previous_content TEXT,
28 new_content TEXT NOT NULL,
29 event TEXT NOT NULL,
30 timestamp TEXT NOT NULL,
31 user_id TEXT,
32 agent_id TEXT,
33 run_id TEXT
34 )",
35 [],
36 ).map_err(|e| MemoryError::History(e.to_string()))?;
37
38 Ok(Self {
39 conn: Arc::new(Mutex::new(conn)),
40 })
41 }
42
43 #[allow(clippy::too_many_arguments)]
44 pub fn add_history(
45 &self,
46 memory_id: Uuid,
47 previous_content: Option<String>,
48 new_content: String,
49 event: EventType,
50 timestamp: DateTime<Utc>,
51 user_id: Option<String>,
52 agent_id: Option<String>,
53 run_id: Option<String>,
54 ) -> Result<(), MemoryError> {
55 let conn = self.conn.lock().unwrap();
56 let id = Uuid::new_v4().to_string();
57
58 let event_str = serde_json::to_string(&event)
60 .map_err(|e| MemoryError::History(format!("Failed to serialize event: {}", e)))?;
61 let event_str = event_str.trim_matches('"');
62
63 conn.execute(
64 "INSERT INTO history (id, memory_id, previous_content, new_content, event, timestamp, user_id, agent_id, run_id)
65 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
66 params![
67 id,
68 memory_id.to_string(),
69 previous_content,
70 new_content,
71 event_str,
72 timestamp.to_rfc3339(),
73 user_id,
74 agent_id,
75 run_id,
76 ],
77 ).map_err(|e| MemoryError::History(e.to_string()))?;
78
79 Ok(())
80 }
81
82 pub fn get_history(&self, memory_id: Uuid) -> Result<Vec<HistoryEntry>, MemoryError> {
83 let conn = self.conn.lock().unwrap();
84
85 let mut stmt = conn.prepare(
86 "SELECT id, memory_id, previous_content, new_content, event, timestamp
87 FROM history WHERE memory_id = ?1 ORDER BY timestamp DESC"
88 ).map_err(|e| MemoryError::History(e.to_string()))?;
89
90 let rows = stmt.query_map(params![memory_id.to_string()], |row| {
91 let event_str: String = row.get(4)?;
92 let event = match event_str.as_str() {
93 "ADD" => EventType::Add,
94 "UPDATE" => EventType::Update,
95 "DELETE" => EventType::Delete,
96 _ => EventType::Noop,
97 };
98
99 let timestamp_str: String = row.get(5)?;
100 let timestamp = DateTime::parse_from_rfc3339(×tamp_str)
101 .map(|dt| dt.with_timezone(&Utc))
102 .unwrap_or(Utc::now());
103
104 Ok(HistoryEntry {
105 id: Uuid::parse_str(&row.get::<_, String>(0)?).unwrap_or_default(),
106 memory_id: Uuid::parse_str(&row.get::<_, String>(1)?).unwrap_or_default(),
107 previous_content: row.get(2)?,
108 new_content: row.get(3)?,
109 event,
110 timestamp,
111 })
112 }).map_err(|e| MemoryError::History(e.to_string()))?;
113
114 let mut history = Vec::new();
115 for row in rows {
116 history.push(row.map_err(|e| MemoryError::History(e.to_string()))?);
117 }
118
119 Ok(history)
120 }
121
122 pub fn reset(&self) -> Result<(), MemoryError> {
123 let conn = self.conn.lock().unwrap();
124 conn.execute("DELETE FROM history", []).map_err(|e| MemoryError::History(e.to_string()))?;
125 Ok(())
126 }
127}