Skip to main content

atomr_persistence_mongodb/
snapshot.rs

1//! MongoDB `SnapshotStore` implementation.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use atomr_persistence::{JournalError, SnapshotMetadata, SnapshotStore};
7use futures::TryStreamExt;
8use mongodb::bson::doc;
9use mongodb::options::{FindOptions, IndexOptions};
10use mongodb::{Client, Collection, IndexModel};
11
12use crate::config::MongoConfig;
13use crate::documents::SnapshotDoc;
14
15pub struct MongoSnapshotStore {
16    client: Client,
17    cfg: MongoConfig,
18}
19
20impl MongoSnapshotStore {
21    pub async fn connect(cfg: MongoConfig) -> Result<Arc<Self>, JournalError> {
22        let client = Client::with_uri_str(&cfg.url).await.map_err(JournalError::backend)?;
23        let me = Self { client, cfg };
24        me.ensure_indexes().await?;
25        Ok(Arc::new(me))
26    }
27
28    pub async fn from_client(client: Client, cfg: MongoConfig) -> Result<Arc<Self>, JournalError> {
29        let me = Self { client, cfg };
30        me.ensure_indexes().await?;
31        Ok(Arc::new(me))
32    }
33
34    pub fn config(&self) -> &MongoConfig {
35        &self.cfg
36    }
37
38    fn collection(&self) -> Collection<SnapshotDoc> {
39        self.client.database(&self.cfg.database).collection::<SnapshotDoc>(&self.cfg.snapshot_collection)
40    }
41
42    async fn ensure_indexes(&self) -> Result<(), JournalError> {
43        let model = IndexModel::builder()
44            .keys(doc! { "persistence_id": 1, "sequence_nr": 1 })
45            .options(IndexOptions::builder().unique(true).build())
46            .build();
47        self.collection().create_index(model).await.map_err(JournalError::backend)?;
48        Ok(())
49    }
50}
51
52#[async_trait]
53impl SnapshotStore for MongoSnapshotStore {
54    async fn save(&self, meta: SnapshotMetadata, payload: Vec<u8>) {
55        let now = chrono::Utc::now().timestamp_millis();
56        let doc = SnapshotDoc::from_meta(&meta, payload, now);
57        let _ = self.collection().insert_one(doc).await;
58    }
59
60    async fn load(&self, persistence_id: &str) -> Option<(SnapshotMetadata, Vec<u8>)> {
61        let opts = FindOptions::builder().sort(doc! { "sequence_nr": -1 }).limit(1i64).build();
62        let mut cur = self
63            .collection()
64            .find(doc! { "persistence_id": persistence_id })
65            .with_options(opts)
66            .await
67            .ok()?;
68        let doc = cur.try_next().await.ok().flatten()?;
69        Some(doc.into_parts())
70    }
71
72    async fn delete(&self, persistence_id: &str, to_sequence_nr: u64) {
73        let _ = self
74            .collection()
75            .delete_many(doc! {
76                "persistence_id": persistence_id,
77                "sequence_nr": { "$lte": to_sequence_nr as i64 },
78            })
79            .await;
80    }
81}