atomr_persistence_mongodb/
snapshot.rs1use std::sync::Arc;
4
5use async_trait::async_trait;
6use atomr_persistence::{JournalError, SnapshotMetadata, SnapshotStore};
7use futures::TryStreamExt;
8use mongodb::bson::doc;
9use mongodb::options::{FindOptions, IndexOptions};
10use mongodb::{Client, Collection, IndexModel};
11
12use crate::config::MongoConfig;
13use crate::documents::SnapshotDoc;
14
15pub struct MongoSnapshotStore {
16 client: Client,
17 cfg: MongoConfig,
18}
19
20impl MongoSnapshotStore {
21 pub async fn connect(cfg: MongoConfig) -> Result<Arc<Self>, JournalError> {
22 let client = Client::with_uri_str(&cfg.url).await.map_err(JournalError::backend)?;
23 let me = Self { client, cfg };
24 me.ensure_indexes().await?;
25 Ok(Arc::new(me))
26 }
27
28 pub async fn from_client(client: Client, cfg: MongoConfig) -> Result<Arc<Self>, JournalError> {
29 let me = Self { client, cfg };
30 me.ensure_indexes().await?;
31 Ok(Arc::new(me))
32 }
33
34 pub fn config(&self) -> &MongoConfig {
35 &self.cfg
36 }
37
38 fn collection(&self) -> Collection<SnapshotDoc> {
39 self.client.database(&self.cfg.database).collection::<SnapshotDoc>(&self.cfg.snapshot_collection)
40 }
41
42 async fn ensure_indexes(&self) -> Result<(), JournalError> {
43 let model = IndexModel::builder()
44 .keys(doc! { "persistence_id": 1, "sequence_nr": 1 })
45 .options(IndexOptions::builder().unique(true).build())
46 .build();
47 self.collection().create_index(model).await.map_err(JournalError::backend)?;
48 Ok(())
49 }
50}
51
52#[async_trait]
53impl SnapshotStore for MongoSnapshotStore {
54 async fn save(&self, meta: SnapshotMetadata, payload: Vec<u8>) {
55 let now = chrono::Utc::now().timestamp_millis();
56 let doc = SnapshotDoc::from_meta(&meta, payload, now);
57 let _ = self.collection().insert_one(doc).await;
58 }
59
60 async fn load(&self, persistence_id: &str) -> Option<(SnapshotMetadata, Vec<u8>)> {
61 let opts = FindOptions::builder().sort(doc! { "sequence_nr": -1 }).limit(1i64).build();
62 let mut cur = self
63 .collection()
64 .find(doc! { "persistence_id": persistence_id })
65 .with_options(opts)
66 .await
67 .ok()?;
68 let doc = cur.try_next().await.ok().flatten()?;
69 Some(doc.into_parts())
70 }
71
72 async fn delete(&self, persistence_id: &str, to_sequence_nr: u64) {
73 let _ = self
74 .collection()
75 .delete_many(doc! {
76 "persistence_id": persistence_id,
77 "sequence_nr": { "$lte": to_sequence_nr as i64 },
78 })
79 .await;
80 }
81}