1use async_trait::async_trait;
9use atomr_persistence::{Journal, JournalError, PersistentRepr};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15#[non_exhaustive]
16#[derive(Default)]
17pub enum Offset {
18 #[default]
19 NoOffset,
20 Sequence(u64),
21 TimeBased(u128),
22}
23
24impl Offset {
25 pub fn as_sequence(self) -> Option<u64> {
26 match self {
27 Self::NoOffset => Some(0),
28 Self::Sequence(n) => Some(n),
29 Self::TimeBased(_) => None,
30 }
31 }
32}
33
34#[derive(Debug, Clone)]
35pub struct EventEnvelope {
36 pub persistence_id: String,
37 pub sequence_nr: u64,
38 pub payload: Vec<u8>,
39 pub offset: u64,
40 pub tags: Vec<String>,
41}
42
43impl From<PersistentRepr> for EventEnvelope {
44 fn from(r: PersistentRepr) -> Self {
45 Self {
46 persistence_id: r.persistence_id,
47 sequence_nr: r.sequence_nr,
48 payload: r.payload,
49 offset: r.sequence_nr,
50 tags: r.tags,
51 }
52 }
53}
54
55#[async_trait]
60pub trait ReadJournal: Send + Sync + 'static {
61 async fn events_by_persistence_id(
64 &self,
65 persistence_id: &str,
66 from_sequence_nr: u64,
67 to_sequence_nr: u64,
68 ) -> Result<Vec<EventEnvelope>, JournalError>;
69
70 async fn current_events_by_persistence_id(
73 &self,
74 persistence_id: &str,
75 from: u64,
76 to: u64,
77 ) -> Result<Vec<EventEnvelope>, JournalError> {
78 self.events_by_persistence_id(persistence_id, from, to).await
79 }
80
81 async fn events_by_tag(&self, _tag: &str, _offset: Offset) -> Result<Vec<EventEnvelope>, JournalError> {
85 Ok(Vec::new())
86 }
87
88 async fn current_events_by_tag(
89 &self,
90 tag: &str,
91 offset: Offset,
92 ) -> Result<Vec<EventEnvelope>, JournalError> {
93 self.events_by_tag(tag, offset).await
94 }
95
96 async fn all_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
99 Ok(Vec::new())
100 }
101
102 async fn current_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
103 self.all_persistence_ids().await
104 }
105}
106
107pub struct SimpleReadJournal<J: Journal> {
108 journal: std::sync::Arc<J>,
109}
110
111impl<J: Journal> SimpleReadJournal<J> {
112 pub fn new(journal: std::sync::Arc<J>) -> Self {
113 Self { journal }
114 }
115}
116
117#[async_trait]
118impl<J: Journal> ReadJournal for SimpleReadJournal<J> {
119 async fn events_by_persistence_id(
120 &self,
121 persistence_id: &str,
122 from: u64,
123 to: u64,
124 ) -> Result<Vec<EventEnvelope>, JournalError> {
125 let reprs = self.journal.replay_messages(persistence_id, from, to, u64::MAX).await?;
126 Ok(reprs.into_iter().map(Into::into).collect())
127 }
128
129 async fn events_by_tag(&self, tag: &str, offset: Offset) -> Result<Vec<EventEnvelope>, JournalError> {
130 let from_seq = offset.as_sequence().unwrap_or(0);
131 let ids = self.current_persistence_ids().await?;
139 let mut out = Vec::new();
140 for id in ids {
141 let reprs = self.journal.replay_messages(&id, from_seq, u64::MAX, u64::MAX).await?;
142 for r in reprs {
143 if r.tags.iter().any(|t| t == tag) {
144 out.push(r.into());
145 }
146 }
147 }
148 Ok(out)
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155 use atomr_persistence::{InMemoryJournal, Journal, PersistentRepr};
156 use std::sync::Arc;
157
158 fn repr(pid: &str, seq: u64, tags: &[&str]) -> PersistentRepr {
159 PersistentRepr {
160 persistence_id: pid.into(),
161 sequence_nr: seq,
162 payload: vec![seq as u8],
163 manifest: "evt".into(),
164 writer_uuid: "w".into(),
165 deleted: false,
166 tags: tags.iter().map(|s| s.to_string()).collect(),
167 }
168 }
169
170 #[tokio::test]
171 async fn events_by_persistence_id_replays_range() {
172 let j = Arc::new(InMemoryJournal::default());
173 j.write_messages(vec![repr("a", 1, &[]), repr("a", 2, &[]), repr("a", 3, &[])]).await.unwrap();
174 let q = SimpleReadJournal::new(j);
175 let evs = q.events_by_persistence_id("a", 1, 2).await.unwrap();
176 assert_eq!(evs.len(), 2);
177 assert_eq!(evs[0].sequence_nr, 1);
178 assert_eq!(evs[1].sequence_nr, 2);
179 }
180
181 #[tokio::test]
182 async fn current_variant_matches_live() {
183 let j = Arc::new(InMemoryJournal::default());
184 j.write_messages(vec![repr("a", 1, &[])]).await.unwrap();
185 let q = SimpleReadJournal::new(j);
186 let live = q.events_by_persistence_id("a", 1, 99).await.unwrap();
187 let snap = q.current_events_by_persistence_id("a", 1, 99).await.unwrap();
188 assert_eq!(live.len(), snap.len());
189 }
190
191 #[tokio::test]
192 async fn offset_sequence_round_trips() {
193 assert_eq!(Offset::Sequence(7).as_sequence(), Some(7));
194 assert_eq!(Offset::NoOffset.as_sequence(), Some(0));
195 assert_eq!(Offset::TimeBased(123).as_sequence(), None);
196 }
197}