shared/infrastructure/database/mongodb/
adapter.rs1use async_trait::async_trait;
2use futures::TryStreamExt;
3use mongodb::{Collection, options::ReturnDocument};
4use serde::{Serialize, de::DeserializeOwned};
5use std::fmt::Debug;
6
7use crate::error::CoreError;
8
9use crate::domain::database::DatabaseAdapter;
10use crate::domain::query::{IntoBsonDocument, IntoDbFilter, QueryBuilder};
11use crate::infrastructure::database::bindings::traits::MongoInsert;
12
13#[derive(Debug, Clone)]
14pub struct MongodbAdapter<T: Send + Sync + Debug> {
15 collection: Collection<T>,
17}
18
19impl<T: Send + Sync + Debug> MongodbAdapter<T> {
20 pub fn new(client: &mongodb::Client, cnx: &str, name: &str) -> Self {
21 MongodbAdapter {
22 collection: client.database(cnx).collection::<T>(name),
24 }
25 }
26}
27
28#[async_trait]
29impl<T> DatabaseAdapter<T> for MongodbAdapter<T>
30where
31 T: Debug
32 + Send
33 + Sync
34 + Serialize
35 + DeserializeOwned
36 + 'static
37 + MongoInsert
38 + IntoBsonDocument,
39{
40 async fn insert(&self, data: T) -> Result<String, CoreError> {
41 let collection = self.collection.clone_with_type::<mongodb::bson::Document>();
42 let doc = data.into_bson_document().unwrap();
43
44 let operation = collection.insert_one(doc);
45
46 let doc = operation.await?;
47
48 let id = doc.inserted_id.as_object_id().ok_or({
49 CoreError::Validation(crate::error::ValidationError::Malformed {
50 field: crate::error::CredentialField::ObjectId,
51 })
52 })?;
53 Ok(id.to_string())
54 }
55 async fn insert_many(&self, data: Vec<T>) -> Result<Vec<String>, CoreError> {
56 let collection = self.collection.clone_with_type::<mongodb::bson::Document>();
57 let docs: Vec<mongodb::bson::Document> = data
58 .into_iter()
59 .map(|d| d.into_bson_document().unwrap())
60 .collect();
61 let operation = collection.insert_many(docs);
62
63 let doc = operation.await?;
64
65 let ids = doc
66 .inserted_ids
67 .into_values()
68 .map(|v| {
69 v.as_object_id().map(|id| id.to_string()).ok_or({
70 CoreError::Validation(crate::error::ValidationError::Malformed {
71 field: crate::error::CredentialField::ObjectId,
72 })
73 })
74 })
75 .collect::<Result<Vec<String>, _>>()?;
76
77 Ok(ids)
78 }
79
80 async fn upsert(&self, data: T) -> Result<String, CoreError> {
81 let doc = data.into_bson_document().unwrap();
82
83 let filter: mongodb::bson::Document = T::uniques()
84 .iter()
85 .filter_map(|&key| {
86 let clean_key = key.trim_matches('"');
87 doc.get(clean_key)
88 .map(|val| (clean_key.to_string(), val.clone()))
89 })
90 .collect();
91 let update = mongodb::bson::doc! { "$set": doc };
92
93 let collection = self.collection.clone_with_type::<mongodb::bson::Document>();
94 let result = collection
95 .find_one_and_update(filter, update)
96 .return_document(ReturnDocument::After)
97 .upsert(true)
98 .await?
99 .ok_or(CoreError::Internal(crate::error::InternalError::Database(
100 "upsert failed".into(),
101 )))?;
102
103 let id = result
104 .get("_id")
105 .and_then(|id| id.as_object_id())
106 .ok_or(CoreError::Internal(crate::error::InternalError::Database(
107 "missing id".into(),
108 )))?;
109
110 Ok(id.to_string())
111 }
112
113 async fn upsert_many(&self, data: Vec<T>) -> Result<Vec<String>, CoreError> {
114 let mut ids = Vec::with_capacity(data.len());
115 for item in data {
116 let id = self.upsert(item).await?;
117 ids.push(id);
118 }
119 Ok(ids)
120 }
121
122 async fn find_all(&self, query: QueryBuilder) -> Result<Vec<T>, CoreError> {
168 let doc = query.into_mongo_filter();
169
170 self.collection
171 .find(doc)
172 .await?
173 .try_collect()
174 .await
175 .map_err(Into::into)
176 }
177
178 async fn find_one(&self, query: QueryBuilder) -> Result<Option<T>, CoreError> {
179 let doc = query.into_mongo_filter();
180
181 self.collection.find_one(doc).await.map_err(Into::into)
182 }
183
184 async fn find_one_and_update(
185 &self,
186 filter: QueryBuilder,
187 update: QueryBuilder,
188 ) -> Result<Option<T>, CoreError> {
189 let filter = filter.into_mongo_filter();
190 let update = update.into_mongo_update();
191
192 self.collection
193 .find_one_and_update(filter, update)
194 .return_document(ReturnDocument::After)
196 .await
197 .map_err(Into::into)
198 }
199
200 async fn update_many(
201 &self,
202 filter: QueryBuilder,
203 update: QueryBuilder,
204 ) -> Result<(), CoreError> {
205 let filter = filter.into_mongo_filter();
206 let update = update.into_mongo_update();
207
208 self.collection.update_many(filter, update).await?;
209 Ok(())
210 }
211
212 async fn delete_one(&self, query: QueryBuilder) -> Result<(), CoreError> {
213 let filter = query.into_mongo_filter();
214
215 self.collection.delete_one(filter).await?;
216 Ok(())
217 }
218
219 async fn delete_many(&self, query: QueryBuilder) -> Result<(), CoreError> {
220 let filter = query.into_mongo_filter();
221
222 self.collection.delete_many(filter).await?;
223 Ok(())
224 }
225}