atomr-persistence-mongodb 0.3.1

MongoDB journal and snapshot store for atomr — indexed collections, atomic multi-document inserts, BSON payloads.
Documentation
//! MongoDB `SnapshotStore` implementation.

use std::sync::Arc;

use async_trait::async_trait;
use atomr_persistence::{JournalError, SnapshotMetadata, SnapshotStore};
use futures::TryStreamExt;
use mongodb::bson::doc;
use mongodb::options::{FindOptions, IndexOptions};
use mongodb::{Client, Collection, IndexModel};

use crate::config::MongoConfig;
use crate::documents::SnapshotDoc;

pub struct MongoSnapshotStore {
    client: Client,
    cfg: MongoConfig,
}

impl MongoSnapshotStore {
    pub async fn connect(cfg: MongoConfig) -> Result<Arc<Self>, JournalError> {
        let client = Client::with_uri_str(&cfg.url).await.map_err(JournalError::backend)?;
        let me = Self { client, cfg };
        me.ensure_indexes().await?;
        Ok(Arc::new(me))
    }

    pub async fn from_client(client: Client, cfg: MongoConfig) -> Result<Arc<Self>, JournalError> {
        let me = Self { client, cfg };
        me.ensure_indexes().await?;
        Ok(Arc::new(me))
    }

    pub fn config(&self) -> &MongoConfig {
        &self.cfg
    }

    fn collection(&self) -> Collection<SnapshotDoc> {
        self.client.database(&self.cfg.database).collection::<SnapshotDoc>(&self.cfg.snapshot_collection)
    }

    async fn ensure_indexes(&self) -> Result<(), JournalError> {
        let model = IndexModel::builder()
            .keys(doc! { "persistence_id": 1, "sequence_nr": 1 })
            .options(IndexOptions::builder().unique(true).build())
            .build();
        self.collection().create_index(model).await.map_err(JournalError::backend)?;
        Ok(())
    }
}

#[async_trait]
impl SnapshotStore for MongoSnapshotStore {
    async fn save(&self, meta: SnapshotMetadata, payload: Vec<u8>) {
        let now = chrono::Utc::now().timestamp_millis();
        let doc = SnapshotDoc::from_meta(&meta, payload, now);
        let _ = self.collection().insert_one(doc).await;
    }

    async fn load(&self, persistence_id: &str) -> Option<(SnapshotMetadata, Vec<u8>)> {
        let opts = FindOptions::builder().sort(doc! { "sequence_nr": -1 }).limit(1i64).build();
        let mut cur = self
            .collection()
            .find(doc! { "persistence_id": persistence_id })
            .with_options(opts)
            .await
            .ok()?;
        let doc = cur.try_next().await.ok().flatten()?;
        Some(doc.into_parts())
    }

    async fn delete(&self, persistence_id: &str, to_sequence_nr: u64) {
        let _ = self
            .collection()
            .delete_many(doc! {
                "persistence_id": persistence_id,
                "sequence_nr": { "$lte": to_sequence_nr as i64 },
            })
            .await;
    }
}