1use arcan_core::protocol::AgentEvent;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::fs::{create_dir_all, File, OpenOptions};
6use std::io::{BufRead, BufReader, Write};
7use std::path::{Path, PathBuf};
8use std::sync::RwLock;
9use thiserror::Error;
10use uuid::Uuid;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct EventRecord {
14 pub id: String,
15 pub session_id: String,
16 pub parent_id: Option<String>,
17 pub timestamp: DateTime<Utc>,
18 pub event: AgentEvent,
19}
20
21#[derive(Debug, Clone)]
22pub struct AppendEvent {
23 pub session_id: String,
24 pub parent_id: Option<String>,
25 pub event: AgentEvent,
26}
27
28pub trait SessionRepository: Send + Sync {
29 fn append(&self, request: AppendEvent) -> Result<EventRecord, StoreError>;
30 fn load_session(&self, session_id: &str) -> Result<Vec<EventRecord>, StoreError>;
31 fn load_children(&self, parent_id: &str) -> Result<Vec<EventRecord>, StoreError>;
32 fn head(&self, session_id: &str) -> Result<Option<EventRecord>, StoreError>;
33}
34
35#[derive(Default)]
36pub struct InMemorySessionRepository {
37 by_session: RwLock<HashMap<String, Vec<EventRecord>>>,
38}
39
40impl SessionRepository for InMemorySessionRepository {
41 fn append(&self, request: AppendEvent) -> Result<EventRecord, StoreError> {
42 let record = EventRecord {
43 id: Uuid::new_v4().to_string(),
44 session_id: request.session_id,
45 parent_id: request.parent_id,
46 timestamp: Utc::now(),
47 event: request.event,
48 };
49
50 let mut guard = self
51 .by_session
52 .write()
53 .map_err(|_| StoreError::PoisonedLock("in-memory write".to_string()))?;
54
55 guard
56 .entry(record.session_id.clone())
57 .or_default()
58 .push(record.clone());
59
60 Ok(record)
61 }
62
63 fn load_session(&self, session_id: &str) -> Result<Vec<EventRecord>, StoreError> {
64 let guard = self
65 .by_session
66 .read()
67 .map_err(|_| StoreError::PoisonedLock("in-memory read".to_string()))?;
68 Ok(guard.get(session_id).cloned().unwrap_or_default())
69 }
70
71 fn load_children(&self, parent_id: &str) -> Result<Vec<EventRecord>, StoreError> {
72 let guard = self
73 .by_session
74 .read()
75 .map_err(|_| StoreError::PoisonedLock("in-memory read".to_string()))?;
76
77 let mut out = Vec::new();
78 for records in guard.values() {
79 for record in records {
80 if record.parent_id.as_deref() == Some(parent_id) {
81 out.push(record.clone());
82 }
83 }
84 }
85
86 Ok(out)
87 }
88
89 fn head(&self, session_id: &str) -> Result<Option<EventRecord>, StoreError> {
90 let guard = self
91 .by_session
92 .read()
93 .map_err(|_| StoreError::PoisonedLock("in-memory read".to_string()))?;
94 Ok(guard
95 .get(session_id)
96 .and_then(|records| records.last().cloned()))
97 }
98}
99
100pub struct JsonlSessionRepository {
101 root: PathBuf,
102}
103
104impl JsonlSessionRepository {
105 pub fn new(root: PathBuf) -> Self {
106 Self { root }
107 }
108
109 fn session_file(&self, session_id: &str) -> PathBuf {
110 self.root.join(format!("{session_id}.jsonl"))
111 }
112
113 fn ensure_root(&self) -> Result<(), StoreError> {
114 create_dir_all(&self.root).map_err(|source| StoreError::Io {
115 path: self.root.clone(),
116 source,
117 })
118 }
119
120 fn read_records(path: &Path) -> Result<Vec<EventRecord>, StoreError> {
121 if !path.exists() {
122 return Ok(Vec::new());
123 }
124
125 let file = File::open(path).map_err(|source| StoreError::Io {
126 path: path.to_path_buf(),
127 source,
128 })?;
129
130 let reader = BufReader::new(file);
131 let mut records = Vec::new();
132
133 for line in reader.lines() {
134 let line = line.map_err(|source| StoreError::Io {
135 path: path.to_path_buf(),
136 source,
137 })?;
138 if line.trim().is_empty() {
139 continue;
140 }
141
142 let record: EventRecord =
143 serde_json::from_str(&line).map_err(|source| StoreError::Serde { source })?;
144 records.push(record);
145 }
146
147 Ok(records)
148 }
149}
150
151impl SessionRepository for JsonlSessionRepository {
152 fn append(&self, request: AppendEvent) -> Result<EventRecord, StoreError> {
153 self.ensure_root()?;
154
155 let record = EventRecord {
156 id: Uuid::new_v4().to_string(),
157 session_id: request.session_id.clone(),
158 parent_id: request.parent_id,
159 timestamp: Utc::now(),
160 event: request.event,
161 };
162
163 let path = self.session_file(&request.session_id);
164 let mut file = OpenOptions::new()
165 .create(true)
166 .append(true)
167 .open(&path)
168 .map_err(|source| StoreError::Io {
169 path: path.clone(),
170 source,
171 })?;
172
173 let line = serde_json::to_string(&record).map_err(|source| StoreError::Serde { source })?;
174 file.write_all(line.as_bytes())
175 .and_then(|_| file.write_all(b"\n"))
176 .map_err(|source| StoreError::Io {
177 path: path.clone(),
178 source,
179 })?;
180
181 Ok(record)
182 }
183
184 fn load_session(&self, session_id: &str) -> Result<Vec<EventRecord>, StoreError> {
185 Self::read_records(&self.session_file(session_id))
186 }
187
188 fn load_children(&self, parent_id: &str) -> Result<Vec<EventRecord>, StoreError> {
189 self.ensure_root()?;
190
191 let mut out = Vec::new();
192 for entry in std::fs::read_dir(&self.root).map_err(|source| StoreError::Io {
193 path: self.root.clone(),
194 source,
195 })? {
196 let entry = entry.map_err(|source| StoreError::Io {
197 path: self.root.clone(),
198 source,
199 })?;
200
201 let path = entry.path();
202 if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
203 continue;
204 }
205
206 for record in Self::read_records(&path)? {
207 if record.parent_id.as_deref() == Some(parent_id) {
208 out.push(record);
209 }
210 }
211 }
212
213 Ok(out)
214 }
215
216 fn head(&self, session_id: &str) -> Result<Option<EventRecord>, StoreError> {
217 Ok(Self::read_records(&self.session_file(session_id))?.pop())
218 }
219}
220
221#[derive(Debug, Error)]
222pub enum StoreError {
223 #[error("IO error on {path}: {source}")]
224 Io {
225 path: PathBuf,
226 #[source]
227 source: std::io::Error,
228 },
229 #[error("serialization error: {source}")]
230 Serde {
231 #[source]
232 source: serde_json::Error,
233 },
234 #[error("in-memory store lock was poisoned: {0}")]
235 PoisonedLock(String),
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241 use arcan_core::protocol::{AgentEvent, RunStopReason};
242
243 fn make_event(run_id: &str, session_id: &str) -> AgentEvent {
244 AgentEvent::RunFinished {
245 run_id: run_id.to_string(),
246 session_id: session_id.to_string(),
247 reason: RunStopReason::Completed,
248 total_iterations: 1,
249 final_answer: Some("ok".to_string()),
250 }
251 }
252
253 #[test]
254 fn appends_and_reads_head() {
255 let store = InMemorySessionRepository::default();
256 store
257 .append(AppendEvent {
258 session_id: "s1".to_string(),
259 parent_id: None,
260 event: make_event("r1", "s1"),
261 })
262 .expect("append should succeed");
263
264 let head = store
265 .head("s1")
266 .expect("head should load")
267 .expect("head exists");
268 assert_eq!(head.session_id, "s1");
269 }
270
271 #[test]
272 fn load_session_returns_all_events_in_order() {
273 let store = InMemorySessionRepository::default();
274 for i in 0..5 {
275 store
276 .append(AppendEvent {
277 session_id: "s1".to_string(),
278 parent_id: None,
279 event: make_event(&format!("r{i}"), "s1"),
280 })
281 .unwrap();
282 }
283
284 let records = store.load_session("s1").unwrap();
285 assert_eq!(records.len(), 5);
286 }
287
288 #[test]
289 fn empty_session_returns_empty() {
290 let store = InMemorySessionRepository::default();
291 let records = store.load_session("nonexistent").unwrap();
292 assert!(records.is_empty());
293 assert!(store.head("nonexistent").unwrap().is_none());
294 }
295
296 #[test]
297 fn sessions_are_isolated() {
298 let store = InMemorySessionRepository::default();
299 store
300 .append(AppendEvent {
301 session_id: "a".to_string(),
302 parent_id: None,
303 event: make_event("r1", "a"),
304 })
305 .unwrap();
306 store
307 .append(AppendEvent {
308 session_id: "b".to_string(),
309 parent_id: None,
310 event: make_event("r2", "b"),
311 })
312 .unwrap();
313
314 assert_eq!(store.load_session("a").unwrap().len(), 1);
315 assert_eq!(store.load_session("b").unwrap().len(), 1);
316 }
317
318 #[test]
319 fn load_children_filters_by_parent() {
320 let store = InMemorySessionRepository::default();
321 let parent = store
322 .append(AppendEvent {
323 session_id: "s1".to_string(),
324 parent_id: None,
325 event: make_event("r1", "s1"),
326 })
327 .unwrap();
328
329 store
330 .append(AppendEvent {
331 session_id: "s1".to_string(),
332 parent_id: Some(parent.id.clone()),
333 event: make_event("r2", "s1"),
334 })
335 .unwrap();
336
337 store
338 .append(AppendEvent {
339 session_id: "s1".to_string(),
340 parent_id: None,
341 event: make_event("r3", "s1"),
342 })
343 .unwrap();
344
345 let children = store.load_children(&parent.id).unwrap();
346 assert_eq!(children.len(), 1);
347 }
348
349 #[test]
350 fn jsonl_repo_round_trip() {
351 let dir = tempfile::tempdir().unwrap();
352 let store = JsonlSessionRepository::new(dir.path().to_path_buf());
353
354 store
355 .append(AppendEvent {
356 session_id: "s1".to_string(),
357 parent_id: None,
358 event: make_event("r1", "s1"),
359 })
360 .unwrap();
361
362 store
363 .append(AppendEvent {
364 session_id: "s1".to_string(),
365 parent_id: None,
366 event: make_event("r2", "s1"),
367 })
368 .unwrap();
369
370 let records = store.load_session("s1").unwrap();
371 assert_eq!(records.len(), 2);
372
373 let head = store.head("s1").unwrap().unwrap();
374 assert!(matches!(
375 head.event,
376 AgentEvent::RunFinished { ref run_id, .. } if run_id == "r2"
377 ));
378 }
379
380 #[test]
381 fn jsonl_repo_empty_session() {
382 let dir = tempfile::tempdir().unwrap();
383 let store = JsonlSessionRepository::new(dir.path().to_path_buf());
384 assert!(store.load_session("nope").unwrap().is_empty());
385 assert!(store.head("nope").unwrap().is_none());
386 }
387}