1use async_trait::async_trait;
2use futures::stream::StreamExt;
3use log::trace;
4use mongodb::bson::oid::ObjectId;
5use mongodb::bson::{doc, Bson::Document as BsonDocument, Document};
6use mongodb::error::Error;
7use mongodb::options::FindOptions;
8use mongodb::Database;
9use mongodb::{bson, options::AggregateOptions};
10use serde::{de::DeserializeOwned, Serialize};
11
12use mongodb::{
13 options::{
14 DeleteOptions, FindOneAndReplaceOptions, FindOneAndUpdateOptions, ReplaceOptions,
15 UpdateModifications, UpdateOptions,
16 },
17 results::{DeleteResult, InsertOneResult, UpdateResult},
18};
19use std::borrow::Borrow;
20
21#[async_trait]
22pub trait CrudRepository {
23 fn collection_name(&self) -> String;
24 fn database(&self) -> Box<&Database>;
25
26 async fn find_one<T>(&self, filter_document: Document) -> Result<Option<T>, Error>
27 where
28 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
29 {
30 let coll = self.database().collection(&self.collection_name());
31 coll.find_one(filter_document, None).await
32 }
33
34 async fn find_by_id<T>(&self, id: &ObjectId) -> Result<Option<T>, Error>
35 where
36 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
37 {
38 let filter_document = doc! {"_id": id};
39 self.find_one(filter_document).await
40 }
41
42 async fn find_by_string_id<T>(&self, id: &str) -> Result<Option<T>, Error>
43 where
44 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
45 {
46 let filter_document = doc! {"_id": id};
47 self.find_one(filter_document).await
48 }
49
50 async fn find_one_by_string_field<T>(&self, name: &str, value: &str) -> Result<Option<T>, Error>
51 where
52 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
53 {
54 let filter_document = doc! {name: value};
55 self.find_one(filter_document).await
56 }
57
58 async fn find_by_string_field<T>(&self, name: &str, value: &str) -> Result<Vec<T>, Error>
59 where
60 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
61 {
62 let filter_document = doc! {name: value};
63 self.find_simple(filter_document).await
64 }
65
66 async fn find_all<T>(&self) -> Result<Vec<T>, Error>
67 where
68 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
69 {
70 self.find(None, None).await
71 }
72
73 async fn find_simple<T>(&self, filter_document: Document) -> Result<Vec<T>, Error>
74 where
75 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
76 {
77 trace!("find");
78 self.find(filter_document, None).await
79 }
80
81 async fn find_with_sort<T>(
82 &self,
83 filter_document: Document,
84 sort_document: Document,
85 ) -> Result<Vec<T>, Error>
86 where
87 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
88 {
89 let sort_option = get_sort_find_option(Some(sort_document));
90 self.find(filter_document, sort_option).await
91 }
92
93 async fn find<T>(
94 &self,
95 filter_document: impl Into<Option<Document>> + Send + 'async_trait,
96 options: impl Into<Option<FindOptions>> + Send + 'async_trait,
97 ) -> Result<Vec<T>, Error>
98 where
99 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
100 {
101 let coll = self.database().collection(&self.collection_name());
102
103 let mut cursor = coll.find(filter_document, options).await?;
104 let mut items = Vec::<T>::new();
105 while let Some(result) = cursor.next().await {
106 match result {
107 Ok(document) => {
108 items.push(document);
109 }
110 Err(err) => {
111 return Err(err);
112 }
113 }
114 }
115
116 Ok(items)
117 }
118
119 async fn count_documents(
120 &self,
121 filter: impl Into<Option<Document>> + Send + 'async_trait,
122 ) -> Result<u64, Error> {
123 let coll = &self
124 .database()
125 .collection::<Document>(&self.collection_name());
126 coll.count_documents(filter, None).await
127 }
128
129 async fn aggregate<T>(
130 &self,
131 pipeline: impl IntoIterator<Item = Document> + Send + 'async_trait,
132 options: impl Into<Option<AggregateOptions>> + Send + 'async_trait,
133 ) -> Result<Vec<T>, Error>
134 where
135 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
136 {
137 let coll = self
138 .database()
139 .collection::<Document>(&self.collection_name());
140 let mut cursor = coll.aggregate(pipeline, options).await?;
141 let mut items = Vec::<T>::new();
142
143 while let Some(result) = cursor.next().await {
144 match result {
145 Ok(document) => {
146 let item = bson::from_bson::<T>(BsonDocument(document))?;
147 items.push(item);
148 }
149 Err(err) => {
150 return Err(err);
151 }
152 }
153 }
154
155 Ok(items)
156 }
157
158 fn get_sort_find_option(&self, sort_document_option: Option<Document>) -> Option<FindOptions> {
159 if let Some(sort_document) = sort_document_option {
160 Some(FindOptions::builder().sort(sort_document).build())
161 } else {
162 None
163 }
164 }
165
166 async fn _find_one_by_field<T>(
167 &self,
168 field_name: String,
169 value: String,
170 ) -> Result<Option<T>, Error>
171 where
172 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
173 {
174 self.find_one(doc! {field_name: value}).await
175 }
176
177 async fn add<T>(
178 &self,
179 item: impl Borrow<T> + Send + Sync + 'async_trait,
180 ) -> Result<InsertOneResult, Error>
181 where
182 T: Serialize + Send + Sync,
183 {
184 let coll = self.database().collection(&self.collection_name());
185 coll.insert_one(item, None).await
186 }
187
188 async fn update_one(
189 &self,
190 query: Document,
191 update: impl Into<UpdateModifications> + Send + Sync + 'async_trait,
192 options: impl Into<Option<UpdateOptions>> + Send + Sync + 'async_trait,
193 ) -> Result<UpdateResult, Error> {
194 let coll = self.database().collection::<Document>(&self.collection_name());
195 coll.update_one(query, update, options).await
196 }
197
198 async fn find_one_and_update<T>(
199 &self,
200 filter: Document,
201 update: impl Into<UpdateModifications> + Send + Sync + 'async_trait,
202 options: impl Into<Option<FindOneAndUpdateOptions>> + Send + Sync + 'async_trait,
203 ) -> Result<Option<T>, Error>
204 where
205 for<'a> T: DeserializeOwned + Send + Sync,
206 {
207 let coll = self.database().collection(&self.collection_name());
208 coll.find_one_and_update(filter, update, options).await
209 }
210
211 async fn find_one_and_replace<T>(
212 &self,
213 filter: Document,
214 replacement: impl Borrow<T> + Send + Sync + 'async_trait,
215 options: impl Into<Option<FindOneAndReplaceOptions>> + Send + Sync + 'async_trait,
216 ) -> Result<Option<T>, Error>
217 where
218 T: Serialize + DeserializeOwned + Send + Sync,
219 {
220 let coll = self.database().collection(&self.collection_name());
221 coll.find_one_and_replace(filter, replacement, options)
222 .await
223 }
224
225 async fn replace_one<T>(
226 &self,
227 query: Document,
228 replacement: impl Borrow<T> + Send + Sync + 'async_trait,
229 options: impl Into<Option<ReplaceOptions>> + Send + Sync + 'async_trait,
230 ) -> Result<UpdateResult, Error>
231 where
232 for<'a> T: Serialize + Send + Sync,
233 {
234 let coll = self.database().collection(&self.collection_name());
235 coll.replace_one(query, replacement, options).await
236 }
237
238 async fn delete_one(
239 &self,
240 query: Document,
241 options: impl Into<Option<DeleteOptions>> + Send + Sync + 'async_trait,
242 ) -> Result<DeleteResult, Error> {
243 let coll = self.database().collection::<Document>(&self.collection_name());
244 coll.delete_one(query, options).await
245 }
246}
247
248pub async fn find_one<T>(
249 filter_document: Document,
250 collection_name: &str,
251 db: &Database,
252) -> Result<Option<T>, Error>
253where
254 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
255{
256 trace!("find_one");
257 let coll = db.collection(collection_name);
258 coll.find_one(filter_document, None).await
259}
260
261pub async fn find_by_id<T>(
262 id: &ObjectId,
263 collection_name: &str,
264 db: &Database,
265) -> Result<Option<T>, Error>
266where
267 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
268{
269 trace!("find_by_id");
270 let filter_document = doc! {"_id": id};
271 find_one(filter_document, &collection_name, &db).await
272}
273
274pub async fn find_by_string_id<T>(
275 id: &str,
276 collection_name: &str,
277 db: &Database,
278) -> Result<Option<T>, Error>
279where
280 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
281{
282 trace!("find_by_string_id");
283 let filter_document = doc! {"_id": id};
284 find_one(filter_document, &collection_name, &db).await
285}
286
287pub async fn find_one_by_string_field<T>(
288 name: &str,
289 value: &str,
290 collection_name: &str,
291 db: &Database,
292) -> Result<Option<T>, Error>
293where
294 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
295{
296 trace!("find_by_string_id");
297 let filter_document = doc! {name: value};
298 find_one(filter_document, &collection_name, &db).await
299}
300
301pub async fn find_by_string_field<T>(
302 name: &str,
303 value: &str,
304 collection_name: &str,
305 db: &Database,
306) -> Result<Vec<T>, Error>
307where
308 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
309{
310 trace!("find_by_string_field");
311 let filter_document = doc! {name: value};
312 find_simple(filter_document, &collection_name, &db).await
313}
314
315pub async fn find_all<T>(collection_name: &str, db: &Database) -> Result<Vec<T>, Error>
316where
317 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
318{
319 trace!("find_all");
320 find(None, None, collection_name, db).await
321}
322
323pub async fn find_simple<T>(
324 filter_document: Document,
325 collection_name: &str,
326 db: &Database,
327) -> Result<Vec<T>, Error>
328where
329 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
330{
331 trace!("find");
332 find(filter_document, None, collection_name, db).await
333}
334
335pub async fn find_with_sort<T>(
336 filter_document: Document,
337 sort_document: Document,
338 collection_name: &str,
339 db: &Database,
340) -> Result<Vec<T>, Error>
341where
342 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
343{
344 trace!("find_with_sort");
345 let sort_option = get_sort_find_option(Some(sort_document));
346 find(filter_document, sort_option, collection_name, db).await
347}
348
349pub async fn find<T>(
350 filter_document: impl Into<Option<Document>>,
351 options: impl Into<Option<FindOptions>>,
352 collection_name: &str,
353 db: &Database,
354) -> Result<Vec<T>, Error>
355where
356 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
357{
358 trace!("find");
359 let coll = db.collection(collection_name);
360
361 let mut cursor = coll.find(filter_document, options).await?;
362 let mut items = Vec::<T>::new();
363 while let Some(result) = cursor.next().await {
364 match result {
365 Ok(document) => {
366 items.push(document);
367 }
368 Err(err) => {
369 return Err(err);
370 }
371 }
372 }
373
374 Ok(items)
375}
376
377pub async fn count_documents(
378 filter: impl Into<Option<Document>>,
379 collection_name: &str,
380 db: &Database,
381) -> Result<u64, Error> {
382 trace!("count_documents");
383 let coll = db.collection::<Document>(collection_name);
384 coll.count_documents(filter, None).await
385}
386
387pub async fn aggregate<T>(
388 pipeline: impl IntoIterator<Item = Document>,
389 options: impl Into<Option<AggregateOptions>>,
390 collection_name: &str,
391 db: &Database,
392) -> Result<Vec<T>, Error>
393where
394 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
395{
396 trace!("aggregate");
397 let coll = db.collection::<Document>(collection_name);
398 let mut cursor = coll.aggregate(pipeline, options).await?;
399 let mut items = Vec::<T>::new();
400
401 while let Some(result) = cursor.next().await {
402 match result {
403 Ok(document) => {
404 let item = bson::from_bson::<T>(BsonDocument(document))?;
405 items.push(item);
406 }
407 Err(err) => {
408 return Err(err);
409 }
410 }
411 }
412
413 Ok(items)
414}
415
416fn get_sort_find_option(sort_document_option: Option<Document>) -> Option<FindOptions> {
417 if let Some(sort_document) = sort_document_option {
418 Some(FindOptions::builder().sort(sort_document).build())
419 } else {
420 None
421 }
422}
423
424pub async fn _find_one_by_field<T>(
425 field_name: String,
426 value: String,
427 collection_name: &str,
428 db: &Database,
429) -> Result<Option<T>, Error>
430where
431 for<'a> T: DeserializeOwned + Unpin + Send + Sync,
432{
433 self::find_one(doc! {field_name: value}, collection_name, db).await
434}
435
436pub async fn add<T>(
437 item: impl Borrow<T>,
438 collection_name: &str,
439 db: &Database,
440) -> Result<InsertOneResult, Error>
441where
442 T: Serialize,
443{
444 let coll = db.collection(collection_name);
445 coll.insert_one(item, None).await
446}
447
448pub async fn update_one(
449 query: Document,
450 update: impl Into<UpdateModifications>,
451 options: impl Into<Option<UpdateOptions>>,
452 collection_name: &str,
453 db: &Database,
454) -> Result<UpdateResult, Error> {
455 let coll = db.collection::<Document>(collection_name);
456 coll.update_one(query, update, options).await
457}
458
459pub async fn find_one_and_update<T>(
460 filter: Document,
461 update: impl Into<UpdateModifications>,
462 options: impl Into<Option<FindOneAndUpdateOptions>>,
463 collection_name: &str,
464 db: &Database,
465) -> Result<Option<T>, Error>
466where
467 for<'a> T: DeserializeOwned,
468{
469 let coll = db.collection(collection_name);
470 coll.find_one_and_update(filter, update, options).await
471}
472
473pub async fn find_one_and_replace<T>(
474 filter: Document,
475 replacement: impl Borrow<T>,
476 options: impl Into<Option<FindOneAndReplaceOptions>>,
477 collection_name: &str,
478 db: &Database,
479) -> Result<Option<T>, Error>
480where
481 T: Serialize + DeserializeOwned,
482{
483 let coll = db.collection(collection_name);
484 coll.find_one_and_replace(filter, replacement, options)
485 .await
486}
487
488pub async fn replace_one<T>(
489 query: Document,
490 replacement: impl Borrow<T>,
491 options: impl Into<Option<ReplaceOptions>>,
492 collection_name: &str,
493 db: &Database,
494) -> Result<UpdateResult, Error>
495where
496 for<'a> T: Serialize,
497{
498 let coll = db.collection(collection_name);
499 coll.replace_one(query, replacement, options).await
500}
501
502pub async fn delete_one(
503 query: Document,
504 options: impl Into<Option<DeleteOptions>>,
505 collection_name: &str,
506 db: &Database,
507) -> Result<DeleteResult, Error> {
508 let coll = db.collection::<Document>(collection_name);
509 coll.delete_one(query, options).await
510}