integrationos-domain 8.0.0

Shared library for IntegrationOS
Documentation
use crate::IntegrationOSError;
use crate::Store;
use bson::doc;
use futures::TryStreamExt;
use mongodb::bson::Document;
use mongodb::options::CountOptions;
use mongodb::{Collection, Database};
use serde::de::DeserializeOwned;
use serde::Serialize;

#[derive(Debug, Clone)]
pub struct MongoStore<T: Serialize + DeserializeOwned + Unpin + Sync> {
    pub collection: Collection<T>,
}

impl<T: Serialize + DeserializeOwned + Unpin + Sync + Send + 'static> MongoStore<T> {
    pub async fn new(database: &Database, store: &Store) -> Result<Self, IntegrationOSError> {
        let collection = database.collection::<T>(store.to_string().as_str());
        Ok(Self { collection })
    }

    pub async fn aggregate(
        &self,
        pipeline: Vec<Document>,
    ) -> Result<Vec<Document>, IntegrationOSError> {
        let cursor = self.collection.aggregate(pipeline, None).await?;
        let results = cursor.try_collect().await?;
        Ok(results)
    }

    pub async fn get_one(&self, filter: Document) -> Result<Option<T>, IntegrationOSError> {
        Ok(self.collection.find_one(filter, None).await?)
    }

    pub async fn get_one_by_id(&self, id: &str) -> Result<Option<T>, IntegrationOSError> {
        let filter = doc! { "_id": id };

        Ok(self.collection.find_one(filter, None).await?)
    }

    /// Get all records from the collection
    ///
    /// Use this method with caution, as it can be very slow for large collections.
    pub async fn get_all(&self) -> Result<Vec<T>, IntegrationOSError> {
        let cursor = self.collection.find(None, None).await?;
        let records = cursor.try_collect().await?;

        Ok(records)
    }

    pub async fn get_many(
        &self,
        filter: Option<Document>,
        selection: Option<Document>,
        sort: Option<Document>,
        limit: Option<u64>,
        skip: Option<u64>,
    ) -> Result<Vec<T>, IntegrationOSError> {
        let mut filter_options = mongodb::options::FindOptions::default();
        filter_options.sort = sort;
        filter_options.projection = selection;
        filter_options.limit = limit.map(|l| l as i64);
        filter_options.skip = skip;

        if filter_options.sort.is_none() {
            filter_options.sort = Some(doc! { "createdAt": -1 });
        }

        let cursor = self.collection.find(filter, filter_options).await?;
        let records = cursor.try_collect().await?;

        Ok(records)
    }

    pub async fn create_one(&self, data: &T) -> Result<(), IntegrationOSError> {
        self.collection.insert_one(data, None).await?;

        Ok(())
    }

    pub async fn create_many(&self, data: &[T]) -> Result<(), IntegrationOSError> {
        self.collection.insert_many(data, None).await?;

        Ok(())
    }

    pub async fn update_one(&self, id: &str, data: Document) -> Result<(), IntegrationOSError> {
        let filter = doc! { "_id": id };

        self.collection.update_one(filter, data, None).await?;
        Ok(())
    }

    pub async fn update_many(
        &self,
        filter: Document,
        data: Document,
    ) -> Result<(), IntegrationOSError> {
        self.collection.update_many(filter, data, None).await?;

        Ok(())
    }

    pub async fn update_many_with_aggregation_pipeline(
        &self,
        filter: Document,
        data: &[Document],
    ) -> Result<(), IntegrationOSError> {
        self.collection
            .update_many(filter, data.to_vec(), None)
            .await?;

        Ok(())
    }

    pub async fn count(
        &self,
        filter: Document,
        limit: Option<u64>,
    ) -> Result<u64, IntegrationOSError> {
        Ok(self
            .collection
            .count_documents(filter, CountOptions::builder().limit(limit).build())
            .await?)
    }
}