Skip to main content

atomr_persistence_query/
lib.rs

1//! atomr-persistence-query.
2//!
3//! Phase 11 of `docs/full-port-plan.md` extends the read-journal
4//! surface to match upstream: `events_by_persistence_id`,
5//! `events_by_tag`, `current_*` variants, `all_persistence_ids`, and
6//! a typed [`Offset`] type.
7
8use async_trait::async_trait;
9use atomr_persistence::{Journal, JournalError, PersistentRepr};
10
11/// Typed read-journal offset. The in-memory backend uses `Sequence`
12/// numbers; a SQL backend might emit `TimeBased` UUIDs. `NoOffset`
13/// means "from the start."
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15#[non_exhaustive]
16#[derive(Default)]
17pub enum Offset {
18    #[default]
19    NoOffset,
20    Sequence(u64),
21    TimeBased(u128),
22}
23
24impl Offset {
25    pub fn as_sequence(self) -> Option<u64> {
26        match self {
27            Self::NoOffset => Some(0),
28            Self::Sequence(n) => Some(n),
29            Self::TimeBased(_) => None,
30        }
31    }
32}
33
34#[derive(Debug, Clone)]
35pub struct EventEnvelope {
36    pub persistence_id: String,
37    pub sequence_nr: u64,
38    pub payload: Vec<u8>,
39    pub offset: u64,
40    pub tags: Vec<String>,
41    /// Manifest tag the event was written with. Mirrors
42    /// [`PersistentRepr::manifest`]; used by readers that dispatch
43    /// decoding based on event-schema version.
44    pub manifest: String,
45}
46
47impl From<PersistentRepr> for EventEnvelope {
48    fn from(r: PersistentRepr) -> Self {
49        Self {
50            persistence_id: r.persistence_id,
51            sequence_nr: r.sequence_nr,
52            payload: r.payload,
53            offset: r.sequence_nr,
54            tags: r.tags,
55            manifest: r.manifest,
56        }
57    }
58}
59
60/// Read-journal surface. `current_*` variants take a snapshot at call
61/// time; the non-current variants are tail-following (live) — backends
62/// that only support snapshots return the snapshot and let callers
63/// re-poll.
64#[async_trait]
65pub trait ReadJournal: Send + Sync + 'static {
66    /// Replay events for a single persistence id, sequence-number
67    /// bounded.
68    async fn events_by_persistence_id(
69        &self,
70        persistence_id: &str,
71        from_sequence_nr: u64,
72        to_sequence_nr: u64,
73    ) -> Result<Vec<EventEnvelope>, JournalError>;
74
75    /// Snapshot variant of [`Self::events_by_persistence_id`] —
76    /// default impl is identical (in-memory journals don't tail).
77    async fn current_events_by_persistence_id(
78        &self,
79        persistence_id: &str,
80        from: u64,
81        to: u64,
82    ) -> Result<Vec<EventEnvelope>, JournalError> {
83        self.events_by_persistence_id(persistence_id, from, to).await
84    }
85
86    /// All events with a given tag, returned in offset order.
87    /// Default impl is empty so backends without tag indexing don't
88    /// silently mis-behave.
89    async fn events_by_tag(&self, _tag: &str, _offset: Offset) -> Result<Vec<EventEnvelope>, JournalError> {
90        Ok(Vec::new())
91    }
92
93    async fn current_events_by_tag(
94        &self,
95        tag: &str,
96        offset: Offset,
97    ) -> Result<Vec<EventEnvelope>, JournalError> {
98        self.events_by_tag(tag, offset).await
99    }
100
101    /// Distinct persistence ids known to the backend. Default impl
102    /// returns empty (backends without an id index opt in).
103    async fn all_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
104        Ok(Vec::new())
105    }
106
107    async fn current_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
108        self.all_persistence_ids().await
109    }
110}
111
112pub struct SimpleReadJournal<J: Journal> {
113    journal: std::sync::Arc<J>,
114}
115
116impl<J: Journal> SimpleReadJournal<J> {
117    pub fn new(journal: std::sync::Arc<J>) -> Self {
118        Self { journal }
119    }
120}
121
122#[async_trait]
123impl<J: Journal> ReadJournal for SimpleReadJournal<J> {
124    async fn events_by_persistence_id(
125        &self,
126        persistence_id: &str,
127        from: u64,
128        to: u64,
129    ) -> Result<Vec<EventEnvelope>, JournalError> {
130        let reprs = self.journal.replay_messages(persistence_id, from, to, u64::MAX).await?;
131        Ok(reprs.into_iter().map(Into::into).collect())
132    }
133
134    async fn events_by_tag(&self, tag: &str, offset: Offset) -> Result<Vec<EventEnvelope>, JournalError> {
135        let from_seq = offset.as_sequence().unwrap_or(0);
136        // Prefer the backend's indexed query when available.
137        let backend_results = self.journal.events_by_tag(tag, from_seq, u64::MAX).await?;
138        if !backend_results.is_empty() {
139            return Ok(backend_results.into_iter().map(Into::into).collect());
140        }
141        // Fall back to scanning per-pid when the backend hasn't
142        // implemented its own tag index.
143        let ids = self.journal.all_persistence_ids().await?;
144        let mut out = Vec::new();
145        for id in ids {
146            let reprs = self.journal.replay_messages(&id, from_seq, u64::MAX, u64::MAX).await?;
147            for r in reprs {
148                if r.tags.iter().any(|t| t == tag) {
149                    out.push(r.into());
150                }
151            }
152        }
153        Ok(out)
154    }
155
156    async fn all_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
157        self.journal.all_persistence_ids().await
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164    use atomr_persistence::{InMemoryJournal, Journal, PersistentRepr};
165    use std::sync::Arc;
166
167    fn repr(pid: &str, seq: u64, tags: &[&str]) -> PersistentRepr {
168        PersistentRepr {
169            persistence_id: pid.into(),
170            sequence_nr: seq,
171            payload: vec![seq as u8],
172            manifest: "evt".into(),
173            writer_uuid: "w".into(),
174            deleted: false,
175            tags: tags.iter().map(|s| s.to_string()).collect(),
176        }
177    }
178
179    #[tokio::test]
180    async fn events_by_persistence_id_replays_range() {
181        let j = Arc::new(InMemoryJournal::default());
182        j.write_messages(vec![repr("a", 1, &[]), repr("a", 2, &[]), repr("a", 3, &[])]).await.unwrap();
183        let q = SimpleReadJournal::new(j);
184        let evs = q.events_by_persistence_id("a", 1, 2).await.unwrap();
185        assert_eq!(evs.len(), 2);
186        assert_eq!(evs[0].sequence_nr, 1);
187        assert_eq!(evs[1].sequence_nr, 2);
188    }
189
190    #[tokio::test]
191    async fn current_variant_matches_live() {
192        let j = Arc::new(InMemoryJournal::default());
193        j.write_messages(vec![repr("a", 1, &[])]).await.unwrap();
194        let q = SimpleReadJournal::new(j);
195        let live = q.events_by_persistence_id("a", 1, 99).await.unwrap();
196        let snap = q.current_events_by_persistence_id("a", 1, 99).await.unwrap();
197        assert_eq!(live.len(), snap.len());
198    }
199
200    #[tokio::test]
201    async fn offset_sequence_round_trips() {
202        assert_eq!(Offset::Sequence(7).as_sequence(), Some(7));
203        assert_eq!(Offset::NoOffset.as_sequence(), Some(0));
204        assert_eq!(Offset::TimeBased(123).as_sequence(), None);
205    }
206
207    #[tokio::test]
208    async fn events_by_tag_returns_tagged_events_across_pids() {
209        let j = Arc::new(InMemoryJournal::default());
210        j.write_messages(vec![
211            repr("a", 1, &["red"]),
212            repr("a", 2, &["blue"]),
213            repr("b", 1, &["red", "hot"]),
214            repr("b", 2, &["green"]),
215        ])
216        .await
217        .unwrap();
218        let q = SimpleReadJournal::new(j);
219        let red = q.events_by_tag("red", Offset::NoOffset).await.unwrap();
220        assert_eq!(red.len(), 2);
221        let blue = q.events_by_tag("blue", Offset::NoOffset).await.unwrap();
222        assert_eq!(blue.len(), 1);
223        let none = q.events_by_tag("nope", Offset::NoOffset).await.unwrap();
224        assert!(none.is_empty());
225    }
226
227    #[tokio::test]
228    async fn events_by_tag_respects_offset() {
229        let j = Arc::new(InMemoryJournal::default());
230        j.write_messages(vec![repr("a", 1, &["t"]), repr("a", 2, &["t"]), repr("a", 3, &["t"])])
231            .await
232            .unwrap();
233        let q = SimpleReadJournal::new(j);
234        let from2 = q.events_by_tag("t", Offset::Sequence(2)).await.unwrap();
235        assert_eq!(from2.len(), 2);
236        assert_eq!(from2[0].sequence_nr, 2);
237    }
238
239    #[tokio::test]
240    async fn all_persistence_ids_lists_distinct_writers() {
241        let j = Arc::new(InMemoryJournal::default());
242        j.write_messages(vec![repr("a", 1, &[]), repr("b", 1, &[]), repr("a", 2, &[])]).await.unwrap();
243        let q = SimpleReadJournal::new(j);
244        let mut ids = q.all_persistence_ids().await.unwrap();
245        ids.sort();
246        assert_eq!(ids, vec!["a".to_string(), "b".to_string()]);
247    }
248}