wread_mongodb/
crud_repository.rs

1use async_trait::async_trait;
2use futures::stream::StreamExt;
3use log::trace;
4use mongodb::bson::oid::ObjectId;
5use mongodb::bson::{doc, Bson::Document as BsonDocument, Document};
6use mongodb::error::Error;
7use mongodb::options::FindOptions;
8use mongodb::Database;
9use mongodb::{bson, options::AggregateOptions};
10use serde::{de::DeserializeOwned, Serialize};
11
12use mongodb::{
13    options::{
14        DeleteOptions, FindOneAndReplaceOptions, FindOneAndUpdateOptions, ReplaceOptions,
15        UpdateModifications, UpdateOptions,
16    },
17    results::{DeleteResult, InsertOneResult, UpdateResult},
18};
19use std::borrow::Borrow;
20
21#[async_trait]
22pub trait CrudRepository {
23    fn collection_name(&self) -> String;
24    fn database(&self) -> Box<&Database>;
25
26    async fn find_one<T>(&self, filter_document: Document) -> Result<Option<T>, Error>
27    where
28        for<'a> T: DeserializeOwned + Unpin + Send + Sync,
29    {
30        let coll = self.database().collection(&self.collection_name());
31        coll.find_one(filter_document, None).await
32    }
33
34    async fn find_by_id<T>(&self, id: &ObjectId) -> Result<Option<T>, Error>
35    where
36        for<'a> T: DeserializeOwned + Unpin + Send + Sync,
37    {
38        let filter_document = doc! {"_id":  id};
39        self.find_one(filter_document).await
40    }
41
42    async fn find_by_string_id<T>(&self, id: &str) -> Result<Option<T>, Error>
43    where
44        for<'a> T: DeserializeOwned + Unpin + Send + Sync,
45    {
46        let filter_document = doc! {"_id": id};
47        self.find_one(filter_document).await
48    }
49
50    async fn find_one_by_string_field<T>(&self, name: &str, value: &str) -> Result<Option<T>, Error>
51    where
52        for<'a> T: DeserializeOwned + Unpin + Send + Sync,
53    {
54        let filter_document = doc! {name: value};
55        self.find_one(filter_document).await
56    }
57
58    async fn find_by_string_field<T>(&self, name: &str, value: &str) -> Result<Vec<T>, Error>
59    where
60        for<'a> T: DeserializeOwned + Unpin + Send + Sync,
61    {
62        let filter_document = doc! {name: value};
63        self.find_simple(filter_document).await
64    }
65
66    async fn find_all<T>(&self) -> Result<Vec<T>, Error>
67    where
68        for<'a> T: DeserializeOwned + Unpin + Send + Sync,
69    {
70        self.find(None, None).await
71    }
72
73    async fn find_simple<T>(&self, filter_document: Document) -> Result<Vec<T>, Error>
74    where
75        for<'a> T: DeserializeOwned + Unpin + Send + Sync,
76    {
77        trace!("find");
78        self.find(filter_document, None).await
79    }
80
81    async fn find_with_sort<T>(
82        &self,
83        filter_document: Document,
84        sort_document: Document,
85    ) -> Result<Vec<T>, Error>
86    where
87        for<'a> T: DeserializeOwned + Unpin + Send + Sync,
88    {
89        let sort_option = get_sort_find_option(Some(sort_document));
90        self.find(filter_document, sort_option).await
91    }
92
93    async fn find<T>(
94        &self,
95        filter_document: impl Into<Option<Document>> + Send + 'async_trait,
96        options: impl Into<Option<FindOptions>> + Send + 'async_trait,
97    ) -> Result<Vec<T>, Error>
98    where
99        for<'a> T: DeserializeOwned + Unpin + Send + Sync,
100    {
101        let coll = self.database().collection(&self.collection_name());
102
103        let mut cursor = coll.find(filter_document, options).await?;
104        let mut items = Vec::<T>::new();
105        while let Some(result) = cursor.next().await {
106            match result {
107                Ok(document) => {
108                    items.push(document);
109                }
110                Err(err) => {
111                    return Err(err);
112                }
113            }
114        }
115
116        Ok(items)
117    }
118
119    async fn count_documents(
120        &self,
121        filter: impl Into<Option<Document>> + Send + 'async_trait,
122    ) -> Result<u64, Error> {
123        let coll = &self
124            .database()
125            .collection::<Document>(&self.collection_name());
126        coll.count_documents(filter, None).await
127    }
128
129    async fn aggregate<T>(
130        &self,
131        pipeline: impl IntoIterator<Item = Document> + Send + 'async_trait,
132        options: impl Into<Option<AggregateOptions>> + Send + 'async_trait,
133    ) -> Result<Vec<T>, Error>
134    where
135        for<'a> T: DeserializeOwned + Unpin + Send + Sync,
136    {
137        let coll = self
138            .database()
139            .collection::<Document>(&self.collection_name());
140        let mut cursor = coll.aggregate(pipeline, options).await?;
141        let mut items = Vec::<T>::new();
142
143        while let Some(result) = cursor.next().await {
144            match result {
145                Ok(document) => {
146                    let item = bson::from_bson::<T>(BsonDocument(document))?;
147                    items.push(item);
148                }
149                Err(err) => {
150                    return Err(err);
151                }
152            }
153        }
154
155        Ok(items)
156    }
157
158    fn get_sort_find_option(&self, sort_document_option: Option<Document>) -> Option<FindOptions> {
159        if let Some(sort_document) = sort_document_option {
160            Some(FindOptions::builder().sort(sort_document).build())
161        } else {
162            None
163        }
164    }
165
166    async fn _find_one_by_field<T>(
167        &self,
168        field_name: String,
169        value: String,
170    ) -> Result<Option<T>, Error>
171    where
172        for<'a> T: DeserializeOwned + Unpin + Send + Sync,
173    {
174        self.find_one(doc! {field_name: value}).await
175    }
176
177    async fn add<T>(
178        &self,
179        item: impl Borrow<T> + Send + Sync + 'async_trait,
180    ) -> Result<InsertOneResult, Error>
181    where
182        T: Serialize + Send + Sync,
183    {
184        let coll = self.database().collection(&self.collection_name());
185        coll.insert_one(item, None).await
186    }
187
188    async fn update_one(
189        &self,
190        query: Document,
191        update: impl Into<UpdateModifications> + Send + Sync + 'async_trait,
192        options: impl Into<Option<UpdateOptions>> + Send + Sync + 'async_trait,
193    ) -> Result<UpdateResult, Error> {
194        let coll = self.database().collection::<Document>(&self.collection_name());
195        coll.update_one(query, update, options).await
196    }
197
198    async fn find_one_and_update<T>(
199        &self,
200        filter: Document,
201        update: impl Into<UpdateModifications> + Send + Sync + 'async_trait,
202        options: impl Into<Option<FindOneAndUpdateOptions>> + Send + Sync + 'async_trait,
203    ) -> Result<Option<T>, Error>
204    where
205        for<'a> T: DeserializeOwned + Send + Sync,
206    {
207        let coll = self.database().collection(&self.collection_name());
208        coll.find_one_and_update(filter, update, options).await
209    }
210
211    async fn find_one_and_replace<T>(
212        &self,
213        filter: Document,
214        replacement: impl Borrow<T> + Send + Sync + 'async_trait,
215        options: impl Into<Option<FindOneAndReplaceOptions>> + Send + Sync + 'async_trait,
216    ) -> Result<Option<T>, Error>
217    where
218        T: Serialize + DeserializeOwned + Send + Sync,
219    {
220        let coll = self.database().collection(&self.collection_name());
221        coll.find_one_and_replace(filter, replacement, options)
222            .await
223    }
224
225    async fn replace_one<T>(
226        &self,
227        query: Document,
228        replacement: impl Borrow<T> + Send + Sync + 'async_trait,
229        options: impl Into<Option<ReplaceOptions>> + Send + Sync + 'async_trait,
230    ) -> Result<UpdateResult, Error>
231    where
232        for<'a> T: Serialize + Send + Sync,
233    {
234        let coll = self.database().collection(&self.collection_name());
235        coll.replace_one(query, replacement, options).await
236    }
237
238    async fn delete_one(
239        &self,
240        query: Document,
241        options: impl Into<Option<DeleteOptions>> + Send + Sync + 'async_trait,
242    ) -> Result<DeleteResult, Error> {
243        let coll = self.database().collection::<Document>(&self.collection_name());
244        coll.delete_one(query, options).await
245    }
246}
247
248pub async fn find_one<T>(
249    filter_document: Document,
250    collection_name: &str,
251    db: &Database,
252) -> Result<Option<T>, Error>
253where
254    for<'a> T: DeserializeOwned + Unpin + Send + Sync,
255{
256    trace!("find_one");
257    let coll = db.collection(collection_name);
258    coll.find_one(filter_document, None).await
259}
260
261pub async fn find_by_id<T>(
262    id: &ObjectId,
263    collection_name: &str,
264    db: &Database,
265) -> Result<Option<T>, Error>
266where
267    for<'a> T: DeserializeOwned + Unpin + Send + Sync,
268{
269    trace!("find_by_id");
270    let filter_document = doc! {"_id":  id};
271    find_one(filter_document, &collection_name, &db).await
272}
273
274pub async fn find_by_string_id<T>(
275    id: &str,
276    collection_name: &str,
277    db: &Database,
278) -> Result<Option<T>, Error>
279where
280    for<'a> T: DeserializeOwned + Unpin + Send + Sync,
281{
282    trace!("find_by_string_id");
283    let filter_document = doc! {"_id": id};
284    find_one(filter_document, &collection_name, &db).await
285}
286
287pub async fn find_one_by_string_field<T>(
288    name: &str,
289    value: &str,
290    collection_name: &str,
291    db: &Database,
292) -> Result<Option<T>, Error>
293where
294    for<'a> T: DeserializeOwned + Unpin + Send + Sync,
295{
296    trace!("find_by_string_id");
297    let filter_document = doc! {name: value};
298    find_one(filter_document, &collection_name, &db).await
299}
300
301pub async fn find_by_string_field<T>(
302    name: &str,
303    value: &str,
304    collection_name: &str,
305    db: &Database,
306) -> Result<Vec<T>, Error>
307where
308    for<'a> T: DeserializeOwned + Unpin + Send + Sync,
309{
310    trace!("find_by_string_field");
311    let filter_document = doc! {name: value};
312    find_simple(filter_document, &collection_name, &db).await
313}
314
315pub async fn find_all<T>(collection_name: &str, db: &Database) -> Result<Vec<T>, Error>
316where
317    for<'a> T: DeserializeOwned + Unpin + Send + Sync,
318{
319    trace!("find_all");
320    find(None, None, collection_name, db).await
321}
322
323pub async fn find_simple<T>(
324    filter_document: Document,
325    collection_name: &str,
326    db: &Database,
327) -> Result<Vec<T>, Error>
328where
329    for<'a> T: DeserializeOwned + Unpin + Send + Sync,
330{
331    trace!("find");
332    find(filter_document, None, collection_name, db).await
333}
334
335pub async fn find_with_sort<T>(
336    filter_document: Document,
337    sort_document: Document,
338    collection_name: &str,
339    db: &Database,
340) -> Result<Vec<T>, Error>
341where
342    for<'a> T: DeserializeOwned + Unpin + Send + Sync,
343{
344    trace!("find_with_sort");
345    let sort_option = get_sort_find_option(Some(sort_document));
346    find(filter_document, sort_option, collection_name, db).await
347}
348
349pub async fn find<T>(
350    filter_document: impl Into<Option<Document>>,
351    options: impl Into<Option<FindOptions>>,
352    collection_name: &str,
353    db: &Database,
354) -> Result<Vec<T>, Error>
355where
356    for<'a> T: DeserializeOwned + Unpin + Send + Sync,
357{
358    trace!("find");
359    let coll = db.collection(collection_name);
360
361    let mut cursor = coll.find(filter_document, options).await?;
362    let mut items = Vec::<T>::new();
363    while let Some(result) = cursor.next().await {
364        match result {
365            Ok(document) => {
366                items.push(document);
367            }
368            Err(err) => {
369                return Err(err);
370            }
371        }
372    }
373
374    Ok(items)
375}
376
377pub async fn count_documents(
378    filter: impl Into<Option<Document>>,
379    collection_name: &str,
380    db: &Database,
381) -> Result<u64, Error> {
382    trace!("count_documents");
383    let coll = db.collection::<Document>(collection_name);
384    coll.count_documents(filter, None).await
385}
386
387pub async fn aggregate<T>(
388    pipeline: impl IntoIterator<Item = Document>,
389    options: impl Into<Option<AggregateOptions>>,
390    collection_name: &str,
391    db: &Database,
392) -> Result<Vec<T>, Error>
393where
394    for<'a> T: DeserializeOwned + Unpin + Send + Sync,
395{
396    trace!("aggregate");
397    let coll = db.collection::<Document>(collection_name);
398    let mut cursor = coll.aggregate(pipeline, options).await?;
399    let mut items = Vec::<T>::new();
400
401    while let Some(result) = cursor.next().await {
402        match result {
403            Ok(document) => {
404                let item = bson::from_bson::<T>(BsonDocument(document))?;
405                items.push(item);
406            }
407            Err(err) => {
408                return Err(err);
409            }
410        }
411    }
412
413    Ok(items)
414}
415
416fn get_sort_find_option(sort_document_option: Option<Document>) -> Option<FindOptions> {
417    if let Some(sort_document) = sort_document_option {
418        Some(FindOptions::builder().sort(sort_document).build())
419    } else {
420        None
421    }
422}
423
424pub async fn _find_one_by_field<T>(
425    field_name: String,
426    value: String,
427    collection_name: &str,
428    db: &Database,
429) -> Result<Option<T>, Error>
430where
431    for<'a> T: DeserializeOwned + Unpin + Send + Sync,
432{
433    self::find_one(doc! {field_name: value}, collection_name, db).await
434}
435
436pub async fn add<T>(
437    item: impl Borrow<T>,
438    collection_name: &str,
439    db: &Database,
440) -> Result<InsertOneResult, Error>
441where
442    T: Serialize,
443{
444    let coll = db.collection(collection_name);
445    coll.insert_one(item, None).await
446}
447
448pub async fn update_one(
449    query: Document,
450    update: impl Into<UpdateModifications>,
451    options: impl Into<Option<UpdateOptions>>,
452    collection_name: &str,
453    db: &Database,
454) -> Result<UpdateResult, Error> {
455    let coll = db.collection::<Document>(collection_name);
456    coll.update_one(query, update, options).await
457}
458
459pub async fn find_one_and_update<T>(
460    filter: Document,
461    update: impl Into<UpdateModifications>,
462    options: impl Into<Option<FindOneAndUpdateOptions>>,
463    collection_name: &str,
464    db: &Database,
465) -> Result<Option<T>, Error>
466where
467    for<'a> T: DeserializeOwned,
468{
469    let coll = db.collection(collection_name);
470    coll.find_one_and_update(filter, update, options).await
471}
472
473pub async fn find_one_and_replace<T>(
474    filter: Document,
475    replacement: impl Borrow<T>,
476    options: impl Into<Option<FindOneAndReplaceOptions>>,
477    collection_name: &str,
478    db: &Database,
479) -> Result<Option<T>, Error>
480where
481    T: Serialize + DeserializeOwned,
482{
483    let coll = db.collection(collection_name);
484    coll.find_one_and_replace(filter, replacement, options)
485        .await
486}
487
488pub async fn replace_one<T>(
489    query: Document,
490    replacement: impl Borrow<T>,
491    options: impl Into<Option<ReplaceOptions>>,
492    collection_name: &str,
493    db: &Database,
494) -> Result<UpdateResult, Error>
495where
496    for<'a> T: Serialize,
497{
498    let coll = db.collection(collection_name);
499    coll.replace_one(query, replacement, options).await
500}
501
502pub async fn delete_one(
503    query: Document,
504    options: impl Into<Option<DeleteOptions>>,
505    collection_name: &str,
506    db: &Database,
507) -> Result<DeleteResult, Error> {
508    let coll = db.collection::<Document>(collection_name);
509    coll.delete_one(query, options).await
510}