Skip to main content

atomr_persistence_sql/
query.rs

1//! `ReadJournal` adapter that reuses the SQL journal for event-by-id /
2//! event-by-tag queries.
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use atomr_persistence::{Journal, JournalError};
8use atomr_persistence_query::{EventEnvelope, ReadJournal};
9
10use crate::journal::SqlJournal;
11
12pub struct SqlReadJournal {
13    journal: Arc<SqlJournal>,
14}
15
16impl SqlReadJournal {
17    pub fn new(journal: Arc<SqlJournal>) -> Self {
18        Self { journal }
19    }
20
21    pub async fn events_by_tag(
22        &self,
23        tag: &str,
24        from_offset: u64,
25        max: u64,
26    ) -> Result<Vec<EventEnvelope>, JournalError> {
27        let reprs = self.journal.events_by_tag(tag, from_offset, max).await?;
28        Ok(reprs.into_iter().map(Into::into).collect())
29    }
30}
31
32/// Clamp a `u128` nanosecond timestamp into the `i64` column domain. Values
33/// beyond `i64::MAX` saturate (effectively "now / always").
34fn clamp_nanos(v: u128) -> i64 {
35    if v > i64::MAX as u128 {
36        i64::MAX
37    } else {
38        v as i64
39    }
40}
41
42#[async_trait]
43impl ReadJournal for SqlReadJournal {
44    async fn events_by_persistence_id(
45        &self,
46        persistence_id: &str,
47        from: u64,
48        to: u64,
49    ) -> Result<Vec<EventEnvelope>, JournalError> {
50        let reprs = self.journal.replay_messages(persistence_id, from, to, u64::MAX).await?;
51        Ok(reprs.into_iter().map(Into::into).collect())
52    }
53
54    /// FR-8 — true system-time as-of, backed by the `system_time` column.
55    async fn events_as_of(
56        &self,
57        pid: &str,
58        system_time_nanos: u128,
59    ) -> Result<Vec<EventEnvelope>, JournalError> {
60        let reprs = self.journal.replay_as_of(pid, clamp_nanos(system_time_nanos)).await?;
61        Ok(reprs.into_iter().map(Into::into).collect())
62    }
63
64    /// FR-8 — true bitemporal slice, backed by `system_time` + `valid_time`.
65    async fn events_valid_as_of(
66        &self,
67        pid: &str,
68        valid_time_nanos: u128,
69        system_time_nanos: u128,
70    ) -> Result<Vec<EventEnvelope>, JournalError> {
71        let reprs = self
72            .journal
73            .replay_valid_as_of(pid, clamp_nanos(valid_time_nanos), clamp_nanos(system_time_nanos))
74            .await?;
75        Ok(reprs.into_iter().map(Into::into).collect())
76    }
77}