integrationos_domain/algebra/
store.rs1use crate::IntegrationOSError;
2use crate::Store;
3use bson::doc;
4use futures::TryStreamExt;
5use mongodb::bson::Document;
6use mongodb::options::CountOptions;
7use mongodb::{Collection, Database};
8use serde::de::DeserializeOwned;
9use serde::Serialize;
10
11#[derive(Debug, Clone)]
12pub struct MongoStore<T: Serialize + DeserializeOwned + Unpin + Sync> {
13 pub collection: Collection<T>,
14}
15
16impl<T: Serialize + DeserializeOwned + Unpin + Sync + Send + 'static> MongoStore<T> {
17 pub async fn new(database: &Database, store: &Store) -> Result<Self, IntegrationOSError> {
18 let collection = database.collection::<T>(store.to_string().as_str());
19 Ok(Self { collection })
20 }
21
22 pub async fn aggregate(
23 &self,
24 pipeline: Vec<Document>,
25 ) -> Result<Vec<Document>, IntegrationOSError> {
26 let cursor = self.collection.aggregate(pipeline, None).await?;
27 let results = cursor.try_collect().await?;
28 Ok(results)
29 }
30
31 pub async fn get_one(&self, filter: Document) -> Result<Option<T>, IntegrationOSError> {
32 Ok(self.collection.find_one(filter, None).await?)
33 }
34
35 pub async fn get_one_by_id(&self, id: &str) -> Result<Option<T>, IntegrationOSError> {
36 let filter = doc! { "_id": id };
37
38 Ok(self.collection.find_one(filter, None).await?)
39 }
40
41 pub async fn get_all(&self) -> Result<Vec<T>, IntegrationOSError> {
45 let cursor = self.collection.find(None, None).await?;
46 let records = cursor.try_collect().await?;
47
48 Ok(records)
49 }
50
51 pub async fn get_many(
52 &self,
53 filter: Option<Document>,
54 selection: Option<Document>,
55 sort: Option<Document>,
56 limit: Option<u64>,
57 skip: Option<u64>,
58 ) -> Result<Vec<T>, IntegrationOSError> {
59 let mut filter_options = mongodb::options::FindOptions::default();
60 filter_options.sort = sort;
61 filter_options.projection = selection;
62 filter_options.limit = limit.map(|l| l as i64);
63 filter_options.skip = skip;
64
65 if filter_options.sort.is_none() {
66 filter_options.sort = Some(doc! { "createdAt": -1 });
67 }
68
69 let cursor = self.collection.find(filter, filter_options).await?;
70 let records = cursor.try_collect().await?;
71
72 Ok(records)
73 }
74
75 pub async fn create_one(&self, data: &T) -> Result<(), IntegrationOSError> {
76 self.collection.insert_one(data, None).await?;
77
78 Ok(())
79 }
80
81 pub async fn create_many(&self, data: &[T]) -> Result<(), IntegrationOSError> {
82 self.collection.insert_many(data, None).await?;
83
84 Ok(())
85 }
86
87 pub async fn update_one(&self, id: &str, data: Document) -> Result<(), IntegrationOSError> {
88 let filter = doc! { "_id": id };
89
90 self.collection.update_one(filter, data, None).await?;
91 Ok(())
92 }
93
94 pub async fn update_many(
95 &self,
96 filter: Document,
97 data: Document,
98 ) -> Result<(), IntegrationOSError> {
99 self.collection.update_many(filter, data, None).await?;
100
101 Ok(())
102 }
103
104 pub async fn update_many_with_aggregation_pipeline(
105 &self,
106 filter: Document,
107 data: &[Document],
108 ) -> Result<(), IntegrationOSError> {
109 self.collection
110 .update_many(filter, data.to_vec(), None)
111 .await?;
112
113 Ok(())
114 }
115
116 pub async fn count(
117 &self,
118 filter: Document,
119 limit: Option<u64>,
120 ) -> Result<u64, IntegrationOSError> {
121 Ok(self
122 .collection
123 .count_documents(filter, CountOptions::builder().limit(limit).build())
124 .await?)
125 }
126}