Skip to main content

shared/infrastructure/database/mongodb/
adapter.rs

1use async_trait::async_trait;
2use futures::TryStreamExt;
3use mongodb::{Collection, options::ReturnDocument};
4use serde::{Serialize, de::DeserializeOwned};
5use std::fmt::Debug;
6
7use crate::error::CoreError;
8
9use crate::domain::database::DatabaseAdapter;
10use crate::domain::query::{IntoBsonDocument, IntoDbFilter, QueryBuilder};
11use crate::infrastructure::database::bindings::traits::MongoInsert;
12
13#[derive(Debug, Clone)]
14pub struct MongodbAdapter<T: Send + Sync + Debug> {
15    // client: mongodb::Client,
16    collection: Collection<T>,
17}
18
19impl<T: Send + Sync + Debug> MongodbAdapter<T> {
20    pub fn new(client: &mongodb::Client, cnx: &str, name: &str) -> Self {
21        MongodbAdapter {
22            // client: client.clone(),
23            collection: client.database(cnx).collection::<T>(name),
24        }
25    }
26}
27
28#[async_trait]
29impl<T> DatabaseAdapter<T> for MongodbAdapter<T>
30where
31    T: Debug
32        + Send
33        + Sync
34        + Serialize
35        + DeserializeOwned
36        + 'static
37        + MongoInsert
38        + IntoBsonDocument,
39{
40    async fn insert(&self, data: T) -> Result<String, CoreError> {
41        let collection = self.collection.clone_with_type::<mongodb::bson::Document>();
42        let doc = data.into_bson_document().unwrap();
43
44        let operation = collection.insert_one(doc);
45
46        let doc = operation.await?;
47
48        let id = doc.inserted_id.as_object_id().ok_or({
49            CoreError::Validation(crate::error::ValidationError::Malformed {
50                field: crate::error::CredentialField::ObjectId,
51            })
52        })?;
53        Ok(id.to_string())
54    }
55    async fn insert_many(&self, data: Vec<T>) -> Result<Vec<String>, CoreError> {
56        let collection = self.collection.clone_with_type::<mongodb::bson::Document>();
57        let docs: Vec<mongodb::bson::Document> = data
58            .into_iter()
59            .map(|d| d.into_bson_document().unwrap())
60            .collect();
61        let operation = collection.insert_many(docs);
62
63        let doc = operation.await?;
64
65        let ids = doc
66            .inserted_ids
67            .into_values()
68            .map(|v| {
69                v.as_object_id().map(|id| id.to_string()).ok_or({
70                    CoreError::Validation(crate::error::ValidationError::Malformed {
71                        field: crate::error::CredentialField::ObjectId,
72                    })
73                })
74            })
75            .collect::<Result<Vec<String>, _>>()?;
76
77        Ok(ids)
78    }
79
80    async fn upsert(&self, data: T) -> Result<String, CoreError> {
81        let doc = data.into_bson_document().unwrap();
82
83        let filter: mongodb::bson::Document = T::uniques()
84            .iter()
85            .filter_map(|&key| {
86                let clean_key = key.trim_matches('"');
87                doc.get(clean_key)
88                    .map(|val| (clean_key.to_string(), val.clone()))
89            })
90            .collect();
91        let update = mongodb::bson::doc! { "$set": doc };
92
93        let collection = self.collection.clone_with_type::<mongodb::bson::Document>();
94        let result = collection
95            .find_one_and_update(filter, update)
96            .return_document(ReturnDocument::After)
97            .upsert(true)
98            .await?
99            .ok_or(CoreError::Internal(crate::error::InternalError::Database(
100                "upsert failed".into(),
101            )))?;
102
103        let id = result
104            .get("_id")
105            .and_then(|id| id.as_object_id())
106            .ok_or(CoreError::Internal(crate::error::InternalError::Database(
107                "missing id".into(),
108            )))?;
109
110        Ok(id.to_string())
111    }
112
113    async fn upsert_many(&self, data: Vec<T>) -> Result<Vec<String>, CoreError> {
114        let mut ids = Vec::with_capacity(data.len());
115        for item in data {
116            let id = self.upsert(item).await?;
117            ids.push(id);
118        }
119        Ok(ids)
120    }
121
122    // NOTE
123    // if mongodb is upgraded to version 8.x
124    // use this method
125    //
126    // async fn upsert_many(&self, data: Vec<T>) -> Result<Vec<String>, Error> {
127    //     let mut operations = vec![];
128    //
129    //     for item in &data {
130    //         let doc = mongodb::bson::to_document(item)?;
131    //
132    //         let filter: mongodb::bson::Document = T::uniques()
133    //             .iter()
134    //             .filter_map(|&key| doc.get(key).map(|val| (key.to_string(), val.clone())))
135    //             .collect();
136    //         let update = mongodb::bson::doc! { "$set": doc };
137    //
138    //         let update_model = mongodb::options::UpdateOneModel::builder()
139    //             .namespace(self.collection.namespace())
140    //             .filter(filter)
141    //             .update(update)
142    //             .upsert(true)
143    //             .build();
144    //         operations.push(mongodb::options::WriteModel::UpdateOne(update_model));
145    //     }
146    //
147    //     let result = self.client.bulk_write(operations).verbose_results().await?;
148    //
149    //     let ids: Vec<String> = result
150    //         .update_results
151    //         .into_values()
152    //         .map(|v| {
153    //             v.upserted_id
154    //                 .and_then(|id| id.as_object_id())
155    //                 .map(|id| id.to_string())
156    //                 .ok_or_else(|| {
157    //                     Error::Validation(crate::error::ValidationError::Malformed {
158    //                         field: crate::error::CredentialField::ObjectId,
159    //                     })
160    //                 })
161    //         })
162    //         .collect::<Result<Vec<String>, _>>()?;
163    //     Ok(ids)
164    // }
165
166    // -- FINDS
167    async fn find_all(&self, query: QueryBuilder) -> Result<Vec<T>, CoreError> {
168        let doc = query.into_mongo_filter();
169
170        self.collection
171            .find(doc)
172            .await?
173            .try_collect()
174            .await
175            .map_err(Into::into)
176    }
177
178    async fn find_one(&self, query: QueryBuilder) -> Result<Option<T>, CoreError> {
179        let doc = query.into_mongo_filter();
180
181        self.collection.find_one(doc).await.map_err(Into::into)
182    }
183
184    async fn find_one_and_update(
185        &self,
186        filter: QueryBuilder,
187        update: QueryBuilder,
188    ) -> Result<Option<T>, CoreError> {
189        let filter = filter.into_mongo_filter();
190        let update = update.into_mongo_update();
191
192        self.collection
193            .find_one_and_update(filter, update)
194            // .return_document(ReturnDocument::Before)
195            .return_document(ReturnDocument::After)
196            .await
197            .map_err(Into::into)
198    }
199
200    async fn update_many(
201        &self,
202        filter: QueryBuilder,
203        update: QueryBuilder,
204    ) -> Result<(), CoreError> {
205        let filter = filter.into_mongo_filter();
206        let update = update.into_mongo_update();
207
208        self.collection.update_many(filter, update).await?;
209        Ok(())
210    }
211
212    async fn delete_one(&self, query: QueryBuilder) -> Result<(), CoreError> {
213        let filter = query.into_mongo_filter();
214
215        self.collection.delete_one(filter).await?;
216        Ok(())
217    }
218
219    async fn delete_many(&self, query: QueryBuilder) -> Result<(), CoreError> {
220        let filter = query.into_mongo_filter();
221
222        self.collection.delete_many(filter).await?;
223        Ok(())
224    }
225}