Skip to main content

atomr_persistence/
journal.rs

1//! Journal plugin trait and an in-memory implementation.
2//! akka.net: `IJournal`, `AsyncWriteJournal`, `MemoryJournal`.
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use parking_lot::RwLock;
9use thiserror::Error;
10
11#[derive(Debug, Clone, Default)]
12pub struct PersistentRepr {
13    pub persistence_id: String,
14    pub sequence_nr: u64,
15    pub payload: Vec<u8>,
16    pub manifest: String,
17    pub writer_uuid: String,
18    pub deleted: bool,
19    pub tags: Vec<String>,
20}
21
22#[derive(Debug, Error)]
23pub enum JournalError {
24    #[error("sequence nr {expected} expected, got {got}")]
25    SequenceOutOfOrder { expected: u64, got: u64 },
26    #[error("persistence id not found: {0}")]
27    NotFound(String),
28    #[error("backend error: {0}")]
29    Backend(String),
30}
31
32impl JournalError {
33    pub fn backend(err: impl std::fmt::Display) -> Self {
34        Self::Backend(err.to_string())
35    }
36}
37
38#[async_trait]
39pub trait Journal: Send + Sync + 'static {
40    async fn write_messages(&self, messages: Vec<PersistentRepr>) -> Result<(), JournalError>;
41
42    async fn delete_messages_to(&self, persistence_id: &str, to_sequence_nr: u64)
43        -> Result<(), JournalError>;
44
45    async fn replay_messages(
46        &self,
47        persistence_id: &str,
48        from_sequence_nr: u64,
49        to_sequence_nr: u64,
50        max: u64,
51    ) -> Result<Vec<PersistentRepr>, JournalError>;
52
53    async fn highest_sequence_nr(
54        &self,
55        persistence_id: &str,
56        from_sequence_nr: u64,
57    ) -> Result<u64, JournalError>;
58
59    async fn events_by_tag(
60        &self,
61        _tag: &str,
62        _from_offset: u64,
63        _max: u64,
64    ) -> Result<Vec<PersistentRepr>, JournalError> {
65        Ok(Vec::new())
66    }
67}
68
69#[derive(Default)]
70pub struct InMemoryJournal {
71    streams: RwLock<HashMap<String, Vec<PersistentRepr>>>,
72}
73
74impl InMemoryJournal {
75    pub fn new() -> Arc<Self> {
76        Arc::new(Self::default())
77    }
78}
79
80#[async_trait]
81impl Journal for InMemoryJournal {
82    async fn write_messages(&self, messages: Vec<PersistentRepr>) -> Result<(), JournalError> {
83        let mut map = self.streams.write();
84        for msg in messages {
85            let entry = map.entry(msg.persistence_id.clone()).or_default();
86            let expected = entry.last().map(|r| r.sequence_nr + 1).unwrap_or(1);
87            if msg.sequence_nr != expected {
88                return Err(JournalError::SequenceOutOfOrder { expected, got: msg.sequence_nr });
89            }
90            entry.push(msg);
91        }
92        Ok(())
93    }
94
95    async fn delete_messages_to(
96        &self,
97        persistence_id: &str,
98        to_sequence_nr: u64,
99    ) -> Result<(), JournalError> {
100        let mut map = self.streams.write();
101        if let Some(stream) = map.get_mut(persistence_id) {
102            for r in stream.iter_mut() {
103                if r.sequence_nr <= to_sequence_nr {
104                    r.deleted = true;
105                }
106            }
107        }
108        Ok(())
109    }
110
111    async fn replay_messages(
112        &self,
113        persistence_id: &str,
114        from: u64,
115        to: u64,
116        max: u64,
117    ) -> Result<Vec<PersistentRepr>, JournalError> {
118        let map = self.streams.read();
119        Ok(map
120            .get(persistence_id)
121            .map(|v| {
122                v.iter()
123                    .filter(|r| !r.deleted && r.sequence_nr >= from && r.sequence_nr <= to)
124                    .take(max as usize)
125                    .cloned()
126                    .collect()
127            })
128            .unwrap_or_default())
129    }
130
131    async fn highest_sequence_nr(&self, pid: &str, _from: u64) -> Result<u64, JournalError> {
132        Ok(self.streams.read().get(pid).and_then(|v| v.last()).map(|r| r.sequence_nr).unwrap_or(0))
133    }
134}
135
136impl InMemoryJournal {
137    /// List all persistence ids currently stored. Used by the telemetry
138    /// `JournalAdmin` impl.
139    pub fn list_persistence_ids(&self) -> Vec<String> {
140        self.streams.read().keys().cloned().collect()
141    }
142
143    /// Number of non-deleted events stored for `persistence_id`.
144    pub fn event_count(&self, persistence_id: &str) -> u64 {
145        self.streams
146            .read()
147            .get(persistence_id)
148            .map(|v| v.iter().filter(|r| !r.deleted).count() as u64)
149            .unwrap_or(0)
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156
157    fn repr(pid: &str, nr: u64) -> PersistentRepr {
158        PersistentRepr {
159            persistence_id: pid.into(),
160            sequence_nr: nr,
161            payload: vec![nr as u8],
162            manifest: "m".into(),
163            writer_uuid: "w".into(),
164            deleted: false,
165            tags: Vec::new(),
166        }
167    }
168
169    #[tokio::test]
170    async fn write_and_replay() {
171        let j = InMemoryJournal::new();
172        j.write_messages(vec![repr("p", 1), repr("p", 2), repr("p", 3)]).await.unwrap();
173        let got = j.replay_messages("p", 1, 3, 10).await.unwrap();
174        assert_eq!(got.len(), 3);
175        assert_eq!(j.highest_sequence_nr("p", 0).await.unwrap(), 3);
176    }
177
178    #[tokio::test]
179    async fn out_of_order_rejected() {
180        let j = InMemoryJournal::new();
181        let err = j.write_messages(vec![repr("p", 2)]).await.unwrap_err();
182        matches!(err, JournalError::SequenceOutOfOrder { .. });
183    }
184}