Skip to main content

macp_storage/
log_store.rs

1use std::collections::HashMap;
2use tokio::sync::RwLock;
3
4#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
5pub enum EntryKind {
6    Incoming,
7    Internal,
8    Checkpoint,
9}
10
11#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
12pub struct LogEntry {
13    pub message_id: String,
14    pub received_at_ms: i64,
15    pub sender: String,
16    pub message_type: String,
17    pub raw_payload: Vec<u8>,
18    pub entry_kind: EntryKind,
19    #[serde(default)]
20    pub session_id: String,
21    #[serde(default)]
22    pub mode: String,
23    #[serde(default)]
24    pub macp_version: String,
25    /// Original envelope timestamp for replay determinism.
26    #[serde(default)]
27    pub timestamp_unix_ms: i64,
28}
29
30pub struct LogStore {
31    logs: RwLock<HashMap<String, Vec<LogEntry>>>,
32}
33
34impl Default for LogStore {
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40impl LogStore {
41    pub fn new() -> Self {
42        Self {
43            logs: RwLock::new(HashMap::new()),
44        }
45    }
46
47    pub async fn create_session_log(&self, session_id: &str) {
48        let mut guard = self.logs.write().await;
49        guard.entry(session_id.to_string()).or_default();
50    }
51
52    pub async fn append(&self, session_id: &str, entry: LogEntry) {
53        let mut guard = self.logs.write().await;
54        guard.entry(session_id.to_string()).or_default().push(entry);
55    }
56
57    pub async fn get_log(&self, session_id: &str) -> Option<Vec<LogEntry>> {
58        let guard = self.logs.read().await;
59        guard.get(session_id).cloned()
60    }
61
62    /// Returns incoming log entries with 0-based index >= `after_sequence`.
63    /// Only `Incoming` entries are returned (Internal/Checkpoint are filtered).
64    /// RFC-MACP-0006-A1: used by StreamSession passive subscribe to replay
65    /// accepted session history to late-joining agents.
66    pub async fn get_incoming_after(
67        &self,
68        session_id: &str,
69        after_sequence: u64,
70    ) -> Vec<(u64, LogEntry)> {
71        let guard = self.logs.read().await;
72        guard
73            .get(session_id)
74            .map(|entries| {
75                entries
76                    .iter()
77                    .enumerate()
78                    .filter(|(_, e)| e.entry_kind == EntryKind::Incoming)
79                    .filter(|(idx, _)| *idx as u64 >= after_sequence)
80                    .map(|(idx, e)| (idx as u64, e.clone()))
81                    .collect()
82            })
83            .unwrap_or_default()
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use super::*;
90
91    fn entry(id: &str, kind: EntryKind) -> LogEntry {
92        LogEntry {
93            message_id: id.into(),
94            received_at_ms: 1_700_000_000_000,
95            sender: "test".into(),
96            message_type: "Message".into(),
97            raw_payload: vec![],
98            entry_kind: kind,
99            session_id: String::new(),
100            mode: String::new(),
101            macp_version: String::new(),
102            timestamp_unix_ms: 1_700_000_000_000,
103        }
104    }
105
106    #[tokio::test]
107    async fn create_append_get_round_trip() {
108        let store = LogStore::new();
109        store.create_session_log("s1").await;
110        store.append("s1", entry("m1", EntryKind::Incoming)).await;
111        store.append("s1", entry("m2", EntryKind::Incoming)).await;
112
113        let log = store.get_log("s1").await.unwrap();
114        assert_eq!(log.len(), 2);
115        assert_eq!(log[0].message_id, "m1");
116        assert_eq!(log[1].message_id, "m2");
117    }
118
119    #[tokio::test]
120    async fn get_incoming_after_filters_by_sequence_and_kind() {
121        let store = LogStore::new();
122        store.create_session_log("s1").await;
123        store.append("s1", entry("m0", EntryKind::Incoming)).await;
124        store.append("s1", entry("m1", EntryKind::Internal)).await;
125        store.append("s1", entry("m2", EntryKind::Incoming)).await;
126        store.append("s1", entry("m3", EntryKind::Incoming)).await;
127        store.append("s1", entry("m4", EntryKind::Checkpoint)).await;
128
129        // after_sequence=0 returns all Incoming entries
130        let all = store.get_incoming_after("s1", 0).await;
131        assert_eq!(all.len(), 3);
132        assert_eq!(all[0].1.message_id, "m0");
133        assert_eq!(all[1].1.message_id, "m2");
134        assert_eq!(all[2].1.message_id, "m3");
135
136        // after_sequence=2 skips index 0 and 1
137        let after2 = store.get_incoming_after("s1", 2).await;
138        assert_eq!(after2.len(), 2);
139        assert_eq!(after2[0].0, 2); // index
140        assert_eq!(after2[0].1.message_id, "m2");
141        assert_eq!(after2[1].0, 3);
142        assert_eq!(after2[1].1.message_id, "m3");
143
144        // nonexistent session returns empty
145        let empty = store.get_incoming_after("nope", 0).await;
146        assert!(empty.is_empty());
147    }
148}