atomr_persistence/
journal.rs1use std::collections::HashMap;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use parking_lot::RwLock;
9use thiserror::Error;
10
11#[derive(Debug, Clone, Default)]
12pub struct PersistentRepr {
13 pub persistence_id: String,
14 pub sequence_nr: u64,
15 pub payload: Vec<u8>,
16 pub manifest: String,
17 pub writer_uuid: String,
18 pub deleted: bool,
19 pub tags: Vec<String>,
20}
21
22#[derive(Debug, Error)]
23pub enum JournalError {
24 #[error("sequence nr {expected} expected, got {got}")]
25 SequenceOutOfOrder { expected: u64, got: u64 },
26 #[error("persistence id not found: {0}")]
27 NotFound(String),
28 #[error("backend error: {0}")]
29 Backend(String),
30}
31
32impl JournalError {
33 pub fn backend(err: impl std::fmt::Display) -> Self {
34 Self::Backend(err.to_string())
35 }
36}
37
38#[async_trait]
39pub trait Journal: Send + Sync + 'static {
40 async fn write_messages(&self, messages: Vec<PersistentRepr>) -> Result<(), JournalError>;
41
42 async fn delete_messages_to(&self, persistence_id: &str, to_sequence_nr: u64)
43 -> Result<(), JournalError>;
44
45 async fn replay_messages(
46 &self,
47 persistence_id: &str,
48 from_sequence_nr: u64,
49 to_sequence_nr: u64,
50 max: u64,
51 ) -> Result<Vec<PersistentRepr>, JournalError>;
52
53 async fn highest_sequence_nr(
54 &self,
55 persistence_id: &str,
56 from_sequence_nr: u64,
57 ) -> Result<u64, JournalError>;
58
59 async fn events_by_tag(
60 &self,
61 _tag: &str,
62 _from_offset: u64,
63 _max: u64,
64 ) -> Result<Vec<PersistentRepr>, JournalError> {
65 Ok(Vec::new())
66 }
67}
68
69#[derive(Default)]
70pub struct InMemoryJournal {
71 streams: RwLock<HashMap<String, Vec<PersistentRepr>>>,
72}
73
74impl InMemoryJournal {
75 pub fn new() -> Arc<Self> {
76 Arc::new(Self::default())
77 }
78}
79
80#[async_trait]
81impl Journal for InMemoryJournal {
82 async fn write_messages(&self, messages: Vec<PersistentRepr>) -> Result<(), JournalError> {
83 let mut map = self.streams.write();
84 for msg in messages {
85 let entry = map.entry(msg.persistence_id.clone()).or_default();
86 let expected = entry.last().map(|r| r.sequence_nr + 1).unwrap_or(1);
87 if msg.sequence_nr != expected {
88 return Err(JournalError::SequenceOutOfOrder { expected, got: msg.sequence_nr });
89 }
90 entry.push(msg);
91 }
92 Ok(())
93 }
94
95 async fn delete_messages_to(
96 &self,
97 persistence_id: &str,
98 to_sequence_nr: u64,
99 ) -> Result<(), JournalError> {
100 let mut map = self.streams.write();
101 if let Some(stream) = map.get_mut(persistence_id) {
102 for r in stream.iter_mut() {
103 if r.sequence_nr <= to_sequence_nr {
104 r.deleted = true;
105 }
106 }
107 }
108 Ok(())
109 }
110
111 async fn replay_messages(
112 &self,
113 persistence_id: &str,
114 from: u64,
115 to: u64,
116 max: u64,
117 ) -> Result<Vec<PersistentRepr>, JournalError> {
118 let map = self.streams.read();
119 Ok(map
120 .get(persistence_id)
121 .map(|v| {
122 v.iter()
123 .filter(|r| !r.deleted && r.sequence_nr >= from && r.sequence_nr <= to)
124 .take(max as usize)
125 .cloned()
126 .collect()
127 })
128 .unwrap_or_default())
129 }
130
131 async fn highest_sequence_nr(&self, pid: &str, _from: u64) -> Result<u64, JournalError> {
132 Ok(self.streams.read().get(pid).and_then(|v| v.last()).map(|r| r.sequence_nr).unwrap_or(0))
133 }
134}
135
136impl InMemoryJournal {
137 pub fn list_persistence_ids(&self) -> Vec<String> {
140 self.streams.read().keys().cloned().collect()
141 }
142
143 pub fn event_count(&self, persistence_id: &str) -> u64 {
145 self.streams
146 .read()
147 .get(persistence_id)
148 .map(|v| v.iter().filter(|r| !r.deleted).count() as u64)
149 .unwrap_or(0)
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156
157 fn repr(pid: &str, nr: u64) -> PersistentRepr {
158 PersistentRepr {
159 persistence_id: pid.into(),
160 sequence_nr: nr,
161 payload: vec![nr as u8],
162 manifest: "m".into(),
163 writer_uuid: "w".into(),
164 deleted: false,
165 tags: Vec::new(),
166 }
167 }
168
169 #[tokio::test]
170 async fn write_and_replay() {
171 let j = InMemoryJournal::new();
172 j.write_messages(vec![repr("p", 1), repr("p", 2), repr("p", 3)]).await.unwrap();
173 let got = j.replay_messages("p", 1, 3, 10).await.unwrap();
174 assert_eq!(got.len(), 3);
175 assert_eq!(j.highest_sequence_nr("p", 0).await.unwrap(), 3);
176 }
177
178 #[tokio::test]
179 async fn out_of_order_rejected() {
180 let j = InMemoryJournal::new();
181 let err = j.write_messages(vec![repr("p", 2)]).await.unwrap_err();
182 matches!(err, JournalError::SequenceOutOfOrder { .. });
183 }
184}