Skip to main content

atomr_persistence/
journal.rs

1//! Journal plugin trait and an in-memory implementation.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use parking_lot::RwLock;
8use thiserror::Error;
9
10#[derive(Debug, Clone, Default)]
11pub struct PersistentRepr {
12    pub persistence_id: String,
13    pub sequence_nr: u64,
14    pub payload: Vec<u8>,
15    pub manifest: String,
16    pub writer_uuid: String,
17    pub deleted: bool,
18    pub tags: Vec<String>,
19}
20
21#[derive(Debug, Error)]
22pub enum JournalError {
23    #[error("sequence nr {expected} expected, got {got}")]
24    SequenceOutOfOrder { expected: u64, got: u64 },
25    #[error("persistence id not found: {0}")]
26    NotFound(String),
27    #[error("backend error: {0}")]
28    Backend(String),
29}
30
31impl JournalError {
32    pub fn backend(err: impl std::fmt::Display) -> Self {
33        Self::Backend(err.to_string())
34    }
35}
36
37#[async_trait]
38pub trait Journal: Send + Sync + 'static {
39    async fn write_messages(&self, messages: Vec<PersistentRepr>) -> Result<(), JournalError>;
40
41    async fn delete_messages_to(&self, persistence_id: &str, to_sequence_nr: u64)
42        -> Result<(), JournalError>;
43
44    async fn replay_messages(
45        &self,
46        persistence_id: &str,
47        from_sequence_nr: u64,
48        to_sequence_nr: u64,
49        max: u64,
50    ) -> Result<Vec<PersistentRepr>, JournalError>;
51
52    async fn highest_sequence_nr(
53        &self,
54        persistence_id: &str,
55        from_sequence_nr: u64,
56    ) -> Result<u64, JournalError>;
57
58    async fn events_by_tag(
59        &self,
60        _tag: &str,
61        _from_offset: u64,
62        _max: u64,
63    ) -> Result<Vec<PersistentRepr>, JournalError> {
64        Ok(Vec::new())
65    }
66
67    /// Distinct persistence ids known to the backend. Default impl
68    /// returns empty so backends without an id index opt in.
69    async fn all_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
70        Ok(Vec::new())
71    }
72}
73
74#[derive(Default)]
75pub struct InMemoryJournal {
76    streams: RwLock<HashMap<String, Vec<PersistentRepr>>>,
77}
78
79impl InMemoryJournal {
80    pub fn new() -> Arc<Self> {
81        Arc::new(Self::default())
82    }
83}
84
85#[async_trait]
86impl Journal for InMemoryJournal {
87    async fn write_messages(&self, messages: Vec<PersistentRepr>) -> Result<(), JournalError> {
88        let mut map = self.streams.write();
89        for msg in messages {
90            let entry = map.entry(msg.persistence_id.clone()).or_default();
91            let expected = entry.last().map(|r| r.sequence_nr + 1).unwrap_or(1);
92            if msg.sequence_nr != expected {
93                return Err(JournalError::SequenceOutOfOrder { expected, got: msg.sequence_nr });
94            }
95            entry.push(msg);
96        }
97        Ok(())
98    }
99
100    async fn delete_messages_to(
101        &self,
102        persistence_id: &str,
103        to_sequence_nr: u64,
104    ) -> Result<(), JournalError> {
105        let mut map = self.streams.write();
106        if let Some(stream) = map.get_mut(persistence_id) {
107            for r in stream.iter_mut() {
108                if r.sequence_nr <= to_sequence_nr {
109                    r.deleted = true;
110                }
111            }
112        }
113        Ok(())
114    }
115
116    async fn replay_messages(
117        &self,
118        persistence_id: &str,
119        from: u64,
120        to: u64,
121        max: u64,
122    ) -> Result<Vec<PersistentRepr>, JournalError> {
123        let map = self.streams.read();
124        Ok(map
125            .get(persistence_id)
126            .map(|v| {
127                v.iter()
128                    .filter(|r| !r.deleted && r.sequence_nr >= from && r.sequence_nr <= to)
129                    .take(max as usize)
130                    .cloned()
131                    .collect()
132            })
133            .unwrap_or_default())
134    }
135
136    async fn highest_sequence_nr(&self, pid: &str, _from: u64) -> Result<u64, JournalError> {
137        Ok(self.streams.read().get(pid).and_then(|v| v.last()).map(|r| r.sequence_nr).unwrap_or(0))
138    }
139
140    async fn all_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
141        Ok(self.streams.read().keys().cloned().collect())
142    }
143
144    async fn events_by_tag(
145        &self,
146        tag: &str,
147        from_offset: u64,
148        max: u64,
149    ) -> Result<Vec<PersistentRepr>, JournalError> {
150        let map = self.streams.read();
151        let mut out = Vec::new();
152        for (_pid, stream) in map.iter() {
153            for r in stream {
154                if r.deleted {
155                    continue;
156                }
157                if r.sequence_nr < from_offset {
158                    continue;
159                }
160                if r.tags.iter().any(|t| t == tag) {
161                    out.push(r.clone());
162                    if out.len() as u64 >= max {
163                        return Ok(out);
164                    }
165                }
166            }
167        }
168        Ok(out)
169    }
170}
171
172impl InMemoryJournal {
173    /// List all persistence ids currently stored. Used by the telemetry
174    /// `JournalAdmin` impl.
175    pub fn list_persistence_ids(&self) -> Vec<String> {
176        self.streams.read().keys().cloned().collect()
177    }
178
179    /// Number of non-deleted events stored for `persistence_id`.
180    pub fn event_count(&self, persistence_id: &str) -> u64 {
181        self.streams
182            .read()
183            .get(persistence_id)
184            .map(|v| v.iter().filter(|r| !r.deleted).count() as u64)
185            .unwrap_or(0)
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    fn repr(pid: &str, nr: u64) -> PersistentRepr {
194        PersistentRepr {
195            persistence_id: pid.into(),
196            sequence_nr: nr,
197            payload: vec![nr as u8],
198            manifest: "m".into(),
199            writer_uuid: "w".into(),
200            deleted: false,
201            tags: Vec::new(),
202        }
203    }
204
205    #[tokio::test]
206    async fn write_and_replay() {
207        let j = InMemoryJournal::new();
208        j.write_messages(vec![repr("p", 1), repr("p", 2), repr("p", 3)]).await.unwrap();
209        let got = j.replay_messages("p", 1, 3, 10).await.unwrap();
210        assert_eq!(got.len(), 3);
211        assert_eq!(j.highest_sequence_nr("p", 0).await.unwrap(), 3);
212    }
213
214    #[tokio::test]
215    async fn out_of_order_rejected() {
216        let j = InMemoryJournal::new();
217        let err = j.write_messages(vec![repr("p", 2)]).await.unwrap_err();
218        matches!(err, JournalError::SequenceOutOfOrder { .. });
219    }
220}