mongodm/
repository.rs

1//! Repositories are abstraction over a specific mongo collection for a given `Model`
2
3use crate::{CollectionConfig, Model};
4use async_trait::async_trait;
5use mongodb::bson::oid::ObjectId;
6use mongodb::bson::{doc, from_document, to_bson, Document};
7use mongodb::error::Result;
8use mongodb::options::*;
9use serde::Deserialize;
10use std::borrow::Borrow;
11use std::ops::Deref;
12
13/// Represents an individual update operation for the `bulk_update` function.
14#[derive(Debug)]
15pub struct BulkUpdate {
16    pub query: Document,
17    pub update: Document,
18    pub options: Option<UpdateOptions>,
19}
20
21/// Result of a `bulk_update` operation.
22#[derive(Debug, Deserialize)]
23pub struct BulkUpdateResult {
24    #[serde(rename = "n")]
25    pub nb_affected: u64,
26    #[serde(rename = "nModified")]
27    pub nb_modified: u64,
28    #[serde(default)]
29    pub upserted: Vec<BulkUpdateUpsertResult>,
30}
31
32/// Individual update result of a `bulk_update` operation.
33/// Contains the generated id in case of an upsert.
34#[derive(Debug, Deserialize)]
35pub struct BulkUpdateUpsertResult {
36    pub index: u64,
37    #[serde(alias = "_id")]
38    pub id: ObjectId,
39}
40
41/// Associate a `mongodb::Collection` and a specific `Model`.
42///
43/// This type can safely be copied and passed around because `std::sync::Arc` is used internally.
44/// Underlying `mongodb::Collection` can be retrieved at anytime with `Repository::get_underlying`.
45#[derive(Debug)]
46pub struct Repository<M: Model> {
47    db: mongodb::Database, // FIXME: temporary keep reference to database object for `bulk_update` operation
48    coll: mongodb::Collection<M>,
49}
50
51impl<M: Model> Deref for Repository<M> {
52    type Target = mongodb::Collection<M>;
53    fn deref(&self) -> &mongodb::Collection<M> {
54        &self.coll
55    }
56}
57
58impl<M: Model> Clone for Repository<M> {
59    fn clone(&self) -> Self {
60        Self {
61            db: self.db.clone(),
62            coll: self.coll.clone_with_type(),
63        }
64    }
65}
66
67impl<M: Model> Repository<M> {
68    /// Create a new repository from the given mongo client.
69    pub fn new(db: mongodb::Database) -> Self {
70        let coll = if let Some(options) = M::CollConf::collection_options() {
71            db.collection_with_options(M::CollConf::collection_name(), options)
72        } else {
73            db.collection(M::CollConf::collection_name())
74        };
75
76        Self { db, coll }
77    }
78
79    /// Create a new repository with associated collection options (override `Model::coll_options`).
80    pub fn new_with_options(db: mongodb::Database, options: CollectionOptions) -> Self {
81        let coll = db.collection_with_options(M::CollConf::collection_name(), options);
82        Self { db, coll }
83    }
84
85    /// Returns associated `M::collection_name`.
86    pub fn collection_name(&self) -> &'static str {
87        M::CollConf::collection_name()
88    }
89
90    /// Returns underlying `mongodb::Collection`.
91    pub fn get_underlying(&self) -> mongodb::Collection<M> {
92        self.coll.clone_with_type()
93    }
94
95    /// Convert this repository to use another `Model`. Only compiles if both `Model::CollConf` are identicals.
96    ///
97    /// # Example
98    ///
99    /// ```ignore
100    /// # async fn demo() -> Result<(), mongodb::error::Error> {
101    /// # use mongodm::mongo::{Client, options::ClientOptions};
102    /// # let client_options = ClientOptions::parse("mongodb://localhost:27017").await?;
103    /// # let client = Client::with_options(client_options)?;
104    /// # let db = client.database("mongodm_wayk_demo");
105    /// use mongodm::{ToRepository, Model, CollectionConfig};
106    /// use mongodm::mongo::bson::doc;
107    /// use mongodm::f;
108    /// use serde::{Serialize, Deserialize};
109    ///
110    /// struct UserCollConf;
111    ///
112    /// impl CollectionConfig for UserCollConf {
113    ///     fn collection_name() -> &'static str {
114    ///         "cast_model"
115    ///     }
116    /// }
117    ///
118    /// // Latest schema currently in use
119    /// #[derive(Serialize, Deserialize)]
120    /// struct User {
121    ///     username: String,
122    ///     last_seen: i64,
123    /// }
124    ///
125    /// impl Model for User {
126    ///     type CollConf = UserCollConf;
127    /// }
128    ///
129    /// // Old schema
130    /// #[derive(Serialize, Deserialize)]
131    /// struct UserV1 {
132    ///     name: String,
133    ///     ls: i64,
134    /// }
135    ///
136    /// // Versionned version of our `User`
137    /// #[derive(Serialize, Deserialize)]
138    /// #[serde(untagged)]
139    /// enum UserVersionned {
140    ///     Last(User),
141    ///     V1(UserV1),
142    /// }
143    ///
144    /// impl Model for UserVersionned {
145    ///     type CollConf = UserCollConf; // same as the non-versionned version
146    /// }
147    ///
148    /// // We have some repository for `User`
149    /// let repo = db.repository::<User>();
150    ///
151    /// # let coll = repo.get_underlying();
152    /// # coll.drop(None).await?;
153    /// # coll.insert_one(doc!{ f!(name in UserV1): "Bernard", f!(ls in UserV1): 1500 }, None).await?;
154    /// // Assume the following document is stored: { "name": "Bernard", "ls": 1500 }
155    ///
156    /// // Following query should fails because the schema doesn't match
157    /// let err = repo.find_one(doc!{ f!(name in UserV1): "Bernard" }, None).await.err().unwrap();
158    /// assert_eq!(err.to_string(), "missing field `username`"); // serde deserialization error
159    ///
160    /// // We can get a repository for `UserVersionned` from our `Repository<User>`
161    /// // because `User::CollConf` == `UserVersionned::CollConf`
162    /// let repo_versionned = repo.cast_model::<UserVersionned>();
163    ///
164    /// // Our versionned model should match with the document
165    /// let ret = repo_versionned.find_one(doc!{ f!(name in UserV1): "Bernard" }, None).await?;
166    /// match ret {
167    ///     Some(UserVersionned::V1(UserV1 { name, ls: 1500 })) if name == "Bernard" => { /* success */ }
168    ///     _ => panic!("Expected document was missing"),
169    /// }
170    ///
171    /// # Ok(())
172    /// # }
173    /// # let mut rt = tokio::runtime::Runtime::new().unwrap();
174    /// # rt.block_on(demo());
175    /// ```
176    ///
177    /// Following code will fail to compile because `CollectionConfig` doesn't match.
178    ///
179    /// ```compile_fail
180    /// # async fn demo() -> Result<(), mongodb::error::Error> {
181    /// # use mongodm::mongo::{Client, options::ClientOptions};
182    /// # let client_options = ClientOptions::parse("mongodb://localhost:27017").await?;
183    /// # let client = Client::with_options(client_options)?;
184    /// # let db = client.database("mongodm_wayk_demo");
185    /// use mongodm::{ToRepository, Model, CollectionConfig};
186    /// use serde::{Serialize, Deserialize};
187    ///
188    /// struct ACollConf;
189    ///
190    /// impl CollectionConfig for ACollConf {
191    ///     fn collection_name() -> &'static str { "a" }
192    /// }
193    ///
194    /// #[derive(Serialize, Deserialize)]
195    /// struct A;
196    ///
197    /// impl Model for A {
198    ///     type CollConf = ACollConf;
199    /// }
200    ///
201    /// struct BCollConf;
202    ///
203    /// impl CollectionConfig for BCollConf {
204    ///     fn collection_name() -> &'static str { "B" }
205    /// }
206    ///
207    /// #[derive(Serialize, Deserialize)]
208    /// struct B;
209    ///
210    /// impl Model for B {
211    ///     type CollConf = BCollConf;
212    /// }
213    ///
214    /// // Doesn't compile because `A` and `B` doesn't share the same `CollectionConfig`.
215    /// db.repository::<A>().cast_model::<B>();
216    /// # Ok(())
217    /// # }
218    /// # let mut rt = tokio::runtime::Runtime::new().unwrap();
219    /// # rt.block_on(demo());
220    /// ```
221    pub fn cast_model<OtherModel>(self) -> Repository<OtherModel>
222    where
223        OtherModel: Model<CollConf = M::CollConf>,
224    {
225        Repository {
226            db: self.db,
227            coll: self.coll.clone_with_type(),
228        }
229    }
230
231    /// Apply multiple update operations in bulk.
232    ///
233    /// This will be removed once support for bulk update is added to the official driver.
234    /// [see](https://jira.mongodb.org/browse/RUST-531) for tracking progress on this feature in the official driver.
235    ///
236    /// # Example
237    ///
238    /// ```no_run
239    /// # use serde::{Serialize, Deserialize};
240    /// # #[derive(Serialize, Deserialize)]
241    /// # struct User {
242    /// #     name: String,
243    /// #     age: i64,
244    /// # }
245    /// # impl Model for User {
246    /// #     type CollConf = UserCollConf;
247    /// # }
248    /// # struct UserCollConf;
249    /// # impl CollectionConfig for UserCollConf {
250    /// #     fn collection_name() -> &'static str { "user" }
251    /// # }
252    /// use mongodm::prelude::*;
253    /// /* ... */
254    /// # async fn demo(_db: mongodb::Database) {
255    /// let db: mongodb::Database; /* exists */
256    /// # db = _db;
257    /// let repository = db.repository::<User>();
258    /// /* ... */
259    /// let bulk_update_res = repository
260    ///     .bulk_update(&vec![
261    ///         &BulkUpdate {
262    ///             query: doc! { f!(name in User): "Dane" },
263    ///             update: doc! { Set: { f!(age in User): 12 } },
264    ///             options: None,
265    ///         },
266    ///         &BulkUpdate {
267    ///             query: doc! { f!(name in User): "David" },
268    ///             update: doc! { Set: { f!(age in User): 30 } },
269    ///             options: None,
270    ///         },
271    ///     ])
272    ///     .await
273    ///     .unwrap();
274    /// assert_eq!(bulk_update_res.nb_affected, 2);
275    /// assert_eq!(bulk_update_res.nb_modified, 2);
276    /// # }
277    /// ```
278    pub async fn bulk_update<V, U>(&self, updates: V) -> Result<BulkUpdateResult>
279    where
280        V: Borrow<Vec<U>> + Send + Sync,
281        U: Borrow<BulkUpdate> + Send + Sync,
282    {
283        self.coll.bulk_update(&self.db, updates).await
284    }
285}
286
287/// MongODM-provided utilities functions on `mongodb::Collection<M>`.
288#[async_trait]
289pub trait CollectionExt {
290    /// Apply multiple update operations in bulk.
291    ///
292    /// This will be removed once support for bulk update is added to the official driver.
293    /// [see](https://jira.mongodb.org/browse/RUST-531) for tracking progress on this feature in the official driver.
294    ///
295    /// # Example
296    ///
297    /// ```no_run
298    /// # use serde::{Serialize, Deserialize};
299    /// # #[derive(Serialize, Deserialize)]
300    /// # struct User {
301    /// #     name: String,
302    /// #     age: i64,
303    /// # }
304    /// use mongodm::prelude::*;
305    /// /* ... */
306    /// # async fn demo(_db: mongodb::Database) {
307    /// let db: mongodb::Database; /* exists */
308    /// # db = _db;
309    /// let collection = db.collection::<User>("user");
310    /// /* ... */
311    /// let bulk_update_res = collection
312    ///     .bulk_update(&db, &vec![
313    ///         &BulkUpdate {
314    ///             query: doc! { f!(name in User): "Dane" },
315    ///             update: doc! { Set: { f!(age in User): 12 } },
316    ///             options: None,
317    ///         },
318    ///         &BulkUpdate {
319    ///             query: doc! { f!(name in User): "David" },
320    ///             update: doc! { Set: { f!(age in User): 30 } },
321    ///             options: None,
322    ///         },
323    ///     ])
324    ///     .await
325    ///     .unwrap();
326    /// assert_eq!(bulk_update_res.nb_affected, 2);
327    /// assert_eq!(bulk_update_res.nb_modified, 2);
328    /// # }
329    /// ```
330    async fn bulk_update<V, U>(
331        &self,
332        db: &mongodb::Database,
333        updates: V,
334    ) -> Result<BulkUpdateResult>
335    where
336        V: 'async_trait + Send + Sync + Borrow<Vec<U>>,
337        U: 'async_trait + Send + Sync + Borrow<BulkUpdate>;
338}
339
340#[async_trait]
341impl<M: Send + Sync> CollectionExt for mongodb::Collection<M> {
342    async fn bulk_update<V, U>(
343        &self,
344        db: &mongodb::Database,
345        updates: V,
346    ) -> Result<BulkUpdateResult>
347    where
348        V: 'async_trait + Send + Sync + Borrow<Vec<U>>,
349        U: 'async_trait + Send + Sync + Borrow<BulkUpdate>,
350    {
351        let updates = updates.borrow();
352        let mut update_docs = Vec::with_capacity(updates.len());
353        for u in updates {
354            let u = u.borrow();
355            let mut doc = doc! {
356                "q": &u.query,
357                "u": &u.update,
358                "multi": false,
359            };
360            if let Some(options) = &u.options {
361                if let Some(ref upsert) = options.upsert {
362                    doc.insert("upsert", upsert);
363                }
364                if let Some(ref collation) = options.collation {
365                    doc.insert("collation", to_bson(collation)?);
366                }
367                if let Some(ref array_filters) = options.array_filters {
368                    doc.insert("arrayFilters", array_filters);
369                }
370                if let Some(ref hint) = options.hint {
371                    doc.insert("hint", to_bson(hint)?);
372                }
373            }
374            update_docs.push(doc);
375        }
376        let mut command = doc! {
377            "update": self.name(),
378            "updates": update_docs,
379        };
380        if let Some(ref write_concern) = self.write_concern() {
381            command.insert("writeConcern", to_bson(write_concern)?);
382        }
383        let res = db.run_command(command).await?;
384        Ok(from_document(res)?)
385    }
386}