mongodb_ro/
model.rs

1use crate::column::ColumnAttr;
2use crate::event::Boot;
3use crate::query_builder::QueryBuilder;
4use futures_util::StreamExt;
5use log::error;
6use mongodb::action::{EstimatedDocumentCount, Find};
7use mongodb::bson::{doc, to_document, Document};
8use mongodb::bson::{Bson, DateTime};
9use mongodb::error::{Error, Result};
10use mongodb::options::{CountOptions, IndexOptions};
11use mongodb::results::{InsertManyResult, InsertOneResult};
12use mongodb::{bson, ClientSession, Collection, Cursor, Database, IndexModel, SessionCursor};
13use serde::de::DeserializeOwned;
14use serde::Serialize;
15use std::collections::HashMap;
16use std::fmt::Debug;
17use std::ops::{Deref, DerefMut};
18use std::sync::Arc;
19
20pub type MongodbResult<T> = Result<T>;
21
22#[derive(Debug, Clone, Serialize)]
23pub struct Model<'a, M>
24where
25    M: Boot,
26{
27    inner: Box<M>,
28    #[serde(skip_serializing)]
29    req: Option<M::Req>,
30    #[serde(skip)]
31    db: Database,
32    #[serde(skip)]
33    collection_name: &'a str,
34    #[serde(skip)]
35    add_times: bool,
36    #[serde(skip)]
37    columns: HashMap<&'a str, ColumnAttr>,
38    #[serde(skip)]
39    query_builder: QueryBuilder,
40}
41
42impl<'a, T: 'a + Boot> Deref for Model<'a, T> {
43    type Target = T;
44
45    fn deref(&self) -> &Self::Target {
46        &self.inner
47    }
48}
49
50impl<'a, T: 'a + Boot> DerefMut for Model<'a, T> {
51    fn deref_mut(&mut self) -> &mut Self::Target {
52        &mut self.inner
53    }
54}
55
56impl<'a, M> Model<'a, M>
57where
58    M: Boot,
59    M: Default,
60    M: Serialize,
61    M: DeserializeOwned,
62    M: Send,
63    M: Sync,
64    M: Unpin,
65{
66    pub fn new(
67        db: &Database,
68        collection_name: &'a str,
69        columns: &'a str,
70        add_times: bool,
71    ) -> Model<'a, M> {
72        let columns = serde_json::from_str(columns).unwrap();
73
74        let model = Model {
75            inner: Box::<M>::default(),
76            req: None,
77            db: db.clone(),
78            collection_name,
79            columns,
80            add_times,
81            query_builder: Default::default(),
82        };
83
84        model
85    }
86
87    /// Set Request to model
88    pub fn set_request(mut self, req: M::Req) -> Model<'a, M> {
89        self.req = Some(req);
90        self
91    }
92
93    /// add lazy column to model
94    pub fn add_columns(&mut self, names: Vec<&'a str>) {
95        for name in names {
96            self.columns.insert(
97                name,
98                ColumnAttr {
99                    asc: false,
100                    desc: false,
101                    unique: false,
102                    sphere2d: false,
103                    text: None,
104                    hidden: false,
105                    name: Some(name.to_string()),
106                },
107            );
108        }
109    }
110
111    /// Gets the collection name
112    pub fn collection_name(&self) -> &'a str {
113        self.collection_name
114    }
115
116    /// Gets a handle to the MongoDB collection
117    pub fn collection(&self) -> Collection<M> {
118        self.db.collection::<M>(self.collection_name)
119    }
120    /// Changes the collection name for this model
121    pub fn set_collection(mut self, name: &'a str) -> Model<'a, M> {
122        self.collection_name = name;
123        self
124    }
125
126    /// Registers indexes based on column attributes
127    ///
128    /// This will:
129    /// 1. Check existing indexes
130    /// 2. Remove indexes for fields that no longer exist in the model
131    /// 3. Create new indexes for fields marked as indexes in column attributes
132    pub async fn register_indexes(&self) {
133        let coll = self.db.collection::<M>(self.collection_name);
134        let previous_indexes = coll.list_indexes().await;
135        let mut attrs = vec![];
136        for (name, attr) in &self.columns {
137            if attr.is_index() {
138                attrs.push(name)
139            }
140        }
141
142        let mut keys_to_remove = Vec::new();
143        if previous_indexes.is_ok() {
144            let foreach_future = previous_indexes.unwrap().for_each(|pr| {
145                match pr {
146                    Ok(index_model) => {
147                        index_model.keys.iter().for_each(|key| {
148                            if key.0 != "_id" {
149                                if let Some(pos) = attrs.iter().position(|k| k == &key.0) {
150                                    // means attribute exists in struct and database and not need to create it
151                                    attrs.remove(pos);
152                                } else if let Some(rw) = &index_model.options {
153                                    // means the attribute must remove because not exists in struct
154                                    match rw.default_language {
155                                        None => keys_to_remove.push(rw.name.clone()),
156                                        Some(_) => match &rw.name {
157                                            None => keys_to_remove.push(rw.name.clone()),
158                                            Some(name) => {
159                                                if let Some(pos) =
160                                                    attrs.iter().position(|k| k == &name)
161                                                {
162                                                    attrs.remove(pos);
163                                                } else {
164                                                    keys_to_remove.push(rw.name.clone())
165                                                }
166                                            }
167                                        },
168                                    }
169                                }
170                            }
171                        });
172                    }
173                    Err(error) => {
174                        error!("Can't unpack index model {error}");
175                    }
176                }
177                futures::future::ready(())
178            });
179            foreach_future.await;
180        }
181
182        let attrs = attrs
183            .iter()
184            .map(|name| {
185                let key = name.to_string();
186                let attr = &self.columns.get(key.as_str()).unwrap();
187
188                if let Some(lang) = &attr.text {
189                    let opts = IndexOptions::builder()
190                        .unique(attr.unique)
191                        .name(key.clone())
192                        .default_language(lang.to_string())
193                        .build();
194                    IndexModel::builder()
195                        .keys(doc! {
196                            key : "text"
197                        })
198                        .options(opts)
199                        .build()
200                } else if attr.sphere2d {
201                    let opts = IndexOptions::builder().unique(attr.unique).build();
202                    IndexModel::builder()
203                        .keys(doc! { key: "2dsphere" })
204                        .options(opts)
205                        .build()
206                } else {
207                    let sort = if attr.desc { -1 } else { 1 };
208                    let opts = IndexOptions::builder().unique(attr.unique).build();
209
210                    IndexModel::builder()
211                        .keys(doc! {
212                            key : sort
213                        })
214                        .options(opts)
215                        .build()
216                }
217            })
218            .collect::<Vec<IndexModel>>();
219
220        for name in keys_to_remove {
221            let key = name.as_ref().unwrap();
222            let _ = coll.drop_index(key).await;
223        }
224        if !attrs.is_empty() {
225            let result = coll.create_indexes(attrs).await;
226            if let Err(error) = result {
227                error!("Can't create indexes : {:?}", error);
228            }
229        }
230    }
231
232    /// Reset all filters
233    pub fn reset(mut self) -> Model<'a, M> {
234        self.query_builder = Default::default();
235        self
236    }
237    /// Adds a filter condition to the query
238    pub fn r#where(mut self, data: Document) -> Model<'a, M> {
239        self.query_builder.r#where.push(data);
240        self
241    }
242    /// Sets the number of documents to skip
243    pub fn skip(mut self, count: u32) -> Model<'a, M> {
244        self.query_builder.skip = count;
245        self
246    }
247    /// Gets distinct values for a field
248    pub async fn distinct(&self, name: &str) -> Result<Vec<Bson>> {
249        let whr = &self.query_builder.r#where;
250        let filter = if whr.is_empty() {
251            doc! {}
252        } else {
253            doc! {"$and":whr}
254        };
255        let collection = self.db.collection::<Document>(self.collection_name);
256        collection.distinct(&name, filter).await
257    }
258    /// Sets the maximum number of documents to return
259    pub fn limit(mut self, count: u32) -> Model<'a, M> {
260        self.query_builder.limit = count;
261        self
262    }
263    /// The number of documents the server should return per cursor batch.
264    pub fn batch_size(mut self, value: u32) -> Model<'a, M> {
265        self.query_builder.batch_size = value;
266        self
267    }
268    /// Sets the sort order
269    pub fn sort(mut self, data: Document) -> Model<'a, M> {
270        self.query_builder.sort = data;
271        self
272    }
273    /// Sets whether to affect all matching documents (for update/delete)
274    pub fn all(mut self) -> Model<'a, M> {
275        self.query_builder.all = true;
276        self
277    }
278    /// Sets the projection (field selection)
279    pub fn select(mut self, data: Document) -> Model<'a, M> {
280        self.query_builder.select = Some(data);
281        self
282    }
283    /// Sets which fields should be visible (overrides hidden fields)
284    pub fn visible(mut self, data: Vec<&str>) -> Model<'a, M> {
285        self.query_builder.visible_fields = data.iter().map(|a| a.to_string()).collect();
286        self
287    }
288    /// Sets whether to upsert on update
289    pub fn upsert(mut self) -> Model<'a, M> {
290        self.query_builder.upsert = true;
291        self
292    }
293
294    fn hidden_fields(&self) -> Vec<String> {
295        let mut r = vec![];
296        for (name, attr) in &self.columns {
297            if attr.hidden
298                && !self
299                    .query_builder
300                    .visible_fields
301                    .contains(&name.to_string())
302            {
303                r.push(name.to_string())
304            }
305        }
306        r
307    }
308    fn clear(&self, data: Document, hidden_fields: &Vec<String>) -> M {
309        let data = data;
310        let mut default = to_document(&M::default()).unwrap();
311        for (name, attr) in &self.columns {
312            if hidden_fields.contains(&name.to_string()) {
313                continue;
314            }
315            let rename = match attr.name.clone() {
316                None => name.to_string(),
317                Some(a) => a,
318            };
319            if data.contains_key(&rename) {
320                default.insert(name.to_string(), data.get(&rename).unwrap());
321            }
322        }
323
324        bson::from_document(default).unwrap()
325    }
326}
327
328impl<'a, M> Model<'a, M>
329where
330    M: Boot,
331    M: Default,
332    M: Serialize,
333{
334    /// this method takes the inner and gives you ownership of inner then
335    /// replace it with default value
336    pub fn take_inner(&mut self) -> M {
337        std::mem::take(&mut *self.inner)
338    }
339
340    pub fn inner_ref(&self) -> &M {
341        self.inner.as_ref()
342    }
343
344    pub fn inner_mut(&mut self) -> &mut M {
345        self.inner.as_mut()
346    }
347
348    pub fn inner_to_doc(&self) -> MongodbResult<Document> {
349        let mut re = to_document(&self.inner)?;
350        self.rename_field(&mut re, false);
351        Ok(re)
352    }
353
354    fn rename_field(&self, doc: &mut Document, is_opt: bool) {
355        for (name, attr) in &self.columns {
356            if let Some(a) = &attr.name {
357                if is_opt {
358                    for (_, d) in doc.iter_mut() {
359                        let i = d.as_document_mut().unwrap();
360                        match i.get(name) {
361                            None => {}
362                            Some(b) => {
363                                i.insert(a.clone(), b.clone());
364                                i.remove(name);
365                            }
366                        }
367                    }
368                } else {
369                    match doc.get(name) {
370                        None => {}
371                        Some(b) => {
372                            doc.insert(a.clone(), b.clone());
373                            doc.remove(name);
374                        }
375                    }
376                }
377            }
378        }
379    }
380
381    pub fn fill(mut self, inner: M) -> Model<'a, M> {
382        *self.inner = inner;
383        self
384    }
385}
386
387impl<'a, M> Model<'a, M>
388where
389    M: Boot,
390    M: Default,
391    M: Serialize,
392    M: DeserializeOwned,
393    M: Send,
394    M: Sync,
395    M: Unpin,
396{
397    /// Get Documents count with filters
398    pub async fn count_documents(self) -> Result<u64> {
399        let whr = &self.query_builder.r#where;
400        let collection = self.db.collection::<Document>(self.collection_name);
401        let filter = if whr.is_empty() {
402            doc! {}
403        } else {
404            doc! { "$and": whr }
405        };
406
407        let options = CountOptions::builder()
408            .skip(if self.query_builder.skip > 0 {
409                Some(self.query_builder.skip as u64)
410            } else {
411                None
412            })
413            .limit(if self.query_builder.limit > 0 {
414                Some(self.query_builder.limit as u64)
415            } else {
416                None
417            })
418            .build();
419
420        collection
421            .count_documents(filter)
422            .with_options(options)
423            .await
424    }
425
426    /// Get Documents count with filters and session
427    pub async fn count_documents_with_session(self, session: &mut ClientSession) -> Result<u64> {
428        let whr = &self.query_builder.r#where;
429        let collection = self.db.collection::<Document>(self.collection_name);
430        let filter = if whr.is_empty() {
431            doc! {}
432        } else {
433            doc! { "$and": whr }
434        };
435
436        let options = CountOptions::builder()
437            .skip(if self.query_builder.skip > 0 {
438                Some(self.query_builder.skip as u64)
439            } else {
440                None
441            })
442            .limit(if self.query_builder.limit > 0 {
443                Some(self.query_builder.limit as u64)
444            } else {
445                None
446            })
447            .build();
448
449        collection
450            .count_documents(filter)
451            .with_options(options)
452            .session(session)
453            .await
454    }
455
456    fn add_times_to_data(&self, data: Document) -> Document {
457        let mut data = data;
458        if data.get_object_id("_id").is_err() {
459            data.remove("_id");
460        }
461        if self.add_times {
462            if !data.contains_key("updated_at") || !data.get_datetime("updated_at").is_ok() {
463                data.insert("updated_at", DateTime::now());
464            }
465            if !data.contains_key("created_at") || !data.get_datetime("created_at").is_ok() {
466                data.insert("created_at", DateTime::now());
467            }
468        }
469        data
470    }
471    /// Creates a new document in the collection
472    ///
473    /// # Notes
474    /// - Automatically adds timestamps if configured
475    pub async fn create(&self) -> Result<InsertOneResult> {
476        let mut data = self.add_times_to_data(self.inner_to_doc()?);
477
478        match self
479            .db
480            .collection(self.collection_name)
481            .insert_one(data.clone())
482            .await{
483            Ok(r) => {
484                data.insert("_id",r.inserted_id.clone());
485                self.finish(&self.req, "create", Document::new(), data, None)
486                    .await;
487                Ok(r)
488            }
489            Err(e) => {Err(e)}
490        }
491    }
492
493    /// Creates a new document in the collection wit session
494    ///
495    /// # Arguments
496    /// * `session` -  MongoDB transaction session
497    ///
498    /// # Notes
499    /// - Automatically adds timestamps if configured
500    pub async fn create_with_session(
501        &self,
502        session: &mut ClientSession,
503    ) -> Result<InsertOneResult> {
504        let mut data = self.add_times_to_data(self.inner_to_doc()?);
505        match self
506            .db
507            .collection(self.collection_name)
508            .insert_one(data.clone())
509            .session(&mut *session)
510            .await{
511            Ok(r) => {
512                data.insert("_id",r.inserted_id.clone());
513                self.finish(&self.req, "create", Document::new(), data, Some(session))
514                    .await;
515                Ok(r)
516            }
517            Err(e) => {Err(e)}
518        }
519    }
520
521    /// Creates a new document from raw BSON
522    pub async fn create_doc(&self, data: Document) -> Result<InsertOneResult> {
523        let mut data = self.add_times_to_data(data);
524
525        match self
526            .db
527            .collection(self.collection_name)
528            .insert_one(data.clone())
529            .await{
530            Ok(r) => {
531                data.insert("_id",r.inserted_id.clone());
532                self.finish(&self.req, "create", Document::new(), data, None)
533                    .await;
534                Ok(r)
535            }
536            Err(e) => {Err(e)}
537        }
538    }
539
540    /// Creates a new document from raw BSON with session
541    pub async fn create_doc_with_session(
542        &self,
543        data: Document,
544        session: &mut ClientSession,
545    ) -> Result<InsertOneResult> {
546        let mut data = self.add_times_to_data(data);
547
548        match self
549            .db
550            .collection(self.collection_name)
551            .insert_one(data.clone())
552            .session(&mut *session)
553            .await{
554            Ok(r) => {
555                data.insert("_id",r.inserted_id.clone());
556                self.finish(&self.req, "create", Document::new(), data, Some(session))
557                    .await;
558                Ok(r)
559            }
560            Err(e) => {Err(e)}
561        }
562    }
563
564    /// Creates many document from raw BSON
565    pub async fn create_many_doc(&self, data: Vec<Document>) -> Result<InsertManyResult> {
566        let mut d=vec![];
567        for item in data {
568            d.push(self.add_times_to_data(item));
569        }
570
571        match self
572            .db
573            .collection(self.collection_name)
574            .insert_many(d)
575            .await{
576            Ok(r) => {
577                let inserted_ids: HashMap<String, Bson> = r.inserted_ids.clone()
578                    .into_iter()
579                    .map(|(k, v)| (k.to_string(), v))
580                    .collect();
581                self.finish(&self.req, "create_many", Document::new(), doc! {"_ids": Bson::Document(Document::from_iter(inserted_ids))}, None)
582                    .await;
583                Ok(r)
584            }
585            Err(e) => {Err(e)}
586        }
587    }
588    /// Creates many document from raw BSON with session
589    pub async fn create_many_doc_with_session(&self, data: Vec<Document>,session: &mut ClientSession,) -> Result<InsertManyResult> {
590        let mut d=vec![];
591        for item in data {
592            d.push(self.add_times_to_data(item));
593        }
594
595        match self
596            .db
597            .collection(self.collection_name)
598            .insert_many(d)
599            .session(&mut *session)
600            .await{
601            Ok(r) => {
602                let inserted_ids: HashMap<String, Bson> = r.inserted_ids.clone()
603                    .into_iter()
604                    .map(|(k, v)| (k.to_string(), v))
605                    .collect();
606                self.finish(&self.req, "create_many", Document::new(),
607                            doc! {"_ids": Bson::Document(Document::from_iter(inserted_ids))},
608                            Some(session))
609                    .await;
610                Ok(r)
611            }
612            Err(e) => {Err(e)}
613        }
614    }
615    fn prepare_update(&self, data: Document) -> Result<(Document, Document)> {
616        let mut data = data;
617        let mut is_opt = false;
618        for (a, _) in data.iter() {
619            if a.starts_with("$") {
620                is_opt = true;
621            }
622        }
623
624        self.rename_field(&mut data, is_opt);
625        if !is_opt {
626            data = doc! {"$set":data};
627        }
628        if self.add_times {
629            if !data.contains_key("$set") {
630                data.insert("$set", doc! {});
631            }
632            let set = data.get_mut("$set").unwrap().as_document_mut().unwrap();
633            set.insert("updated_at", DateTime::now());
634        }
635
636        if self.query_builder.upsert {
637            if self.add_times {
638                if !data.contains_key("$setOnInsert") {
639                    data.insert("$setOnInsert", doc! {});
640                }
641                let set = data
642                    .get_mut("$setOnInsert")
643                    .unwrap()
644                    .as_document_mut()
645                    .unwrap();
646                set.insert("created_at", DateTime::now());
647            }
648        }
649        let whr = &self.query_builder.r#where;
650        if whr.is_empty() {
651            return Err(Error::from(std::io::Error::new(
652                std::io::ErrorKind::InvalidInput,
653                "where not set.",
654            )));
655        }
656        let filter = doc! {"$and":whr};
657        Ok((data, filter))
658    }
659    /// Updates documents in the collection
660    ///
661    /// # Arguments
662    /// * `data` - Update operations
663    ///
664    /// # Notes
665    /// - Automatically adds updated_at timestamp if configured
666    /// - Handles both single and multi-document updates based on `all()` setting
667    /// - Supports upsert if configured
668    pub async fn update(&self, data: Document) -> Result<Document> {
669        let (data, filter) = self.prepare_update(data)?;
670
671        let r = self.db.collection::<Document>(self.collection_name);
672
673        if self.query_builder.all {
674            let r = r
675                .update_many(filter, data.clone())
676                .upsert(self.query_builder.upsert)
677                .await;
678            match r {
679                Ok(old) => {
680                    let res = doc! {"modified_count":old.modified_count.to_string()};
681                    self.finish(&self.req, "update_many", res.clone(), data, None)
682                        .await;
683                    Ok(res)
684                }
685                Err(e) => Err(e),
686            }
687        } else {
688            let r = r
689                .find_one_and_update(filter, data.clone())
690                .upsert(self.query_builder.upsert)
691                .sort(self.query_builder.sort.clone())
692                .await;
693            match r {
694                Ok(old) => {
695                    let res = old.unwrap_or(Document::new());
696                    self.finish(&self.req, "update", res.clone(), data, None)
697                        .await;
698                    Ok(res)
699                }
700                Err(e) => Err(e),
701            }
702        }
703    }
704
705    /// Updates documents in the collection with session
706    ///
707    /// # Arguments
708    /// * `data` - Update operations
709    /// * `session` - MongoDB transaction session
710    ///
711    /// # Notes
712    /// - Automatically adds updated_at timestamp if configured
713    /// - Handles both single and multi-document updates based on `all()` setting
714    /// - Supports upsert if configured
715    pub async fn update_with_session(
716        &self,
717        data: Document,
718        session: &mut ClientSession,
719    ) -> Result<Document> {
720        let (data, filter) = self.prepare_update(data)?;
721
722        let r = self.db.collection::<Document>(self.collection_name);
723        if self.query_builder.all {
724            let r = r
725                .update_many(filter, data.clone())
726                .upsert(self.query_builder.upsert)
727                .session(&mut *session)
728                .await;
729            match r {
730                Ok(old) => {
731                    let res = doc! {"modified_count":old.modified_count.to_string()};
732                    self.finish(&self.req, "update_many", res.clone(), data, Some(session))
733                        .await;
734                    Ok(res)
735                }
736                Err(e) => Err(e),
737            }
738        } else {
739            let r = r
740                .find_one_and_update(filter, data.clone())
741                .upsert(self.query_builder.upsert)
742                .sort(self.query_builder.sort.clone())
743                .session(&mut *session)
744                .await;
745            match r {
746                Ok(old) => {
747                    let res = old.unwrap_or(Document::new());
748                    self.finish(&self.req, "update", res.clone(), data, Some(session))
749                        .await;
750                    Ok(res)
751                }
752                Err(e) => Err(e),
753            }
754        }
755    }
756
757    /// Deletes documents from the collection
758    ///
759    ///
760    /// # Notes
761    /// - Handles both single and multi-document deletes based on `all()` setting
762    pub async fn delete(&self) -> Result<Document> {
763        let whr = &self.query_builder.r#where;
764        if whr.is_empty() {
765            return Err(Error::from(std::io::Error::new(
766                std::io::ErrorKind::InvalidInput,
767                "where not set.",
768            )));
769        }
770        let filter = doc! {"$and":whr};
771
772        let r = self.db.collection::<Document>(self.collection_name);
773        if self.query_builder.all {
774            let r = r.delete_many(filter).await;
775            match r {
776                Ok(old) => {
777                    let res = doc! {"deleted_count":old.deleted_count.to_string()};
778                    self.finish(&self.req, "delete_many", res.clone(), doc! {}, None)
779                        .await;
780                    Ok(res)
781                }
782                Err(e) => Err(e),
783            }
784        } else {
785            let r = r
786                .find_one_and_delete(filter)
787                .sort(self.query_builder.sort.clone())
788                .await;
789            match r {
790                Ok(old) => {
791                    let res = old.unwrap_or(Document::new());
792                    self.finish(&self.req, "delete", res.clone(), doc! {}, None)
793                        .await;
794                    Ok(res)
795                }
796                Err(e) => Err(e),
797            }
798        }
799    }
800
801    /// Deletes documents from the collection with session
802    ///
803    /// # Arguments
804    /// * `session` - Optional MongoDB transaction session
805    ///
806    /// # Notes
807    /// - Handles both single and multi-document deletes based on `all()` setting
808    pub async fn delete_with_session(&self, session: &mut ClientSession) -> Result<Document> {
809        let whr = &self.query_builder.r#where;
810        if whr.is_empty() {
811            return Err(Error::from(std::io::Error::new(
812                std::io::ErrorKind::InvalidInput,
813                "where not set.",
814            )));
815        }
816        let filter = doc! {"$and":whr};
817
818        let r = self.db.collection::<Document>(self.collection_name);
819        if self.query_builder.all {
820            let r = r.delete_many(filter).session(&mut *session).await;
821            match r {
822                Ok(old) => {
823                    let res = doc! {"deleted_count":old.deleted_count.to_string()};
824                    self.finish(
825                        &self.req,
826                        "delete_many",
827                        res.clone(),
828                        doc! {},
829                        Some(session),
830                    )
831                    .await;
832                    Ok(res)
833                }
834                Err(e) => Err(e),
835            }
836        } else {
837            let r = r
838                .find_one_and_delete(filter)
839                .sort(self.query_builder.sort.clone())
840                .session(&mut *session)
841                .await;
842            match r {
843                Ok(old) => {
844                    let res = old.unwrap_or(Document::new());
845                    self.finish(&self.req, "delete", res.clone(), doc! {}, Some(session))
846                        .await;
847                    Ok(res)
848                }
849                Err(e) => Err(e),
850            }
851        }
852    }
853    fn prepare_get(&self) -> (Document, Vec<String>) {
854        let whr = &self.query_builder.r#where;
855        let filter = if whr.is_empty() {
856            doc! {}
857        } else {
858            doc! {"$and":whr}
859        };
860        let hidden_fields = self.hidden_fields();
861        (filter, hidden_fields)
862    }
863
864    fn prepare_find<'b>(&self, mut find: Find<'b, Document>) -> Find<'b, Document> {
865        find = find.sort(self.query_builder.sort.clone());
866
867        if self.query_builder.skip > 0 {
868            find = find.skip(self.query_builder.skip as u64);
869        }
870        if self.query_builder.limit > 0 {
871            find = find.limit(self.query_builder.limit as i64);
872        }
873        if self.query_builder.batch_size > 0 {
874            find = find.batch_size(self.query_builder.batch_size);
875        }
876        if let Some(select) = self.query_builder.select.clone() {
877            find = find.projection(select);
878        }
879        find
880    }
881
882    /// Queries documents from the collection
883    ///
884    ///
885    /// # Notes
886    /// - Respects skip/limit/sort/select settings
887    /// - Filters out hidden fields unless explicitly made visible
888    pub async fn get(&self) -> Result<Vec<M>> {
889        let (filter, hidden_fields) = self.prepare_get();
890        let collection = self.db.collection::<Document>(self.collection_name);
891        let mut find = collection.find(filter);
892        find = self.prepare_find(find);
893
894        let mut r = vec![];
895        let mut cursor = find.await?;
896        while let Some(d) = cursor.next().await {
897            r.push(self.clear(self.cast(d?, &self.req), &hidden_fields))
898        }
899        Ok(r)
900    }
901
902    /// Queries documents from the collection with session
903    ///
904    /// # Arguments
905    /// * `session` - Optional MongoDB transaction session
906    ///
907    /// # Notes
908    /// - Respects skip/limit/sort/select settings
909    /// - Filters out hidden fields unless explicitly made visible
910    pub async fn get_with_session(&self, session: &mut ClientSession) -> Result<Vec<M>> {
911        let (filter, hidden_fields) = self.prepare_get();
912        let collection = self.db.collection::<Document>(self.collection_name);
913        let mut find = collection.find(filter);
914        find = self.prepare_find(find);
915
916        let mut r = vec![];
917        let mut cursor = find.session(&mut *session).await?;
918        while let Some(d) = cursor.next(&mut *session).await {
919            r.push(self.clear(self.cast(d?, &self.req), &hidden_fields))
920        }
921        Ok(r)
922    }
923
924    /// Gets the first matching document
925    pub async fn first(&mut self) -> Result<Option<M>> {
926        self.query_builder.limit = 1;
927        let r = self.get().await?;
928        for item in r {
929            return Ok(Some(item));
930        }
931        Ok(None)
932    }
933    /// Gets the first matching document with session
934    pub async fn first_with_session(&mut self, session: &mut ClientSession) -> Result<Option<M>> {
935        self.query_builder.limit = 1;
936        let r = self.get_with_session(session).await?;
937        for item in r {
938            return Ok(Some(item));
939        }
940        Ok(None)
941    }
942
943    /// Runs an aggregation pipeline
944    pub async fn aggregate(
945        &mut self,
946        pipeline: impl IntoIterator<Item = Document>,
947    ) -> Result<Vec<M>> {
948        let collection = self.db.collection::<Document>(self.collection_name);
949        let res = collection.aggregate(pipeline);
950        let hidden_fields = self.hidden_fields();
951        let mut r = vec![];
952        let mut cursor = res.await?;
953        while let Some(d) = cursor.next().await {
954            r.push(self.clear(self.cast(d?, &self.req), &hidden_fields))
955        }
956        Ok(r)
957    }
958
959    /// Runs an aggregation pipeline with session
960    pub async fn aggregate_with_session(
961        &mut self,
962        pipeline: impl IntoIterator<Item = Document>,
963        session: &mut ClientSession,
964    ) -> Result<Vec<M>> {
965        let collection = self.db.collection::<Document>(self.collection_name);
966        let res = collection.aggregate(pipeline);
967        let hidden_fields = self.hidden_fields();
968        let mut r = vec![];
969        let mut cursor = res.session(&mut *session).await?;
970        while let Some(d) = cursor.next(&mut *session).await {
971            r.push(self.clear(self.cast(d?, &self.req), &hidden_fields))
972        }
973        Ok(r)
974    }
975
976    /// Queries documents from the collection
977    ///
978    ///
979    /// # Notes
980    /// - Respects skip/limit/sort/select settings
981    /// - Filters out hidden fields unless explicitly made visible
982    pub async fn get_doc(&self) -> Result<Vec<Document>> {
983        let (filter, _) = self.prepare_get();
984        let collection = self.db.collection::<Document>(self.collection_name);
985        let mut find = collection.find(filter);
986        find = self.prepare_find(find);
987
988        let mut r = vec![];
989        let mut cursor = find.await?;
990        while let Some(d) = cursor.next().await {
991            r.push(self.cast(d?, &self.req))
992        }
993        Ok(r)
994    }
995
996    /// Queries documents from the collection with session
997    ///
998    /// # Arguments
999    /// * `session` - Optional MongoDB transaction session
1000    ///
1001    /// # Notes
1002    /// - Respects skip/limit/sort/select settings
1003    /// - Filters out hidden fields unless explicitly made visible
1004    pub async fn get_doc_with_session(&self, session: &mut ClientSession) -> Result<Vec<Document>> {
1005        let (filter, _) = self.prepare_get();
1006        let collection = self.db.collection::<Document>(self.collection_name);
1007        let mut find = collection.find(filter);
1008        find = self.prepare_find(find);
1009
1010        let mut r = vec![];
1011        let mut cursor = find.session(&mut *session).await?;
1012        while let Some(d) = cursor.next(&mut *session).await {
1013            r.push(self.cast(d?, &self.req))
1014        }
1015        Ok(r)
1016    }
1017
1018    /// Gets the first matching document
1019    pub async fn first_doc(&mut self) -> Result<Option<Document>> {
1020        self.query_builder.limit = 1;
1021        let r = self.get_doc().await?;
1022        for item in r {
1023            return Ok(Some(item));
1024        }
1025        Ok(None)
1026    }
1027    /// Gets the first matching document with session
1028    pub async fn first_doc_with_session(
1029        &mut self,
1030        session: &mut ClientSession,
1031    ) -> Result<Option<Document>> {
1032        self.query_builder.limit = 1;
1033        let r = self.get_doc_with_session(session).await?;
1034        for item in r {
1035            return Ok(Some(item));
1036        }
1037        Ok(None)
1038    }
1039
1040    /// Runs an aggregation pipeline
1041    pub async fn aggregate_doc(
1042        &mut self,
1043        pipeline: impl IntoIterator<Item = Document>,
1044    ) -> Result<Vec<Document>> {
1045        let collection = self.db.collection::<Document>(self.collection_name);
1046        let res = collection.aggregate(pipeline);
1047        let mut r = vec![];
1048        let mut cursor = res.await?;
1049        while let Some(d) = cursor.next().await {
1050            r.push(self.cast(d?, &self.req))
1051        }
1052        Ok(r)
1053    }
1054
1055    /// Runs an aggregation pipeline with session
1056    pub async fn aggregate_doc_with_session(
1057        &mut self,
1058        pipeline: impl IntoIterator<Item = Document>,
1059        session: &mut ClientSession,
1060    ) -> Result<Vec<Document>> {
1061        let collection = self.db.collection::<Document>(self.collection_name);
1062        let res = collection.aggregate(pipeline);
1063        let mut r = vec![];
1064        let mut cursor = res.session(&mut *session).await?;
1065        while let Some(d) = cursor.next(&mut *session).await {
1066            r.push(self.cast(d?, &self.req))
1067        }
1068        Ok(r)
1069    }
1070
1071    /// Creates a cursor for iterating over documents in the collection.
1072    ///
1073    ///
1074    /// # Returns
1075    /// - `Result<Cursor<Document>>`: A MongoDB cursor that can be used to iterate over documents
1076    /// - Returns an error if the query execution fails
1077    ///
1078    /// # Example
1079    /// ```rust
1080    /// let cursor = User::new_model(db).cursor().await?;
1081    /// while let Some(doc) = cursor.next().await {
1082    ///     // process document
1083    /// }
1084    /// ```
1085    pub async fn cursor(&self) -> Result<Cursor<Document>> {
1086        let (filter, _) = self.prepare_get();
1087        let collection = self.db.collection::<Document>(self.collection_name);
1088        let mut find = collection.find(filter);
1089        find = self.prepare_find(find).no_cursor_timeout(true);
1090        let cursor = find.await?;
1091        Ok(cursor)
1092    }
1093    pub async fn cursor_with_session(
1094        &self,
1095        session: &mut ClientSession,
1096    ) -> Result<SessionCursor<Document>> {
1097        let (filter, _) = self.prepare_get();
1098        let collection = self.db.collection::<Document>(self.collection_name);
1099        let mut find = collection.find(filter);
1100        find = self.prepare_find(find).no_cursor_timeout(true);
1101        let cursor = find.session(session).await?;
1102        Ok(cursor)
1103    }
1104}