1use async_trait::async_trait;
9use atomr_persistence::{Journal, JournalError, PersistentRepr};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15#[non_exhaustive]
16#[derive(Default)]
17pub enum Offset {
18 #[default]
19 NoOffset,
20 Sequence(u64),
21 TimeBased(u128),
22}
23
24impl Offset {
25 pub fn as_sequence(self) -> Option<u64> {
26 match self {
27 Self::NoOffset => Some(0),
28 Self::Sequence(n) => Some(n),
29 Self::TimeBased(_) => None,
30 }
31 }
32}
33
34#[derive(Debug, Clone)]
35pub struct EventEnvelope {
36 pub persistence_id: String,
37 pub sequence_nr: u64,
38 pub payload: Vec<u8>,
39 pub offset: u64,
40 pub tags: Vec<String>,
41 pub manifest: String,
45}
46
47impl From<PersistentRepr> for EventEnvelope {
48 fn from(r: PersistentRepr) -> Self {
49 Self {
50 persistence_id: r.persistence_id,
51 sequence_nr: r.sequence_nr,
52 payload: r.payload,
53 offset: r.sequence_nr,
54 tags: r.tags,
55 manifest: r.manifest,
56 }
57 }
58}
59
60#[async_trait]
65pub trait ReadJournal: Send + Sync + 'static {
66 async fn events_by_persistence_id(
69 &self,
70 persistence_id: &str,
71 from_sequence_nr: u64,
72 to_sequence_nr: u64,
73 ) -> Result<Vec<EventEnvelope>, JournalError>;
74
75 async fn current_events_by_persistence_id(
78 &self,
79 persistence_id: &str,
80 from: u64,
81 to: u64,
82 ) -> Result<Vec<EventEnvelope>, JournalError> {
83 self.events_by_persistence_id(persistence_id, from, to).await
84 }
85
86 async fn events_by_tag(&self, _tag: &str, _offset: Offset) -> Result<Vec<EventEnvelope>, JournalError> {
90 Ok(Vec::new())
91 }
92
93 async fn current_events_by_tag(
94 &self,
95 tag: &str,
96 offset: Offset,
97 ) -> Result<Vec<EventEnvelope>, JournalError> {
98 self.events_by_tag(tag, offset).await
99 }
100
101 async fn all_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
104 Ok(Vec::new())
105 }
106
107 async fn current_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
108 self.all_persistence_ids().await
109 }
110
111 async fn events_as_of(
120 &self,
121 pid: &str,
122 _system_time_nanos: u128,
123 ) -> Result<Vec<EventEnvelope>, JournalError> {
124 self.current_events_by_persistence_id(pid, 0, u64::MAX).await
125 }
126
127 async fn events_valid_as_of(
135 &self,
136 pid: &str,
137 _valid_time_nanos: u128,
138 _system_time_nanos: u128,
139 ) -> Result<Vec<EventEnvelope>, JournalError> {
140 self.current_events_by_persistence_id(pid, 0, u64::MAX).await
141 }
142}
143
144pub struct SimpleReadJournal<J: Journal> {
145 journal: std::sync::Arc<J>,
146}
147
148impl<J: Journal> SimpleReadJournal<J> {
149 pub fn new(journal: std::sync::Arc<J>) -> Self {
150 Self { journal }
151 }
152}
153
154#[async_trait]
155impl<J: Journal> ReadJournal for SimpleReadJournal<J> {
156 async fn events_by_persistence_id(
157 &self,
158 persistence_id: &str,
159 from: u64,
160 to: u64,
161 ) -> Result<Vec<EventEnvelope>, JournalError> {
162 let reprs = self.journal.replay_messages(persistence_id, from, to, u64::MAX).await?;
163 Ok(reprs.into_iter().map(Into::into).collect())
164 }
165
166 async fn events_by_tag(&self, tag: &str, offset: Offset) -> Result<Vec<EventEnvelope>, JournalError> {
167 let from_seq = offset.as_sequence().unwrap_or(0);
168 let backend_results = self.journal.events_by_tag(tag, from_seq, u64::MAX).await?;
170 if !backend_results.is_empty() {
171 return Ok(backend_results.into_iter().map(Into::into).collect());
172 }
173 let ids = self.journal.all_persistence_ids().await?;
176 let mut out = Vec::new();
177 for id in ids {
178 let reprs = self.journal.replay_messages(&id, from_seq, u64::MAX, u64::MAX).await?;
179 for r in reprs {
180 if r.tags.iter().any(|t| t == tag) {
181 out.push(r.into());
182 }
183 }
184 }
185 Ok(out)
186 }
187
188 async fn all_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
189 self.journal.all_persistence_ids().await
190 }
191}
192
193#[cfg(test)]
194mod tests {
195 use super::*;
196 use atomr_persistence::{InMemoryJournal, Journal, PersistentRepr};
197 use std::sync::Arc;
198
199 fn repr(pid: &str, seq: u64, tags: &[&str]) -> PersistentRepr {
200 PersistentRepr {
201 persistence_id: pid.into(),
202 sequence_nr: seq,
203 payload: vec![seq as u8],
204 manifest: "evt".into(),
205 writer_uuid: "w".into(),
206 deleted: false,
207 tags: tags.iter().map(|s| s.to_string()).collect(),
208 }
209 }
210
211 #[tokio::test]
212 async fn events_by_persistence_id_replays_range() {
213 let j = Arc::new(InMemoryJournal::default());
214 j.write_messages(vec![repr("a", 1, &[]), repr("a", 2, &[]), repr("a", 3, &[])]).await.unwrap();
215 let q = SimpleReadJournal::new(j);
216 let evs = q.events_by_persistence_id("a", 1, 2).await.unwrap();
217 assert_eq!(evs.len(), 2);
218 assert_eq!(evs[0].sequence_nr, 1);
219 assert_eq!(evs[1].sequence_nr, 2);
220 }
221
222 #[tokio::test]
223 async fn current_variant_matches_live() {
224 let j = Arc::new(InMemoryJournal::default());
225 j.write_messages(vec![repr("a", 1, &[])]).await.unwrap();
226 let q = SimpleReadJournal::new(j);
227 let live = q.events_by_persistence_id("a", 1, 99).await.unwrap();
228 let snap = q.current_events_by_persistence_id("a", 1, 99).await.unwrap();
229 assert_eq!(live.len(), snap.len());
230 }
231
232 #[tokio::test]
233 async fn offset_sequence_round_trips() {
234 assert_eq!(Offset::Sequence(7).as_sequence(), Some(7));
235 assert_eq!(Offset::NoOffset.as_sequence(), Some(0));
236 assert_eq!(Offset::TimeBased(123).as_sequence(), None);
237 }
238
239 #[tokio::test]
240 async fn events_by_tag_returns_tagged_events_across_pids() {
241 let j = Arc::new(InMemoryJournal::default());
242 j.write_messages(vec![
243 repr("a", 1, &["red"]),
244 repr("a", 2, &["blue"]),
245 repr("b", 1, &["red", "hot"]),
246 repr("b", 2, &["green"]),
247 ])
248 .await
249 .unwrap();
250 let q = SimpleReadJournal::new(j);
251 let red = q.events_by_tag("red", Offset::NoOffset).await.unwrap();
252 assert_eq!(red.len(), 2);
253 let blue = q.events_by_tag("blue", Offset::NoOffset).await.unwrap();
254 assert_eq!(blue.len(), 1);
255 let none = q.events_by_tag("nope", Offset::NoOffset).await.unwrap();
256 assert!(none.is_empty());
257 }
258
259 #[tokio::test]
260 async fn events_by_tag_respects_offset() {
261 let j = Arc::new(InMemoryJournal::default());
262 j.write_messages(vec![repr("a", 1, &["t"]), repr("a", 2, &["t"]), repr("a", 3, &["t"])])
263 .await
264 .unwrap();
265 let q = SimpleReadJournal::new(j);
266 let from2 = q.events_by_tag("t", Offset::Sequence(2)).await.unwrap();
267 assert_eq!(from2.len(), 2);
268 assert_eq!(from2[0].sequence_nr, 2);
269 }
270
271 #[tokio::test]
272 async fn events_as_of_default_returns_current_events() {
273 let j = Arc::new(InMemoryJournal::default());
276 j.write_messages(vec![repr("a", 1, &[]), repr("a", 2, &[])]).await.unwrap();
277 let q = SimpleReadJournal::new(j);
278 let asof = q.events_as_of("a", 12345).await.unwrap();
279 assert_eq!(asof.len(), 2);
280 let bit = q.events_valid_as_of("a", 1, 2).await.unwrap();
281 assert_eq!(bit.len(), 2);
282 }
283
284 #[tokio::test]
285 async fn all_persistence_ids_lists_distinct_writers() {
286 let j = Arc::new(InMemoryJournal::default());
287 j.write_messages(vec![repr("a", 1, &[]), repr("b", 1, &[]), repr("a", 2, &[])]).await.unwrap();
288 let q = SimpleReadJournal::new(j);
289 let mut ids = q.all_persistence_ids().await.unwrap();
290 ids.sort();
291 assert_eq!(ids, vec!["a".to_string(), "b".to_string()]);
292 }
293}