1use crate::log_store::LogEntry;
2use crate::registry::PersistedSession;
3use macp_core::session::Session;
4use std::fs;
5use std::io;
6use std::path::{Path, PathBuf};
7use tokio::fs as tfs;
8use tokio::io::AsyncWriteExt;
9
10use super::StorageBackend;
11
12pub struct FileBackend {
13 base_dir: PathBuf,
14}
15
16impl FileBackend {
17 pub fn new(base_dir: PathBuf) -> io::Result<Self> {
18 fs::create_dir_all(base_dir.join("sessions"))?;
19 Ok(Self { base_dir })
20 }
21
22 fn session_dir(&self, session_id: &str) -> PathBuf {
23 self.base_dir.join("sessions").join(session_id)
24 }
25
26 pub(crate) fn session_file(&self, session_id: &str) -> PathBuf {
27 self.session_dir(session_id).join("session.json")
28 }
29
30 pub(crate) fn log_file(&self, session_id: &str) -> PathBuf {
31 self.session_dir(session_id).join("log.jsonl")
32 }
33
34 async fn atomic_write(path: &Path, data: &[u8]) -> io::Result<()> {
35 let tmp_path = path.with_extension("json.tmp");
36 tfs::write(&tmp_path, data).await?;
37 tfs::rename(&tmp_path, path).await
38 }
39}
40
41#[async_trait::async_trait]
42impl StorageBackend for FileBackend {
43 async fn create_session_storage(&self, session_id: &str) -> io::Result<()> {
44 tfs::create_dir_all(self.session_dir(session_id)).await
45 }
46
47 async fn save_session(&self, session: &Session) -> io::Result<()> {
48 let persisted = PersistedSession::from(session);
49 let bytes = serde_json::to_vec_pretty(&persisted)
50 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
51 Self::atomic_write(&self.session_file(&session.session_id), &bytes).await
52 }
53
54 async fn load_session(&self, session_id: &str) -> io::Result<Option<Session>> {
55 let path = self.session_file(session_id);
56 if tfs::metadata(&path).await.is_err() {
57 return Ok(None);
58 }
59 let bytes = tfs::read(&path).await?;
60 let persisted: PersistedSession = serde_json::from_slice(&bytes)
61 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
62 Ok(Some(Session::from(persisted)))
63 }
64
65 async fn load_all_sessions(&self) -> io::Result<Vec<Session>> {
66 let ids = self.list_session_ids().await?;
67 let mut sessions = Vec::new();
68 for id in ids {
69 match self.load_session(&id).await {
70 Ok(Some(s)) => sessions.push(s),
71 Ok(None) => {}
72 Err(e) => {
73 eprintln!("warning: failed to load session {id}: {e}; skipping");
74 }
75 }
76 }
77 Ok(sessions)
78 }
79
80 async fn delete_session(&self, session_id: &str) -> io::Result<()> {
81 let dir = self.session_dir(session_id);
82 if tfs::metadata(&dir).await.is_ok() {
83 tfs::remove_dir_all(&dir).await?;
84 }
85 Ok(())
86 }
87
88 async fn list_session_ids(&self) -> io::Result<Vec<String>> {
89 let sessions_dir = self.base_dir.join("sessions");
90 if tfs::metadata(&sessions_dir).await.is_err() {
91 return Ok(vec![]);
92 }
93 let mut ids = Vec::new();
94 let mut entries = tfs::read_dir(&sessions_dir).await?;
95 while let Some(entry) = entries.next_entry().await? {
96 if !entry.file_type().await?.is_dir() {
97 continue;
98 }
99 ids.push(entry.file_name().to_string_lossy().to_string());
100 }
101 Ok(ids)
102 }
103
104 async fn append_log_entry(&self, session_id: &str, entry: &LogEntry) -> io::Result<()> {
105 let path = self.log_file(session_id);
106 let mut line = serde_json::to_string(entry)
107 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
108 line.push('\n');
109
110 let mut file = tfs::OpenOptions::new()
111 .create(true)
112 .append(true)
113 .open(&path)
114 .await?;
115 file.write_all(line.as_bytes()).await?;
116 file.sync_data().await?;
117 Ok(())
118 }
119
120 async fn load_log(&self, session_id: &str) -> io::Result<Vec<LogEntry>> {
121 let path = self.log_file(session_id);
122 if tfs::metadata(&path).await.is_err() {
123 return Ok(vec![]);
124 }
125 let content = tfs::read_to_string(&path).await?;
126 let mut entries = Vec::new();
127 for (line_num, line) in content.lines().enumerate() {
128 if line.trim().is_empty() {
129 continue;
130 }
131 match serde_json::from_str::<LogEntry>(line) {
132 Ok(entry) => entries.push(entry),
133 Err(e) => {
134 eprintln!(
135 "warning: failed to parse log entry at {}:{}: {e}; skipping",
136 path.display(),
137 line_num + 1
138 );
139 }
140 }
141 }
142 Ok(entries)
143 }
144
145 async fn replace_log(&self, session_id: &str, entries: &[LogEntry]) -> io::Result<()> {
146 let path = self.log_file(session_id);
147 let mut data = String::new();
148 for entry in entries {
149 let line = serde_json::to_string(entry)
150 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
151 data.push_str(&line);
152 data.push('\n');
153 }
154 let tmp_path = path.with_extension("jsonl.tmp");
155 tfs::write(&tmp_path, data.as_bytes()).await?;
156 tfs::rename(&tmp_path, &path).await
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163 use crate::log_store::EntryKind;
164 use macp_core::session::SessionState;
165 use std::collections::HashSet;
166
167 fn sample_session(id: &str) -> Session {
168 Session {
169 session_id: id.into(),
170 state: SessionState::Open,
171 ttl_expiry: 61_000,
172 ttl_ms: 60_000,
173 started_at_unix_ms: 1_000,
174 resolution: None,
175 mode: "macp.mode.decision.v1".into(),
176 mode_state: vec![1, 2, 3],
177 participants: vec!["alice".into(), "bob".into()],
178 seen_message_ids: HashSet::from(["m1".into()]),
179 intent: "test intent".into(),
180 mode_version: "1.0.0".into(),
181 configuration_version: "cfg-1".into(),
182 policy_version: "pol-1".into(),
183 context_id: "test-ctx".to_string(),
184 extensions: std::collections::HashMap::new(),
185 roots: vec![macp_pb::pb::Root {
186 uri: "root://1".into(),
187 name: "r1".into(),
188 }],
189 initiator_sender: "alice".into(),
190 participant_message_counts: std::collections::HashMap::new(),
191 participant_last_seen: std::collections::HashMap::new(),
192 policy_definition: None,
193 suspended_at_ms: None,
194 accumulated_suspended_ms: 0,
195 }
196 }
197
198 fn sample_entry(id: &str) -> LogEntry {
199 LogEntry {
200 message_id: id.into(),
201 received_at_ms: 1_700_000_000_000,
202 sender: "alice".into(),
203 message_type: "Message".into(),
204 raw_payload: vec![],
205 entry_kind: EntryKind::Incoming,
206 session_id: String::new(),
207 mode: String::new(),
208 macp_version: String::new(),
209 timestamp_unix_ms: 1_700_000_000_000,
210 }
211 }
212
213 #[tokio::test]
214 async fn file_backend_session_round_trip() {
215 let dir = tempfile::tempdir().unwrap();
216 let backend = FileBackend::new(dir.path().to_path_buf()).unwrap();
217
218 let session = sample_session("s1");
219 backend.create_session_storage("s1").await.unwrap();
220 backend.save_session(&session).await.unwrap();
221
222 let loaded = backend.load_session("s1").await.unwrap().unwrap();
223 assert_eq!(loaded.session_id, "s1");
224 assert_eq!(loaded.ttl_ms, 60_000);
225 assert_eq!(loaded.mode_version, "1.0.0");
226 assert!(loaded.seen_message_ids.contains("m1"));
227 assert_eq!(loaded.participants, vec!["alice", "bob"]);
228 }
229
230 #[tokio::test]
231 async fn file_backend_log_append_and_load() {
232 let dir = tempfile::tempdir().unwrap();
233 let backend = FileBackend::new(dir.path().to_path_buf()).unwrap();
234
235 backend.create_session_storage("s1").await.unwrap();
236 backend
237 .append_log_entry("s1", &sample_entry("m1"))
238 .await
239 .unwrap();
240 backend
241 .append_log_entry("s1", &sample_entry("m2"))
242 .await
243 .unwrap();
244 backend
245 .append_log_entry("s1", &sample_entry("m3"))
246 .await
247 .unwrap();
248
249 let log = backend.load_log("s1").await.unwrap();
250 assert_eq!(log.len(), 3);
251 assert_eq!(log[0].message_id, "m1");
252 assert_eq!(log[1].message_id, "m2");
253 assert_eq!(log[2].message_id, "m3");
254 }
255
256 #[tokio::test]
257 async fn file_backend_load_all_sessions() {
258 let dir = tempfile::tempdir().unwrap();
259 let backend = FileBackend::new(dir.path().to_path_buf()).unwrap();
260
261 for id in ["s1", "s2", "s3"] {
262 backend.create_session_storage(id).await.unwrap();
263 backend.save_session(&sample_session(id)).await.unwrap();
264 }
265
266 let mut sessions = backend.load_all_sessions().await.unwrap();
267 sessions.sort_by(|a, b| a.session_id.cmp(&b.session_id));
268 assert_eq!(sessions.len(), 3);
269 assert_eq!(sessions[0].session_id, "s1");
270 assert_eq!(sessions[1].session_id, "s2");
271 assert_eq!(sessions[2].session_id, "s3");
272 }
273
274 #[tokio::test]
275 async fn append_only_no_full_rewrite() {
276 let dir = tempfile::tempdir().unwrap();
277 let backend = FileBackend::new(dir.path().to_path_buf()).unwrap();
278 backend.create_session_storage("s1").await.unwrap();
279
280 for i in 0..100 {
281 backend
282 .append_log_entry("s1", &sample_entry(&format!("m{}", i)))
283 .await
284 .unwrap();
285 }
286
287 let content = fs::read_to_string(backend.log_file("s1")).unwrap();
288 let line_count = content.lines().count();
289 assert_eq!(line_count, 100);
290
291 let log = backend.load_log("s1").await.unwrap();
292 assert_eq!(log.len(), 100);
293 }
294
295 #[tokio::test]
296 async fn write_ordering_log_before_session() {
297 let dir = tempfile::tempdir().unwrap();
298 let backend = FileBackend::new(dir.path().to_path_buf()).unwrap();
299 backend.create_session_storage("s1").await.unwrap();
300
301 backend
302 .append_log_entry("s1", &sample_entry("m1"))
303 .await
304 .unwrap();
305
306 let log = backend.load_log("s1").await.unwrap();
307 assert_eq!(log.len(), 1);
308 assert_eq!(log[0].message_id, "m1");
309
310 assert!(backend.load_session("s1").await.unwrap().is_none());
311
312 backend.save_session(&sample_session("s1")).await.unwrap();
313 assert!(backend.load_session("s1").await.unwrap().is_some());
314 }
315
316 #[tokio::test]
317 async fn delete_session_removes_directory() {
318 let dir = tempfile::tempdir().unwrap();
319 let backend = FileBackend::new(dir.path().to_path_buf()).unwrap();
320
321 backend.create_session_storage("s1").await.unwrap();
322 backend.save_session(&sample_session("s1")).await.unwrap();
323 backend
324 .append_log_entry("s1", &sample_entry("m1"))
325 .await
326 .unwrap();
327
328 assert!(backend.load_session("s1").await.unwrap().is_some());
329
330 backend.delete_session("s1").await.unwrap();
331 assert!(backend.load_session("s1").await.unwrap().is_none());
332 assert!(backend.load_log("s1").await.unwrap().is_empty());
333
334 backend.delete_session("s1").await.unwrap();
336 }
337
338 #[tokio::test]
339 async fn list_session_ids_returns_directories() {
340 let dir = tempfile::tempdir().unwrap();
341 let backend = FileBackend::new(dir.path().to_path_buf()).unwrap();
342
343 for id in ["s1", "s2", "s3"] {
344 backend.create_session_storage(id).await.unwrap();
345 }
346
347 let mut ids = backend.list_session_ids().await.unwrap();
348 ids.sort();
349 assert_eq!(ids, vec!["s1", "s2", "s3"]);
350 }
351
352 #[tokio::test]
353 async fn replace_log_atomically_overwrites() {
354 let dir = tempfile::tempdir().unwrap();
355 let backend = FileBackend::new(dir.path().to_path_buf()).unwrap();
356 backend.create_session_storage("s1").await.unwrap();
357
358 for i in 0..10 {
359 backend
360 .append_log_entry("s1", &sample_entry(&format!("m{i}")))
361 .await
362 .unwrap();
363 }
364 assert_eq!(backend.load_log("s1").await.unwrap().len(), 10);
365
366 let replacement = vec![sample_entry("checkpoint")];
367 backend.replace_log("s1", &replacement).await.unwrap();
368
369 let log = backend.load_log("s1").await.unwrap();
370 assert_eq!(log.len(), 1);
371 assert_eq!(log[0].message_id, "checkpoint");
372 }
373
374 #[tokio::test]
375 async fn ttl_ms_backward_compat_deserialization() {
376 let dir = tempfile::tempdir().unwrap();
377 let base = dir.path();
378 let backend = FileBackend::new(base.to_path_buf()).unwrap();
379 backend.create_session_storage("s1").await.unwrap();
380
381 let json = serde_json::json!({
382 "session_id": "s1",
383 "state": "Open",
384 "ttl_expiry": 61000,
385 "started_at_unix_ms": 1000,
386 "resolution": null,
387 "mode": "macp.mode.decision.v1",
388 "mode_state": [],
389 "participants": ["alice"],
390 "seen_message_ids": [],
391 "intent": "",
392 "mode_version": "1.0.0",
393 "configuration_version": "cfg",
394 "policy_version": "pol",
395 "context": [],
396 "roots": [],
397 "initiator_sender": "alice"
398 });
399 fs::write(
400 backend.session_file("s1"),
401 serde_json::to_vec_pretty(&json).unwrap(),
402 )
403 .unwrap();
404
405 let loaded = backend.load_session("s1").await.unwrap().unwrap();
406 assert_eq!(loaded.ttl_ms, 60_000);
407 }
408}