Skip to main content

atomr_persistence_query/
lib.rs

1//! atomr-persistence-query. akka.net: `Akka.Persistence.Query`.
2//!
3//! Phase 11 of `docs/full-port-plan.md` extends the read-journal
4//! surface to match upstream: `events_by_persistence_id`,
5//! `events_by_tag`, `current_*` variants, `all_persistence_ids`, and
6//! a typed [`Offset`] type.
7
8use async_trait::async_trait;
9use atomr_persistence::{Journal, JournalError, PersistentRepr};
10
11/// Typed read-journal offset. The in-memory backend uses `Sequence`
12/// numbers; a SQL backend might emit `TimeBased` UUIDs. `NoOffset`
13/// means "from the start."
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15#[non_exhaustive]
16#[derive(Default)]
17pub enum Offset {
18    #[default]
19    NoOffset,
20    Sequence(u64),
21    TimeBased(u128),
22}
23
24impl Offset {
25    pub fn as_sequence(self) -> Option<u64> {
26        match self {
27            Self::NoOffset => Some(0),
28            Self::Sequence(n) => Some(n),
29            Self::TimeBased(_) => None,
30        }
31    }
32}
33
34#[derive(Debug, Clone)]
35pub struct EventEnvelope {
36    pub persistence_id: String,
37    pub sequence_nr: u64,
38    pub payload: Vec<u8>,
39    pub offset: u64,
40    pub tags: Vec<String>,
41}
42
43impl From<PersistentRepr> for EventEnvelope {
44    fn from(r: PersistentRepr) -> Self {
45        Self {
46            persistence_id: r.persistence_id,
47            sequence_nr: r.sequence_nr,
48            payload: r.payload,
49            offset: r.sequence_nr,
50            tags: r.tags,
51        }
52    }
53}
54
55/// Read-journal surface. `current_*` variants take a snapshot at call
56/// time; the non-current variants are tail-following (live) — backends
57/// that only support snapshots return the snapshot and let callers
58/// re-poll.
59#[async_trait]
60pub trait ReadJournal: Send + Sync + 'static {
61    /// Replay events for a single persistence id, sequence-number
62    /// bounded.
63    async fn events_by_persistence_id(
64        &self,
65        persistence_id: &str,
66        from_sequence_nr: u64,
67        to_sequence_nr: u64,
68    ) -> Result<Vec<EventEnvelope>, JournalError>;
69
70    /// Snapshot variant of [`Self::events_by_persistence_id`] —
71    /// default impl is identical (in-memory journals don't tail).
72    async fn current_events_by_persistence_id(
73        &self,
74        persistence_id: &str,
75        from: u64,
76        to: u64,
77    ) -> Result<Vec<EventEnvelope>, JournalError> {
78        self.events_by_persistence_id(persistence_id, from, to).await
79    }
80
81    /// All events with a given tag, returned in offset order.
82    /// Default impl is empty so backends without tag indexing don't
83    /// silently mis-behave.
84    async fn events_by_tag(&self, _tag: &str, _offset: Offset) -> Result<Vec<EventEnvelope>, JournalError> {
85        Ok(Vec::new())
86    }
87
88    async fn current_events_by_tag(
89        &self,
90        tag: &str,
91        offset: Offset,
92    ) -> Result<Vec<EventEnvelope>, JournalError> {
93        self.events_by_tag(tag, offset).await
94    }
95
96    /// Distinct persistence ids known to the backend. Default impl
97    /// returns empty (backends without an id index opt in).
98    async fn all_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
99        Ok(Vec::new())
100    }
101
102    async fn current_persistence_ids(&self) -> Result<Vec<String>, JournalError> {
103        self.all_persistence_ids().await
104    }
105}
106
107pub struct SimpleReadJournal<J: Journal> {
108    journal: std::sync::Arc<J>,
109}
110
111impl<J: Journal> SimpleReadJournal<J> {
112    pub fn new(journal: std::sync::Arc<J>) -> Self {
113        Self { journal }
114    }
115}
116
117#[async_trait]
118impl<J: Journal> ReadJournal for SimpleReadJournal<J> {
119    async fn events_by_persistence_id(
120        &self,
121        persistence_id: &str,
122        from: u64,
123        to: u64,
124    ) -> Result<Vec<EventEnvelope>, JournalError> {
125        let reprs = self.journal.replay_messages(persistence_id, from, to, u64::MAX).await?;
126        Ok(reprs.into_iter().map(Into::into).collect())
127    }
128
129    async fn events_by_tag(&self, tag: &str, offset: Offset) -> Result<Vec<EventEnvelope>, JournalError> {
130        let from_seq = offset.as_sequence().unwrap_or(0);
131        // For backends that don't have a tag index, we have to fall
132        // back to scanning known persistence ids. The Journal trait
133        // doesn't expose enumeration, so we ask the backend for the
134        // list via a downcast-free path: we use `current_persistence_ids`
135        // (default impl returns empty for in-memory). Production
136        // backends override `events_by_tag` directly with an indexed
137        // query.
138        let ids = self.current_persistence_ids().await?;
139        let mut out = Vec::new();
140        for id in ids {
141            let reprs = self.journal.replay_messages(&id, from_seq, u64::MAX, u64::MAX).await?;
142            for r in reprs {
143                if r.tags.iter().any(|t| t == tag) {
144                    out.push(r.into());
145                }
146            }
147        }
148        Ok(out)
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155    use atomr_persistence::{InMemoryJournal, Journal, PersistentRepr};
156    use std::sync::Arc;
157
158    fn repr(pid: &str, seq: u64, tags: &[&str]) -> PersistentRepr {
159        PersistentRepr {
160            persistence_id: pid.into(),
161            sequence_nr: seq,
162            payload: vec![seq as u8],
163            manifest: "evt".into(),
164            writer_uuid: "w".into(),
165            deleted: false,
166            tags: tags.iter().map(|s| s.to_string()).collect(),
167        }
168    }
169
170    #[tokio::test]
171    async fn events_by_persistence_id_replays_range() {
172        let j = Arc::new(InMemoryJournal::default());
173        j.write_messages(vec![repr("a", 1, &[]), repr("a", 2, &[]), repr("a", 3, &[])]).await.unwrap();
174        let q = SimpleReadJournal::new(j);
175        let evs = q.events_by_persistence_id("a", 1, 2).await.unwrap();
176        assert_eq!(evs.len(), 2);
177        assert_eq!(evs[0].sequence_nr, 1);
178        assert_eq!(evs[1].sequence_nr, 2);
179    }
180
181    #[tokio::test]
182    async fn current_variant_matches_live() {
183        let j = Arc::new(InMemoryJournal::default());
184        j.write_messages(vec![repr("a", 1, &[])]).await.unwrap();
185        let q = SimpleReadJournal::new(j);
186        let live = q.events_by_persistence_id("a", 1, 99).await.unwrap();
187        let snap = q.current_events_by_persistence_id("a", 1, 99).await.unwrap();
188        assert_eq!(live.len(), snap.len());
189    }
190
191    #[tokio::test]
192    async fn offset_sequence_round_trips() {
193        assert_eq!(Offset::Sequence(7).as_sequence(), Some(7));
194        assert_eq!(Offset::NoOffset.as_sequence(), Some(0));
195        assert_eq!(Offset::TimeBased(123).as_sequence(), None);
196    }
197}