macp_storage/
log_store.rs1use std::collections::HashMap;
2use tokio::sync::RwLock;
3
4#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
5pub enum EntryKind {
6 Incoming,
7 Internal,
8 Checkpoint,
9}
10
11#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
12pub struct LogEntry {
13 pub message_id: String,
14 pub received_at_ms: i64,
15 pub sender: String,
16 pub message_type: String,
17 pub raw_payload: Vec<u8>,
18 pub entry_kind: EntryKind,
19 #[serde(default)]
20 pub session_id: String,
21 #[serde(default)]
22 pub mode: String,
23 #[serde(default)]
24 pub macp_version: String,
25 #[serde(default)]
27 pub timestamp_unix_ms: i64,
28}
29
30pub struct LogStore {
31 logs: RwLock<HashMap<String, Vec<LogEntry>>>,
32}
33
34impl Default for LogStore {
35 fn default() -> Self {
36 Self::new()
37 }
38}
39
40impl LogStore {
41 pub fn new() -> Self {
42 Self {
43 logs: RwLock::new(HashMap::new()),
44 }
45 }
46
47 pub async fn create_session_log(&self, session_id: &str) {
48 let mut guard = self.logs.write().await;
49 guard.entry(session_id.to_string()).or_default();
50 }
51
52 pub async fn append(&self, session_id: &str, entry: LogEntry) {
53 let mut guard = self.logs.write().await;
54 guard.entry(session_id.to_string()).or_default().push(entry);
55 }
56
57 pub async fn get_log(&self, session_id: &str) -> Option<Vec<LogEntry>> {
58 let guard = self.logs.read().await;
59 guard.get(session_id).cloned()
60 }
61
62 pub async fn get_incoming_after(
67 &self,
68 session_id: &str,
69 after_sequence: u64,
70 ) -> Vec<(u64, LogEntry)> {
71 let guard = self.logs.read().await;
72 guard
73 .get(session_id)
74 .map(|entries| {
75 entries
76 .iter()
77 .enumerate()
78 .filter(|(_, e)| e.entry_kind == EntryKind::Incoming)
79 .filter(|(idx, _)| *idx as u64 >= after_sequence)
80 .map(|(idx, e)| (idx as u64, e.clone()))
81 .collect()
82 })
83 .unwrap_or_default()
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use super::*;
90
91 fn entry(id: &str, kind: EntryKind) -> LogEntry {
92 LogEntry {
93 message_id: id.into(),
94 received_at_ms: 1_700_000_000_000,
95 sender: "test".into(),
96 message_type: "Message".into(),
97 raw_payload: vec![],
98 entry_kind: kind,
99 session_id: String::new(),
100 mode: String::new(),
101 macp_version: String::new(),
102 timestamp_unix_ms: 1_700_000_000_000,
103 }
104 }
105
106 #[tokio::test]
107 async fn create_append_get_round_trip() {
108 let store = LogStore::new();
109 store.create_session_log("s1").await;
110 store.append("s1", entry("m1", EntryKind::Incoming)).await;
111 store.append("s1", entry("m2", EntryKind::Incoming)).await;
112
113 let log = store.get_log("s1").await.unwrap();
114 assert_eq!(log.len(), 2);
115 assert_eq!(log[0].message_id, "m1");
116 assert_eq!(log[1].message_id, "m2");
117 }
118
119 #[tokio::test]
120 async fn get_incoming_after_filters_by_sequence_and_kind() {
121 let store = LogStore::new();
122 store.create_session_log("s1").await;
123 store.append("s1", entry("m0", EntryKind::Incoming)).await;
124 store.append("s1", entry("m1", EntryKind::Internal)).await;
125 store.append("s1", entry("m2", EntryKind::Incoming)).await;
126 store.append("s1", entry("m3", EntryKind::Incoming)).await;
127 store.append("s1", entry("m4", EntryKind::Checkpoint)).await;
128
129 let all = store.get_incoming_after("s1", 0).await;
131 assert_eq!(all.len(), 3);
132 assert_eq!(all[0].1.message_id, "m0");
133 assert_eq!(all[1].1.message_id, "m2");
134 assert_eq!(all[2].1.message_id, "m3");
135
136 let after2 = store.get_incoming_after("s1", 2).await;
138 assert_eq!(after2.len(), 2);
139 assert_eq!(after2[0].0, 2); assert_eq!(after2[0].1.message_id, "m2");
141 assert_eq!(after2[1].0, 3);
142 assert_eq!(after2[1].1.message_id, "m3");
143
144 let empty = store.get_incoming_after("nope", 0).await;
146 assert!(empty.is_empty());
147 }
148}