agent_diva_core/session/
manager.rs1use super::store::Session;
4use std::collections::HashMap;
5use std::path::{Path, PathBuf};
6
7#[derive(Debug)]
9pub struct SessionManager {
10 sessions_dir: PathBuf,
12 cache: HashMap<String, Session>,
14}
15
16impl SessionManager {
17 pub fn new<P: AsRef<Path>>(workspace: P) -> Self {
19 let sessions_dir = workspace.as_ref().join("sessions");
20 Self {
21 sessions_dir,
22 cache: HashMap::new(),
23 }
24 }
25
26 pub fn get_or_create(&mut self, key: impl Into<String>) -> &mut Session {
28 let key = key.into();
29
30 if !self.cache.contains_key(&key) {
31 let session = self.load(&key).unwrap_or_else(|| Session::new(&key));
32 self.cache.insert(key.clone(), session);
33 }
34
35 self.cache.get_mut(&key).unwrap()
36 }
37
38 pub fn get(&self, key: &str) -> Option<&Session> {
40 self.cache.get(key)
41 }
42
43 pub fn get_or_load(&mut self, key: &str) -> Option<&Session> {
45 if !self.cache.contains_key(key) {
46 if let Some(session) = self.load(key) {
47 self.cache.insert(key.to_string(), session);
48 } else {
49 return None;
50 }
51 }
52 self.cache.get(key)
53 }
54
55 fn load(&self, key: &str) -> Option<Session> {
57 let path = self.session_path(key);
58
59 if !path.exists() {
60 return None;
61 }
62
63 let content = std::fs::read_to_string(&path).ok()?;
64 let mut messages = Vec::new();
65 let mut metadata = serde_json::Value::Object(serde_json::Map::new());
66 let mut created_at = None;
67 let mut last_consolidated: usize = 0;
68
69 for line in content.lines() {
70 let line = line.trim();
71 if line.is_empty() {
72 continue;
73 }
74
75 if let Ok(value) = serde_json::from_str::<serde_json::Value>(line) {
76 if value.get("_type").and_then(|v| v.as_str()) == Some("metadata") {
77 metadata = value.get("metadata").cloned().unwrap_or(metadata);
78 created_at = value
79 .get("created_at")
80 .and_then(|v| v.as_str())
81 .and_then(|s| s.parse().ok());
82 last_consolidated = value
83 .get("last_consolidated")
84 .and_then(|v| v.as_u64())
85 .unwrap_or(0) as usize;
86 } else if let Ok(msg) = serde_json::from_value::<super::store::ChatMessage>(value) {
87 messages.push(msg);
88 }
89 }
90 }
91
92 Some(Session {
93 key: key.to_string(),
94 messages,
95 created_at: created_at.unwrap_or_else(chrono::Utc::now),
96 updated_at: chrono::Utc::now(),
97 metadata,
98 last_consolidated,
99 })
100 }
101
102 pub fn save(&self, session: &Session) -> crate::Result<()> {
104 std::fs::create_dir_all(&self.sessions_dir)?;
105 let path = self.session_path(&session.key);
106
107 let mut lines = Vec::new();
108
109 let metadata = serde_json::json!({
111 "_type": "metadata",
112 "created_at": session.created_at.to_rfc3339(),
113 "updated_at": session.updated_at.to_rfc3339(),
114 "metadata": session.metadata,
115 "last_consolidated": session.last_consolidated,
116 });
117 lines.push(serde_json::to_string(&metadata)?);
118
119 for msg in &session.messages {
121 lines.push(serde_json::to_string(msg)?);
122 }
123
124 std::fs::write(&path, lines.join("\n"))?;
125 Ok(())
126 }
127
128 pub fn delete(&mut self, key: &str) -> crate::Result<bool> {
130 self.cache.remove(key);
131
132 let path = self.session_path(key);
133 if path.exists() {
134 std::fs::remove_file(&path)?;
135 Ok(true)
136 } else {
137 Ok(false)
138 }
139 }
140
141 pub fn archive_and_reset(&mut self, key: &str) -> crate::Result<bool> {
143 self.cache.remove(key);
144
145 let path = self.session_path(key);
146 if path.exists() {
147 let safe_key = key.replace([':', '/', '\\'], "_");
148 let timestamp = chrono::Utc::now().timestamp_millis();
149 let archive_filename = format!("{}.reset.{}.jsonl", safe_key, timestamp);
150 let archive_path = self.sessions_dir.join(archive_filename);
151
152 std::fs::rename(&path, &archive_path)?;
153 Ok(true)
154 } else {
155 Ok(false)
156 }
157 }
158
159 pub fn list_sessions(&self) -> Vec<SessionInfo> {
161 let mut sessions = Vec::new();
162
163 if let Ok(entries) = std::fs::read_dir(&self.sessions_dir) {
164 for entry in entries.flatten() {
165 if let Some(name) = entry.file_name().to_str() {
166 if name.ends_with(".jsonl") {
167 let key = name.trim_end_matches(".jsonl").replace('_', ":");
168 if let Ok(content) = std::fs::read_to_string(entry.path()) {
169 if let Some(first_line) = content.lines().next() {
170 if let Ok(value) =
171 serde_json::from_str::<serde_json::Value>(first_line)
172 {
173 if value.get("_type").and_then(|v| v.as_str())
174 == Some("metadata")
175 {
176 sessions.push(SessionInfo {
177 key,
178 created_at: value
179 .get("created_at")
180 .and_then(|v| v.as_str())
181 .map(|s| s.to_string()),
182 updated_at: value
183 .get("updated_at")
184 .and_then(|v| v.as_str())
185 .map(|s| s.to_string()),
186 path: entry.path().to_string_lossy().to_string(),
187 });
188 }
189 }
190 }
191 }
192 }
193 }
194 }
195 }
196
197 sessions.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
198 sessions
199 }
200
201 fn session_path(&self, key: &str) -> PathBuf {
203 let safe_key = key.replace([':', '/', '\\'], "_");
204 self.sessions_dir.join(format!("{}.jsonl", safe_key))
205 }
206}
207
208#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
210pub struct SessionInfo {
211 pub key: String,
213 pub created_at: Option<String>,
215 pub updated_at: Option<String>,
217 pub path: String,
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224 use tempfile::TempDir;
225
226 #[test]
227 fn test_session_manager_creation() {
228 let temp_dir = TempDir::new().unwrap();
229 let manager = SessionManager::new(temp_dir.path());
230 assert!(manager.list_sessions().is_empty());
231 }
232
233 #[test]
234 fn test_get_or_create_session() {
235 let temp_dir = TempDir::new().unwrap();
236 let mut manager = SessionManager::new(temp_dir.path());
237
238 let session = manager.get_or_create("telegram:123");
239 session.add_message("user", "Hello");
240
241 assert_eq!(session.messages.len(), 1);
242 assert_eq!(session.key, "telegram:123");
243 }
244
245 #[test]
246 fn test_save_and_load_session() {
247 let temp_dir = TempDir::new().unwrap();
248 let mut manager = SessionManager::new(temp_dir.path());
249
250 let session = manager.get_or_create("test:456");
252 session.add_message("user", "Test message");
253 let key = session.key.clone();
254
255 manager.save(&manager.cache.get(&key).unwrap()).unwrap();
257
258 manager.cache.clear();
260 let session = manager.get_or_create("test:456");
261
262 assert_eq!(session.messages.len(), 1);
263 assert_eq!(session.messages[0].content, "Test message");
264 }
265
266 #[test]
267 fn test_archive_and_reset_session() {
268 let temp_dir = TempDir::new().unwrap();
269 let mut manager = SessionManager::new(temp_dir.path());
270
271 let session = manager.get_or_create("archive:789");
273 session.add_message("user", "Message to be archived");
274 let key = session.key.clone();
275
276 manager.save(&manager.cache.get(&key).unwrap()).unwrap();
278
279 let archived = manager.archive_and_reset(&key).unwrap();
281 assert!(archived);
282
283 assert!(manager.cache.get(&key).is_none());
285
286 let new_session = manager.get_or_create("archive:789");
288 assert_eq!(new_session.messages.len(), 0);
289
290 let mut reset_files_count = 0;
292 for entry in std::fs::read_dir(temp_dir.path().join("sessions")).unwrap() {
293 let entry = entry.unwrap();
294 let file_name = entry.file_name().into_string().unwrap();
295 if file_name.contains(".reset.") {
296 reset_files_count += 1;
297 } else if file_name == "archive_789.jsonl" {
298 panic!("Original file still exists!");
300 }
301 }
302 assert_eq!(
303 reset_files_count, 1,
304 "Should have exactly one archived file"
305 );
306 }
307
308 #[test]
309 fn test_get_or_load_cache_hit() {
310 let temp_dir = TempDir::new().unwrap();
311 let mut manager = SessionManager::new(temp_dir.path());
312
313 let session = manager.get_or_create("gui:chat-1");
314 session.add_message("user", "Hello");
315 let key = session.key.clone();
316 manager.save(&manager.cache.get(&key).unwrap()).unwrap();
317
318 let loaded = manager.get_or_load("gui:chat-1");
320 assert!(loaded.is_some());
321 assert_eq!(loaded.unwrap().key, "gui:chat-1");
322 assert_eq!(loaded.unwrap().messages.len(), 1);
323 }
324
325 #[test]
326 fn test_get_or_load_disk_exists_cache_miss() {
327 let temp_dir = TempDir::new().unwrap();
328 let mut manager = SessionManager::new(temp_dir.path());
329
330 let session = manager.get_or_create("gui:chat-2");
332 session.add_message("user", "From disk");
333 let key = session.key.clone();
334 manager.save(&manager.cache.get(&key).unwrap()).unwrap();
335
336 manager.cache.clear();
338
339 let loaded = manager.get_or_load("gui:chat-2");
341 assert!(loaded.is_some());
342 assert_eq!(loaded.unwrap().key, "gui:chat-2");
343 assert_eq!(loaded.unwrap().messages[0].content, "From disk");
344 }
345
346 #[test]
347 fn test_get_or_load_disk_not_exists() {
348 let temp_dir = TempDir::new().unwrap();
349 let mut manager = SessionManager::new(temp_dir.path());
350
351 let loaded = manager.get_or_load("gui:nonexistent");
353 assert!(loaded.is_none());
354 }
355}