Skip to main content

agent_diva_core/session/
manager.rs

1//! Session manager for handling multiple sessions
2
3use super::store::Session;
4use std::collections::HashMap;
5use std::path::{Path, PathBuf};
6
7/// Manages conversation sessions
8#[derive(Debug)]
9pub struct SessionManager {
10    /// Sessions directory
11    sessions_dir: PathBuf,
12    /// In-memory cache of sessions
13    cache: HashMap<String, Session>,
14}
15
16impl SessionManager {
17    /// Create a new session manager
18    pub fn new<P: AsRef<Path>>(workspace: P) -> Self {
19        let sessions_dir = workspace.as_ref().join("sessions");
20        Self {
21            sessions_dir,
22            cache: HashMap::new(),
23        }
24    }
25
26    /// Get or create a session
27    pub fn get_or_create(&mut self, key: impl Into<String>) -> &mut Session {
28        let key = key.into();
29
30        if !self.cache.contains_key(&key) {
31            let session = self.load(&key).unwrap_or_else(|| Session::new(&key));
32            self.cache.insert(key.clone(), session);
33        }
34
35        self.cache.get_mut(&key).unwrap()
36    }
37
38    /// Get a session if it exists
39    pub fn get(&self, key: &str) -> Option<&Session> {
40        self.cache.get(key)
41    }
42
43    /// Get a session if it exists (cache or disk). Does not create.
44    pub fn get_or_load(&mut self, key: &str) -> Option<&Session> {
45        if !self.cache.contains_key(key) {
46            if let Some(session) = self.load(key) {
47                self.cache.insert(key.to_string(), session);
48            } else {
49                return None;
50            }
51        }
52        self.cache.get(key)
53    }
54
55    /// Load a session from disk
56    fn load(&self, key: &str) -> Option<Session> {
57        let path = self.session_path(key);
58
59        if !path.exists() {
60            return None;
61        }
62
63        let content = std::fs::read_to_string(&path).ok()?;
64        let mut messages = Vec::new();
65        let mut metadata = serde_json::Value::Object(serde_json::Map::new());
66        let mut created_at = None;
67        let mut last_consolidated: usize = 0;
68
69        for line in content.lines() {
70            let line = line.trim();
71            if line.is_empty() {
72                continue;
73            }
74
75            if let Ok(value) = serde_json::from_str::<serde_json::Value>(line) {
76                if value.get("_type").and_then(|v| v.as_str()) == Some("metadata") {
77                    metadata = value.get("metadata").cloned().unwrap_or(metadata);
78                    created_at = value
79                        .get("created_at")
80                        .and_then(|v| v.as_str())
81                        .and_then(|s| s.parse().ok());
82                    last_consolidated = value
83                        .get("last_consolidated")
84                        .and_then(|v| v.as_u64())
85                        .unwrap_or(0) as usize;
86                } else if let Ok(msg) = serde_json::from_value::<super::store::ChatMessage>(value) {
87                    messages.push(msg);
88                }
89            }
90        }
91
92        Some(Session {
93            key: key.to_string(),
94            messages,
95            created_at: created_at.unwrap_or_else(chrono::Utc::now),
96            updated_at: chrono::Utc::now(),
97            metadata,
98            last_consolidated,
99        })
100    }
101
102    /// Save a session to disk
103    pub fn save(&self, session: &Session) -> crate::Result<()> {
104        std::fs::create_dir_all(&self.sessions_dir)?;
105        let path = self.session_path(&session.key);
106
107        let mut lines = Vec::new();
108
109        // Write metadata
110        let metadata = serde_json::json!({
111            "_type": "metadata",
112            "created_at": session.created_at.to_rfc3339(),
113            "updated_at": session.updated_at.to_rfc3339(),
114            "metadata": session.metadata,
115            "last_consolidated": session.last_consolidated,
116        });
117        lines.push(serde_json::to_string(&metadata)?);
118
119        // Write messages
120        for msg in &session.messages {
121            lines.push(serde_json::to_string(msg)?);
122        }
123
124        std::fs::write(&path, lines.join("\n"))?;
125        Ok(())
126    }
127
128    /// Delete a session
129    pub fn delete(&mut self, key: &str) -> crate::Result<bool> {
130        self.cache.remove(key);
131
132        let path = self.session_path(key);
133        if path.exists() {
134            std::fs::remove_file(&path)?;
135            Ok(true)
136        } else {
137            Ok(false)
138        }
139    }
140
141    /// Archive an existing session and clear it from memory, forcing a fresh start
142    pub fn archive_and_reset(&mut self, key: &str) -> crate::Result<bool> {
143        self.cache.remove(key);
144
145        let path = self.session_path(key);
146        if path.exists() {
147            let safe_key = key.replace([':', '/', '\\'], "_");
148            let timestamp = chrono::Utc::now().timestamp_millis();
149            let archive_filename = format!("{}.reset.{}.jsonl", safe_key, timestamp);
150            let archive_path = self.sessions_dir.join(archive_filename);
151
152            std::fs::rename(&path, &archive_path)?;
153            Ok(true)
154        } else {
155            Ok(false)
156        }
157    }
158
159    /// List all sessions
160    pub fn list_sessions(&self) -> Vec<SessionInfo> {
161        let mut sessions = Vec::new();
162
163        if let Ok(entries) = std::fs::read_dir(&self.sessions_dir) {
164            for entry in entries.flatten() {
165                if let Some(name) = entry.file_name().to_str() {
166                    if name.ends_with(".jsonl") {
167                        let key = name.trim_end_matches(".jsonl").replace('_', ":");
168                        if let Ok(content) = std::fs::read_to_string(entry.path()) {
169                            if let Some(first_line) = content.lines().next() {
170                                if let Ok(value) =
171                                    serde_json::from_str::<serde_json::Value>(first_line)
172                                {
173                                    if value.get("_type").and_then(|v| v.as_str())
174                                        == Some("metadata")
175                                    {
176                                        sessions.push(SessionInfo {
177                                            key,
178                                            created_at: value
179                                                .get("created_at")
180                                                .and_then(|v| v.as_str())
181                                                .map(|s| s.to_string()),
182                                            updated_at: value
183                                                .get("updated_at")
184                                                .and_then(|v| v.as_str())
185                                                .map(|s| s.to_string()),
186                                            path: entry.path().to_string_lossy().to_string(),
187                                        });
188                                    }
189                                }
190                            }
191                        }
192                    }
193                }
194            }
195        }
196
197        sessions.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
198        sessions
199    }
200
201    /// Get the file path for a session
202    fn session_path(&self, key: &str) -> PathBuf {
203        let safe_key = key.replace([':', '/', '\\'], "_");
204        self.sessions_dir.join(format!("{}.jsonl", safe_key))
205    }
206}
207
208/// Information about a session
209#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
210pub struct SessionInfo {
211    /// Session key
212    pub key: String,
213    /// Creation time
214    pub created_at: Option<String>,
215    /// Last update time
216    pub updated_at: Option<String>,
217    /// File path
218    pub path: String,
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224    use tempfile::TempDir;
225
226    #[test]
227    fn test_session_manager_creation() {
228        let temp_dir = TempDir::new().unwrap();
229        let manager = SessionManager::new(temp_dir.path());
230        assert!(manager.list_sessions().is_empty());
231    }
232
233    #[test]
234    fn test_get_or_create_session() {
235        let temp_dir = TempDir::new().unwrap();
236        let mut manager = SessionManager::new(temp_dir.path());
237
238        let session = manager.get_or_create("telegram:123");
239        session.add_message("user", "Hello");
240
241        assert_eq!(session.messages.len(), 1);
242        assert_eq!(session.key, "telegram:123");
243    }
244
245    #[test]
246    fn test_save_and_load_session() {
247        let temp_dir = TempDir::new().unwrap();
248        let mut manager = SessionManager::new(temp_dir.path());
249
250        // Create and modify session
251        let session = manager.get_or_create("test:456");
252        session.add_message("user", "Test message");
253        let key = session.key.clone();
254
255        // Save the session
256        manager.save(&manager.cache.get(&key).unwrap()).unwrap();
257
258        // Clear cache and reload
259        manager.cache.clear();
260        let session = manager.get_or_create("test:456");
261
262        assert_eq!(session.messages.len(), 1);
263        assert_eq!(session.messages[0].content, "Test message");
264    }
265
266    #[test]
267    fn test_archive_and_reset_session() {
268        let temp_dir = TempDir::new().unwrap();
269        let mut manager = SessionManager::new(temp_dir.path());
270
271        // Create and modify session
272        let session = manager.get_or_create("archive:789");
273        session.add_message("user", "Message to be archived");
274        let key = session.key.clone();
275
276        // Save it so it exists on disk
277        manager.save(&manager.cache.get(&key).unwrap()).unwrap();
278
279        // Archive it
280        let archived = manager.archive_and_reset(&key).unwrap();
281        assert!(archived);
282
283        // Check it's removed from cache
284        assert!(manager.cache.get(&key).is_none());
285
286        // Get or create should now be empty
287        let new_session = manager.get_or_create("archive:789");
288        assert_eq!(new_session.messages.len(), 0);
289
290        // Check if the original file is gone but there's a file with .reset. in it
291        let mut reset_files_count = 0;
292        for entry in std::fs::read_dir(temp_dir.path().join("sessions")).unwrap() {
293            let entry = entry.unwrap();
294            let file_name = entry.file_name().into_string().unwrap();
295            if file_name.contains(".reset.") {
296                reset_files_count += 1;
297            } else if file_name == "archive_789.jsonl" {
298                // Should not find the active file since it wasn't saved yet
299                panic!("Original file still exists!");
300            }
301        }
302        assert_eq!(
303            reset_files_count, 1,
304            "Should have exactly one archived file"
305        );
306    }
307
308    #[test]
309    fn test_get_or_load_cache_hit() {
310        let temp_dir = TempDir::new().unwrap();
311        let mut manager = SessionManager::new(temp_dir.path());
312
313        let session = manager.get_or_create("gui:chat-1");
314        session.add_message("user", "Hello");
315        let key = session.key.clone();
316        manager.save(&manager.cache.get(&key).unwrap()).unwrap();
317
318        // Session is in cache; get_or_load should return it
319        let loaded = manager.get_or_load("gui:chat-1");
320        assert!(loaded.is_some());
321        assert_eq!(loaded.unwrap().key, "gui:chat-1");
322        assert_eq!(loaded.unwrap().messages.len(), 1);
323    }
324
325    #[test]
326    fn test_get_or_load_disk_exists_cache_miss() {
327        let temp_dir = TempDir::new().unwrap();
328        let mut manager = SessionManager::new(temp_dir.path());
329
330        // Create and save session
331        let session = manager.get_or_create("gui:chat-2");
332        session.add_message("user", "From disk");
333        let key = session.key.clone();
334        manager.save(&manager.cache.get(&key).unwrap()).unwrap();
335
336        // Clear cache to simulate "not loaded this run"
337        manager.cache.clear();
338
339        // get_or_load should load from disk
340        let loaded = manager.get_or_load("gui:chat-2");
341        assert!(loaded.is_some());
342        assert_eq!(loaded.unwrap().key, "gui:chat-2");
343        assert_eq!(loaded.unwrap().messages[0].content, "From disk");
344    }
345
346    #[test]
347    fn test_get_or_load_disk_not_exists() {
348        let temp_dir = TempDir::new().unwrap();
349        let mut manager = SessionManager::new(temp_dir.path());
350
351        // Session never created; no file on disk
352        let loaded = manager.get_or_load("gui:nonexistent");
353        assert!(loaded.is_none());
354    }
355}