Skip to main content

atomr_persistence_query/
lib.rs

1//! atomr-persistence-query.
2//!
3//! Phase 11 of `docs/full-port-plan.md` extends the read-journal
4//! surface to match upstream: `events_by_persistence_id`,
5//! `events_by_tag`, `current_*` variants, `all_persistence_ids`, and
6//! a typed [`Offset`] type.
7
8use async_trait::async_trait;
9use atomr_persistence::{Journal, JournalError, PersistentRepr};
10
11/// Typed read-journal offset. The in-memory backend uses `Sequence`
12/// numbers; a SQL backend might emit `TimeBased` UUIDs. `NoOffset`
13/// means "from the start."
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15#[non_exhaustive]
16#[derive(Default)]
17pub enum Offset {
18    #[default]
19    NoOffset,
20    Sequence(u64),
21    TimeBased(u128),
22}
23
24impl Offset {
25    pub fn as_sequence(self) -> Option<u64> {
26        match self {
27            Self::NoOffset => Some(0),
28            Self::Sequence(n) => Some(n),
29            Self::TimeBased(_) => None,
30        }
31    }
32}
33
34#[derive(Debug, Clone)]
35pub struct EventEnvelope {
36    pub persistence_id: String,
37    pub sequence_nr: u64,
38    pub payload: Vec<u8>,
39    pub offset: u64,
40    pub tags: Vec<String>,
41}
42
43impl From<PersistentRepr> for EventEnvelope {
44    fn from(r: PersistentRepr) -> Self {
45        Self {
46            persistence_id: r.persistence_id,
47            sequence_nr: r.sequence_nr,
48            payload: r.payload,
49            offset: r.sequence_nr,
50            tags: r.tags,
51        }
52    }
53}
54
55/// Read-journal surface. `current_*` variants take a snapshot at call
56/// time; the non-current variants are tail-following (live) — backends
57/// that only support snapshots return the snapshot and let callers
58/// re-poll.
59#[async_trait]
60pub trait ReadJournal: Send + Sync + 'static {
61    /// Replay events for a single persistence id, sequence-number
62    /// bounded.
63    async fn events_by_persistence_id(
64        &self,
65        persistence_id: &str,
66        from_sequence_nr: u64,
67        to_sequence_nr: u64,
68    ) -> Result<Vec<EventEnvelope>, JournalError>;
69
70    /// Snapshot variant of [`Self::events_by_persistence_id`] —
71    /// default impl is identical (in-memory journals don't tail).
72    async fn current_events_by_persistence_id(
73        &self,
74        persistence_id: &str,
75        from: u64,
76        to: u64,
77    ) -> Result<Vec<EventEnvelope>, JournalError> {
78        self.events_by_persistence_id(persistence_id, from, to).await
79    }
80
81    /// All events with a given tag, returned in offset order.
82    /// Default impl is empty so backends without tag indexing don't
83    /// silently mis-behave.
84    async fn events_by_tag(&self, _tag: &str, _offset: Offset) -> Result<Vec<EventEnvelope>, JournalError> {
85        Ok(Vec::new())
86    }
87
88    async fn current_events_by_tag(
89        &self,
90        tag: &str,
91        offset: Offset,
92    ) -> Result<Vec<EventEnvelope>, JournalError> {
93        self.events_by_tag(tag, offset).await
94    }
95
96    /// Distinct persistence ids known to the backend. Default impl
97    /// returns empty (backends without an id index opt in).
98    async fn all_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
99        Ok(Vec::new())
100    }
101
102    async fn current_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
103        self.all_persistence_ids().await
104    }
105}
106
107pub struct SimpleReadJournal<J: Journal> {
108    journal: std::sync::Arc<J>,
109}
110
111impl<J: Journal> SimpleReadJournal<J> {
112    pub fn new(journal: std::sync::Arc<J>) -> Self {
113        Self { journal }
114    }
115}
116
117#[async_trait]
118impl<J: Journal> ReadJournal for SimpleReadJournal<J> {
119    async fn events_by_persistence_id(
120        &self,
121        persistence_id: &str,
122        from: u64,
123        to: u64,
124    ) -> Result<Vec<EventEnvelope>, JournalError> {
125        let reprs = self.journal.replay_messages(persistence_id, from, to, u64::MAX).await?;
126        Ok(reprs.into_iter().map(Into::into).collect())
127    }
128
129    async fn events_by_tag(&self, tag: &str, offset: Offset) -> Result<Vec<EventEnvelope>, JournalError> {
130        let from_seq = offset.as_sequence().unwrap_or(0);
131        // Prefer the backend's indexed query when available.
132        let backend_results = self.journal.events_by_tag(tag, from_seq, u64::MAX).await?;
133        if !backend_results.is_empty() {
134            return Ok(backend_results.into_iter().map(Into::into).collect());
135        }
136        // Fall back to scanning per-pid when the backend hasn't
137        // implemented its own tag index.
138        let ids = self.journal.all_persistence_ids().await?;
139        let mut out = Vec::new();
140        for id in ids {
141            let reprs = self.journal.replay_messages(&id, from_seq, u64::MAX, u64::MAX).await?;
142            for r in reprs {
143                if r.tags.iter().any(|t| t == tag) {
144                    out.push(r.into());
145                }
146            }
147        }
148        Ok(out)
149    }
150
151    async fn all_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
152        self.journal.all_persistence_ids().await
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use atomr_persistence::{InMemoryJournal, Journal, PersistentRepr};
160    use std::sync::Arc;
161
162    fn repr(pid: &str, seq: u64, tags: &[&str]) -> PersistentRepr {
163        PersistentRepr {
164            persistence_id: pid.into(),
165            sequence_nr: seq,
166            payload: vec![seq as u8],
167            manifest: "evt".into(),
168            writer_uuid: "w".into(),
169            deleted: false,
170            tags: tags.iter().map(|s| s.to_string()).collect(),
171        }
172    }
173
174    #[tokio::test]
175    async fn events_by_persistence_id_replays_range() {
176        let j = Arc::new(InMemoryJournal::default());
177        j.write_messages(vec![repr("a", 1, &[]), repr("a", 2, &[]), repr("a", 3, &[])]).await.unwrap();
178        let q = SimpleReadJournal::new(j);
179        let evs = q.events_by_persistence_id("a", 1, 2).await.unwrap();
180        assert_eq!(evs.len(), 2);
181        assert_eq!(evs[0].sequence_nr, 1);
182        assert_eq!(evs[1].sequence_nr, 2);
183    }
184
185    #[tokio::test]
186    async fn current_variant_matches_live() {
187        let j = Arc::new(InMemoryJournal::default());
188        j.write_messages(vec![repr("a", 1, &[])]).await.unwrap();
189        let q = SimpleReadJournal::new(j);
190        let live = q.events_by_persistence_id("a", 1, 99).await.unwrap();
191        let snap = q.current_events_by_persistence_id("a", 1, 99).await.unwrap();
192        assert_eq!(live.len(), snap.len());
193    }
194
195    #[tokio::test]
196    async fn offset_sequence_round_trips() {
197        assert_eq!(Offset::Sequence(7).as_sequence(), Some(7));
198        assert_eq!(Offset::NoOffset.as_sequence(), Some(0));
199        assert_eq!(Offset::TimeBased(123).as_sequence(), None);
200    }
201
202    #[tokio::test]
203    async fn events_by_tag_returns_tagged_events_across_pids() {
204        let j = Arc::new(InMemoryJournal::default());
205        j.write_messages(vec![
206            repr("a", 1, &["red"]),
207            repr("a", 2, &["blue"]),
208            repr("b", 1, &["red", "hot"]),
209            repr("b", 2, &["green"]),
210        ])
211        .await
212        .unwrap();
213        let q = SimpleReadJournal::new(j);
214        let red = q.events_by_tag("red", Offset::NoOffset).await.unwrap();
215        assert_eq!(red.len(), 2);
216        let blue = q.events_by_tag("blue", Offset::NoOffset).await.unwrap();
217        assert_eq!(blue.len(), 1);
218        let none = q.events_by_tag("nope", Offset::NoOffset).await.unwrap();
219        assert!(none.is_empty());
220    }
221
222    #[tokio::test]
223    async fn events_by_tag_respects_offset() {
224        let j = Arc::new(InMemoryJournal::default());
225        j.write_messages(vec![repr("a", 1, &["t"]), repr("a", 2, &["t"]), repr("a", 3, &["t"])])
226            .await
227            .unwrap();
228        let q = SimpleReadJournal::new(j);
229        let from2 = q.events_by_tag("t", Offset::Sequence(2)).await.unwrap();
230        assert_eq!(from2.len(), 2);
231        assert_eq!(from2[0].sequence_nr, 2);
232    }
233
234    #[tokio::test]
235    async fn all_persistence_ids_lists_distinct_writers() {
236        let j = Arc::new(InMemoryJournal::default());
237        j.write_messages(vec![repr("a", 1, &[]), repr("b", 1, &[]), repr("a", 2, &[])]).await.unwrap();
238        let q = SimpleReadJournal::new(j);
239        let mut ids = q.all_persistence_ids().await.unwrap();
240        ids.sort();
241        assert_eq!(ids, vec!["a".to_string(), "b".to_string()]);
242    }
243}