Skip to main content

construct/channels/
session_store.rs

1//! JSONL-based session persistence for channel conversations.
2//!
3//! Each session (keyed by `channel_sender` or `channel_thread_sender`) is stored
4//! as an append-only JSONL file in `{workspace}/sessions/`. Messages are appended
5//! one-per-line as JSON, never modifying old lines. On daemon restart, sessions
6//! are loaded from disk to restore conversation context.
7
8use crate::channels::session_backend::SessionBackend;
9use crate::providers::traits::ChatMessage;
10use std::io::{BufRead, Write};
11use std::path::{Path, PathBuf};
12
13/// Append-only JSONL session store for channel conversations.
14pub struct SessionStore {
15    sessions_dir: PathBuf,
16}
17
18impl SessionStore {
19    /// Create a new session store, ensuring the sessions directory exists.
20    pub fn new(workspace_dir: &Path) -> std::io::Result<Self> {
21        let sessions_dir = workspace_dir.join("sessions");
22        std::fs::create_dir_all(&sessions_dir)?;
23        Ok(Self { sessions_dir })
24    }
25
26    /// Compute the file path for a session key, sanitizing for filesystem safety.
27    fn session_path(&self, session_key: &str) -> PathBuf {
28        let safe_key: String = session_key
29            .chars()
30            .map(|c| {
31                if c.is_alphanumeric() || c == '_' || c == '-' {
32                    c
33                } else {
34                    '_'
35                }
36            })
37            .collect();
38        self.sessions_dir.join(format!("{safe_key}.jsonl"))
39    }
40
41    /// Load all messages for a session from its JSONL file.
42    /// Returns an empty vec if the file does not exist or is unreadable.
43    pub fn load(&self, session_key: &str) -> Vec<ChatMessage> {
44        let path = self.session_path(session_key);
45        let file = match std::fs::File::open(&path) {
46            Ok(f) => f,
47            Err(_) => return Vec::new(),
48        };
49
50        let reader = std::io::BufReader::new(file);
51        let mut messages = Vec::new();
52
53        for line in reader.lines() {
54            let Ok(line) = line else { continue };
55            let trimmed = line.trim();
56            if trimmed.is_empty() {
57                continue;
58            }
59            if let Ok(msg) = serde_json::from_str::<ChatMessage>(trimmed) {
60                messages.push(msg);
61            }
62        }
63
64        messages
65    }
66
67    /// Append a single message to the session JSONL file.
68    pub fn append(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<()> {
69        let path = self.session_path(session_key);
70        let mut file = std::fs::OpenOptions::new()
71            .create(true)
72            .append(true)
73            .open(&path)?;
74
75        let json = serde_json::to_string(message)
76            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
77
78        writeln!(file, "{json}")?;
79        Ok(())
80    }
81
82    /// Remove the last message from a session's JSONL file.
83    ///
84    /// Rewrite approach: load all messages, drop the last, rewrite. This is
85    /// O(n) but rollbacks are rare.
86    pub fn remove_last(&self, session_key: &str) -> std::io::Result<bool> {
87        let mut messages = self.load(session_key);
88        if messages.is_empty() {
89            return Ok(false);
90        }
91        messages.pop();
92        self.rewrite(session_key, &messages)?;
93        Ok(true)
94    }
95
96    /// Compact a session file by rewriting only valid messages (removes corrupt lines).
97    pub fn compact(&self, session_key: &str) -> std::io::Result<()> {
98        let messages = self.load(session_key);
99        self.rewrite(session_key, &messages)
100    }
101
102    fn rewrite(&self, session_key: &str, messages: &[ChatMessage]) -> std::io::Result<()> {
103        let path = self.session_path(session_key);
104        let mut file = std::fs::File::create(&path)?;
105        for msg in messages {
106            let json = serde_json::to_string(msg)
107                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
108            writeln!(file, "{json}")?;
109        }
110        Ok(())
111    }
112
113    /// Delete a session's JSONL file. Returns `true` if the file existed.
114    pub fn delete_session(&self, session_key: &str) -> std::io::Result<bool> {
115        let path = self.session_path(session_key);
116        if !path.exists() {
117            return Ok(false);
118        }
119        std::fs::remove_file(&path)?;
120        Ok(true)
121    }
122
123    /// List all session keys that have files on disk.
124    pub fn list_sessions(&self) -> Vec<String> {
125        let entries = match std::fs::read_dir(&self.sessions_dir) {
126            Ok(e) => e,
127            Err(_) => return Vec::new(),
128        };
129
130        entries
131            .filter_map(|entry| {
132                let entry = entry.ok()?;
133                let name = entry.file_name().into_string().ok()?;
134                name.strip_suffix(".jsonl").map(String::from)
135            })
136            .collect()
137    }
138}
139
140impl SessionBackend for SessionStore {
141    fn load(&self, session_key: &str) -> Vec<ChatMessage> {
142        self.load(session_key)
143    }
144
145    fn append(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<()> {
146        self.append(session_key, message)
147    }
148
149    fn remove_last(&self, session_key: &str) -> std::io::Result<bool> {
150        self.remove_last(session_key)
151    }
152
153    fn list_sessions(&self) -> Vec<String> {
154        self.list_sessions()
155    }
156
157    fn compact(&self, session_key: &str) -> std::io::Result<()> {
158        self.compact(session_key)
159    }
160
161    fn delete_session(&self, session_key: &str) -> std::io::Result<bool> {
162        self.delete_session(session_key)
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169    use tempfile::TempDir;
170
171    #[test]
172    fn round_trip_append_and_load() {
173        let tmp = TempDir::new().unwrap();
174        let store = SessionStore::new(tmp.path()).unwrap();
175
176        store
177            .append("telegram_user123", &ChatMessage::user("hello"))
178            .unwrap();
179        store
180            .append("telegram_user123", &ChatMessage::assistant("hi there"))
181            .unwrap();
182
183        let messages = store.load("telegram_user123");
184        assert_eq!(messages.len(), 2);
185        assert_eq!(messages[0].role, "user");
186        assert_eq!(messages[0].content, "hello");
187        assert_eq!(messages[1].role, "assistant");
188        assert_eq!(messages[1].content, "hi there");
189    }
190
191    #[test]
192    fn load_nonexistent_session_returns_empty() {
193        let tmp = TempDir::new().unwrap();
194        let store = SessionStore::new(tmp.path()).unwrap();
195
196        let messages = store.load("nonexistent");
197        assert!(messages.is_empty());
198    }
199
200    #[test]
201    fn key_sanitization() {
202        let tmp = TempDir::new().unwrap();
203        let store = SessionStore::new(tmp.path()).unwrap();
204
205        // Keys with special chars should be sanitized
206        store
207            .append("slack/thread:123/user", &ChatMessage::user("test"))
208            .unwrap();
209
210        let messages = store.load("slack/thread:123/user");
211        assert_eq!(messages.len(), 1);
212    }
213
214    #[test]
215    fn list_sessions_returns_keys() {
216        let tmp = TempDir::new().unwrap();
217        let store = SessionStore::new(tmp.path()).unwrap();
218
219        store
220            .append("telegram_alice", &ChatMessage::user("hi"))
221            .unwrap();
222        store
223            .append("discord_bob", &ChatMessage::user("hey"))
224            .unwrap();
225
226        let mut sessions = store.list_sessions();
227        sessions.sort();
228        assert_eq!(sessions.len(), 2);
229        assert!(sessions.contains(&"discord_bob".to_string()));
230        assert!(sessions.contains(&"telegram_alice".to_string()));
231    }
232
233    #[test]
234    fn append_is_truly_append_only() {
235        let tmp = TempDir::new().unwrap();
236        let store = SessionStore::new(tmp.path()).unwrap();
237        let key = "test_session";
238
239        store.append(key, &ChatMessage::user("msg1")).unwrap();
240        store.append(key, &ChatMessage::user("msg2")).unwrap();
241
242        // Read raw file to verify append-only format
243        let path = store.session_path(key);
244        let content = std::fs::read_to_string(&path).unwrap();
245        let lines: Vec<&str> = content.trim().lines().collect();
246        assert_eq!(lines.len(), 2);
247    }
248
249    #[test]
250    fn remove_last_drops_final_message() {
251        let tmp = TempDir::new().unwrap();
252        let store = SessionStore::new(tmp.path()).unwrap();
253
254        store
255            .append("rm_test", &ChatMessage::user("first"))
256            .unwrap();
257        store
258            .append("rm_test", &ChatMessage::user("second"))
259            .unwrap();
260
261        assert!(store.remove_last("rm_test").unwrap());
262        let messages = store.load("rm_test");
263        assert_eq!(messages.len(), 1);
264        assert_eq!(messages[0].content, "first");
265    }
266
267    #[test]
268    fn remove_last_empty_returns_false() {
269        let tmp = TempDir::new().unwrap();
270        let store = SessionStore::new(tmp.path()).unwrap();
271        assert!(!store.remove_last("nonexistent").unwrap());
272    }
273
274    #[test]
275    fn compact_removes_corrupt_lines() {
276        let tmp = TempDir::new().unwrap();
277        let store = SessionStore::new(tmp.path()).unwrap();
278        let key = "compact_test";
279
280        let path = store.session_path(key);
281        std::fs::create_dir_all(path.parent().unwrap()).unwrap();
282        let mut file = std::fs::File::create(&path).unwrap();
283        writeln!(file, r#"{{"role":"user","content":"ok"}}"#).unwrap();
284        writeln!(file, "corrupt line").unwrap();
285        writeln!(file, r#"{{"role":"assistant","content":"hi"}}"#).unwrap();
286
287        store.compact(key).unwrap();
288
289        let raw = std::fs::read_to_string(&path).unwrap();
290        assert_eq!(raw.trim().lines().count(), 2);
291    }
292
293    #[test]
294    fn session_backend_trait_works_via_dyn() {
295        let tmp = TempDir::new().unwrap();
296        let store = SessionStore::new(tmp.path()).unwrap();
297        let backend: &dyn SessionBackend = &store;
298
299        backend
300            .append("trait_test", &ChatMessage::user("hello"))
301            .unwrap();
302        let msgs = backend.load("trait_test");
303        assert_eq!(msgs.len(), 1);
304    }
305
306    #[test]
307    fn handles_corrupt_lines_gracefully() {
308        let tmp = TempDir::new().unwrap();
309        let store = SessionStore::new(tmp.path()).unwrap();
310        let key = "corrupt_test";
311
312        // Write valid message + corrupt line + valid message
313        let path = store.session_path(key);
314        std::fs::create_dir_all(path.parent().unwrap()).unwrap();
315        let mut file = std::fs::File::create(&path).unwrap();
316        writeln!(file, r#"{{"role":"user","content":"hello"}}"#).unwrap();
317        writeln!(file, "this is not valid json").unwrap();
318        writeln!(file, r#"{{"role":"assistant","content":"world"}}"#).unwrap();
319
320        let messages = store.load(key);
321        assert_eq!(messages.len(), 2);
322        assert_eq!(messages[0].content, "hello");
323        assert_eq!(messages[1].content, "world");
324    }
325
326    #[test]
327    fn delete_session_removes_jsonl_file() {
328        let tmp = TempDir::new().unwrap();
329        let store = SessionStore::new(tmp.path()).unwrap();
330        let key = "delete_test";
331
332        store.append(key, &ChatMessage::user("hello")).unwrap();
333        assert_eq!(store.load(key).len(), 1);
334
335        let deleted = store.delete_session(key).unwrap();
336        assert!(deleted);
337        assert!(store.load(key).is_empty());
338        assert!(!store.session_path(key).exists());
339    }
340
341    #[test]
342    fn delete_session_nonexistent_returns_false() {
343        let tmp = TempDir::new().unwrap();
344        let store = SessionStore::new(tmp.path()).unwrap();
345
346        let deleted = store.delete_session("nonexistent").unwrap();
347        assert!(!deleted);
348    }
349
350    #[test]
351    fn delete_session_via_trait() {
352        let tmp = TempDir::new().unwrap();
353        let store = SessionStore::new(tmp.path()).unwrap();
354        let backend: &dyn SessionBackend = &store;
355
356        backend
357            .append("trait_delete", &ChatMessage::user("hello"))
358            .unwrap();
359        assert_eq!(backend.load("trait_delete").len(), 1);
360
361        let deleted = backend.delete_session("trait_delete").unwrap();
362        assert!(deleted);
363        assert!(backend.load("trait_delete").is_empty());
364    }
365}