atomr_persistence_mongodb/
journal.rs1use std::sync::Arc;
4
5use async_trait::async_trait;
6use atomr_persistence::{Journal, JournalError, PersistentRepr};
7use futures::TryStreamExt;
8use mongodb::bson::doc;
9use mongodb::options::{FindOptions, IndexOptions};
10use mongodb::{Client, Collection, IndexModel};
11
12use crate::config::MongoConfig;
13use crate::documents::EventDoc;
14
15pub struct MongoJournal {
16 client: Client,
17 cfg: MongoConfig,
18}
19
20impl MongoJournal {
21 pub async fn connect(cfg: MongoConfig) -> Result<Arc<Self>, JournalError> {
22 let client = Client::with_uri_str(&cfg.url).await.map_err(JournalError::backend)?;
23 let me = Self { client, cfg };
24 me.ensure_indexes().await?;
25 Ok(Arc::new(me))
26 }
27
28 pub async fn from_client(client: Client, cfg: MongoConfig) -> Result<Arc<Self>, JournalError> {
29 let me = Self { client, cfg };
30 me.ensure_indexes().await?;
31 Ok(Arc::new(me))
32 }
33
34 pub fn config(&self) -> &MongoConfig {
35 &self.cfg
36 }
37
38 fn collection(&self) -> Collection<EventDoc> {
39 self.client.database(&self.cfg.database).collection::<EventDoc>(&self.cfg.journal_collection)
40 }
41
42 async fn ensure_indexes(&self) -> Result<(), JournalError> {
43 let model = IndexModel::builder()
44 .keys(doc! { "persistence_id": 1, "sequence_nr": 1 })
45 .options(IndexOptions::builder().unique(true).build())
46 .build();
47 self.collection().create_index(model).await.map_err(JournalError::backend)?;
48 let tag_model = IndexModel::builder().keys(doc! { "tags": 1, "sequence_nr": 1 }).build();
49 self.collection().create_index(tag_model).await.map_err(JournalError::backend)?;
50 Ok(())
51 }
52
53 async fn current_max(&self, pid: &str) -> Result<i64, JournalError> {
54 let opts = FindOptions::builder().sort(doc! { "sequence_nr": -1 }).limit(1i64).build();
55 let mut cur = self
56 .collection()
57 .find(doc! { "persistence_id": pid })
58 .with_options(opts)
59 .await
60 .map_err(JournalError::backend)?;
61 Ok(cur.try_next().await.map_err(JournalError::backend)?.map(|d| d.sequence_nr).unwrap_or(0))
62 }
63}
64
65#[async_trait]
66impl Journal for MongoJournal {
67 async fn write_messages(&self, messages: Vec<PersistentRepr>) -> Result<(), JournalError> {
68 if messages.is_empty() {
69 return Ok(());
70 }
71 let mut by_pid: std::collections::BTreeMap<String, Vec<PersistentRepr>> =
72 std::collections::BTreeMap::new();
73 for m in messages {
74 by_pid.entry(m.persistence_id.clone()).or_default().push(m);
75 }
76 let col = self.collection();
77 let now = chrono::Utc::now().timestamp_millis();
78 for (pid, batch) in by_pid {
79 let mut expected = self.current_max(&pid).await? as u64 + 1;
80 let mut docs = Vec::with_capacity(batch.len());
81 for msg in batch {
82 if msg.sequence_nr != expected {
83 return Err(JournalError::SequenceOutOfOrder { expected, got: msg.sequence_nr });
84 }
85 expected += 1;
86 docs.push(EventDoc::from_repr(&msg, now));
87 }
88 col.insert_many(docs).await.map_err(JournalError::backend)?;
89 }
90 Ok(())
91 }
92
93 async fn delete_messages_to(
94 &self,
95 persistence_id: &str,
96 to_sequence_nr: u64,
97 ) -> Result<(), JournalError> {
98 self.collection()
99 .update_many(
100 doc! { "persistence_id": persistence_id, "sequence_nr": { "$lte": to_sequence_nr as i64 } },
101 doc! { "$set": { "deleted": true } },
102 )
103 .await
104 .map_err(JournalError::backend)?;
105 Ok(())
106 }
107
108 async fn replay_messages(
109 &self,
110 persistence_id: &str,
111 from: u64,
112 to: u64,
113 max: u64,
114 ) -> Result<Vec<PersistentRepr>, JournalError> {
115 if max == 0 {
116 return Ok(Vec::new());
117 }
118 let limit = if max > i64::MAX as u64 { i64::MAX } else { max as i64 };
119 let opts = FindOptions::builder().sort(doc! { "sequence_nr": 1 }).limit(limit).build();
120 let mut cur = self
121 .collection()
122 .find(doc! {
123 "persistence_id": persistence_id,
124 "sequence_nr": { "$gte": from as i64, "$lte": clamp(to) },
125 "deleted": false,
126 })
127 .with_options(opts)
128 .await
129 .map_err(JournalError::backend)?;
130 let mut out = Vec::new();
131 while let Some(doc) = cur.try_next().await.map_err(JournalError::backend)? {
132 out.push(doc.into_repr());
133 }
134 Ok(out)
135 }
136
137 async fn highest_sequence_nr(&self, persistence_id: &str, _from: u64) -> Result<u64, JournalError> {
138 Ok(self.current_max(persistence_id).await? as u64)
139 }
140
141 async fn events_by_tag(
142 &self,
143 tag: &str,
144 from_offset: u64,
145 max: u64,
146 ) -> Result<Vec<PersistentRepr>, JournalError> {
147 if max == 0 {
148 return Ok(Vec::new());
149 }
150 let limit = if max > i64::MAX as u64 { i64::MAX } else { max as i64 };
151 let opts =
152 FindOptions::builder().sort(doc! { "persistence_id": 1, "sequence_nr": 1 }).limit(limit).build();
153 let mut cur = self
154 .collection()
155 .find(doc! {
156 "tags": tag,
157 "sequence_nr": { "$gte": from_offset as i64 },
158 "deleted": false,
159 })
160 .with_options(opts)
161 .await
162 .map_err(JournalError::backend)?;
163 let mut out = Vec::new();
164 while let Some(doc) = cur.try_next().await.map_err(JournalError::backend)? {
165 out.push(doc.into_repr());
166 }
167 Ok(out)
168 }
169}
170
171fn clamp(v: u64) -> i64 {
172 if v > i64::MAX as u64 {
173 i64::MAX
174 } else {
175 v as i64
176 }
177}