mongodm/repository.rs
1//! Repositories are abstraction over a specific mongo collection for a given `Model`
2
3use crate::{CollectionConfig, Model};
4use async_trait::async_trait;
5use mongodb::bson::oid::ObjectId;
6use mongodb::bson::{doc, from_document, to_bson, Document};
7use mongodb::error::Result;
8use mongodb::options::*;
9use serde::Deserialize;
10use std::borrow::Borrow;
11use std::ops::Deref;
12
13/// Represents an individual update operation for the `bulk_update` function.
14#[derive(Debug)]
15pub struct BulkUpdate {
16 pub query: Document,
17 pub update: Document,
18 pub options: Option<UpdateOptions>,
19}
20
21/// Result of a `bulk_update` operation.
22#[derive(Debug, Deserialize)]
23pub struct BulkUpdateResult {
24 #[serde(rename = "n")]
25 pub nb_affected: u64,
26 #[serde(rename = "nModified")]
27 pub nb_modified: u64,
28 #[serde(default)]
29 pub upserted: Vec<BulkUpdateUpsertResult>,
30}
31
32/// Individual update result of a `bulk_update` operation.
33/// Contains the generated id in case of an upsert.
34#[derive(Debug, Deserialize)]
35pub struct BulkUpdateUpsertResult {
36 pub index: u64,
37 #[serde(alias = "_id")]
38 pub id: ObjectId,
39}
40
41/// Associate a `mongodb::Collection` and a specific `Model`.
42///
43/// This type can safely be copied and passed around because `std::sync::Arc` is used internally.
44/// Underlying `mongodb::Collection` can be retrieved at anytime with `Repository::get_underlying`.
45#[derive(Debug)]
46pub struct Repository<M: Model> {
47 db: mongodb::Database, // FIXME: temporary keep reference to database object for `bulk_update` operation
48 coll: mongodb::Collection<M>,
49}
50
51impl<M: Model> Deref for Repository<M> {
52 type Target = mongodb::Collection<M>;
53 fn deref(&self) -> &mongodb::Collection<M> {
54 &self.coll
55 }
56}
57
58impl<M: Model> Clone for Repository<M> {
59 fn clone(&self) -> Self {
60 Self {
61 db: self.db.clone(),
62 coll: self.coll.clone_with_type(),
63 }
64 }
65}
66
67impl<M: Model> Repository<M> {
68 /// Create a new repository from the given mongo client.
69 pub fn new(db: mongodb::Database) -> Self {
70 let coll = if let Some(options) = M::CollConf::collection_options() {
71 db.collection_with_options(M::CollConf::collection_name(), options)
72 } else {
73 db.collection(M::CollConf::collection_name())
74 };
75
76 Self { db, coll }
77 }
78
79 /// Create a new repository with associated collection options (override `Model::coll_options`).
80 pub fn new_with_options(db: mongodb::Database, options: CollectionOptions) -> Self {
81 let coll = db.collection_with_options(M::CollConf::collection_name(), options);
82 Self { db, coll }
83 }
84
85 /// Returns associated `M::collection_name`.
86 pub fn collection_name(&self) -> &'static str {
87 M::CollConf::collection_name()
88 }
89
90 /// Returns underlying `mongodb::Collection`.
91 pub fn get_underlying(&self) -> mongodb::Collection<M> {
92 self.coll.clone_with_type()
93 }
94
95 /// Convert this repository to use another `Model`. Only compiles if both `Model::CollConf` are identicals.
96 ///
97 /// # Example
98 ///
99 /// ```ignore
100 /// # async fn demo() -> Result<(), mongodb::error::Error> {
101 /// # use mongodm::mongo::{Client, options::ClientOptions};
102 /// # let client_options = ClientOptions::parse("mongodb://localhost:27017").await?;
103 /// # let client = Client::with_options(client_options)?;
104 /// # let db = client.database("mongodm_wayk_demo");
105 /// use mongodm::{ToRepository, Model, CollectionConfig};
106 /// use mongodm::mongo::bson::doc;
107 /// use mongodm::f;
108 /// use serde::{Serialize, Deserialize};
109 ///
110 /// struct UserCollConf;
111 ///
112 /// impl CollectionConfig for UserCollConf {
113 /// fn collection_name() -> &'static str {
114 /// "cast_model"
115 /// }
116 /// }
117 ///
118 /// // Latest schema currently in use
119 /// #[derive(Serialize, Deserialize)]
120 /// struct User {
121 /// username: String,
122 /// last_seen: i64,
123 /// }
124 ///
125 /// impl Model for User {
126 /// type CollConf = UserCollConf;
127 /// }
128 ///
129 /// // Old schema
130 /// #[derive(Serialize, Deserialize)]
131 /// struct UserV1 {
132 /// name: String,
133 /// ls: i64,
134 /// }
135 ///
136 /// // Versionned version of our `User`
137 /// #[derive(Serialize, Deserialize)]
138 /// #[serde(untagged)]
139 /// enum UserVersionned {
140 /// Last(User),
141 /// V1(UserV1),
142 /// }
143 ///
144 /// impl Model for UserVersionned {
145 /// type CollConf = UserCollConf; // same as the non-versionned version
146 /// }
147 ///
148 /// // We have some repository for `User`
149 /// let repo = db.repository::<User>();
150 ///
151 /// # let coll = repo.get_underlying();
152 /// # coll.drop(None).await?;
153 /// # coll.insert_one(doc!{ f!(name in UserV1): "Bernard", f!(ls in UserV1): 1500 }, None).await?;
154 /// // Assume the following document is stored: { "name": "Bernard", "ls": 1500 }
155 ///
156 /// // Following query should fails because the schema doesn't match
157 /// let err = repo.find_one(doc!{ f!(name in UserV1): "Bernard" }, None).await.err().unwrap();
158 /// assert_eq!(err.to_string(), "missing field `username`"); // serde deserialization error
159 ///
160 /// // We can get a repository for `UserVersionned` from our `Repository<User>`
161 /// // because `User::CollConf` == `UserVersionned::CollConf`
162 /// let repo_versionned = repo.cast_model::<UserVersionned>();
163 ///
164 /// // Our versionned model should match with the document
165 /// let ret = repo_versionned.find_one(doc!{ f!(name in UserV1): "Bernard" }, None).await?;
166 /// match ret {
167 /// Some(UserVersionned::V1(UserV1 { name, ls: 1500 })) if name == "Bernard" => { /* success */ }
168 /// _ => panic!("Expected document was missing"),
169 /// }
170 ///
171 /// # Ok(())
172 /// # }
173 /// # let mut rt = tokio::runtime::Runtime::new().unwrap();
174 /// # rt.block_on(demo());
175 /// ```
176 ///
177 /// Following code will fail to compile because `CollectionConfig` doesn't match.
178 ///
179 /// ```compile_fail
180 /// # async fn demo() -> Result<(), mongodb::error::Error> {
181 /// # use mongodm::mongo::{Client, options::ClientOptions};
182 /// # let client_options = ClientOptions::parse("mongodb://localhost:27017").await?;
183 /// # let client = Client::with_options(client_options)?;
184 /// # let db = client.database("mongodm_wayk_demo");
185 /// use mongodm::{ToRepository, Model, CollectionConfig};
186 /// use serde::{Serialize, Deserialize};
187 ///
188 /// struct ACollConf;
189 ///
190 /// impl CollectionConfig for ACollConf {
191 /// fn collection_name() -> &'static str { "a" }
192 /// }
193 ///
194 /// #[derive(Serialize, Deserialize)]
195 /// struct A;
196 ///
197 /// impl Model for A {
198 /// type CollConf = ACollConf;
199 /// }
200 ///
201 /// struct BCollConf;
202 ///
203 /// impl CollectionConfig for BCollConf {
204 /// fn collection_name() -> &'static str { "B" }
205 /// }
206 ///
207 /// #[derive(Serialize, Deserialize)]
208 /// struct B;
209 ///
210 /// impl Model for B {
211 /// type CollConf = BCollConf;
212 /// }
213 ///
214 /// // Doesn't compile because `A` and `B` doesn't share the same `CollectionConfig`.
215 /// db.repository::<A>().cast_model::<B>();
216 /// # Ok(())
217 /// # }
218 /// # let mut rt = tokio::runtime::Runtime::new().unwrap();
219 /// # rt.block_on(demo());
220 /// ```
221 pub fn cast_model<OtherModel>(self) -> Repository<OtherModel>
222 where
223 OtherModel: Model<CollConf = M::CollConf>,
224 {
225 Repository {
226 db: self.db,
227 coll: self.coll.clone_with_type(),
228 }
229 }
230
231 /// Apply multiple update operations in bulk.
232 ///
233 /// This will be removed once support for bulk update is added to the official driver.
234 /// [see](https://jira.mongodb.org/browse/RUST-531) for tracking progress on this feature in the official driver.
235 ///
236 /// # Example
237 ///
238 /// ```no_run
239 /// # use serde::{Serialize, Deserialize};
240 /// # #[derive(Serialize, Deserialize)]
241 /// # struct User {
242 /// # name: String,
243 /// # age: i64,
244 /// # }
245 /// # impl Model for User {
246 /// # type CollConf = UserCollConf;
247 /// # }
248 /// # struct UserCollConf;
249 /// # impl CollectionConfig for UserCollConf {
250 /// # fn collection_name() -> &'static str { "user" }
251 /// # }
252 /// use mongodm::prelude::*;
253 /// /* ... */
254 /// # async fn demo(_db: mongodb::Database) {
255 /// let db: mongodb::Database; /* exists */
256 /// # db = _db;
257 /// let repository = db.repository::<User>();
258 /// /* ... */
259 /// let bulk_update_res = repository
260 /// .bulk_update(&vec![
261 /// &BulkUpdate {
262 /// query: doc! { f!(name in User): "Dane" },
263 /// update: doc! { Set: { f!(age in User): 12 } },
264 /// options: None,
265 /// },
266 /// &BulkUpdate {
267 /// query: doc! { f!(name in User): "David" },
268 /// update: doc! { Set: { f!(age in User): 30 } },
269 /// options: None,
270 /// },
271 /// ])
272 /// .await
273 /// .unwrap();
274 /// assert_eq!(bulk_update_res.nb_affected, 2);
275 /// assert_eq!(bulk_update_res.nb_modified, 2);
276 /// # }
277 /// ```
278 pub async fn bulk_update<V, U>(&self, updates: V) -> Result<BulkUpdateResult>
279 where
280 V: Borrow<Vec<U>> + Send + Sync,
281 U: Borrow<BulkUpdate> + Send + Sync,
282 {
283 self.coll.bulk_update(&self.db, updates).await
284 }
285}
286
287/// MongODM-provided utilities functions on `mongodb::Collection<M>`.
288#[async_trait]
289pub trait CollectionExt {
290 /// Apply multiple update operations in bulk.
291 ///
292 /// This will be removed once support for bulk update is added to the official driver.
293 /// [see](https://jira.mongodb.org/browse/RUST-531) for tracking progress on this feature in the official driver.
294 ///
295 /// # Example
296 ///
297 /// ```no_run
298 /// # use serde::{Serialize, Deserialize};
299 /// # #[derive(Serialize, Deserialize)]
300 /// # struct User {
301 /// # name: String,
302 /// # age: i64,
303 /// # }
304 /// use mongodm::prelude::*;
305 /// /* ... */
306 /// # async fn demo(_db: mongodb::Database) {
307 /// let db: mongodb::Database; /* exists */
308 /// # db = _db;
309 /// let collection = db.collection::<User>("user");
310 /// /* ... */
311 /// let bulk_update_res = collection
312 /// .bulk_update(&db, &vec![
313 /// &BulkUpdate {
314 /// query: doc! { f!(name in User): "Dane" },
315 /// update: doc! { Set: { f!(age in User): 12 } },
316 /// options: None,
317 /// },
318 /// &BulkUpdate {
319 /// query: doc! { f!(name in User): "David" },
320 /// update: doc! { Set: { f!(age in User): 30 } },
321 /// options: None,
322 /// },
323 /// ])
324 /// .await
325 /// .unwrap();
326 /// assert_eq!(bulk_update_res.nb_affected, 2);
327 /// assert_eq!(bulk_update_res.nb_modified, 2);
328 /// # }
329 /// ```
330 async fn bulk_update<V, U>(
331 &self,
332 db: &mongodb::Database,
333 updates: V,
334 ) -> Result<BulkUpdateResult>
335 where
336 V: 'async_trait + Send + Sync + Borrow<Vec<U>>,
337 U: 'async_trait + Send + Sync + Borrow<BulkUpdate>;
338}
339
340#[async_trait]
341impl<M: Send + Sync> CollectionExt for mongodb::Collection<M> {
342 async fn bulk_update<V, U>(
343 &self,
344 db: &mongodb::Database,
345 updates: V,
346 ) -> Result<BulkUpdateResult>
347 where
348 V: 'async_trait + Send + Sync + Borrow<Vec<U>>,
349 U: 'async_trait + Send + Sync + Borrow<BulkUpdate>,
350 {
351 let updates = updates.borrow();
352 let mut update_docs = Vec::with_capacity(updates.len());
353 for u in updates {
354 let u = u.borrow();
355 let mut doc = doc! {
356 "q": &u.query,
357 "u": &u.update,
358 "multi": false,
359 };
360 if let Some(options) = &u.options {
361 if let Some(ref upsert) = options.upsert {
362 doc.insert("upsert", upsert);
363 }
364 if let Some(ref collation) = options.collation {
365 doc.insert("collation", to_bson(collation)?);
366 }
367 if let Some(ref array_filters) = options.array_filters {
368 doc.insert("arrayFilters", array_filters);
369 }
370 if let Some(ref hint) = options.hint {
371 doc.insert("hint", to_bson(hint)?);
372 }
373 }
374 update_docs.push(doc);
375 }
376 let mut command = doc! {
377 "update": self.name(),
378 "updates": update_docs,
379 };
380 if let Some(ref write_concern) = self.write_concern() {
381 command.insert("writeConcern", to_bson(write_concern)?);
382 }
383 let res = db.run_command(command).await?;
384 Ok(from_document(res)?)
385 }
386}