1use async_trait::async_trait;
9use atomr_persistence::{Journal, JournalError, PersistentRepr};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15#[non_exhaustive]
16#[derive(Default)]
17pub enum Offset {
18 #[default]
19 NoOffset,
20 Sequence(u64),
21 TimeBased(u128),
22}
23
24impl Offset {
25 pub fn as_sequence(self) -> Option<u64> {
26 match self {
27 Self::NoOffset => Some(0),
28 Self::Sequence(n) => Some(n),
29 Self::TimeBased(_) => None,
30 }
31 }
32}
33
34#[derive(Debug, Clone)]
35pub struct EventEnvelope {
36 pub persistence_id: String,
37 pub sequence_nr: u64,
38 pub payload: Vec<u8>,
39 pub offset: u64,
40 pub tags: Vec<String>,
41}
42
43impl From<PersistentRepr> for EventEnvelope {
44 fn from(r: PersistentRepr) -> Self {
45 Self {
46 persistence_id: r.persistence_id,
47 sequence_nr: r.sequence_nr,
48 payload: r.payload,
49 offset: r.sequence_nr,
50 tags: r.tags,
51 }
52 }
53}
54
55#[async_trait]
60pub trait ReadJournal: Send + Sync + 'static {
61 async fn events_by_persistence_id(
64 &self,
65 persistence_id: &str,
66 from_sequence_nr: u64,
67 to_sequence_nr: u64,
68 ) -> Result<Vec<EventEnvelope>, JournalError>;
69
70 async fn current_events_by_persistence_id(
73 &self,
74 persistence_id: &str,
75 from: u64,
76 to: u64,
77 ) -> Result<Vec<EventEnvelope>, JournalError> {
78 self.events_by_persistence_id(persistence_id, from, to).await
79 }
80
81 async fn events_by_tag(&self, _tag: &str, _offset: Offset) -> Result<Vec<EventEnvelope>, JournalError> {
85 Ok(Vec::new())
86 }
87
88 async fn current_events_by_tag(
89 &self,
90 tag: &str,
91 offset: Offset,
92 ) -> Result<Vec<EventEnvelope>, JournalError> {
93 self.events_by_tag(tag, offset).await
94 }
95
96 async fn all_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
99 Ok(Vec::new())
100 }
101
102 async fn current_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
103 self.all_persistence_ids().await
104 }
105}
106
107pub struct SimpleReadJournal<J: Journal> {
108 journal: std::sync::Arc<J>,
109}
110
111impl<J: Journal> SimpleReadJournal<J> {
112 pub fn new(journal: std::sync::Arc<J>) -> Self {
113 Self { journal }
114 }
115}
116
117#[async_trait]
118impl<J: Journal> ReadJournal for SimpleReadJournal<J> {
119 async fn events_by_persistence_id(
120 &self,
121 persistence_id: &str,
122 from: u64,
123 to: u64,
124 ) -> Result<Vec<EventEnvelope>, JournalError> {
125 let reprs = self.journal.replay_messages(persistence_id, from, to, u64::MAX).await?;
126 Ok(reprs.into_iter().map(Into::into).collect())
127 }
128
129 async fn events_by_tag(&self, tag: &str, offset: Offset) -> Result<Vec<EventEnvelope>, JournalError> {
130 let from_seq = offset.as_sequence().unwrap_or(0);
131 let backend_results = self.journal.events_by_tag(tag, from_seq, u64::MAX).await?;
133 if !backend_results.is_empty() {
134 return Ok(backend_results.into_iter().map(Into::into).collect());
135 }
136 let ids = self.journal.all_persistence_ids().await?;
139 let mut out = Vec::new();
140 for id in ids {
141 let reprs = self.journal.replay_messages(&id, from_seq, u64::MAX, u64::MAX).await?;
142 for r in reprs {
143 if r.tags.iter().any(|t| t == tag) {
144 out.push(r.into());
145 }
146 }
147 }
148 Ok(out)
149 }
150
151 async fn all_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
152 self.journal.all_persistence_ids().await
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use atomr_persistence::{InMemoryJournal, Journal, PersistentRepr};
160 use std::sync::Arc;
161
162 fn repr(pid: &str, seq: u64, tags: &[&str]) -> PersistentRepr {
163 PersistentRepr {
164 persistence_id: pid.into(),
165 sequence_nr: seq,
166 payload: vec![seq as u8],
167 manifest: "evt".into(),
168 writer_uuid: "w".into(),
169 deleted: false,
170 tags: tags.iter().map(|s| s.to_string()).collect(),
171 }
172 }
173
174 #[tokio::test]
175 async fn events_by_persistence_id_replays_range() {
176 let j = Arc::new(InMemoryJournal::default());
177 j.write_messages(vec![repr("a", 1, &[]), repr("a", 2, &[]), repr("a", 3, &[])]).await.unwrap();
178 let q = SimpleReadJournal::new(j);
179 let evs = q.events_by_persistence_id("a", 1, 2).await.unwrap();
180 assert_eq!(evs.len(), 2);
181 assert_eq!(evs[0].sequence_nr, 1);
182 assert_eq!(evs[1].sequence_nr, 2);
183 }
184
185 #[tokio::test]
186 async fn current_variant_matches_live() {
187 let j = Arc::new(InMemoryJournal::default());
188 j.write_messages(vec![repr("a", 1, &[])]).await.unwrap();
189 let q = SimpleReadJournal::new(j);
190 let live = q.events_by_persistence_id("a", 1, 99).await.unwrap();
191 let snap = q.current_events_by_persistence_id("a", 1, 99).await.unwrap();
192 assert_eq!(live.len(), snap.len());
193 }
194
195 #[tokio::test]
196 async fn offset_sequence_round_trips() {
197 assert_eq!(Offset::Sequence(7).as_sequence(), Some(7));
198 assert_eq!(Offset::NoOffset.as_sequence(), Some(0));
199 assert_eq!(Offset::TimeBased(123).as_sequence(), None);
200 }
201
202 #[tokio::test]
203 async fn events_by_tag_returns_tagged_events_across_pids() {
204 let j = Arc::new(InMemoryJournal::default());
205 j.write_messages(vec![
206 repr("a", 1, &["red"]),
207 repr("a", 2, &["blue"]),
208 repr("b", 1, &["red", "hot"]),
209 repr("b", 2, &["green"]),
210 ])
211 .await
212 .unwrap();
213 let q = SimpleReadJournal::new(j);
214 let red = q.events_by_tag("red", Offset::NoOffset).await.unwrap();
215 assert_eq!(red.len(), 2);
216 let blue = q.events_by_tag("blue", Offset::NoOffset).await.unwrap();
217 assert_eq!(blue.len(), 1);
218 let none = q.events_by_tag("nope", Offset::NoOffset).await.unwrap();
219 assert!(none.is_empty());
220 }
221
222 #[tokio::test]
223 async fn events_by_tag_respects_offset() {
224 let j = Arc::new(InMemoryJournal::default());
225 j.write_messages(vec![repr("a", 1, &["t"]), repr("a", 2, &["t"]), repr("a", 3, &["t"])])
226 .await
227 .unwrap();
228 let q = SimpleReadJournal::new(j);
229 let from2 = q.events_by_tag("t", Offset::Sequence(2)).await.unwrap();
230 assert_eq!(from2.len(), 2);
231 assert_eq!(from2[0].sequence_nr, 2);
232 }
233
234 #[tokio::test]
235 async fn all_persistence_ids_lists_distinct_writers() {
236 let j = Arc::new(InMemoryJournal::default());
237 j.write_messages(vec![repr("a", 1, &[]), repr("b", 1, &[]), repr("a", 2, &[])]).await.unwrap();
238 let q = SimpleReadJournal::new(j);
239 let mut ids = q.all_persistence_ids().await.unwrap();
240 ids.sort();
241 assert_eq!(ids, vec!["a".to_string(), "b".to_string()]);
242 }
243}