1use crate::column::ColumnAttr;
2use crate::event::Boot;
3use futures_util::StreamExt;
4use log::error;
5use mongodb::action::EstimatedDocumentCount;
6use mongodb::bson::{doc, to_document, Document};
7use mongodb::bson::{Bson, DateTime};
8use mongodb::error::{Error, Result};
9use mongodb::options::{CountOptions, IndexOptions};
10use mongodb::results::InsertOneResult;
11use mongodb::{bson, ClientSession, Collection, Database, IndexModel};
12use serde::de::DeserializeOwned;
13use serde::Serialize;
14use std::collections::HashMap;
15use std::fmt::Debug;
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18
19pub type MongodbResult<T> = Result<T>;
20
21#[derive(Debug, Default, Clone)]
22struct QueryBuilder {
23 pub r#where: Vec<Document>,
24 pub all: bool,
25 pub upsert: bool,
26 pub select: Option<Document>,
27 pub sort: Document,
28 pub skip: u32,
29 pub limit: u32,
30 pub visible_fields: Vec<String>,
31}
32#[derive(Debug, Clone, Serialize)]
33pub struct Model<'a, M>
34where
35 M: Boot,
36{
37 inner: Box<M>,
38 #[serde(skip_serializing)]
39 req: Option<M::Req>,
40 #[serde(skip)]
41 db: Database,
42 #[serde(skip)]
43 collection_name: &'a str,
44 #[serde(skip)]
45 add_times: bool,
46 #[serde(skip)]
47 columns: HashMap<&'a str, ColumnAttr>,
48 #[serde(skip)]
49 query_builder: QueryBuilder,
50}
51
52impl<'a, T: 'a + Boot> Deref for Model<'a, T> {
53 type Target = T;
54
55 fn deref(&self) -> &Self::Target {
56 &self.inner
57 }
58}
59
60impl<'a, T: 'a + Boot> DerefMut for Model<'a, T> {
61 fn deref_mut(&mut self) -> &mut Self::Target {
62 &mut self.inner
63 }
64}
65
66impl<'a, M> Model<'a, M>
67where
68 M: Boot,
69 M: Default,
70 M: Serialize,
71 M: DeserializeOwned,
72 M: Send,
73 M: Sync,
74 M: Unpin,
75{
76 pub fn new(
77 db: &Database,
78 collection_name: &'a str,
79 columns: &'a str,
80 add_times: bool,
81 ) -> Model<'a, M> {
82 let columns = serde_json::from_str(columns).unwrap();
83
84 let model = Model {
85 inner: Box::<M>::default(),
86 req: None,
87 db: db.clone(),
88 collection_name,
89 columns,
90 add_times,
91 query_builder: Default::default(),
92 };
93
94 model
95 }
96
97 pub fn set_request(mut self, req: M::Req) -> Model<'a, M> {
99 self.req = Some(req);
100 self
101 }
102
103 pub fn add_columns(&mut self, names: Vec<&'a str>) {
105 for name in names {
106 self.columns.insert(
107 name,
108 ColumnAttr {
109 asc: false,
110 desc: false,
111 unique: false,
112 sphere2d: false,
113 text: None,
114 hidden: false,
115 name: Some(name.to_string()),
116 },
117 );
118 }
119 }
120
121 pub fn collection_name(&self) -> &'a str {
123 self.collection_name
124 }
125
126 pub fn collection(&self) -> Collection<M> {
128 self.db.collection::<M>(self.collection_name)
129 }
130 pub fn set_collection(mut self, name: &'a str) -> Model<'a, M> {
132 self.collection_name = name;
133 self
134 }
135
136 pub async fn register_indexes(&self) {
143 let coll = self.db.collection::<M>(self.collection_name);
144 let previous_indexes = coll.list_indexes().await;
145 let mut attrs = vec![];
146 for (name, attr) in &self.columns {
147 if attr.is_index() {
148 attrs.push(name)
149 }
150 }
151
152 let mut keys_to_remove = Vec::new();
153 if previous_indexes.is_ok() {
154 let foreach_future = previous_indexes.unwrap().for_each(|pr| {
155 match pr {
156 Ok(index_model) => {
157 index_model.keys.iter().for_each(|key| {
158 if key.0 != "_id" {
159 if let Some(pos) = attrs.iter().position(|k| k == &key.0) {
160 attrs.remove(pos);
162 } else if let Some(rw) = &index_model.options {
163 match rw.default_language {
165 None => keys_to_remove.push(rw.name.clone()),
166 Some(_) => match &rw.name {
167 None => keys_to_remove.push(rw.name.clone()),
168 Some(name) => {
169 if let Some(pos) =
170 attrs.iter().position(|k| k == &name)
171 {
172 attrs.remove(pos);
173 } else {
174 keys_to_remove.push(rw.name.clone())
175 }
176 }
177 },
178 }
179 }
180 }
181 });
182 }
183 Err(error) => {
184 error!("Can't unpack index model {error}");
185 }
186 }
187 futures::future::ready(())
188 });
189 foreach_future.await;
190 }
191
192 let attrs = attrs
193 .iter()
194 .map(|name| {
195 let key = name.to_string();
196 let attr = &self.columns.get(key.as_str()).unwrap();
197
198 if let Some(lang) = &attr.text {
199 let opts = IndexOptions::builder()
200 .unique(attr.unique)
201 .name(key.clone())
202 .default_language(lang.to_string())
203 .build();
204 IndexModel::builder()
205 .keys(doc! {
206 key : "text"
207 })
208 .options(opts)
209 .build()
210 } else if attr.sphere2d {
211 let opts = IndexOptions::builder().unique(attr.unique).build();
212 IndexModel::builder()
213 .keys(doc! { key: "2dsphere" })
214 .options(opts)
215 .build()
216 } else {
217 let sort = if attr.desc { -1 } else { 1 };
218 let opts = IndexOptions::builder().unique(attr.unique).build();
219
220 IndexModel::builder()
221 .keys(doc! {
222 key : sort
223 })
224 .options(opts)
225 .build()
226 }
227 })
228 .collect::<Vec<IndexModel>>();
229
230 for name in keys_to_remove {
231 let key = name.as_ref().unwrap();
232 let _ = coll.drop_index(key).await;
233 }
234 if !attrs.is_empty() {
235 let result = coll.create_indexes(attrs).await;
236 if let Err(error) = result {
237 error!("Can't create indexes : {:?}", error);
238 }
239 }
240 }
241
242 pub fn reset(mut self) -> Model<'a, M> {
244 self.query_builder = Default::default();
245 self
246 }
247 pub fn r#where(mut self, data: Document) -> Model<'a, M> {
249 self.query_builder.r#where.push(data);
250 self
251 }
252 pub fn skip(mut self, count: u32) -> Model<'a, M> {
254 self.query_builder.skip = count;
255 self
256 }
257 pub async fn distinct(&self, name: &str) -> Result<Vec<Bson>> {
259 let whr = &self.query_builder.r#where;
260 let filter = if whr.is_empty() {
261 doc! {}
262 } else {
263 doc! {"$and":whr}
264 };
265 let collection = self.db.collection::<Document>(self.collection_name);
266 collection.distinct(&name, filter).await
267 }
268 pub fn limit(mut self, count: u32) -> Model<'a, M> {
270 self.query_builder.limit = count;
271 self
272 }
273 pub fn sort(mut self, data: Document) -> Model<'a, M> {
275 self.query_builder.sort = data;
276 self
277 }
278 pub fn all(mut self) -> Model<'a, M> {
280 self.query_builder.all = true;
281 self
282 }
283 pub fn select(mut self, data: Document) -> Model<'a, M> {
285 self.query_builder.select = Some(data);
286 self
287 }
288 pub fn visible(mut self, data: Vec<&str>) -> Model<'a, M> {
290 self.query_builder.visible_fields = data.iter().map(|a| a.to_string()).collect();
291 self
292 }
293 pub fn upsert(mut self) -> Model<'a, M> {
295 self.query_builder.upsert = true;
296 self
297 }
298
299 pub async fn count_documents(self) -> Result<u64> {
301 let whr = &self.query_builder.r#where;
302 let collection = self.db.collection::<Document>(self.collection_name);
303 let filter = if whr.is_empty() {
304 doc! {}
305 } else {
306 doc! { "$and": whr }
307 };
308
309 let options = CountOptions::builder()
310 .skip(if self.query_builder.skip > 0 {
311 Some(self.query_builder.skip as u64)
312 } else {
313 None
314 })
315 .limit(if self.query_builder.limit > 0 {
316 Some(self.query_builder.limit as u64)
317 } else {
318 None
319 })
320 .build();
321
322 collection
323 .count_documents(filter)
324 .with_options(options)
325 .await
326 }
327
328 pub async fn create(&self, session: Option<&mut ClientSession>) -> Result<InsertOneResult> {
336 let mut data = self.inner_to_doc()?;
337 if data.get_object_id("_id").is_err() {
338 data.remove("_id");
339 }
340 if self.add_times {
341 if !data.contains_key("updated_at") || !data.get_datetime("updated_at").is_ok() {
342 data.insert("updated_at", DateTime::now());
343 }
344 if !data.contains_key("created_at") || !data.get_datetime("created_at").is_ok() {
345 data.insert("created_at", DateTime::now());
346 }
347 }
348 match session {
349 None => {
350 let r = self
351 .db
352 .collection(self.collection_name)
353 .insert_one(data.clone())
354 .await;
355 if r.is_ok() {
356 self.finish(&self.req, "create", Document::new(), data, None)
357 .await;
358 }
359 r
360 }
361 Some(s) => {
362 let r = self
363 .db
364 .collection(self.collection_name)
365 .insert_one(data.clone())
366 .session(&mut *s)
367 .await;
368 if r.is_ok() {
369 self.finish(&self.req, "create", Document::new(), data, Some(s))
370 .await;
371 }
372 r
373 }
374 }
375 }
376
377 pub async fn create_doc(
379 &self,
380 data: Document,
381 session: Option<&mut ClientSession>,
382 ) -> Result<InsertOneResult> {
383 let mut data = data;
384
385 if self.add_times {
386 if !data.contains_key("updated_at") || !data.get_datetime("updated_at").is_ok() {
387 data.insert("updated_at", DateTime::now());
388 }
389 if !data.contains_key("created_at") || !data.get_datetime("created_at").is_ok() {
390 data.insert("created_at", DateTime::now());
391 }
392 }
393 match session {
394 None => {
395 let r = self
396 .db
397 .collection(self.collection_name)
398 .insert_one(data.clone())
399 .await;
400 if r.is_ok() {
401 self.finish(&self.req, "create", Document::new(), data, None)
402 .await;
403 }
404 r
405 }
406 Some(s) => {
407 let r = self
408 .db
409 .collection(self.collection_name)
410 .insert_one(data.clone())
411 .session(&mut *s)
412 .await;
413 if r.is_ok() {
414 self.finish(&self.req, "create", Document::new(), data, Some(s))
415 .await;
416 }
417 r
418 }
419 }
420 }
421
422 pub async fn update(
433 &self,
434 data: Document,
435 session: Option<&mut ClientSession>,
436 ) -> Result<Document> {
437 let mut data = data;
438 let mut is_opt = false;
439 for (a, _) in data.iter() {
440 if a.starts_with("$") {
441 is_opt = true;
442 }
443 }
444
445 self.rename_field(&mut data, is_opt);
446 if !is_opt {
447 data = doc! {"$set":data};
448 }
449 if self.add_times {
450 if !data.contains_key("$set") {
451 data.insert("$set", doc! {});
452 }
453 let set = data.get_mut("$set").unwrap().as_document_mut().unwrap();
454 set.insert("updated_at", DateTime::now());
455 }
456
457 if self.query_builder.upsert {
458 if self.add_times {
459 if !data.contains_key("$setOnInsert") {
460 data.insert("$setOnInsert", doc! {});
461 }
462 let set = data
463 .get_mut("$setOnInsert")
464 .unwrap()
465 .as_document_mut()
466 .unwrap();
467 set.insert("created_at", DateTime::now());
468 }
469 }
470 let whr = &self.query_builder.r#where;
471 if whr.is_empty() {
472 return Err(Error::from(std::io::Error::new(
473 std::io::ErrorKind::InvalidInput,
474 "where not set.",
475 )));
476 }
477 let filter = doc! {"$and":whr};
478
479 match session {
480 None => {
481 let r = self.db.collection::<Document>(self.collection_name);
482
483 if self.query_builder.all {
484 let r = r
485 .update_many(filter, data.clone())
486 .upsert(self.query_builder.upsert)
487 .await;
488 match r {
489 Ok(old) => {
490 let res = doc! {"modified_count":old.modified_count.to_string()};
491 self.finish(&self.req, "update_many", res.clone(), data, None)
492 .await;
493 Ok(res)
494 }
495 Err(e) => Err(e),
496 }
497 } else {
498 let r = r
499 .find_one_and_update(filter, data.clone())
500 .upsert(self.query_builder.upsert)
501 .sort(self.query_builder.sort.clone())
502 .await;
503 match r {
504 Ok(old) => {
505 let res = old.unwrap_or(Document::new());
506 self.finish(&self.req, "update", res.clone(), data, None)
507 .await;
508 Ok(res)
509 }
510 Err(e) => Err(e),
511 }
512 }
513 }
514 Some(s) => {
515 let r = self.db.collection::<Document>(self.collection_name);
516 if self.query_builder.all {
517 let r = r
518 .update_many(filter, data.clone())
519 .upsert(self.query_builder.upsert)
520 .session(&mut *s)
521 .await;
522 match r {
523 Ok(old) => {
524 let res = doc! {"modified_count":old.modified_count.to_string()};
525 self.finish(&self.req, "update_many", res.clone(), data, Some(s))
526 .await;
527 Ok(res)
528 }
529 Err(e) => Err(e),
530 }
531 } else {
532 let r = r
533 .find_one_and_update(filter, data.clone())
534 .upsert(self.query_builder.upsert)
535 .sort(self.query_builder.sort.clone())
536 .session(&mut *s)
537 .await;
538 match r {
539 Ok(old) => {
540 let res = old.unwrap_or(Document::new());
541 self.finish(&self.req, "update", res.clone(), data, Some(s))
542 .await;
543 Ok(res)
544 }
545 Err(e) => Err(e),
546 }
547 }
548 }
549 }
550 }
551
552 pub async fn delete(&self, session: Option<&mut ClientSession>) -> Result<Document> {
560 let whr = &self.query_builder.r#where;
561 if whr.is_empty() {
562 return Err(Error::from(std::io::Error::new(
563 std::io::ErrorKind::InvalidInput,
564 "where not set.",
565 )));
566 }
567 let filter = doc! {"$and":whr};
568
569 match session {
570 None => {
571 let r = self.db.collection::<Document>(self.collection_name);
572 if self.query_builder.all {
573 let r = r.delete_many(filter).await;
574 match r {
575 Ok(old) => {
576 let res = doc! {"deleted_count":old.deleted_count.to_string()};
577 self.finish(&self.req, "delete_many", res.clone(), doc! {}, None)
578 .await;
579 Ok(res)
580 }
581 Err(e) => Err(e),
582 }
583 } else {
584 let r = r
585 .find_one_and_delete(filter)
586 .sort(self.query_builder.sort.clone())
587 .await;
588 match r {
589 Ok(old) => {
590 let res = old.unwrap_or(Document::new());
591 self.finish(&self.req, "delete", res.clone(), doc! {}, None)
592 .await;
593 Ok(res)
594 }
595 Err(e) => Err(e),
596 }
597 }
598 }
599 Some(s) => {
600 let r = self.db.collection::<Document>(self.collection_name);
601 if self.query_builder.all {
602 let r = r.delete_many(filter).session(&mut *s).await;
603 match r {
604 Ok(old) => {
605 let res = doc! {"deleted_count":old.deleted_count.to_string()};
606 self.finish(&self.req, "delete_many", res.clone(), doc! {}, Some(s))
607 .await;
608 Ok(res)
609 }
610 Err(e) => Err(e),
611 }
612 } else {
613 let r = r
614 .find_one_and_delete(filter)
615 .sort(self.query_builder.sort.clone())
616 .session(&mut *s)
617 .await;
618 match r {
619 Ok(old) => {
620 let res = old.unwrap_or(Document::new());
621 self.finish(&self.req, "delete", res.clone(), doc! {}, Some(s))
622 .await;
623 Ok(res)
624 }
625 Err(e) => Err(e),
626 }
627 }
628 }
629 }
630 }
631
632 pub async fn get(&self, session: Option<&mut ClientSession>) -> Result<Vec<M>> {
641 let whr = &self.query_builder.r#where;
642 let filter = if whr.is_empty() {
643 doc! {}
644 } else {
645 doc! {"$and":whr}
646 };
647 let hidden_fields = self.hidden_fields();
648 let collection = self.db.collection::<Document>(self.collection_name);
649 let mut find = collection.find(filter);
650 find = find.sort(self.query_builder.sort.clone());
651
652 if self.query_builder.skip > 0 {
653 find = find.skip(self.query_builder.skip as u64);
654 }
655 if self.query_builder.limit > 0 {
656 find = find.limit(self.query_builder.limit as i64);
657 }
658 if let Some(select) = self.query_builder.select.clone() {
659 find = find.projection(select);
660 }
661
662 let mut r = vec![];
663 match session {
664 None => {
665 let mut cursor = find.await?;
666 while let Some(d) = cursor.next().await {
667 r.push(self.clear(self.cast(d?, &self.req), &hidden_fields))
668 }
669 Ok(r)
670 }
671 Some(s) => {
672 let mut cursor = find.session(&mut *s).await?;
673 while let Some(d) = cursor.next(&mut *s).await {
674 r.push(self.clear(self.cast(d?, &self.req), &hidden_fields))
675 }
676 Ok(r)
677 }
678 }
679 }
680
681 pub async fn first(&mut self, session: Option<&mut ClientSession>) -> Result<Option<M>> {
683 self.query_builder.limit = 1;
684 let r = self.get(session).await?;
685 for item in r {
686 return Ok(Some(item));
687 }
688 Ok(None)
689 }
690
691 pub async fn aggregate(
693 &mut self,
694 pipeline: impl IntoIterator<Item = Document>,
695 session: Option<&mut ClientSession>,
696 ) -> Result<Vec<M>> {
697 let collection = self.db.collection::<Document>(self.collection_name);
698 let res = collection.aggregate(pipeline);
699 let hidden_fields = self.hidden_fields();
700 let mut r = vec![];
701 match session {
702 None => {
703 let mut cursor = res.await?;
704 while let Some(d) = cursor.next().await {
705 r.push(self.clear(self.cast(d?, &self.req), &hidden_fields))
706 }
707 Ok(r)
708 }
709 Some(s) => {
710 let mut cursor = res.session(&mut *s).await?;
711 while let Some(d) = cursor.next(&mut *s).await {
712 r.push(self.clear(self.cast(d?, &self.req), &hidden_fields))
713 }
714 Ok(r)
715 }
716 }
717 }
718
719 pub async fn get_doc(&self, session: Option<&mut ClientSession>) -> Result<Vec<Document>> {
721 let whr = &self.query_builder.r#where;
722 let filter = if whr.is_empty() {
723 doc! {}
724 } else {
725 doc! {"$and":whr}
726 };
727 let collection = self.db.collection::<Document>(self.collection_name);
728 let mut find = collection.find(filter);
729 find = find.sort(self.query_builder.sort.clone());
730
731 if self.query_builder.skip > 0 {
732 find = find.skip(self.query_builder.skip as u64);
733 }
734 if self.query_builder.limit > 0 {
735 find = find.limit(self.query_builder.limit as i64);
736 }
737 if let Some(select) = self.query_builder.select.clone() {
738 find = find.projection(select);
739 }
740
741 let mut r = vec![];
742 match session {
743 None => {
744 let mut cursor = find.await?;
745 while let Some(d) = cursor.next().await {
746 r.push(self.cast(d?, &self.req))
747 }
748 Ok(r)
749 }
750 Some(s) => {
751 let mut cursor = find.session(&mut *s).await?;
752 while let Some(d) = cursor.next(&mut *s).await {
753 r.push(self.cast(d?, &self.req))
754 }
755 Ok(r)
756 }
757 }
758 }
759
760 pub async fn first_doc(
762 &mut self,
763 session: Option<&mut ClientSession>,
764 ) -> Result<Option<Document>> {
765 self.query_builder.limit = 1;
766 let r = self.get_doc(session).await?;
767 for item in r {
768 return Ok(Some(item));
769 }
770 Ok(None)
771 }
772
773 pub async fn aggregate_doc(
775 &mut self,
776 pipeline: impl IntoIterator<Item = Document>,
777 session: Option<&mut ClientSession>,
778 ) -> Result<Vec<Document>> {
779 let collection = self.db.collection::<Document>(self.collection_name);
780 let res = collection.aggregate(pipeline);
781 let mut r = vec![];
782 match session {
783 None => {
784 let mut cursor = res.await?;
785 while let Some(d) = cursor.next().await {
786 r.push(self.cast(d?, &self.req))
787 }
788 Ok(r)
789 }
790 Some(s) => {
791 let mut cursor = res.session(&mut *s).await?;
792 while let Some(d) = cursor.next(&mut *s).await {
793 r.push(self.cast(d?, &self.req))
794 }
795 Ok(r)
796 }
797 }
798 }
799
800 fn hidden_fields(&self) -> Vec<String> {
801 let mut r = vec![];
802 for (name, attr) in &self.columns {
803 if attr.hidden && !self.query_builder.visible_fields.contains(&name.to_string()) {
804 r.push(name.to_string())
805 }
806 }
807 r
808 }
809 fn clear(&self, data: Document, hidden_fields: &Vec<String>) -> M {
810 let data = data;
811 let mut default = to_document(&M::default()).unwrap();
812 for (name, attr) in &self.columns {
813 if hidden_fields.contains(&name.to_string()) {
814 continue;
815 }
816 let rename = match attr.name.clone() {
817 None => name.to_string(),
818 Some(a) => a,
819 };
820 if data.contains_key(&rename) {
821 default.insert(name.to_string(), data.get(&rename).unwrap());
822 }
823 }
824
825 bson::from_document(default).unwrap()
826 }
827}
828
829impl<'a, M> Model<'a, M>
830where
831 M: Boot,
832 M: Default,
833 M: Serialize,
834{
835 pub fn take_inner(&mut self) -> M {
838 std::mem::take(&mut *self.inner)
839 }
840
841 pub fn inner_ref(&self) -> &M {
842 self.inner.as_ref()
843 }
844
845 pub fn inner_mut(&mut self) -> &mut M {
846 self.inner.as_mut()
847 }
848
849 pub fn inner_to_doc(&self) -> MongodbResult<Document> {
850 let mut re = to_document(&self.inner)?;
851 self.rename_field(&mut re, false);
852 Ok(re)
853 }
854
855 fn rename_field(&self, doc: &mut Document, is_opt: bool) {
856 for (name, attr) in &self.columns {
857 if let Some(a) = &attr.name {
858 if is_opt {
859 for (_, d) in doc.iter_mut() {
860 let i = d.as_document_mut().unwrap();
861 match i.get(name) {
862 None => {}
863 Some(b) => {
864 i.insert(a.clone(), b.clone());
865 i.remove(name);
866 }
867 }
868 }
869 } else {
870 match doc.get(name) {
871 None => {}
872 Some(b) => {
873 doc.insert(a.clone(), b.clone());
874 doc.remove(name);
875 }
876 }
877 }
878 }
879 }
880 }
881
882 pub fn fill(mut self, inner: M) -> Model<'a, M> {
883 *self.inner = inner;
884 self
885 }
886}