osentities 2.0.0

Shared library for Pica
Documentation
use crate::PicaError;
use crate::Store;
use bson::doc;
use futures::TryStreamExt;
use mongodb::bson::Document;
use mongodb::options::CountOptions;
use mongodb::{Collection, Database};
use serde::de::DeserializeOwned;
use serde::Serialize;

#[derive(Debug, Clone)]
pub struct MongoStore<T: Serialize + DeserializeOwned + Unpin + Sync + Send + Sync> {
    pub collection: Collection<T>,
}

impl<T: Serialize + DeserializeOwned + Unpin + Sync + Send + 'static> MongoStore<T> {
    pub async fn new(database: &Database, store: &Store) -> Result<Self, PicaError> {
        let collection = database.collection::<T>(store.to_string().as_str());
        Ok(Self { collection })
    }

    pub async fn aggregate(&self, pipeline: Vec<Document>) -> Result<Vec<Document>, PicaError> {
        let cursor = self.collection.aggregate(pipeline).await?;
        let results = cursor.try_collect().await?;
        Ok(results)
    }

    pub async fn get_one(&self, filter: Document) -> Result<Option<T>, PicaError> {
        Ok(self.collection.find_one(filter).await?)
    }

    pub async fn get_one_by_id(&self, id: &str) -> Result<Option<T>, PicaError> {
        let filter = doc! { "_id": id };

        Ok(self.collection.find_one(filter).await?)
    }

    pub async fn get_many(
        &self,
        filter: Option<Document>,
        selection: Option<Document>,
        sort: Option<Document>,
        limit: Option<u64>,
        skip: Option<u64>,
    ) -> Result<Vec<T>, PicaError> {
        let mut filter_options = mongodb::options::FindOptions::default();
        filter_options.sort = sort;
        filter_options.projection = selection;
        filter_options.limit = limit.map(|l| l as i64);
        filter_options.skip = skip;

        if filter_options.sort.is_none() {
            filter_options.sort = Some(doc! { "createdAt": -1 });
        }

        let cursor = self
            .collection
            .find(filter.unwrap_or_default())
            .with_options(filter_options)
            .await?;

        let records = cursor.try_collect().await?;

        Ok(records)
    }

    pub async fn create_one(&self, data: &T) -> Result<(), PicaError> {
        self.collection.insert_one(data).await?;

        Ok(())
    }

    pub async fn create_many(&self, data: &[T]) -> Result<(), PicaError> {
        self.collection.insert_many(data).await?;

        Ok(())
    }

    pub async fn update_one(&self, id: &str, data: Document) -> Result<(), PicaError> {
        let filter = doc! { "_id": id };

        self.collection.update_one(filter, data).await?;
        Ok(())
    }

    pub async fn update_many(&self, filter: Document, data: Document) -> Result<(), PicaError> {
        self.collection.update_many(filter, data).await?;

        Ok(())
    }

    pub async fn update_many_with_aggregation_pipeline(
        &self,
        filter: Document,
        data: &[Document],
    ) -> Result<(), PicaError> {
        self.collection.update_many(filter, data.to_vec()).await?;

        Ok(())
    }

    pub async fn count(&self, filter: Document, limit: Option<u64>) -> Result<u64, PicaError> {
        Ok(self
            .collection
            .count_documents(filter)
            .with_options(CountOptions::builder().limit(limit).build())
            .await?)
    }
}