spark_orm/
model.rs

1#![allow(dead_code)]
2
3pub mod observer;
4pub mod util;
5
6use crate::futures::StreamExt;
7use crate::macros::{error, trace};
8use crate::model::observer::Observer;
9use crate::model::util::ModelTimestamps;
10use crate::Spark;
11use mongodb::bson::{doc, to_document, Document};
12use mongodb::error::Result;
13use mongodb::options::{
14    DeleteOptions, DropIndexOptions, FindOneOptions, FindOptions, InsertOneOptions,
15    ListIndexesOptions, UpdateOptions,
16};
17use mongodb::results::UpdateResult;
18use mongodb::{Collection, Cursor, Database, IndexModel};
19use serde::de::DeserializeOwned;
20use serde::Serialize;
21use std::fmt::Debug;
22use std::ops::{Deref, DerefMut};
23use std::sync::Arc;
24use std::time::Duration;
25
26// TODO: this must move to types module
27type Id = mongodb::bson::Bson;
28pub type MongodbResult<T> = Result<T>;
29
30#[derive(Debug, Serialize)]
31pub struct Model<'a, M> {
32    inner: Box<M>,
33    #[serde(skip)]
34    db: Arc<Database>,
35    #[serde(skip)]
36    collection_name: &'a str,
37    #[serde(skip)]
38    collection: Collection<M>,
39}
40
41impl<'a, T: 'a> Deref for Model<'a, T> {
42    type Target = T;
43
44    fn deref(&self) -> &Self::Target {
45        &self.inner
46    }
47}
48
49impl<'a, T: 'a> DerefMut for Model<'a, T> {
50    fn deref_mut(&mut self) -> &mut Self::Target {
51        &mut self.inner
52    }
53}
54
55impl<'a, M> Model<'a, M>
56where
57    M: Default,
58    M: Serialize,
59    M: DeserializeOwned,
60    M: Send,
61    M: Sync,
62    M: Unpin,
63    M: Debug,
64    M: ModelTimestamps,
65    M: Observer<M>,
66{
67    /// makes a model and stores the data and collection_name to creating collection object
68    /// to store data into it
69    ///
70    /// # Arguments
71    ///
72    /// * `db`: you cna pass None , in this way model created by global spark connection , or you can pass your own database
73    /// * `collection_name`:  it's collection name that we use in create collection object
74    ///
75    /// returns: Model<M>
76    ///
77    /// # Examples
78    ///
79    /// ```
80    /// struct User{
81    ///     name: String
82    /// }
83    /// let db = ...;
84    /// let user_model = Model::<User>::new(Arc::clone(db) , "users");
85    /// ```
86    pub fn new(db: Option<&Arc<Database>>, collection_name: &'a str) -> Model<'a, M> {
87        if let Some(database) = db {
88            let collection = database.collection::<M>(collection_name);
89            return Model {
90                inner: Box::<M>::default(),
91                db: database.clone(),
92                collection_name,
93                collection,
94            };
95        }
96        // it panics if it's not initialized before use
97        let database = Spark::get_db();
98        let collection = database.collection::<M>(collection_name);
99        Model {
100            inner: Box::<M>::default(),
101            db: database,
102            collection_name,
103            collection,
104        }
105    }
106
107    /// saves the change , if the inner has some _id then it's update the existing unless
108    /// it's create  new document
109    pub async fn save(
110        &mut self,
111        options: impl Into<Option<InsertOneOptions>>,
112    ) -> MongodbResult<Id> {
113        self.inner.updated_at();
114        let mut converted = to_document(&self.inner)?;
115        if let Some(id) = converted.get("_id") {
116            let owned_id = id.to_owned();
117            let upsert = self
118                .collection
119                .update_one(
120                    doc! {
121                        "_id" : id
122                    },
123                    doc! { "$set": &converted},
124                    None,
125                )
126                .await?;
127            if upsert.modified_count >= 1 {
128                // dispatch call
129                // this must be pinned to handle recursive async call
130                Box::pin(M::updated(self)).await?;
131
132                return Ok(owned_id);
133            };
134        }
135        converted.remove("_id");
136        self.inner.created_at();
137
138        let re = self.collection.insert_one(&*self.inner, options).await?;
139
140        // dispatch observer
141        // this must be pinned to handle recursive async call
142        Box::pin(M::created(self)).await?;
143
144        Ok(re.inserted_id)
145    }
146    pub async fn find_one(
147        &mut self,
148        doc: impl Into<Document>,
149        options: impl Into<Option<FindOneOptions>>,
150    ) -> MongodbResult<Option<&mut Self>> {
151        let result = self.collection.find_one(Some(doc.into()), options).await?;
152        match result {
153            Some(inner) => {
154                self.fill(inner);
155                Ok(Some(self))
156            }
157            None => Ok(None),
158        }
159    }
160
161    /// this is raw update , and you can pass document or your model
162    /// # Examples
163    /// ## with the raw doc
164    ///  ```
165    ///  let user_model = User::new_model(Some(&db));
166    ///     let updated = user_model.update(
167    ///         doc! {
168    ///             "name": "Hossein",
169    ///         },
170    ///         doc! {
171    ///            "$set": {
172    ///                 "name": "Hossein 33"
173    ///             }
174    ///         },
175    ///         None,
176    ///     ).await.unwrap();
177    /// ```
178    /// ## with the model
179    /// let user_model = User::new_model(Some(&db));
180    ///     let mut sample_user = User::default();
181    ///     sample_user.name = "Hossein 33".to_string();
182    ///     let updated = user_model.update(
183    ///         &sample_user,
184    ///        doc! {
185    ///            "$set": {
186    ///                "name": "Hossein 3355"
187    ///            }
188    ///        },
189    ///        None,
190    ///    ).await.unwrap();
191    ///
192    /// ## with_model_instance
193    ///     let mut user_model = User::new_model(Some(&db));
194    ///    user_model.name = "Hossein 3355".to_string();
195    ///    user_model.age = 58;
196    ///    let updated = user_model.update(
197    ///        &user_model,
198    ///        doc! {
199    ///            "$set": {
200    ///                "name": "Hossein 325"
201    ///            }
202    ///        },
203    ///        None,
204    ///    ).await.unwrap();
205    ///
206    /// NOTE : updated observer doesn't execute in this method
207    ///
208    pub async fn update(
209        &self,
210        query: impl Into<Document>,
211        doc: impl Into<Document>,
212        options: impl Into<Option<UpdateOptions>>,
213    ) -> MongodbResult<UpdateResult> {
214        self.collection
215            .update_one(query.into(), doc.into(), options)
216            .await
217    }
218
219    pub async fn find(
220        &self,
221        filter: impl Into<Document>,
222        options: impl Into<Option<FindOptions>>,
223    ) -> MongodbResult<Cursor<M>> {
224        self.collection.find(Some(filter.into()), options).await
225    }
226
227    pub async fn find_and_collect(
228        &self,
229        filter: impl Into<Document>,
230        options: impl Into<Option<FindOptions>>,
231    ) -> MongodbResult<Vec<MongodbResult<M>>> {
232        // TODO write this in other functions
233        let converted = filter.into();
234        let doc = if converted.is_empty() {
235            None
236        } else {
237            Some(converted)
238        };
239
240        let future = self.collection.find(doc, options).await?;
241        Ok(future.collect().await)
242    }
243
244    pub fn register_attributes(&self, attributes: Vec<&str>) {
245        let mut attrs = attributes
246            .iter()
247            .map(|attr| attr.to_string())
248            .collect::<Vec<String>>();
249        let max_time_to_drop = Some(Duration::from_secs(5));
250        let (tx, _) = tokio::sync::oneshot::channel();
251        let db = self.db.clone();
252        let coll_name = self.collection_name.to_owned();
253        trace!("Spawn task to register indexes");
254        let register_attrs = async move {
255            let coll = db.collection::<M>(&coll_name);
256            let previous_indexes = coll
257                .list_indexes(Some(
258                    ListIndexesOptions::builder()
259                        .max_time(max_time_to_drop)
260                        .build(),
261                ))
262                .await;
263
264            let mut keys_to_remove = Vec::new();
265
266            if previous_indexes.is_ok() {
267                let foreach_future = previous_indexes.unwrap().for_each(|pr| {
268                    match pr {
269                        Ok(index_model) => {
270                            index_model.keys.iter().for_each(|key| {
271                                if key.0 != "_id" {
272                                    if let Some(pos) = attrs.iter().position(|k| k == key.0) {
273                                        // means attribute exists in struct and database and not need to create it
274                                        attrs.remove(pos);
275                                    } else if let Some(rw) = &index_model.options {
276                                        // means the attribute must remove because not exists in struct
277                                        keys_to_remove.push(rw.name.clone())
278                                    }
279                                }
280                            });
281                        }
282                        Err(error) => {
283                            error!("Can't unpack index model {error}");
284                        }
285                    }
286                    futures::future::ready(())
287                });
288                foreach_future.await;
289            }
290            let attrs = attrs
291                .iter()
292                .map(|attr| {
293                    let key = attr.to_string();
294                    IndexModel::builder()
295                        .keys(doc! {
296                            key : 1
297                        })
298                        .build()
299                })
300                .collect::<Vec<IndexModel>>();
301
302            for name in keys_to_remove {
303                let key = name.as_ref().unwrap();
304                let _ = coll
305                    .drop_index(
306                        key,
307                        Some(
308                            DropIndexOptions::builder()
309                                .max_time(max_time_to_drop)
310                                .build(),
311                        ),
312                    )
313                    .await;
314            }
315            if !attrs.is_empty() {
316                let result = coll.create_indexes(attrs, None).await;
317                if let Err(error) = result {
318                    error!("Can't create indexes : {:?}", error);
319                }
320            }
321        };
322
323        let task = tokio::spawn(register_attrs);
324
325        let wait_for_complete = async move {
326            let _ = task.await;
327            let _ = tx.send(());
328        };
329
330        tokio::task::spawn(wait_for_complete);
331    }
332
333    pub async fn delete(
334        &mut self,
335        query: impl Into<Document>,
336        options: impl Into<Option<DeleteOptions>>,
337    ) -> MongodbResult<u64> {
338        let re = self
339            .collection
340            .delete_one(query.into(), options)
341            .await?
342            .deleted_count;
343
344        // dispatch observer
345        // this must be pinned to handle recursive async call
346        M::deleted(self).await?;
347
348        Ok(re)
349    }
350
351    pub fn fill(&mut self, inner: M) {
352        *self.inner = inner;
353    }
354}
355
356impl<'a, M> Model<'a, M>
357where
358    M: Default,
359    M: Serialize,
360{
361    /// this method takes the inner and gives you ownership of inner then
362    /// replace it with default value
363    pub fn take_inner(&mut self) -> M {
364        std::mem::take(&mut *self.inner)
365    }
366
367    pub fn inner_ref(&self) -> &M {
368        self.inner.as_ref()
369    }
370
371    pub fn inner_mut(&mut self) -> &mut M {
372        self.inner.as_mut()
373    }
374
375    pub fn inner_to_doc(&self) -> MongodbResult<Document> {
376        let re = to_document(&self.inner)?;
377        Ok(re)
378    }
379}
380
381// converts
382
383impl<'a, M> From<Model<'a, M>> for Document
384where
385    M: Serialize,
386{
387    fn from(value: Model<M>) -> Self {
388        mongodb::bson::to_document(&value.inner).unwrap()
389    }
390}
391
392impl<'a, M> From<&Model<'a, M>> for Document
393where
394    M: Serialize,
395{
396    fn from(value: &Model<'a, M>) -> Self {
397        mongodb::bson::to_document(&value.inner).unwrap()
398    }
399}
400
401impl<'a, M> From<&mut Model<'a, M>> for Document
402where
403    M: Serialize,
404{
405    fn from(value: &mut Model<'a, M>) -> Self {
406        mongodb::bson::to_document(&value.inner).unwrap()
407    }
408}