integrationos_domain/algebra/
store.rs

1use crate::IntegrationOSError;
2use crate::Store;
3use bson::doc;
4use futures::TryStreamExt;
5use mongodb::bson::Document;
6use mongodb::options::CountOptions;
7use mongodb::{Collection, Database};
8use serde::de::DeserializeOwned;
9use serde::Serialize;
10
11#[derive(Debug, Clone)]
12pub struct MongoStore<T: Serialize + DeserializeOwned + Unpin + Sync> {
13    pub collection: Collection<T>,
14}
15
16impl<T: Serialize + DeserializeOwned + Unpin + Sync + Send + 'static> MongoStore<T> {
17    pub async fn new(database: &Database, store: &Store) -> Result<Self, IntegrationOSError> {
18        let collection = database.collection::<T>(store.to_string().as_str());
19        Ok(Self { collection })
20    }
21
22    pub async fn aggregate(
23        &self,
24        pipeline: Vec<Document>,
25    ) -> Result<Vec<Document>, IntegrationOSError> {
26        let cursor = self.collection.aggregate(pipeline, None).await?;
27        let results = cursor.try_collect().await?;
28        Ok(results)
29    }
30
31    pub async fn get_one(&self, filter: Document) -> Result<Option<T>, IntegrationOSError> {
32        Ok(self.collection.find_one(filter, None).await?)
33    }
34
35    pub async fn get_one_by_id(&self, id: &str) -> Result<Option<T>, IntegrationOSError> {
36        let filter = doc! { "_id": id };
37
38        Ok(self.collection.find_one(filter, None).await?)
39    }
40
41    /// Get all records from the collection
42    ///
43    /// Use this method with caution, as it can be very slow for large collections.
44    pub async fn get_all(&self) -> Result<Vec<T>, IntegrationOSError> {
45        let cursor = self.collection.find(None, None).await?;
46        let records = cursor.try_collect().await?;
47
48        Ok(records)
49    }
50
51    pub async fn get_many(
52        &self,
53        filter: Option<Document>,
54        selection: Option<Document>,
55        sort: Option<Document>,
56        limit: Option<u64>,
57        skip: Option<u64>,
58    ) -> Result<Vec<T>, IntegrationOSError> {
59        let mut filter_options = mongodb::options::FindOptions::default();
60        filter_options.sort = sort;
61        filter_options.projection = selection;
62        filter_options.limit = limit.map(|l| l as i64);
63        filter_options.skip = skip;
64
65        if filter_options.sort.is_none() {
66            filter_options.sort = Some(doc! { "createdAt": -1 });
67        }
68
69        let cursor = self.collection.find(filter, filter_options).await?;
70        let records = cursor.try_collect().await?;
71
72        Ok(records)
73    }
74
75    pub async fn create_one(&self, data: &T) -> Result<(), IntegrationOSError> {
76        self.collection.insert_one(data, None).await?;
77
78        Ok(())
79    }
80
81    pub async fn create_many(&self, data: &[T]) -> Result<(), IntegrationOSError> {
82        self.collection.insert_many(data, None).await?;
83
84        Ok(())
85    }
86
87    pub async fn update_one(&self, id: &str, data: Document) -> Result<(), IntegrationOSError> {
88        let filter = doc! { "_id": id };
89
90        self.collection.update_one(filter, data, None).await?;
91        Ok(())
92    }
93
94    pub async fn update_many(
95        &self,
96        filter: Document,
97        data: Document,
98    ) -> Result<(), IntegrationOSError> {
99        self.collection.update_many(filter, data, None).await?;
100
101        Ok(())
102    }
103
104    pub async fn update_many_with_aggregation_pipeline(
105        &self,
106        filter: Document,
107        data: &[Document],
108    ) -> Result<(), IntegrationOSError> {
109        self.collection
110            .update_many(filter, data.to_vec(), None)
111            .await?;
112
113        Ok(())
114    }
115
116    pub async fn count(
117        &self,
118        filter: Document,
119        limit: Option<u64>,
120    ) -> Result<u64, IntegrationOSError> {
121        Ok(self
122            .collection
123            .count_documents(filter, CountOptions::builder().limit(limit).build())
124            .await?)
125    }
126}