atomr_persistence_sql/
query.rs1use std::sync::Arc;
5
6use async_trait::async_trait;
7use atomr_persistence::{Journal, JournalError};
8use atomr_persistence_query::{EventEnvelope, ReadJournal};
9
10use crate::journal::SqlJournal;
11
12pub struct SqlReadJournal {
13 journal: Arc<SqlJournal>,
14}
15
16impl SqlReadJournal {
17 pub fn new(journal: Arc<SqlJournal>) -> Self {
18 Self { journal }
19 }
20
21 pub async fn events_by_tag(
22 &self,
23 tag: &str,
24 from_offset: u64,
25 max: u64,
26 ) -> Result<Vec<EventEnvelope>, JournalError> {
27 let reprs = self.journal.events_by_tag(tag, from_offset, max).await?;
28 Ok(reprs.into_iter().map(Into::into).collect())
29 }
30}
31
32fn clamp_nanos(v: u128) -> i64 {
35 if v > i64::MAX as u128 {
36 i64::MAX
37 } else {
38 v as i64
39 }
40}
41
42#[async_trait]
43impl ReadJournal for SqlReadJournal {
44 async fn events_by_persistence_id(
45 &self,
46 persistence_id: &str,
47 from: u64,
48 to: u64,
49 ) -> Result<Vec<EventEnvelope>, JournalError> {
50 let reprs = self.journal.replay_messages(persistence_id, from, to, u64::MAX).await?;
51 Ok(reprs.into_iter().map(Into::into).collect())
52 }
53
54 async fn events_as_of(
56 &self,
57 pid: &str,
58 system_time_nanos: u128,
59 ) -> Result<Vec<EventEnvelope>, JournalError> {
60 let reprs = self.journal.replay_as_of(pid, clamp_nanos(system_time_nanos)).await?;
61 Ok(reprs.into_iter().map(Into::into).collect())
62 }
63
64 async fn events_valid_as_of(
66 &self,
67 pid: &str,
68 valid_time_nanos: u128,
69 system_time_nanos: u128,
70 ) -> Result<Vec<EventEnvelope>, JournalError> {
71 let reprs = self
72 .journal
73 .replay_valid_as_of(pid, clamp_nanos(valid_time_nanos), clamp_nanos(system_time_nanos))
74 .await?;
75 Ok(reprs.into_iter().map(Into::into).collect())
76 }
77}