Skip to main content

macp_storage/storage/
file.rs

1use crate::log_store::LogEntry;
2use crate::registry::PersistedSession;
3use macp_core::session::Session;
4use std::fs;
5use std::io;
6use std::path::{Path, PathBuf};
7use tokio::fs as tfs;
8use tokio::io::AsyncWriteExt;
9
10use super::StorageBackend;
11
12pub struct FileBackend {
13    base_dir: PathBuf,
14}
15
16impl FileBackend {
17    pub fn new(base_dir: PathBuf) -> io::Result<Self> {
18        fs::create_dir_all(base_dir.join("sessions"))?;
19        Ok(Self { base_dir })
20    }
21
22    fn session_dir(&self, session_id: &str) -> PathBuf {
23        self.base_dir.join("sessions").join(session_id)
24    }
25
26    pub(crate) fn session_file(&self, session_id: &str) -> PathBuf {
27        self.session_dir(session_id).join("session.json")
28    }
29
30    pub(crate) fn log_file(&self, session_id: &str) -> PathBuf {
31        self.session_dir(session_id).join("log.jsonl")
32    }
33
34    async fn atomic_write(path: &Path, data: &[u8]) -> io::Result<()> {
35        let tmp_path = path.with_extension("json.tmp");
36        tfs::write(&tmp_path, data).await?;
37        tfs::rename(&tmp_path, path).await
38    }
39}
40
41#[async_trait::async_trait]
42impl StorageBackend for FileBackend {
43    async fn create_session_storage(&self, session_id: &str) -> io::Result<()> {
44        tfs::create_dir_all(self.session_dir(session_id)).await
45    }
46
47    async fn save_session(&self, session: &Session) -> io::Result<()> {
48        let persisted = PersistedSession::from(session);
49        let bytes = serde_json::to_vec_pretty(&persisted)
50            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
51        Self::atomic_write(&self.session_file(&session.session_id), &bytes).await
52    }
53
54    async fn load_session(&self, session_id: &str) -> io::Result<Option<Session>> {
55        let path = self.session_file(session_id);
56        if tfs::metadata(&path).await.is_err() {
57            return Ok(None);
58        }
59        let bytes = tfs::read(&path).await?;
60        let persisted: PersistedSession = serde_json::from_slice(&bytes)
61            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
62        Ok(Some(Session::from(persisted)))
63    }
64
65    async fn load_all_sessions(&self) -> io::Result<Vec<Session>> {
66        let ids = self.list_session_ids().await?;
67        let mut sessions = Vec::new();
68        for id in ids {
69            match self.load_session(&id).await {
70                Ok(Some(s)) => sessions.push(s),
71                Ok(None) => {}
72                Err(e) => {
73                    eprintln!("warning: failed to load session {id}: {e}; skipping");
74                }
75            }
76        }
77        Ok(sessions)
78    }
79
80    async fn delete_session(&self, session_id: &str) -> io::Result<()> {
81        let dir = self.session_dir(session_id);
82        if tfs::metadata(&dir).await.is_ok() {
83            tfs::remove_dir_all(&dir).await?;
84        }
85        Ok(())
86    }
87
88    async fn list_session_ids(&self) -> io::Result<Vec<String>> {
89        let sessions_dir = self.base_dir.join("sessions");
90        if tfs::metadata(&sessions_dir).await.is_err() {
91            return Ok(vec![]);
92        }
93        let mut ids = Vec::new();
94        let mut entries = tfs::read_dir(&sessions_dir).await?;
95        while let Some(entry) = entries.next_entry().await? {
96            if !entry.file_type().await?.is_dir() {
97                continue;
98            }
99            ids.push(entry.file_name().to_string_lossy().to_string());
100        }
101        Ok(ids)
102    }
103
104    async fn append_log_entry(&self, session_id: &str, entry: &LogEntry) -> io::Result<()> {
105        let path = self.log_file(session_id);
106        let mut line = serde_json::to_string(entry)
107            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
108        line.push('\n');
109
110        let mut file = tfs::OpenOptions::new()
111            .create(true)
112            .append(true)
113            .open(&path)
114            .await?;
115        file.write_all(line.as_bytes()).await?;
116        file.sync_data().await?;
117        Ok(())
118    }
119
120    async fn load_log(&self, session_id: &str) -> io::Result<Vec<LogEntry>> {
121        let path = self.log_file(session_id);
122        if tfs::metadata(&path).await.is_err() {
123            return Ok(vec![]);
124        }
125        let content = tfs::read_to_string(&path).await?;
126        let mut entries = Vec::new();
127        for (line_num, line) in content.lines().enumerate() {
128            if line.trim().is_empty() {
129                continue;
130            }
131            match serde_json::from_str::<LogEntry>(line) {
132                Ok(entry) => entries.push(entry),
133                Err(e) => {
134                    eprintln!(
135                        "warning: failed to parse log entry at {}:{}: {e}; skipping",
136                        path.display(),
137                        line_num + 1
138                    );
139                }
140            }
141        }
142        Ok(entries)
143    }
144
145    async fn replace_log(&self, session_id: &str, entries: &[LogEntry]) -> io::Result<()> {
146        let path = self.log_file(session_id);
147        let mut data = String::new();
148        for entry in entries {
149            let line = serde_json::to_string(entry)
150                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
151            data.push_str(&line);
152            data.push('\n');
153        }
154        let tmp_path = path.with_extension("jsonl.tmp");
155        tfs::write(&tmp_path, data.as_bytes()).await?;
156        tfs::rename(&tmp_path, &path).await
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163    use crate::log_store::EntryKind;
164    use macp_core::session::SessionState;
165    use std::collections::HashSet;
166
167    fn sample_session(id: &str) -> Session {
168        Session {
169            session_id: id.into(),
170            state: SessionState::Open,
171            ttl_expiry: 61_000,
172            ttl_ms: 60_000,
173            started_at_unix_ms: 1_000,
174            resolution: None,
175            mode: "macp.mode.decision.v1".into(),
176            mode_state: vec![1, 2, 3],
177            participants: vec!["alice".into(), "bob".into()],
178            seen_message_ids: HashSet::from(["m1".into()]),
179            intent: "test intent".into(),
180            mode_version: "1.0.0".into(),
181            configuration_version: "cfg-1".into(),
182            policy_version: "pol-1".into(),
183            context_id: "test-ctx".to_string(),
184            extensions: std::collections::HashMap::new(),
185            roots: vec![macp_pb::pb::Root {
186                uri: "root://1".into(),
187                name: "r1".into(),
188            }],
189            initiator_sender: "alice".into(),
190            participant_message_counts: std::collections::HashMap::new(),
191            participant_last_seen: std::collections::HashMap::new(),
192            policy_definition: None,
193            suspended_at_ms: None,
194            accumulated_suspended_ms: 0,
195        }
196    }
197
198    fn sample_entry(id: &str) -> LogEntry {
199        LogEntry {
200            message_id: id.into(),
201            received_at_ms: 1_700_000_000_000,
202            sender: "alice".into(),
203            message_type: "Message".into(),
204            raw_payload: vec![],
205            entry_kind: EntryKind::Incoming,
206            session_id: String::new(),
207            mode: String::new(),
208            macp_version: String::new(),
209            timestamp_unix_ms: 1_700_000_000_000,
210        }
211    }
212
213    #[tokio::test]
214    async fn file_backend_session_round_trip() {
215        let dir = tempfile::tempdir().unwrap();
216        let backend = FileBackend::new(dir.path().to_path_buf()).unwrap();
217
218        let session = sample_session("s1");
219        backend.create_session_storage("s1").await.unwrap();
220        backend.save_session(&session).await.unwrap();
221
222        let loaded = backend.load_session("s1").await.unwrap().unwrap();
223        assert_eq!(loaded.session_id, "s1");
224        assert_eq!(loaded.ttl_ms, 60_000);
225        assert_eq!(loaded.mode_version, "1.0.0");
226        assert!(loaded.seen_message_ids.contains("m1"));
227        assert_eq!(loaded.participants, vec!["alice", "bob"]);
228    }
229
230    #[tokio::test]
231    async fn file_backend_log_append_and_load() {
232        let dir = tempfile::tempdir().unwrap();
233        let backend = FileBackend::new(dir.path().to_path_buf()).unwrap();
234
235        backend.create_session_storage("s1").await.unwrap();
236        backend
237            .append_log_entry("s1", &sample_entry("m1"))
238            .await
239            .unwrap();
240        backend
241            .append_log_entry("s1", &sample_entry("m2"))
242            .await
243            .unwrap();
244        backend
245            .append_log_entry("s1", &sample_entry("m3"))
246            .await
247            .unwrap();
248
249        let log = backend.load_log("s1").await.unwrap();
250        assert_eq!(log.len(), 3);
251        assert_eq!(log[0].message_id, "m1");
252        assert_eq!(log[1].message_id, "m2");
253        assert_eq!(log[2].message_id, "m3");
254    }
255
256    #[tokio::test]
257    async fn file_backend_load_all_sessions() {
258        let dir = tempfile::tempdir().unwrap();
259        let backend = FileBackend::new(dir.path().to_path_buf()).unwrap();
260
261        for id in ["s1", "s2", "s3"] {
262            backend.create_session_storage(id).await.unwrap();
263            backend.save_session(&sample_session(id)).await.unwrap();
264        }
265
266        let mut sessions = backend.load_all_sessions().await.unwrap();
267        sessions.sort_by(|a, b| a.session_id.cmp(&b.session_id));
268        assert_eq!(sessions.len(), 3);
269        assert_eq!(sessions[0].session_id, "s1");
270        assert_eq!(sessions[1].session_id, "s2");
271        assert_eq!(sessions[2].session_id, "s3");
272    }
273
274    #[tokio::test]
275    async fn append_only_no_full_rewrite() {
276        let dir = tempfile::tempdir().unwrap();
277        let backend = FileBackend::new(dir.path().to_path_buf()).unwrap();
278        backend.create_session_storage("s1").await.unwrap();
279
280        for i in 0..100 {
281            backend
282                .append_log_entry("s1", &sample_entry(&format!("m{}", i)))
283                .await
284                .unwrap();
285        }
286
287        let content = fs::read_to_string(backend.log_file("s1")).unwrap();
288        let line_count = content.lines().count();
289        assert_eq!(line_count, 100);
290
291        let log = backend.load_log("s1").await.unwrap();
292        assert_eq!(log.len(), 100);
293    }
294
295    #[tokio::test]
296    async fn write_ordering_log_before_session() {
297        let dir = tempfile::tempdir().unwrap();
298        let backend = FileBackend::new(dir.path().to_path_buf()).unwrap();
299        backend.create_session_storage("s1").await.unwrap();
300
301        backend
302            .append_log_entry("s1", &sample_entry("m1"))
303            .await
304            .unwrap();
305
306        let log = backend.load_log("s1").await.unwrap();
307        assert_eq!(log.len(), 1);
308        assert_eq!(log[0].message_id, "m1");
309
310        assert!(backend.load_session("s1").await.unwrap().is_none());
311
312        backend.save_session(&sample_session("s1")).await.unwrap();
313        assert!(backend.load_session("s1").await.unwrap().is_some());
314    }
315
316    #[tokio::test]
317    async fn delete_session_removes_directory() {
318        let dir = tempfile::tempdir().unwrap();
319        let backend = FileBackend::new(dir.path().to_path_buf()).unwrap();
320
321        backend.create_session_storage("s1").await.unwrap();
322        backend.save_session(&sample_session("s1")).await.unwrap();
323        backend
324            .append_log_entry("s1", &sample_entry("m1"))
325            .await
326            .unwrap();
327
328        assert!(backend.load_session("s1").await.unwrap().is_some());
329
330        backend.delete_session("s1").await.unwrap();
331        assert!(backend.load_session("s1").await.unwrap().is_none());
332        assert!(backend.load_log("s1").await.unwrap().is_empty());
333
334        // Idempotent
335        backend.delete_session("s1").await.unwrap();
336    }
337
338    #[tokio::test]
339    async fn list_session_ids_returns_directories() {
340        let dir = tempfile::tempdir().unwrap();
341        let backend = FileBackend::new(dir.path().to_path_buf()).unwrap();
342
343        for id in ["s1", "s2", "s3"] {
344            backend.create_session_storage(id).await.unwrap();
345        }
346
347        let mut ids = backend.list_session_ids().await.unwrap();
348        ids.sort();
349        assert_eq!(ids, vec!["s1", "s2", "s3"]);
350    }
351
352    #[tokio::test]
353    async fn replace_log_atomically_overwrites() {
354        let dir = tempfile::tempdir().unwrap();
355        let backend = FileBackend::new(dir.path().to_path_buf()).unwrap();
356        backend.create_session_storage("s1").await.unwrap();
357
358        for i in 0..10 {
359            backend
360                .append_log_entry("s1", &sample_entry(&format!("m{i}")))
361                .await
362                .unwrap();
363        }
364        assert_eq!(backend.load_log("s1").await.unwrap().len(), 10);
365
366        let replacement = vec![sample_entry("checkpoint")];
367        backend.replace_log("s1", &replacement).await.unwrap();
368
369        let log = backend.load_log("s1").await.unwrap();
370        assert_eq!(log.len(), 1);
371        assert_eq!(log[0].message_id, "checkpoint");
372    }
373
374    #[tokio::test]
375    async fn ttl_ms_backward_compat_deserialization() {
376        let dir = tempfile::tempdir().unwrap();
377        let base = dir.path();
378        let backend = FileBackend::new(base.to_path_buf()).unwrap();
379        backend.create_session_storage("s1").await.unwrap();
380
381        let json = serde_json::json!({
382            "session_id": "s1",
383            "state": "Open",
384            "ttl_expiry": 61000,
385            "started_at_unix_ms": 1000,
386            "resolution": null,
387            "mode": "macp.mode.decision.v1",
388            "mode_state": [],
389            "participants": ["alice"],
390            "seen_message_ids": [],
391            "intent": "",
392            "mode_version": "1.0.0",
393            "configuration_version": "cfg",
394            "policy_version": "pol",
395            "context": [],
396            "roots": [],
397            "initiator_sender": "alice"
398        });
399        fs::write(
400            backend.session_file("s1"),
401            serde_json::to_vec_pretty(&json).unwrap(),
402        )
403        .unwrap();
404
405        let loaded = backend.load_session("s1").await.unwrap().unwrap();
406        assert_eq!(loaded.ttl_ms, 60_000);
407    }
408}