1use crate::column::ColumnAttr;
2use crate::event::Boot;
3use crate::query_builder::QueryBuilder;
4use futures_util::StreamExt;
5use log::error;
6use mongodb::action::{EstimatedDocumentCount, Find};
7use mongodb::bson::{doc, to_document, Document};
8use mongodb::bson::{Bson, DateTime};
9use mongodb::error::{Error, Result};
10use mongodb::options::{CountOptions, IndexOptions};
11use mongodb::results::{InsertManyResult, InsertOneResult};
12use mongodb::{bson, ClientSession, Collection, Cursor, Database, IndexModel, SessionCursor};
13use serde::de::DeserializeOwned;
14use serde::Serialize;
15use std::collections::HashMap;
16use std::fmt::Debug;
17use std::ops::{Deref, DerefMut};
18use std::sync::Arc;
19
20pub type MongodbResult<T> = Result<T>;
21
22#[derive(Debug, Clone, Serialize)]
23pub struct Model<'a, M>
24where
25 M: Boot,
26{
27 inner: Box<M>,
28 #[serde(skip_serializing)]
29 req: Option<M::Req>,
30 #[serde(skip)]
31 db: Database,
32 #[serde(skip)]
33 collection_name: &'a str,
34 #[serde(skip)]
35 add_times: bool,
36 #[serde(skip)]
37 columns: HashMap<&'a str, ColumnAttr>,
38 #[serde(skip)]
39 query_builder: QueryBuilder,
40}
41
42impl<'a, T: 'a + Boot> Deref for Model<'a, T> {
43 type Target = T;
44
45 fn deref(&self) -> &Self::Target {
46 &self.inner
47 }
48}
49
50impl<'a, T: 'a + Boot> DerefMut for Model<'a, T> {
51 fn deref_mut(&mut self) -> &mut Self::Target {
52 &mut self.inner
53 }
54}
55
56impl<'a, M> Model<'a, M>
57where
58 M: Boot,
59 M: Default,
60 M: Serialize,
61 M: DeserializeOwned,
62 M: Send,
63 M: Sync,
64 M: Unpin,
65{
66 pub fn new(
67 db: &Database,
68 collection_name: &'a str,
69 columns: &'a str,
70 add_times: bool,
71 ) -> Model<'a, M> {
72 let columns = serde_json::from_str(columns).unwrap();
73
74 let model = Model {
75 inner: Box::<M>::default(),
76 req: None,
77 db: db.clone(),
78 collection_name,
79 columns,
80 add_times,
81 query_builder: Default::default(),
82 };
83
84 model
85 }
86
87 pub fn set_request(mut self, req: M::Req) -> Model<'a, M> {
89 self.req = Some(req);
90 self
91 }
92
93 pub fn add_columns(&mut self, names: Vec<&'a str>) {
95 for name in names {
96 self.columns.insert(
97 name,
98 ColumnAttr {
99 asc: false,
100 desc: false,
101 unique: false,
102 sphere2d: false,
103 text: None,
104 hidden: false,
105 name: Some(name.to_string()),
106 },
107 );
108 }
109 }
110
111 pub fn collection_name(&self) -> &'a str {
113 self.collection_name
114 }
115
116 pub fn collection(&self) -> Collection<M> {
118 self.db.collection::<M>(self.collection_name)
119 }
120 pub fn set_collection(mut self, name: &'a str) -> Model<'a, M> {
122 self.collection_name = name;
123 self
124 }
125
126 pub async fn register_indexes(&self) {
133 let coll = self.db.collection::<M>(self.collection_name);
134 let previous_indexes = coll.list_indexes().await;
135 let mut attrs = vec![];
136 for (name, attr) in &self.columns {
137 if attr.is_index() {
138 attrs.push(name)
139 }
140 }
141
142 let mut keys_to_remove = Vec::new();
143 if previous_indexes.is_ok() {
144 let foreach_future = previous_indexes.unwrap().for_each(|pr| {
145 match pr {
146 Ok(index_model) => {
147 index_model.keys.iter().for_each(|key| {
148 if key.0 != "_id" {
149 if let Some(pos) = attrs.iter().position(|k| k == &key.0) {
150 attrs.remove(pos);
152 } else if let Some(rw) = &index_model.options {
153 match rw.default_language {
155 None => keys_to_remove.push(rw.name.clone()),
156 Some(_) => match &rw.name {
157 None => keys_to_remove.push(rw.name.clone()),
158 Some(name) => {
159 if let Some(pos) =
160 attrs.iter().position(|k| k == &name)
161 {
162 attrs.remove(pos);
163 } else {
164 keys_to_remove.push(rw.name.clone())
165 }
166 }
167 },
168 }
169 }
170 }
171 });
172 }
173 Err(error) => {
174 error!("Can't unpack index model {error}");
175 }
176 }
177 futures::future::ready(())
178 });
179 foreach_future.await;
180 }
181
182 let attrs = attrs
183 .iter()
184 .map(|name| {
185 let key = name.to_string();
186 let attr = &self.columns.get(key.as_str()).unwrap();
187
188 if let Some(lang) = &attr.text {
189 let opts = IndexOptions::builder()
190 .unique(attr.unique)
191 .name(key.clone())
192 .default_language(lang.to_string())
193 .build();
194 IndexModel::builder()
195 .keys(doc! {
196 key : "text"
197 })
198 .options(opts)
199 .build()
200 } else if attr.sphere2d {
201 let opts = IndexOptions::builder().unique(attr.unique).build();
202 IndexModel::builder()
203 .keys(doc! { key: "2dsphere" })
204 .options(opts)
205 .build()
206 } else {
207 let sort = if attr.desc { -1 } else { 1 };
208 let opts = IndexOptions::builder().unique(attr.unique).build();
209
210 IndexModel::builder()
211 .keys(doc! {
212 key : sort
213 })
214 .options(opts)
215 .build()
216 }
217 })
218 .collect::<Vec<IndexModel>>();
219
220 for name in keys_to_remove {
221 let key = name.as_ref().unwrap();
222 let _ = coll.drop_index(key).await;
223 }
224 if !attrs.is_empty() {
225 let result = coll.create_indexes(attrs).await;
226 if let Err(error) = result {
227 error!("Can't create indexes : {:?}", error);
228 }
229 }
230 }
231
232 pub fn reset(mut self) -> Model<'a, M> {
234 self.query_builder = Default::default();
235 self
236 }
237 pub fn r#where(mut self, data: Document) -> Model<'a, M> {
239 self.query_builder.r#where.push(data);
240 self
241 }
242 pub fn skip(mut self, count: u32) -> Model<'a, M> {
244 self.query_builder.skip = count;
245 self
246 }
247 pub async fn distinct(&self, name: &str) -> Result<Vec<Bson>> {
249 let whr = &self.query_builder.r#where;
250 let filter = if whr.is_empty() {
251 doc! {}
252 } else {
253 doc! {"$and":whr}
254 };
255 let collection = self.db.collection::<Document>(self.collection_name);
256 collection.distinct(&name, filter).await
257 }
258 pub fn limit(mut self, count: u32) -> Model<'a, M> {
260 self.query_builder.limit = count;
261 self
262 }
263 pub fn batch_size(mut self, value: u32) -> Model<'a, M> {
265 self.query_builder.batch_size = value;
266 self
267 }
268 pub fn sort(mut self, data: Document) -> Model<'a, M> {
270 self.query_builder.sort = data;
271 self
272 }
273 pub fn all(mut self) -> Model<'a, M> {
275 self.query_builder.all = true;
276 self
277 }
278 pub fn select(mut self, data: Document) -> Model<'a, M> {
280 self.query_builder.select = Some(data);
281 self
282 }
283 pub fn visible(mut self, data: Vec<&str>) -> Model<'a, M> {
285 self.query_builder.visible_fields = data.iter().map(|a| a.to_string()).collect();
286 self
287 }
288 pub fn upsert(mut self) -> Model<'a, M> {
290 self.query_builder.upsert = true;
291 self
292 }
293
294 fn hidden_fields(&self) -> Vec<String> {
295 let mut r = vec![];
296 for (name, attr) in &self.columns {
297 if attr.hidden
298 && !self
299 .query_builder
300 .visible_fields
301 .contains(&name.to_string())
302 {
303 r.push(name.to_string())
304 }
305 }
306 r
307 }
308 fn clear(&self, data: Document, hidden_fields: &Vec<String>) -> M {
309 let data = data;
310 let mut default = to_document(&M::default()).unwrap();
311 for (name, attr) in &self.columns {
312 if hidden_fields.contains(&name.to_string()) {
313 continue;
314 }
315 let rename = match attr.name.clone() {
316 None => name.to_string(),
317 Some(a) => a,
318 };
319 if data.contains_key(&rename) {
320 default.insert(name.to_string(), data.get(&rename).unwrap());
321 }
322 }
323
324 bson::from_document(default).unwrap()
325 }
326}
327
328impl<'a, M> Model<'a, M>
329where
330 M: Boot,
331 M: Default,
332 M: Serialize,
333{
334 pub fn take_inner(&mut self) -> M {
337 std::mem::take(&mut *self.inner)
338 }
339
340 pub fn inner_ref(&self) -> &M {
341 self.inner.as_ref()
342 }
343
344 pub fn inner_mut(&mut self) -> &mut M {
345 self.inner.as_mut()
346 }
347
348 pub fn inner_to_doc(&self) -> MongodbResult<Document> {
349 let mut re = to_document(&self.inner)?;
350 self.rename_field(&mut re, false);
351 Ok(re)
352 }
353
354 fn rename_field(&self, doc: &mut Document, is_opt: bool) {
355 for (name, attr) in &self.columns {
356 if let Some(a) = &attr.name {
357 if is_opt {
358 for (_, d) in doc.iter_mut() {
359 let i = d.as_document_mut().unwrap();
360 match i.get(name) {
361 None => {}
362 Some(b) => {
363 i.insert(a.clone(), b.clone());
364 i.remove(name);
365 }
366 }
367 }
368 } else {
369 match doc.get(name) {
370 None => {}
371 Some(b) => {
372 doc.insert(a.clone(), b.clone());
373 doc.remove(name);
374 }
375 }
376 }
377 }
378 }
379 }
380
381 pub fn fill(mut self, inner: M) -> Model<'a, M> {
382 *self.inner = inner;
383 self
384 }
385}
386
387impl<'a, M> Model<'a, M>
388where
389 M: Boot,
390 M: Default,
391 M: Serialize,
392 M: DeserializeOwned,
393 M: Send,
394 M: Sync,
395 M: Unpin,
396{
397 pub async fn count_documents(self) -> Result<u64> {
399 let whr = &self.query_builder.r#where;
400 let collection = self.db.collection::<Document>(self.collection_name);
401 let filter = if whr.is_empty() {
402 doc! {}
403 } else {
404 doc! { "$and": whr }
405 };
406
407 let options = CountOptions::builder()
408 .skip(if self.query_builder.skip > 0 {
409 Some(self.query_builder.skip as u64)
410 } else {
411 None
412 })
413 .limit(if self.query_builder.limit > 0 {
414 Some(self.query_builder.limit as u64)
415 } else {
416 None
417 })
418 .build();
419
420 collection
421 .count_documents(filter)
422 .with_options(options)
423 .await
424 }
425
426 pub async fn count_documents_with_session(self, session: &mut ClientSession) -> Result<u64> {
428 let whr = &self.query_builder.r#where;
429 let collection = self.db.collection::<Document>(self.collection_name);
430 let filter = if whr.is_empty() {
431 doc! {}
432 } else {
433 doc! { "$and": whr }
434 };
435
436 let options = CountOptions::builder()
437 .skip(if self.query_builder.skip > 0 {
438 Some(self.query_builder.skip as u64)
439 } else {
440 None
441 })
442 .limit(if self.query_builder.limit > 0 {
443 Some(self.query_builder.limit as u64)
444 } else {
445 None
446 })
447 .build();
448
449 collection
450 .count_documents(filter)
451 .with_options(options)
452 .session(session)
453 .await
454 }
455
456 fn add_times_to_data(&self, data: Document) -> Document {
457 let mut data = data;
458 if data.get_object_id("_id").is_err() {
459 data.remove("_id");
460 }
461 if self.add_times {
462 if !data.contains_key("updated_at") || !data.get_datetime("updated_at").is_ok() {
463 data.insert("updated_at", DateTime::now());
464 }
465 if !data.contains_key("created_at") || !data.get_datetime("created_at").is_ok() {
466 data.insert("created_at", DateTime::now());
467 }
468 }
469 data
470 }
471 pub async fn create(&self) -> Result<InsertOneResult> {
476 let mut data = self.add_times_to_data(self.inner_to_doc()?);
477
478 match self
479 .db
480 .collection(self.collection_name)
481 .insert_one(data.clone())
482 .await{
483 Ok(r) => {
484 data.insert("_id",r.inserted_id.clone());
485 self.finish(&self.req, "create", Document::new(), data, None)
486 .await;
487 Ok(r)
488 }
489 Err(e) => {Err(e)}
490 }
491 }
492
493 pub async fn create_with_session(
501 &self,
502 session: &mut ClientSession,
503 ) -> Result<InsertOneResult> {
504 let mut data = self.add_times_to_data(self.inner_to_doc()?);
505 match self
506 .db
507 .collection(self.collection_name)
508 .insert_one(data.clone())
509 .session(&mut *session)
510 .await{
511 Ok(r) => {
512 data.insert("_id",r.inserted_id.clone());
513 self.finish(&self.req, "create", Document::new(), data, Some(session))
514 .await;
515 Ok(r)
516 }
517 Err(e) => {Err(e)}
518 }
519 }
520
521 pub async fn create_doc(&self, data: Document) -> Result<InsertOneResult> {
523 let mut data = self.add_times_to_data(data);
524
525 match self
526 .db
527 .collection(self.collection_name)
528 .insert_one(data.clone())
529 .await{
530 Ok(r) => {
531 data.insert("_id",r.inserted_id.clone());
532 self.finish(&self.req, "create", Document::new(), data, None)
533 .await;
534 Ok(r)
535 }
536 Err(e) => {Err(e)}
537 }
538 }
539
540 pub async fn create_doc_with_session(
542 &self,
543 data: Document,
544 session: &mut ClientSession,
545 ) -> Result<InsertOneResult> {
546 let mut data = self.add_times_to_data(data);
547
548 match self
549 .db
550 .collection(self.collection_name)
551 .insert_one(data.clone())
552 .session(&mut *session)
553 .await{
554 Ok(r) => {
555 data.insert("_id",r.inserted_id.clone());
556 self.finish(&self.req, "create", Document::new(), data, Some(session))
557 .await;
558 Ok(r)
559 }
560 Err(e) => {Err(e)}
561 }
562 }
563
564 pub async fn create_many_doc(&self, data: Vec<Document>) -> Result<InsertManyResult> {
566 let mut d=vec![];
567 for item in data {
568 d.push(self.add_times_to_data(item));
569 }
570
571 match self
572 .db
573 .collection(self.collection_name)
574 .insert_many(d)
575 .await{
576 Ok(r) => {
577 let inserted_ids: HashMap<String, Bson> = r.inserted_ids.clone()
578 .into_iter()
579 .map(|(k, v)| (k.to_string(), v))
580 .collect();
581 self.finish(&self.req, "create_many", Document::new(), doc! {"_ids": Bson::Document(Document::from_iter(inserted_ids))}, None)
582 .await;
583 Ok(r)
584 }
585 Err(e) => {Err(e)}
586 }
587 }
588 pub async fn create_many_doc_with_session(&self, data: Vec<Document>,session: &mut ClientSession,) -> Result<InsertManyResult> {
590 let mut d=vec![];
591 for item in data {
592 d.push(self.add_times_to_data(item));
593 }
594
595 match self
596 .db
597 .collection(self.collection_name)
598 .insert_many(d)
599 .session(&mut *session)
600 .await{
601 Ok(r) => {
602 let inserted_ids: HashMap<String, Bson> = r.inserted_ids.clone()
603 .into_iter()
604 .map(|(k, v)| (k.to_string(), v))
605 .collect();
606 self.finish(&self.req, "create_many", Document::new(),
607 doc! {"_ids": Bson::Document(Document::from_iter(inserted_ids))},
608 Some(session))
609 .await;
610 Ok(r)
611 }
612 Err(e) => {Err(e)}
613 }
614 }
615 fn prepare_update(&self, data: Document) -> Result<(Document, Document)> {
616 let mut data = data;
617 let mut is_opt = false;
618 for (a, _) in data.iter() {
619 if a.starts_with("$") {
620 is_opt = true;
621 }
622 }
623
624 self.rename_field(&mut data, is_opt);
625 if !is_opt {
626 data = doc! {"$set":data};
627 }
628 if self.add_times {
629 if !data.contains_key("$set") {
630 data.insert("$set", doc! {});
631 }
632 let set = data.get_mut("$set").unwrap().as_document_mut().unwrap();
633 set.insert("updated_at", DateTime::now());
634 }
635
636 if self.query_builder.upsert {
637 if self.add_times {
638 if !data.contains_key("$setOnInsert") {
639 data.insert("$setOnInsert", doc! {});
640 }
641 let set = data
642 .get_mut("$setOnInsert")
643 .unwrap()
644 .as_document_mut()
645 .unwrap();
646 set.insert("created_at", DateTime::now());
647 }
648 }
649 let whr = &self.query_builder.r#where;
650 if whr.is_empty() {
651 return Err(Error::from(std::io::Error::new(
652 std::io::ErrorKind::InvalidInput,
653 "where not set.",
654 )));
655 }
656 let filter = doc! {"$and":whr};
657 Ok((data, filter))
658 }
659 pub async fn update(&self, data: Document) -> Result<Document> {
669 let (data, filter) = self.prepare_update(data)?;
670
671 let r = self.db.collection::<Document>(self.collection_name);
672
673 if self.query_builder.all {
674 let r = r
675 .update_many(filter, data.clone())
676 .upsert(self.query_builder.upsert)
677 .await;
678 match r {
679 Ok(old) => {
680 let res = doc! {"modified_count":old.modified_count.to_string()};
681 self.finish(&self.req, "update_many", res.clone(), data, None)
682 .await;
683 Ok(res)
684 }
685 Err(e) => Err(e),
686 }
687 } else {
688 let r = r
689 .find_one_and_update(filter, data.clone())
690 .upsert(self.query_builder.upsert)
691 .sort(self.query_builder.sort.clone())
692 .await;
693 match r {
694 Ok(old) => {
695 let res = old.unwrap_or(Document::new());
696 self.finish(&self.req, "update", res.clone(), data, None)
697 .await;
698 Ok(res)
699 }
700 Err(e) => Err(e),
701 }
702 }
703 }
704
705 pub async fn update_with_session(
716 &self,
717 data: Document,
718 session: &mut ClientSession,
719 ) -> Result<Document> {
720 let (data, filter) = self.prepare_update(data)?;
721
722 let r = self.db.collection::<Document>(self.collection_name);
723 if self.query_builder.all {
724 let r = r
725 .update_many(filter, data.clone())
726 .upsert(self.query_builder.upsert)
727 .session(&mut *session)
728 .await;
729 match r {
730 Ok(old) => {
731 let res = doc! {"modified_count":old.modified_count.to_string()};
732 self.finish(&self.req, "update_many", res.clone(), data, Some(session))
733 .await;
734 Ok(res)
735 }
736 Err(e) => Err(e),
737 }
738 } else {
739 let r = r
740 .find_one_and_update(filter, data.clone())
741 .upsert(self.query_builder.upsert)
742 .sort(self.query_builder.sort.clone())
743 .session(&mut *session)
744 .await;
745 match r {
746 Ok(old) => {
747 let res = old.unwrap_or(Document::new());
748 self.finish(&self.req, "update", res.clone(), data, Some(session))
749 .await;
750 Ok(res)
751 }
752 Err(e) => Err(e),
753 }
754 }
755 }
756
757 pub async fn delete(&self) -> Result<Document> {
763 let whr = &self.query_builder.r#where;
764 if whr.is_empty() {
765 return Err(Error::from(std::io::Error::new(
766 std::io::ErrorKind::InvalidInput,
767 "where not set.",
768 )));
769 }
770 let filter = doc! {"$and":whr};
771
772 let r = self.db.collection::<Document>(self.collection_name);
773 if self.query_builder.all {
774 let r = r.delete_many(filter).await;
775 match r {
776 Ok(old) => {
777 let res = doc! {"deleted_count":old.deleted_count.to_string()};
778 self.finish(&self.req, "delete_many", res.clone(), doc! {}, None)
779 .await;
780 Ok(res)
781 }
782 Err(e) => Err(e),
783 }
784 } else {
785 let r = r
786 .find_one_and_delete(filter)
787 .sort(self.query_builder.sort.clone())
788 .await;
789 match r {
790 Ok(old) => {
791 let res = old.unwrap_or(Document::new());
792 self.finish(&self.req, "delete", res.clone(), doc! {}, None)
793 .await;
794 Ok(res)
795 }
796 Err(e) => Err(e),
797 }
798 }
799 }
800
801 pub async fn delete_with_session(&self, session: &mut ClientSession) -> Result<Document> {
809 let whr = &self.query_builder.r#where;
810 if whr.is_empty() {
811 return Err(Error::from(std::io::Error::new(
812 std::io::ErrorKind::InvalidInput,
813 "where not set.",
814 )));
815 }
816 let filter = doc! {"$and":whr};
817
818 let r = self.db.collection::<Document>(self.collection_name);
819 if self.query_builder.all {
820 let r = r.delete_many(filter).session(&mut *session).await;
821 match r {
822 Ok(old) => {
823 let res = doc! {"deleted_count":old.deleted_count.to_string()};
824 self.finish(
825 &self.req,
826 "delete_many",
827 res.clone(),
828 doc! {},
829 Some(session),
830 )
831 .await;
832 Ok(res)
833 }
834 Err(e) => Err(e),
835 }
836 } else {
837 let r = r
838 .find_one_and_delete(filter)
839 .sort(self.query_builder.sort.clone())
840 .session(&mut *session)
841 .await;
842 match r {
843 Ok(old) => {
844 let res = old.unwrap_or(Document::new());
845 self.finish(&self.req, "delete", res.clone(), doc! {}, Some(session))
846 .await;
847 Ok(res)
848 }
849 Err(e) => Err(e),
850 }
851 }
852 }
853 fn prepare_get(&self) -> (Document, Vec<String>) {
854 let whr = &self.query_builder.r#where;
855 let filter = if whr.is_empty() {
856 doc! {}
857 } else {
858 doc! {"$and":whr}
859 };
860 let hidden_fields = self.hidden_fields();
861 (filter, hidden_fields)
862 }
863
864 fn prepare_find<'b>(&self, mut find: Find<'b, Document>) -> Find<'b, Document> {
865 find = find.sort(self.query_builder.sort.clone());
866
867 if self.query_builder.skip > 0 {
868 find = find.skip(self.query_builder.skip as u64);
869 }
870 if self.query_builder.limit > 0 {
871 find = find.limit(self.query_builder.limit as i64);
872 }
873 if self.query_builder.batch_size > 0 {
874 find = find.batch_size(self.query_builder.batch_size);
875 }
876 if let Some(select) = self.query_builder.select.clone() {
877 find = find.projection(select);
878 }
879 find
880 }
881
882 pub async fn get(&self) -> Result<Vec<M>> {
889 let (filter, hidden_fields) = self.prepare_get();
890 let collection = self.db.collection::<Document>(self.collection_name);
891 let mut find = collection.find(filter);
892 find = self.prepare_find(find);
893
894 let mut r = vec![];
895 let mut cursor = find.await?;
896 while let Some(d) = cursor.next().await {
897 r.push(self.clear(self.cast(d?, &self.req), &hidden_fields))
898 }
899 Ok(r)
900 }
901
902 pub async fn get_with_session(&self, session: &mut ClientSession) -> Result<Vec<M>> {
911 let (filter, hidden_fields) = self.prepare_get();
912 let collection = self.db.collection::<Document>(self.collection_name);
913 let mut find = collection.find(filter);
914 find = self.prepare_find(find);
915
916 let mut r = vec![];
917 let mut cursor = find.session(&mut *session).await?;
918 while let Some(d) = cursor.next(&mut *session).await {
919 r.push(self.clear(self.cast(d?, &self.req), &hidden_fields))
920 }
921 Ok(r)
922 }
923
924 pub async fn first(&mut self) -> Result<Option<M>> {
926 self.query_builder.limit = 1;
927 let r = self.get().await?;
928 for item in r {
929 return Ok(Some(item));
930 }
931 Ok(None)
932 }
933 pub async fn first_with_session(&mut self, session: &mut ClientSession) -> Result<Option<M>> {
935 self.query_builder.limit = 1;
936 let r = self.get_with_session(session).await?;
937 for item in r {
938 return Ok(Some(item));
939 }
940 Ok(None)
941 }
942
943 pub async fn aggregate(
945 &mut self,
946 pipeline: impl IntoIterator<Item = Document>,
947 ) -> Result<Vec<M>> {
948 let collection = self.db.collection::<Document>(self.collection_name);
949 let res = collection.aggregate(pipeline);
950 let hidden_fields = self.hidden_fields();
951 let mut r = vec![];
952 let mut cursor = res.await?;
953 while let Some(d) = cursor.next().await {
954 r.push(self.clear(self.cast(d?, &self.req), &hidden_fields))
955 }
956 Ok(r)
957 }
958
959 pub async fn aggregate_with_session(
961 &mut self,
962 pipeline: impl IntoIterator<Item = Document>,
963 session: &mut ClientSession,
964 ) -> Result<Vec<M>> {
965 let collection = self.db.collection::<Document>(self.collection_name);
966 let res = collection.aggregate(pipeline);
967 let hidden_fields = self.hidden_fields();
968 let mut r = vec![];
969 let mut cursor = res.session(&mut *session).await?;
970 while let Some(d) = cursor.next(&mut *session).await {
971 r.push(self.clear(self.cast(d?, &self.req), &hidden_fields))
972 }
973 Ok(r)
974 }
975
976 pub async fn get_doc(&self) -> Result<Vec<Document>> {
983 let (filter, _) = self.prepare_get();
984 let collection = self.db.collection::<Document>(self.collection_name);
985 let mut find = collection.find(filter);
986 find = self.prepare_find(find);
987
988 let mut r = vec![];
989 let mut cursor = find.await?;
990 while let Some(d) = cursor.next().await {
991 r.push(self.cast(d?, &self.req))
992 }
993 Ok(r)
994 }
995
996 pub async fn get_doc_with_session(&self, session: &mut ClientSession) -> Result<Vec<Document>> {
1005 let (filter, _) = self.prepare_get();
1006 let collection = self.db.collection::<Document>(self.collection_name);
1007 let mut find = collection.find(filter);
1008 find = self.prepare_find(find);
1009
1010 let mut r = vec![];
1011 let mut cursor = find.session(&mut *session).await?;
1012 while let Some(d) = cursor.next(&mut *session).await {
1013 r.push(self.cast(d?, &self.req))
1014 }
1015 Ok(r)
1016 }
1017
1018 pub async fn first_doc(&mut self) -> Result<Option<Document>> {
1020 self.query_builder.limit = 1;
1021 let r = self.get_doc().await?;
1022 for item in r {
1023 return Ok(Some(item));
1024 }
1025 Ok(None)
1026 }
1027 pub async fn first_doc_with_session(
1029 &mut self,
1030 session: &mut ClientSession,
1031 ) -> Result<Option<Document>> {
1032 self.query_builder.limit = 1;
1033 let r = self.get_doc_with_session(session).await?;
1034 for item in r {
1035 return Ok(Some(item));
1036 }
1037 Ok(None)
1038 }
1039
1040 pub async fn aggregate_doc(
1042 &mut self,
1043 pipeline: impl IntoIterator<Item = Document>,
1044 ) -> Result<Vec<Document>> {
1045 let collection = self.db.collection::<Document>(self.collection_name);
1046 let res = collection.aggregate(pipeline);
1047 let mut r = vec![];
1048 let mut cursor = res.await?;
1049 while let Some(d) = cursor.next().await {
1050 r.push(self.cast(d?, &self.req))
1051 }
1052 Ok(r)
1053 }
1054
1055 pub async fn aggregate_doc_with_session(
1057 &mut self,
1058 pipeline: impl IntoIterator<Item = Document>,
1059 session: &mut ClientSession,
1060 ) -> Result<Vec<Document>> {
1061 let collection = self.db.collection::<Document>(self.collection_name);
1062 let res = collection.aggregate(pipeline);
1063 let mut r = vec![];
1064 let mut cursor = res.session(&mut *session).await?;
1065 while let Some(d) = cursor.next(&mut *session).await {
1066 r.push(self.cast(d?, &self.req))
1067 }
1068 Ok(r)
1069 }
1070
1071 pub async fn cursor(&self) -> Result<Cursor<Document>> {
1086 let (filter, _) = self.prepare_get();
1087 let collection = self.db.collection::<Document>(self.collection_name);
1088 let mut find = collection.find(filter);
1089 find = self.prepare_find(find).no_cursor_timeout(true);
1090 let cursor = find.await?;
1091 Ok(cursor)
1092 }
1093 pub async fn cursor_with_session(
1094 &self,
1095 session: &mut ClientSession,
1096 ) -> Result<SessionCursor<Document>> {
1097 let (filter, _) = self.prepare_get();
1098 let collection = self.db.collection::<Document>(self.collection_name);
1099 let mut find = collection.find(filter);
1100 find = self.prepare_find(find).no_cursor_timeout(true);
1101 let cursor = find.session(session).await?;
1102 Ok(cursor)
1103 }
1104}