1use async_trait::async_trait;
9use atomr_persistence::{Journal, JournalError, PersistentRepr};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15#[non_exhaustive]
16#[derive(Default)]
17pub enum Offset {
18 #[default]
19 NoOffset,
20 Sequence(u64),
21 TimeBased(u128),
22}
23
24impl Offset {
25 pub fn as_sequence(self) -> Option<u64> {
26 match self {
27 Self::NoOffset => Some(0),
28 Self::Sequence(n) => Some(n),
29 Self::TimeBased(_) => None,
30 }
31 }
32}
33
34#[derive(Debug, Clone)]
35pub struct EventEnvelope {
36 pub persistence_id: String,
37 pub sequence_nr: u64,
38 pub payload: Vec<u8>,
39 pub offset: u64,
40 pub tags: Vec<String>,
41 pub manifest: String,
45}
46
47impl From<PersistentRepr> for EventEnvelope {
48 fn from(r: PersistentRepr) -> Self {
49 Self {
50 persistence_id: r.persistence_id,
51 sequence_nr: r.sequence_nr,
52 payload: r.payload,
53 offset: r.sequence_nr,
54 tags: r.tags,
55 manifest: r.manifest,
56 }
57 }
58}
59
60#[async_trait]
65pub trait ReadJournal: Send + Sync + 'static {
66 async fn events_by_persistence_id(
69 &self,
70 persistence_id: &str,
71 from_sequence_nr: u64,
72 to_sequence_nr: u64,
73 ) -> Result<Vec<EventEnvelope>, JournalError>;
74
75 async fn current_events_by_persistence_id(
78 &self,
79 persistence_id: &str,
80 from: u64,
81 to: u64,
82 ) -> Result<Vec<EventEnvelope>, JournalError> {
83 self.events_by_persistence_id(persistence_id, from, to).await
84 }
85
86 async fn events_by_tag(&self, _tag: &str, _offset: Offset) -> Result<Vec<EventEnvelope>, JournalError> {
90 Ok(Vec::new())
91 }
92
93 async fn current_events_by_tag(
94 &self,
95 tag: &str,
96 offset: Offset,
97 ) -> Result<Vec<EventEnvelope>, JournalError> {
98 self.events_by_tag(tag, offset).await
99 }
100
101 async fn all_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
104 Ok(Vec::new())
105 }
106
107 async fn current_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
108 self.all_persistence_ids().await
109 }
110}
111
112pub struct SimpleReadJournal<J: Journal> {
113 journal: std::sync::Arc<J>,
114}
115
116impl<J: Journal> SimpleReadJournal<J> {
117 pub fn new(journal: std::sync::Arc<J>) -> Self {
118 Self { journal }
119 }
120}
121
122#[async_trait]
123impl<J: Journal> ReadJournal for SimpleReadJournal<J> {
124 async fn events_by_persistence_id(
125 &self,
126 persistence_id: &str,
127 from: u64,
128 to: u64,
129 ) -> Result<Vec<EventEnvelope>, JournalError> {
130 let reprs = self.journal.replay_messages(persistence_id, from, to, u64::MAX).await?;
131 Ok(reprs.into_iter().map(Into::into).collect())
132 }
133
134 async fn events_by_tag(&self, tag: &str, offset: Offset) -> Result<Vec<EventEnvelope>, JournalError> {
135 let from_seq = offset.as_sequence().unwrap_or(0);
136 let backend_results = self.journal.events_by_tag(tag, from_seq, u64::MAX).await?;
138 if !backend_results.is_empty() {
139 return Ok(backend_results.into_iter().map(Into::into).collect());
140 }
141 let ids = self.journal.all_persistence_ids().await?;
144 let mut out = Vec::new();
145 for id in ids {
146 let reprs = self.journal.replay_messages(&id, from_seq, u64::MAX, u64::MAX).await?;
147 for r in reprs {
148 if r.tags.iter().any(|t| t == tag) {
149 out.push(r.into());
150 }
151 }
152 }
153 Ok(out)
154 }
155
156 async fn all_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
157 self.journal.all_persistence_ids().await
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use super::*;
164 use atomr_persistence::{InMemoryJournal, Journal, PersistentRepr};
165 use std::sync::Arc;
166
167 fn repr(pid: &str, seq: u64, tags: &[&str]) -> PersistentRepr {
168 PersistentRepr {
169 persistence_id: pid.into(),
170 sequence_nr: seq,
171 payload: vec![seq as u8],
172 manifest: "evt".into(),
173 writer_uuid: "w".into(),
174 deleted: false,
175 tags: tags.iter().map(|s| s.to_string()).collect(),
176 }
177 }
178
179 #[tokio::test]
180 async fn events_by_persistence_id_replays_range() {
181 let j = Arc::new(InMemoryJournal::default());
182 j.write_messages(vec![repr("a", 1, &[]), repr("a", 2, &[]), repr("a", 3, &[])]).await.unwrap();
183 let q = SimpleReadJournal::new(j);
184 let evs = q.events_by_persistence_id("a", 1, 2).await.unwrap();
185 assert_eq!(evs.len(), 2);
186 assert_eq!(evs[0].sequence_nr, 1);
187 assert_eq!(evs[1].sequence_nr, 2);
188 }
189
190 #[tokio::test]
191 async fn current_variant_matches_live() {
192 let j = Arc::new(InMemoryJournal::default());
193 j.write_messages(vec![repr("a", 1, &[])]).await.unwrap();
194 let q = SimpleReadJournal::new(j);
195 let live = q.events_by_persistence_id("a", 1, 99).await.unwrap();
196 let snap = q.current_events_by_persistence_id("a", 1, 99).await.unwrap();
197 assert_eq!(live.len(), snap.len());
198 }
199
200 #[tokio::test]
201 async fn offset_sequence_round_trips() {
202 assert_eq!(Offset::Sequence(7).as_sequence(), Some(7));
203 assert_eq!(Offset::NoOffset.as_sequence(), Some(0));
204 assert_eq!(Offset::TimeBased(123).as_sequence(), None);
205 }
206
207 #[tokio::test]
208 async fn events_by_tag_returns_tagged_events_across_pids() {
209 let j = Arc::new(InMemoryJournal::default());
210 j.write_messages(vec![
211 repr("a", 1, &["red"]),
212 repr("a", 2, &["blue"]),
213 repr("b", 1, &["red", "hot"]),
214 repr("b", 2, &["green"]),
215 ])
216 .await
217 .unwrap();
218 let q = SimpleReadJournal::new(j);
219 let red = q.events_by_tag("red", Offset::NoOffset).await.unwrap();
220 assert_eq!(red.len(), 2);
221 let blue = q.events_by_tag("blue", Offset::NoOffset).await.unwrap();
222 assert_eq!(blue.len(), 1);
223 let none = q.events_by_tag("nope", Offset::NoOffset).await.unwrap();
224 assert!(none.is_empty());
225 }
226
227 #[tokio::test]
228 async fn events_by_tag_respects_offset() {
229 let j = Arc::new(InMemoryJournal::default());
230 j.write_messages(vec![repr("a", 1, &["t"]), repr("a", 2, &["t"]), repr("a", 3, &["t"])])
231 .await
232 .unwrap();
233 let q = SimpleReadJournal::new(j);
234 let from2 = q.events_by_tag("t", Offset::Sequence(2)).await.unwrap();
235 assert_eq!(from2.len(), 2);
236 assert_eq!(from2[0].sequence_nr, 2);
237 }
238
239 #[tokio::test]
240 async fn all_persistence_ids_lists_distinct_writers() {
241 let j = Arc::new(InMemoryJournal::default());
242 j.write_messages(vec![repr("a", 1, &[]), repr("b", 1, &[]), repr("a", 2, &[])]).await.unwrap();
243 let q = SimpleReadJournal::new(j);
244 let mut ids = q.all_persistence_ids().await.unwrap();
245 ids.sort();
246 assert_eq!(ids, vec!["a".to_string(), "b".to_string()]);
247 }
248}