Skip to main content

arcan_store/
session.rs

1use arcan_core::protocol::AgentEvent;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::fs::{create_dir_all, File, OpenOptions};
6use std::io::{BufRead, BufReader, Write};
7use std::path::{Path, PathBuf};
8use std::sync::RwLock;
9use thiserror::Error;
10use uuid::Uuid;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct EventRecord {
14    pub id: String,
15    pub session_id: String,
16    pub parent_id: Option<String>,
17    pub timestamp: DateTime<Utc>,
18    pub event: AgentEvent,
19}
20
21#[derive(Debug, Clone)]
22pub struct AppendEvent {
23    pub session_id: String,
24    pub parent_id: Option<String>,
25    pub event: AgentEvent,
26}
27
28pub trait SessionRepository: Send + Sync {
29    fn append(&self, request: AppendEvent) -> Result<EventRecord, StoreError>;
30    fn load_session(&self, session_id: &str) -> Result<Vec<EventRecord>, StoreError>;
31    fn load_children(&self, parent_id: &str) -> Result<Vec<EventRecord>, StoreError>;
32    fn head(&self, session_id: &str) -> Result<Option<EventRecord>, StoreError>;
33}
34
35#[derive(Default)]
36pub struct InMemorySessionRepository {
37    by_session: RwLock<HashMap<String, Vec<EventRecord>>>,
38}
39
40impl SessionRepository for InMemorySessionRepository {
41    fn append(&self, request: AppendEvent) -> Result<EventRecord, StoreError> {
42        let record = EventRecord {
43            id: Uuid::new_v4().to_string(),
44            session_id: request.session_id,
45            parent_id: request.parent_id,
46            timestamp: Utc::now(),
47            event: request.event,
48        };
49
50        let mut guard = self
51            .by_session
52            .write()
53            .map_err(|_| StoreError::PoisonedLock("in-memory write".to_string()))?;
54
55        guard
56            .entry(record.session_id.clone())
57            .or_default()
58            .push(record.clone());
59
60        Ok(record)
61    }
62
63    fn load_session(&self, session_id: &str) -> Result<Vec<EventRecord>, StoreError> {
64        let guard = self
65            .by_session
66            .read()
67            .map_err(|_| StoreError::PoisonedLock("in-memory read".to_string()))?;
68        Ok(guard.get(session_id).cloned().unwrap_or_default())
69    }
70
71    fn load_children(&self, parent_id: &str) -> Result<Vec<EventRecord>, StoreError> {
72        let guard = self
73            .by_session
74            .read()
75            .map_err(|_| StoreError::PoisonedLock("in-memory read".to_string()))?;
76
77        let mut out = Vec::new();
78        for records in guard.values() {
79            for record in records {
80                if record.parent_id.as_deref() == Some(parent_id) {
81                    out.push(record.clone());
82                }
83            }
84        }
85
86        Ok(out)
87    }
88
89    fn head(&self, session_id: &str) -> Result<Option<EventRecord>, StoreError> {
90        let guard = self
91            .by_session
92            .read()
93            .map_err(|_| StoreError::PoisonedLock("in-memory read".to_string()))?;
94        Ok(guard
95            .get(session_id)
96            .and_then(|records| records.last().cloned()))
97    }
98}
99
100pub struct JsonlSessionRepository {
101    root: PathBuf,
102}
103
104impl JsonlSessionRepository {
105    pub fn new(root: PathBuf) -> Self {
106        Self { root }
107    }
108
109    fn session_file(&self, session_id: &str) -> PathBuf {
110        self.root.join(format!("{session_id}.jsonl"))
111    }
112
113    fn ensure_root(&self) -> Result<(), StoreError> {
114        create_dir_all(&self.root).map_err(|source| StoreError::Io {
115            path: self.root.clone(),
116            source,
117        })
118    }
119
120    fn read_records(path: &Path) -> Result<Vec<EventRecord>, StoreError> {
121        if !path.exists() {
122            return Ok(Vec::new());
123        }
124
125        let file = File::open(path).map_err(|source| StoreError::Io {
126            path: path.to_path_buf(),
127            source,
128        })?;
129
130        let reader = BufReader::new(file);
131        let mut records = Vec::new();
132
133        for line in reader.lines() {
134            let line = line.map_err(|source| StoreError::Io {
135                path: path.to_path_buf(),
136                source,
137            })?;
138            if line.trim().is_empty() {
139                continue;
140            }
141
142            let record: EventRecord =
143                serde_json::from_str(&line).map_err(|source| StoreError::Serde { source })?;
144            records.push(record);
145        }
146
147        Ok(records)
148    }
149}
150
151impl SessionRepository for JsonlSessionRepository {
152    fn append(&self, request: AppendEvent) -> Result<EventRecord, StoreError> {
153        self.ensure_root()?;
154
155        let record = EventRecord {
156            id: Uuid::new_v4().to_string(),
157            session_id: request.session_id.clone(),
158            parent_id: request.parent_id,
159            timestamp: Utc::now(),
160            event: request.event,
161        };
162
163        let path = self.session_file(&request.session_id);
164        let mut file = OpenOptions::new()
165            .create(true)
166            .append(true)
167            .open(&path)
168            .map_err(|source| StoreError::Io {
169                path: path.clone(),
170                source,
171            })?;
172
173        let line = serde_json::to_string(&record).map_err(|source| StoreError::Serde { source })?;
174        file.write_all(line.as_bytes())
175            .and_then(|_| file.write_all(b"\n"))
176            .map_err(|source| StoreError::Io {
177                path: path.clone(),
178                source,
179            })?;
180
181        Ok(record)
182    }
183
184    fn load_session(&self, session_id: &str) -> Result<Vec<EventRecord>, StoreError> {
185        Self::read_records(&self.session_file(session_id))
186    }
187
188    fn load_children(&self, parent_id: &str) -> Result<Vec<EventRecord>, StoreError> {
189        self.ensure_root()?;
190
191        let mut out = Vec::new();
192        for entry in std::fs::read_dir(&self.root).map_err(|source| StoreError::Io {
193            path: self.root.clone(),
194            source,
195        })? {
196            let entry = entry.map_err(|source| StoreError::Io {
197                path: self.root.clone(),
198                source,
199            })?;
200
201            let path = entry.path();
202            if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
203                continue;
204            }
205
206            for record in Self::read_records(&path)? {
207                if record.parent_id.as_deref() == Some(parent_id) {
208                    out.push(record);
209                }
210            }
211        }
212
213        Ok(out)
214    }
215
216    fn head(&self, session_id: &str) -> Result<Option<EventRecord>, StoreError> {
217        Ok(Self::read_records(&self.session_file(session_id))?.pop())
218    }
219}
220
221#[derive(Debug, Error)]
222pub enum StoreError {
223    #[error("IO error on {path}: {source}")]
224    Io {
225        path: PathBuf,
226        #[source]
227        source: std::io::Error,
228    },
229    #[error("serialization error: {source}")]
230    Serde {
231        #[source]
232        source: serde_json::Error,
233    },
234    #[error("in-memory store lock was poisoned: {0}")]
235    PoisonedLock(String),
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241    use arcan_core::protocol::{AgentEvent, RunStopReason};
242
243    fn make_event(run_id: &str, session_id: &str) -> AgentEvent {
244        AgentEvent::RunFinished {
245            run_id: run_id.to_string(),
246            session_id: session_id.to_string(),
247            reason: RunStopReason::Completed,
248            total_iterations: 1,
249            final_answer: Some("ok".to_string()),
250        }
251    }
252
253    #[test]
254    fn appends_and_reads_head() {
255        let store = InMemorySessionRepository::default();
256        store
257            .append(AppendEvent {
258                session_id: "s1".to_string(),
259                parent_id: None,
260                event: make_event("r1", "s1"),
261            })
262            .expect("append should succeed");
263
264        let head = store
265            .head("s1")
266            .expect("head should load")
267            .expect("head exists");
268        assert_eq!(head.session_id, "s1");
269    }
270
271    #[test]
272    fn load_session_returns_all_events_in_order() {
273        let store = InMemorySessionRepository::default();
274        for i in 0..5 {
275            store
276                .append(AppendEvent {
277                    session_id: "s1".to_string(),
278                    parent_id: None,
279                    event: make_event(&format!("r{i}"), "s1"),
280                })
281                .unwrap();
282        }
283
284        let records = store.load_session("s1").unwrap();
285        assert_eq!(records.len(), 5);
286    }
287
288    #[test]
289    fn empty_session_returns_empty() {
290        let store = InMemorySessionRepository::default();
291        let records = store.load_session("nonexistent").unwrap();
292        assert!(records.is_empty());
293        assert!(store.head("nonexistent").unwrap().is_none());
294    }
295
296    #[test]
297    fn sessions_are_isolated() {
298        let store = InMemorySessionRepository::default();
299        store
300            .append(AppendEvent {
301                session_id: "a".to_string(),
302                parent_id: None,
303                event: make_event("r1", "a"),
304            })
305            .unwrap();
306        store
307            .append(AppendEvent {
308                session_id: "b".to_string(),
309                parent_id: None,
310                event: make_event("r2", "b"),
311            })
312            .unwrap();
313
314        assert_eq!(store.load_session("a").unwrap().len(), 1);
315        assert_eq!(store.load_session("b").unwrap().len(), 1);
316    }
317
318    #[test]
319    fn load_children_filters_by_parent() {
320        let store = InMemorySessionRepository::default();
321        let parent = store
322            .append(AppendEvent {
323                session_id: "s1".to_string(),
324                parent_id: None,
325                event: make_event("r1", "s1"),
326            })
327            .unwrap();
328
329        store
330            .append(AppendEvent {
331                session_id: "s1".to_string(),
332                parent_id: Some(parent.id.clone()),
333                event: make_event("r2", "s1"),
334            })
335            .unwrap();
336
337        store
338            .append(AppendEvent {
339                session_id: "s1".to_string(),
340                parent_id: None,
341                event: make_event("r3", "s1"),
342            })
343            .unwrap();
344
345        let children = store.load_children(&parent.id).unwrap();
346        assert_eq!(children.len(), 1);
347    }
348
349    #[test]
350    fn jsonl_repo_round_trip() {
351        let dir = tempfile::tempdir().unwrap();
352        let store = JsonlSessionRepository::new(dir.path().to_path_buf());
353
354        store
355            .append(AppendEvent {
356                session_id: "s1".to_string(),
357                parent_id: None,
358                event: make_event("r1", "s1"),
359            })
360            .unwrap();
361
362        store
363            .append(AppendEvent {
364                session_id: "s1".to_string(),
365                parent_id: None,
366                event: make_event("r2", "s1"),
367            })
368            .unwrap();
369
370        let records = store.load_session("s1").unwrap();
371        assert_eq!(records.len(), 2);
372
373        let head = store.head("s1").unwrap().unwrap();
374        assert!(matches!(
375            head.event,
376            AgentEvent::RunFinished { ref run_id, .. } if run_id == "r2"
377        ));
378    }
379
380    #[test]
381    fn jsonl_repo_empty_session() {
382        let dir = tempfile::tempdir().unwrap();
383        let store = JsonlSessionRepository::new(dir.path().to_path_buf());
384        assert!(store.load_session("nope").unwrap().is_empty());
385        assert!(store.head("nope").unwrap().is_none());
386    }
387}