atomr_persistence/
journal.rs1use std::collections::HashMap;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use parking_lot::RwLock;
8use thiserror::Error;
9
10#[derive(Debug, Clone, Default)]
11pub struct PersistentRepr {
12 pub persistence_id: String,
13 pub sequence_nr: u64,
14 pub payload: Vec<u8>,
15 pub manifest: String,
16 pub writer_uuid: String,
17 pub deleted: bool,
18 pub tags: Vec<String>,
19}
20
21#[derive(Debug, Error)]
22pub enum JournalError {
23 #[error("sequence nr {expected} expected, got {got}")]
24 SequenceOutOfOrder { expected: u64, got: u64 },
25 #[error("persistence id not found: {0}")]
26 NotFound(String),
27 #[error("backend error: {0}")]
28 Backend(String),
29}
30
31impl JournalError {
32 pub fn backend(err: impl std::fmt::Display) -> Self {
33 Self::Backend(err.to_string())
34 }
35}
36
37#[async_trait]
38pub trait Journal: Send + Sync + 'static {
39 async fn write_messages(&self, messages: Vec<PersistentRepr>) -> Result<(), JournalError>;
40
41 async fn delete_messages_to(&self, persistence_id: &str, to_sequence_nr: u64)
42 -> Result<(), JournalError>;
43
44 async fn replay_messages(
45 &self,
46 persistence_id: &str,
47 from_sequence_nr: u64,
48 to_sequence_nr: u64,
49 max: u64,
50 ) -> Result<Vec<PersistentRepr>, JournalError>;
51
52 async fn highest_sequence_nr(
53 &self,
54 persistence_id: &str,
55 from_sequence_nr: u64,
56 ) -> Result<u64, JournalError>;
57
58 async fn events_by_tag(
59 &self,
60 _tag: &str,
61 _from_offset: u64,
62 _max: u64,
63 ) -> Result<Vec<PersistentRepr>, JournalError> {
64 Ok(Vec::new())
65 }
66
67 async fn all_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
70 Ok(Vec::new())
71 }
72}
73
74#[derive(Default)]
75pub struct InMemoryJournal {
76 streams: RwLock<HashMap<String, Vec<PersistentRepr>>>,
77}
78
79impl InMemoryJournal {
80 pub fn new() -> Arc<Self> {
81 Arc::new(Self::default())
82 }
83}
84
85#[async_trait]
86impl Journal for InMemoryJournal {
87 async fn write_messages(&self, messages: Vec<PersistentRepr>) -> Result<(), JournalError> {
88 let mut map = self.streams.write();
89 for msg in messages {
90 let entry = map.entry(msg.persistence_id.clone()).or_default();
91 let expected = entry.last().map(|r| r.sequence_nr + 1).unwrap_or(1);
92 if msg.sequence_nr != expected {
93 return Err(JournalError::SequenceOutOfOrder { expected, got: msg.sequence_nr });
94 }
95 entry.push(msg);
96 }
97 Ok(())
98 }
99
100 async fn delete_messages_to(
101 &self,
102 persistence_id: &str,
103 to_sequence_nr: u64,
104 ) -> Result<(), JournalError> {
105 let mut map = self.streams.write();
106 if let Some(stream) = map.get_mut(persistence_id) {
107 for r in stream.iter_mut() {
108 if r.sequence_nr <= to_sequence_nr {
109 r.deleted = true;
110 }
111 }
112 }
113 Ok(())
114 }
115
116 async fn replay_messages(
117 &self,
118 persistence_id: &str,
119 from: u64,
120 to: u64,
121 max: u64,
122 ) -> Result<Vec<PersistentRepr>, JournalError> {
123 let map = self.streams.read();
124 Ok(map
125 .get(persistence_id)
126 .map(|v| {
127 v.iter()
128 .filter(|r| !r.deleted && r.sequence_nr >= from && r.sequence_nr <= to)
129 .take(max as usize)
130 .cloned()
131 .collect()
132 })
133 .unwrap_or_default())
134 }
135
136 async fn highest_sequence_nr(&self, pid: &str, _from: u64) -> Result<u64, JournalError> {
137 Ok(self.streams.read().get(pid).and_then(|v| v.last()).map(|r| r.sequence_nr).unwrap_or(0))
138 }
139
140 async fn all_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
141 Ok(self.streams.read().keys().cloned().collect())
142 }
143
144 async fn events_by_tag(
145 &self,
146 tag: &str,
147 from_offset: u64,
148 max: u64,
149 ) -> Result<Vec<PersistentRepr>, JournalError> {
150 let map = self.streams.read();
151 let mut out = Vec::new();
152 for (_pid, stream) in map.iter() {
153 for r in stream {
154 if r.deleted {
155 continue;
156 }
157 if r.sequence_nr < from_offset {
158 continue;
159 }
160 if r.tags.iter().any(|t| t == tag) {
161 out.push(r.clone());
162 if out.len() as u64 >= max {
163 return Ok(out);
164 }
165 }
166 }
167 }
168 Ok(out)
169 }
170}
171
172impl InMemoryJournal {
173 pub fn list_persistence_ids(&self) -> Vec<String> {
176 self.streams.read().keys().cloned().collect()
177 }
178
179 pub fn event_count(&self, persistence_id: &str) -> u64 {
181 self.streams
182 .read()
183 .get(persistence_id)
184 .map(|v| v.iter().filter(|r| !r.deleted).count() as u64)
185 .unwrap_or(0)
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 fn repr(pid: &str, nr: u64) -> PersistentRepr {
194 PersistentRepr {
195 persistence_id: pid.into(),
196 sequence_nr: nr,
197 payload: vec![nr as u8],
198 manifest: "m".into(),
199 writer_uuid: "w".into(),
200 deleted: false,
201 tags: Vec::new(),
202 }
203 }
204
205 #[tokio::test]
206 async fn write_and_replay() {
207 let j = InMemoryJournal::new();
208 j.write_messages(vec![repr("p", 1), repr("p", 2), repr("p", 3)]).await.unwrap();
209 let got = j.replay_messages("p", 1, 3, 10).await.unwrap();
210 assert_eq!(got.len(), 3);
211 assert_eq!(j.highest_sequence_nr("p", 0).await.unwrap(), 3);
212 }
213
214 #[tokio::test]
215 async fn out_of_order_rejected() {
216 let j = InMemoryJournal::new();
217 let err = j.write_messages(vec![repr("p", 2)]).await.unwrap_err();
218 matches!(err, JournalError::SequenceOutOfOrder { .. });
219 }
220}