Skip to main content

atomr_persistence_query/
lib.rs

1//! atomr-persistence-query.
2//!
3//! Phase 11 of `docs/full-port-plan.md` extends the read-journal
4//! surface to match upstream: `events_by_persistence_id`,
5//! `events_by_tag`, `current_*` variants, `all_persistence_ids`, and
6//! a typed [`Offset`] type.
7
8use async_trait::async_trait;
9use atomr_persistence::{Journal, JournalError, PersistentRepr};
10
11/// Typed read-journal offset. The in-memory backend uses `Sequence`
12/// numbers; a SQL backend might emit `TimeBased` UUIDs. `NoOffset`
13/// means "from the start."
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15#[non_exhaustive]
16#[derive(Default)]
17pub enum Offset {
18    #[default]
19    NoOffset,
20    Sequence(u64),
21    TimeBased(u128),
22}
23
24impl Offset {
25    pub fn as_sequence(self) -> Option<u64> {
26        match self {
27            Self::NoOffset => Some(0),
28            Self::Sequence(n) => Some(n),
29            Self::TimeBased(_) => None,
30        }
31    }
32}
33
34#[derive(Debug, Clone)]
35pub struct EventEnvelope {
36    pub persistence_id: String,
37    pub sequence_nr: u64,
38    pub payload: Vec<u8>,
39    pub offset: u64,
40    pub tags: Vec<String>,
41    /// Manifest tag the event was written with. Mirrors
42    /// [`PersistentRepr::manifest`]; used by readers that dispatch
43    /// decoding based on event-schema version.
44    pub manifest: String,
45}
46
47impl From<PersistentRepr> for EventEnvelope {
48    fn from(r: PersistentRepr) -> Self {
49        Self {
50            persistence_id: r.persistence_id,
51            sequence_nr: r.sequence_nr,
52            payload: r.payload,
53            offset: r.sequence_nr,
54            tags: r.tags,
55            manifest: r.manifest,
56        }
57    }
58}
59
60/// Read-journal surface. `current_*` variants take a snapshot at call
61/// time; the non-current variants are tail-following (live) — backends
62/// that only support snapshots return the snapshot and let callers
63/// re-poll.
64#[async_trait]
65pub trait ReadJournal: Send + Sync + 'static {
66    /// Replay events for a single persistence id, sequence-number
67    /// bounded.
68    async fn events_by_persistence_id(
69        &self,
70        persistence_id: &str,
71        from_sequence_nr: u64,
72        to_sequence_nr: u64,
73    ) -> Result<Vec<EventEnvelope>, JournalError>;
74
75    /// Snapshot variant of [`Self::events_by_persistence_id`] —
76    /// default impl is identical (in-memory journals don't tail).
77    async fn current_events_by_persistence_id(
78        &self,
79        persistence_id: &str,
80        from: u64,
81        to: u64,
82    ) -> Result<Vec<EventEnvelope>, JournalError> {
83        self.events_by_persistence_id(persistence_id, from, to).await
84    }
85
86    /// All events with a given tag, returned in offset order.
87    /// Default impl is empty so backends without tag indexing don't
88    /// silently mis-behave.
89    async fn events_by_tag(&self, _tag: &str, _offset: Offset) -> Result<Vec<EventEnvelope>, JournalError> {
90        Ok(Vec::new())
91    }
92
93    async fn current_events_by_tag(
94        &self,
95        tag: &str,
96        offset: Offset,
97    ) -> Result<Vec<EventEnvelope>, JournalError> {
98        self.events_by_tag(tag, offset).await
99    }
100
101    /// Distinct persistence ids known to the backend. Default impl
102    /// returns empty (backends without an id index opt in).
103    async fn all_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
104        Ok(Vec::new())
105    }
106
107    async fn current_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
108        self.all_persistence_ids().await
109    }
110
111    /// FR-8 — bitemporal *system-time* as-of query: return the events for
112    /// `pid` **as they were known to the system at** `system_time_nanos`,
113    /// excluding any later-recorded restatement (no lookahead).
114    ///
115    /// The default impl has no notion of system time, so it falls back to the
116    /// full current event list — only a backend that records a per-row
117    /// `system_time` (e.g. the SQL provider) can honor the cutoff. Override
118    /// to get a true as-of slice.
119    async fn events_as_of(
120        &self,
121        pid: &str,
122        _system_time_nanos: u128,
123    ) -> Result<Vec<EventEnvelope>, JournalError> {
124        self.current_events_by_persistence_id(pid, 0, u64::MAX).await
125    }
126
127    /// FR-8 — full bitemporal slice: events for `pid` whose *valid time* is at
128    /// or before `valid_time_nanos`, as known to the system at
129    /// `system_time_nanos`. Distinguishes a corrected value (recorded later)
130    /// from the value originally known at an earlier system time.
131    ///
132    /// Default impl ignores both time axes (returns the current list); only
133    /// the SQL backend provides the true bitemporal slice.
134    async fn events_valid_as_of(
135        &self,
136        pid: &str,
137        _valid_time_nanos: u128,
138        _system_time_nanos: u128,
139    ) -> Result<Vec<EventEnvelope>, JournalError> {
140        self.current_events_by_persistence_id(pid, 0, u64::MAX).await
141    }
142}
143
144pub struct SimpleReadJournal<J: Journal> {
145    journal: std::sync::Arc<J>,
146}
147
148impl<J: Journal> SimpleReadJournal<J> {
149    pub fn new(journal: std::sync::Arc<J>) -> Self {
150        Self { journal }
151    }
152}
153
154#[async_trait]
155impl<J: Journal> ReadJournal for SimpleReadJournal<J> {
156    async fn events_by_persistence_id(
157        &self,
158        persistence_id: &str,
159        from: u64,
160        to: u64,
161    ) -> Result<Vec<EventEnvelope>, JournalError> {
162        let reprs = self.journal.replay_messages(persistence_id, from, to, u64::MAX).await?;
163        Ok(reprs.into_iter().map(Into::into).collect())
164    }
165
166    async fn events_by_tag(&self, tag: &str, offset: Offset) -> Result<Vec<EventEnvelope>, JournalError> {
167        let from_seq = offset.as_sequence().unwrap_or(0);
168        // Prefer the backend's indexed query when available.
169        let backend_results = self.journal.events_by_tag(tag, from_seq, u64::MAX).await?;
170        if !backend_results.is_empty() {
171            return Ok(backend_results.into_iter().map(Into::into).collect());
172        }
173        // Fall back to scanning per-pid when the backend hasn't
174        // implemented its own tag index.
175        let ids = self.journal.all_persistence_ids().await?;
176        let mut out = Vec::new();
177        for id in ids {
178            let reprs = self.journal.replay_messages(&id, from_seq, u64::MAX, u64::MAX).await?;
179            for r in reprs {
180                if r.tags.iter().any(|t| t == tag) {
181                    out.push(r.into());
182                }
183            }
184        }
185        Ok(out)
186    }
187
188    async fn all_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
189        self.journal.all_persistence_ids().await
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use super::*;
196    use atomr_persistence::{InMemoryJournal, Journal, PersistentRepr};
197    use std::sync::Arc;
198
199    fn repr(pid: &str, seq: u64, tags: &[&str]) -> PersistentRepr {
200        PersistentRepr {
201            persistence_id: pid.into(),
202            sequence_nr: seq,
203            payload: vec![seq as u8],
204            manifest: "evt".into(),
205            writer_uuid: "w".into(),
206            deleted: false,
207            tags: tags.iter().map(|s| s.to_string()).collect(),
208        }
209    }
210
211    #[tokio::test]
212    async fn events_by_persistence_id_replays_range() {
213        let j = Arc::new(InMemoryJournal::default());
214        j.write_messages(vec![repr("a", 1, &[]), repr("a", 2, &[]), repr("a", 3, &[])]).await.unwrap();
215        let q = SimpleReadJournal::new(j);
216        let evs = q.events_by_persistence_id("a", 1, 2).await.unwrap();
217        assert_eq!(evs.len(), 2);
218        assert_eq!(evs[0].sequence_nr, 1);
219        assert_eq!(evs[1].sequence_nr, 2);
220    }
221
222    #[tokio::test]
223    async fn current_variant_matches_live() {
224        let j = Arc::new(InMemoryJournal::default());
225        j.write_messages(vec![repr("a", 1, &[])]).await.unwrap();
226        let q = SimpleReadJournal::new(j);
227        let live = q.events_by_persistence_id("a", 1, 99).await.unwrap();
228        let snap = q.current_events_by_persistence_id("a", 1, 99).await.unwrap();
229        assert_eq!(live.len(), snap.len());
230    }
231
232    #[tokio::test]
233    async fn offset_sequence_round_trips() {
234        assert_eq!(Offset::Sequence(7).as_sequence(), Some(7));
235        assert_eq!(Offset::NoOffset.as_sequence(), Some(0));
236        assert_eq!(Offset::TimeBased(123).as_sequence(), None);
237    }
238
239    #[tokio::test]
240    async fn events_by_tag_returns_tagged_events_across_pids() {
241        let j = Arc::new(InMemoryJournal::default());
242        j.write_messages(vec![
243            repr("a", 1, &["red"]),
244            repr("a", 2, &["blue"]),
245            repr("b", 1, &["red", "hot"]),
246            repr("b", 2, &["green"]),
247        ])
248        .await
249        .unwrap();
250        let q = SimpleReadJournal::new(j);
251        let red = q.events_by_tag("red", Offset::NoOffset).await.unwrap();
252        assert_eq!(red.len(), 2);
253        let blue = q.events_by_tag("blue", Offset::NoOffset).await.unwrap();
254        assert_eq!(blue.len(), 1);
255        let none = q.events_by_tag("nope", Offset::NoOffset).await.unwrap();
256        assert!(none.is_empty());
257    }
258
259    #[tokio::test]
260    async fn events_by_tag_respects_offset() {
261        let j = Arc::new(InMemoryJournal::default());
262        j.write_messages(vec![repr("a", 1, &["t"]), repr("a", 2, &["t"]), repr("a", 3, &["t"])])
263            .await
264            .unwrap();
265        let q = SimpleReadJournal::new(j);
266        let from2 = q.events_by_tag("t", Offset::Sequence(2)).await.unwrap();
267        assert_eq!(from2.len(), 2);
268        assert_eq!(from2[0].sequence_nr, 2);
269    }
270
271    #[tokio::test]
272    async fn events_as_of_default_returns_current_events() {
273        // The default (non-SQL) impl has no system_time, so as-of degrades to
274        // the full current list — documented behavior.
275        let j = Arc::new(InMemoryJournal::default());
276        j.write_messages(vec![repr("a", 1, &[]), repr("a", 2, &[])]).await.unwrap();
277        let q = SimpleReadJournal::new(j);
278        let asof = q.events_as_of("a", 12345).await.unwrap();
279        assert_eq!(asof.len(), 2);
280        let bit = q.events_valid_as_of("a", 1, 2).await.unwrap();
281        assert_eq!(bit.len(), 2);
282    }
283
284    #[tokio::test]
285    async fn all_persistence_ids_lists_distinct_writers() {
286        let j = Arc::new(InMemoryJournal::default());
287        j.write_messages(vec![repr("a", 1, &[]), repr("b", 1, &[]), repr("a", 2, &[])]).await.unwrap();
288        let q = SimpleReadJournal::new(j);
289        let mut ids = q.all_persistence_ids().await.unwrap();
290        ids.sort();
291        assert_eq!(ids, vec!["a".to_string(), "b".to_string()]);
292    }
293}