Skip to main content

atomr_persistence_mongodb/
journal.rs

1//! MongoDB `Journal` implementation.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use atomr_persistence::{Journal, JournalError, PersistentRepr};
7use futures::TryStreamExt;
8use mongodb::bson::doc;
9use mongodb::options::{FindOptions, IndexOptions};
10use mongodb::{Client, Collection, IndexModel};
11
12use crate::config::MongoConfig;
13use crate::documents::EventDoc;
14
15pub struct MongoJournal {
16    client: Client,
17    cfg: MongoConfig,
18}
19
20impl MongoJournal {
21    pub async fn connect(cfg: MongoConfig) -> Result<Arc<Self>, JournalError> {
22        let client = Client::with_uri_str(&cfg.url).await.map_err(JournalError::backend)?;
23        let me = Self { client, cfg };
24        me.ensure_indexes().await?;
25        Ok(Arc::new(me))
26    }
27
28    pub async fn from_client(client: Client, cfg: MongoConfig) -> Result<Arc<Self>, JournalError> {
29        let me = Self { client, cfg };
30        me.ensure_indexes().await?;
31        Ok(Arc::new(me))
32    }
33
34    pub fn config(&self) -> &MongoConfig {
35        &self.cfg
36    }
37
38    fn collection(&self) -> Collection<EventDoc> {
39        self.client.database(&self.cfg.database).collection::<EventDoc>(&self.cfg.journal_collection)
40    }
41
42    async fn ensure_indexes(&self) -> Result<(), JournalError> {
43        let model = IndexModel::builder()
44            .keys(doc! { "persistence_id": 1, "sequence_nr": 1 })
45            .options(IndexOptions::builder().unique(true).build())
46            .build();
47        self.collection().create_index(model).await.map_err(JournalError::backend)?;
48        let tag_model = IndexModel::builder().keys(doc! { "tags": 1, "sequence_nr": 1 }).build();
49        self.collection().create_index(tag_model).await.map_err(JournalError::backend)?;
50        Ok(())
51    }
52
53    async fn current_max(&self, pid: &str) -> Result<i64, JournalError> {
54        let opts = FindOptions::builder().sort(doc! { "sequence_nr": -1 }).limit(1i64).build();
55        let mut cur = self
56            .collection()
57            .find(doc! { "persistence_id": pid })
58            .with_options(opts)
59            .await
60            .map_err(JournalError::backend)?;
61        Ok(cur.try_next().await.map_err(JournalError::backend)?.map(|d| d.sequence_nr).unwrap_or(0))
62    }
63}
64
65#[async_trait]
66impl Journal for MongoJournal {
67    async fn write_messages(&self, messages: Vec<PersistentRepr>) -> Result<(), JournalError> {
68        if messages.is_empty() {
69            return Ok(());
70        }
71        let mut by_pid: std::collections::BTreeMap<String, Vec<PersistentRepr>> =
72            std::collections::BTreeMap::new();
73        for m in messages {
74            by_pid.entry(m.persistence_id.clone()).or_default().push(m);
75        }
76        let col = self.collection();
77        let now = chrono::Utc::now().timestamp_millis();
78        for (pid, batch) in by_pid {
79            let mut expected = self.current_max(&pid).await? as u64 + 1;
80            let mut docs = Vec::with_capacity(batch.len());
81            for msg in batch {
82                if msg.sequence_nr != expected {
83                    return Err(JournalError::SequenceOutOfOrder { expected, got: msg.sequence_nr });
84                }
85                expected += 1;
86                docs.push(EventDoc::from_repr(&msg, now));
87            }
88            col.insert_many(docs).await.map_err(JournalError::backend)?;
89        }
90        Ok(())
91    }
92
93    async fn delete_messages_to(
94        &self,
95        persistence_id: &str,
96        to_sequence_nr: u64,
97    ) -> Result<(), JournalError> {
98        self.collection()
99            .update_many(
100                doc! { "persistence_id": persistence_id, "sequence_nr": { "$lte": to_sequence_nr as i64 } },
101                doc! { "$set": { "deleted": true } },
102            )
103            .await
104            .map_err(JournalError::backend)?;
105        Ok(())
106    }
107
108    async fn replay_messages(
109        &self,
110        persistence_id: &str,
111        from: u64,
112        to: u64,
113        max: u64,
114    ) -> Result<Vec<PersistentRepr>, JournalError> {
115        if max == 0 {
116            return Ok(Vec::new());
117        }
118        let limit = if max > i64::MAX as u64 { i64::MAX } else { max as i64 };
119        let opts = FindOptions::builder().sort(doc! { "sequence_nr": 1 }).limit(limit).build();
120        let mut cur = self
121            .collection()
122            .find(doc! {
123                "persistence_id": persistence_id,
124                "sequence_nr": { "$gte": from as i64, "$lte": clamp(to) },
125                "deleted": false,
126            })
127            .with_options(opts)
128            .await
129            .map_err(JournalError::backend)?;
130        let mut out = Vec::new();
131        while let Some(doc) = cur.try_next().await.map_err(JournalError::backend)? {
132            out.push(doc.into_repr());
133        }
134        Ok(out)
135    }
136
137    async fn highest_sequence_nr(&self, persistence_id: &str, _from: u64) -> Result<u64, JournalError> {
138        Ok(self.current_max(persistence_id).await? as u64)
139    }
140
141    async fn events_by_tag(
142        &self,
143        tag: &str,
144        from_offset: u64,
145        max: u64,
146    ) -> Result<Vec<PersistentRepr>, JournalError> {
147        if max == 0 {
148            return Ok(Vec::new());
149        }
150        let limit = if max > i64::MAX as u64 { i64::MAX } else { max as i64 };
151        let opts =
152            FindOptions::builder().sort(doc! { "persistence_id": 1, "sequence_nr": 1 }).limit(limit).build();
153        let mut cur = self
154            .collection()
155            .find(doc! {
156                "tags": tag,
157                "sequence_nr": { "$gte": from_offset as i64 },
158                "deleted": false,
159            })
160            .with_options(opts)
161            .await
162            .map_err(JournalError::backend)?;
163        let mut out = Vec::new();
164        while let Some(doc) = cur.try_next().await.map_err(JournalError::backend)? {
165            out.push(doc.into_repr());
166        }
167        Ok(out)
168    }
169}
170
171fn clamp(v: u64) -> i64 {
172    if v > i64::MAX as u64 {
173        i64::MAX
174    } else {
175        v as i64
176    }
177}