1use crate::column::ColumnAttr;
2use crate::event::Boot;
3use futures_util::StreamExt;
4use log::error;
5use mongodb::bson::{doc, to_document, Document};
6use mongodb::bson::{Bson, DateTime};
7use mongodb::error::{Error, Result};
8use mongodb::options::IndexOptions;
9use mongodb::results::InsertOneResult;
10use mongodb::{bson, ClientSession, Collection, Database, IndexModel};
11use serde::de::DeserializeOwned;
12use serde::Serialize;
13use std::collections::HashMap;
14use std::fmt::Debug;
15use std::ops::{Deref, DerefMut};
16use std::sync::Arc;
17
18pub type MongodbResult<T> = Result<T>;
19
20#[derive(Debug, Default, Clone)]
21struct Mongorm {
22 pub r#where: Vec<Document>,
23 pub all: bool,
24 pub upsert: bool,
25 pub select: Option<Document>,
26 pub sort: Document,
27 pub skip: u32,
28 pub limit: u32,
29 pub visible_fields: Vec<String>,
30}
31#[derive(Debug, Clone, Serialize)]
32pub struct Model<'a, M>
33where
34 M: Boot,
35{
36 inner: Box<M>,
37 #[serde(skip_serializing)]
38 req: Option<M::Req>,
39 #[serde(skip)]
40 db: Database,
41 #[serde(skip)]
42 collection_name: &'a str,
43 #[serde(skip)]
44 add_times: bool,
45 #[serde(skip)]
46 columns: HashMap<&'a str, ColumnAttr>,
47 #[serde(skip)]
48 _mongo: Mongorm,
49}
50
51impl<'a, T: 'a + Boot> Deref for Model<'a, T> {
52 type Target = T;
53
54 fn deref(&self) -> &Self::Target {
55 &self.inner
56 }
57}
58
59impl<'a, T: 'a + Boot> DerefMut for Model<'a, T> {
60 fn deref_mut(&mut self) -> &mut Self::Target {
61 &mut self.inner
62 }
63}
64
65impl<'a, M> Model<'a, M>
66where
67 M: Boot,
68 M: Default,
69 M: Serialize,
70 M: DeserializeOwned,
71 M: Send,
72 M: Sync,
73 M: Unpin,
74{
75 pub fn new(
76 db: &Database,
77 req: Option<M::Req>,
78 collection_name: &'a str,
79 columns: &'a str,
80 add_times: bool,
81 ) -> Model<'a, M> {
82 let columns = serde_json::from_str(columns).unwrap();
83
84 let model = Model {
85 inner: Box::<M>::default(),
86 req,
87 db: db.clone(),
88 collection_name,
89 columns,
90 add_times,
91 _mongo: Default::default(),
92 };
93
94 model
95 }
96
97 pub fn add_columns(&mut self, names:Vec<&'a str>){
99 for name in names {
100 self.columns.insert(name, ColumnAttr {
101 asc: false,
102 desc: false,
103 unique: false,
104 sphere2d: false,
105 text: None,
106 hidden: false,
107 name: Some(name.to_string()),
108 });
109 }
110 }
111
112 pub fn collection_name(&self) -> &'a str {
114 self.collection_name
115 }
116
117 pub fn collection(&self) -> Collection<M> {
119 self.db.collection::<M>(self.collection_name)
120 }
121 pub fn set_collection(mut self, name: &'a str) -> Model<'a, M> {
123 self.collection_name = name;
124 self
125 }
126
127 pub async fn register_indexes(&self) {
134 let coll = self.db.collection::<M>(self.collection_name);
135 let previous_indexes = coll.list_indexes().await;
136 let mut attrs = vec![];
137 for (name, attr) in &self.columns {
138 if attr.is_index() {
139 attrs.push(name)
140 }
141 }
142
143 let mut keys_to_remove = Vec::new();
144 if previous_indexes.is_ok() {
145 let foreach_future = previous_indexes.unwrap().for_each(|pr| {
146 match pr {
147 Ok(index_model) => {
148 index_model.keys.iter().for_each(|key| {
149 if key.0 != "_id" {
150 if let Some(pos) = attrs.iter().position(|k| k == &key.0) {
151 attrs.remove(pos);
153 } else if let Some(rw) = &index_model.options {
154 match rw.default_language {
156 None => keys_to_remove.push(rw.name.clone()),
157 Some(_) => match &rw.name {
158 None => keys_to_remove.push(rw.name.clone()),
159 Some(name) => {
160 if let Some(pos) =
161 attrs.iter().position(|k| k == &name)
162 {
163 attrs.remove(pos);
164 } else {
165 keys_to_remove.push(rw.name.clone())
166 }
167 }
168 },
169 }
170 }
171 }
172 });
173 }
174 Err(error) => {
175 error!("Can't unpack index model {error}");
176 }
177 }
178 futures::future::ready(())
179 });
180 foreach_future.await;
181 }
182
183 let attrs = attrs
184 .iter()
185 .map(|name| {
186 let key = name.to_string();
187 let attr = &self.columns.get(key.as_str()).unwrap();
188
189 if let Some(lang) = &attr.text {
190 let opts = IndexOptions::builder()
191 .unique(attr.unique)
192 .name(key.clone())
193 .default_language(lang.to_string())
194 .build();
195 IndexModel::builder()
196 .keys(doc! {
197 key : "text"
198 })
199 .options(opts)
200 .build()
201 } else if attr.sphere2d {
202 let opts = IndexOptions::builder().unique(attr.unique).build();
203 IndexModel::builder()
204 .keys(doc! { key: "2dsphere" })
205 .options(opts)
206 .build()
207 } else {
208 let sort = if attr.desc { -1 } else { 1 };
209 let opts = IndexOptions::builder().unique(attr.unique).build();
210
211 IndexModel::builder()
212 .keys(doc! {
213 key : sort
214 })
215 .options(opts)
216 .build()
217 }
218 })
219 .collect::<Vec<IndexModel>>();
220
221 for name in keys_to_remove {
222 let key = name.as_ref().unwrap();
223 let _ = coll.drop_index(key).await;
224 }
225 if !attrs.is_empty() {
226 let result = coll.create_indexes(attrs).await;
227 if let Err(error) = result {
228 error!("Can't create indexes : {:?}", error);
229 }
230 }
231 }
232
233 pub fn reset(mut self) -> Model<'a, M> {
235 self._mongo= Default::default();
236 self
237 }
238 pub fn r#where(mut self, data: Document) -> Model<'a, M> {
240 self._mongo.r#where.push(data);
241 self
242 }
243 pub fn skip(mut self, count: u32) -> Model<'a, M> {
245 self._mongo.skip = count;
246 self
247 }
248 pub async fn distinct(&self, name: &str) -> Result<Vec<Bson>> {
250 let whr = &self._mongo.r#where;
251 let filter = if whr.is_empty() {
252 doc! {}
253 } else {
254 doc! {"$and":whr}
255 };
256 let collection = self.db.collection::<Document>(self.collection_name);
257 collection.distinct(&name, filter).await
258 }
259 pub fn limit(mut self, count: u32) -> Model<'a, M> {
261 self._mongo.limit = count;
262 self
263 }
264 pub fn sort(mut self, data: Document) -> Model<'a, M> {
266 self._mongo.sort = data;
267 self
268 }
269 pub fn all(mut self) -> Model<'a, M> {
271 self._mongo.all = true;
272 self
273 }
274 pub fn select(mut self, data: Document) -> Model<'a, M> {
276 self._mongo.select = Some(data);
277 self
278 }
279 pub fn visible(mut self, data: Vec<&str>) -> Model<'a, M> {
281 self._mongo.visible_fields = data.iter().map(|a| a.to_string()).collect();
282 self
283 }
284 pub fn upsert(mut self) -> Model<'a, M> {
286 self._mongo.upsert = true;
287 self
288 }
289
290 pub async fn create(&self, session: Option<&mut ClientSession>) -> Result<InsertOneResult> {
298 let mut data = self.inner_to_doc()?;
299 if data.get_object_id("_id").is_err() {
300 data.remove("_id");
301 }
302 if self.add_times {
303 if !data.contains_key("updated_at") || !data.get_datetime("updated_at").is_ok() {
304 data.insert("updated_at", DateTime::now());
305 }
306 if !data.contains_key("created_at") || !data.get_datetime("created_at").is_ok() {
307 data.insert("created_at", DateTime::now());
308 }
309 }
310 match session {
311 None => {
312 let r = self
313 .db
314 .collection(self.collection_name)
315 .insert_one(data.clone())
316 .await;
317 if r.is_ok() {
318 self.finish(&self.req, "create", Document::new(), data, None)
319 .await;
320 }
321 r
322 }
323 Some(s) => {
324 let r = self
325 .db
326 .collection(self.collection_name)
327 .insert_one(data.clone())
328 .session(&mut *s)
329 .await;
330 if r.is_ok() {
331 self.finish(&self.req, "create", Document::new(), data, Some(s))
332 .await;
333 }
334 r
335 }
336 }
337 }
338
339 pub async fn create_doc(
341 &self,
342 data: Document,
343 session: Option<&mut ClientSession>,
344 ) -> Result<InsertOneResult> {
345 let mut data = data;
346
347 if self.add_times {
348 if !data.contains_key("updated_at") || !data.get_datetime("updated_at").is_ok() {
349 data.insert("updated_at", DateTime::now());
350 }
351 if !data.contains_key("created_at") || !data.get_datetime("created_at").is_ok() {
352 data.insert("created_at", DateTime::now());
353 }
354 }
355 match session {
356 None => {
357 let r = self
358 .db
359 .collection(self.collection_name)
360 .insert_one(data.clone())
361 .await;
362 if r.is_ok() {
363 self.finish(&self.req, "create", Document::new(), data, None)
364 .await;
365 }
366 r
367 }
368 Some(s) => {
369 let r = self
370 .db
371 .collection(self.collection_name)
372 .insert_one(data.clone())
373 .session(&mut *s)
374 .await;
375 if r.is_ok() {
376 self.finish(&self.req, "create", Document::new(), data, Some(s))
377 .await;
378 }
379 r
380 }
381 }
382 }
383
384 pub async fn update(
395 &self,
396 data: Document,
397 session: Option<&mut ClientSession>,
398 ) -> Result<Document> {
399 let mut data = data;
400 let mut is_opt = false;
401 for (a, _) in data.iter() {
402 if a.starts_with("$") {
403 is_opt = true;
404 }
405 }
406
407 self.rename_field(&mut data, is_opt);
408 if !is_opt {
409 data = doc! {"$set":data};
410 }
411 if self.add_times {
412 if !data.contains_key("$set") {
413 data.insert("$set", doc! {});
414 }
415 let set = data.get_mut("$set").unwrap().as_document_mut().unwrap();
416 set.insert("updated_at", DateTime::now());
417 }
418
419 if self._mongo.upsert {
420 if self.add_times {
421 if !data.contains_key("$setOnInsert") {
422 data.insert("$setOnInsert", doc! {});
423 }
424 let set = data
425 .get_mut("$setOnInsert")
426 .unwrap()
427 .as_document_mut()
428 .unwrap();
429 set.insert("created_at", DateTime::now());
430 }
431 }
432 let whr = &self._mongo.r#where;
433 if whr.is_empty() {
434 return Err(Error::from(std::io::Error::new(
435 std::io::ErrorKind::InvalidInput,
436 "where not set.",
437 )));
438 }
439 let filter = doc! {"$and":whr};
440
441 match session {
442 None => {
443 let r = self.db.collection::<Document>(self.collection_name);
444
445 if self._mongo.all {
446 let r = r
447 .update_many(filter, data.clone())
448 .upsert(self._mongo.upsert)
449 .await;
450 match r {
451 Ok(old) => {
452 let res = doc! {"modified_count":old.modified_count.to_string()};
453 self.finish(&self.req, "update_many", res.clone(), data, None)
454 .await;
455 Ok(res)
456 }
457 Err(e) => Err(e),
458 }
459 } else {
460 let r = r
461 .find_one_and_update(filter, data.clone())
462 .upsert(self._mongo.upsert)
463 .sort(self._mongo.sort.clone())
464 .await;
465 match r {
466 Ok(old) => {
467 let res = old.unwrap_or(Document::new());
468 self.finish(&self.req, "update", res.clone(), data, None)
469 .await;
470 Ok(res)
471 }
472 Err(e) => Err(e),
473 }
474 }
475 }
476 Some(s) => {
477 let r = self.db.collection::<Document>(self.collection_name);
478 if self._mongo.all {
479 let r = r
480 .update_many(filter, data.clone())
481 .upsert(self._mongo.upsert)
482 .session(&mut *s)
483 .await;
484 match r {
485 Ok(old) => {
486 let res = doc! {"modified_count":old.modified_count.to_string()};
487 self.finish(&self.req, "update_many", res.clone(), data, Some(s))
488 .await;
489 Ok(res)
490 }
491 Err(e) => Err(e),
492 }
493 } else {
494 let r = r
495 .find_one_and_update(filter, data.clone())
496 .upsert(self._mongo.upsert)
497 .sort(self._mongo.sort.clone())
498 .session(&mut *s)
499 .await;
500 match r {
501 Ok(old) => {
502 let res = old.unwrap_or(Document::new());
503 self.finish(&self.req, "update", res.clone(), data, Some(s))
504 .await;
505 Ok(res)
506 }
507 Err(e) => Err(e),
508 }
509 }
510 }
511 }
512 }
513
514 pub async fn delete(&self, session: Option<&mut ClientSession>) -> Result<Document> {
522 let whr = &self._mongo.r#where;
523 if whr.is_empty() {
524 return Err(Error::from(std::io::Error::new(
525 std::io::ErrorKind::InvalidInput,
526 "where not set.",
527 )));
528 }
529 let filter = doc! {"$and":whr};
530
531 match session {
532 None => {
533 let r = self.db.collection::<Document>(self.collection_name);
534 if self._mongo.all {
535 let r = r.delete_many(filter).await;
536 match r {
537 Ok(old) => {
538 let res = doc! {"deleted_count":old.deleted_count.to_string()};
539 self.finish(&self.req, "delete_many", res.clone(), doc! {}, None)
540 .await;
541 Ok(res)
542 }
543 Err(e) => Err(e),
544 }
545 } else {
546 let r = r
547 .find_one_and_delete(filter)
548 .sort(self._mongo.sort.clone())
549 .await;
550 match r {
551 Ok(old) => {
552 let res = old.unwrap_or(Document::new());
553 self.finish(&self.req, "delete", res.clone(), doc! {}, None)
554 .await;
555 Ok(res)
556 }
557 Err(e) => Err(e),
558 }
559 }
560 }
561 Some(s) => {
562 let r = self.db.collection::<Document>(self.collection_name);
563 if self._mongo.all {
564 let r = r.delete_many(filter).session(&mut *s).await;
565 match r {
566 Ok(old) => {
567 let res = doc! {"deleted_count":old.deleted_count.to_string()};
568 self.finish(&self.req, "delete_many", res.clone(), doc! {}, Some(s))
569 .await;
570 Ok(res)
571 }
572 Err(e) => Err(e),
573 }
574 } else {
575 let r = r
576 .find_one_and_delete(filter)
577 .sort(self._mongo.sort.clone())
578 .session(&mut *s)
579 .await;
580 match r {
581 Ok(old) => {
582 let res = old.unwrap_or(Document::new());
583 self.finish(&self.req, "delete", res.clone(), doc! {}, Some(s))
584 .await;
585 Ok(res)
586 }
587 Err(e) => Err(e),
588 }
589 }
590 }
591 }
592 }
593
594 pub async fn get(&self, session: Option<&mut ClientSession>) -> Result<Vec<M>> {
603 let whr = &self._mongo.r#where;
604 let filter = if whr.is_empty() {
605 doc! {}
606 } else {
607 doc! {"$and":whr}
608 };
609 let hidden_fields = self.hidden_fields();
610 let collection = self.db.collection::<Document>(self.collection_name);
611 let mut find = collection.find(filter);
612 find = find.sort(self._mongo.sort.clone());
613
614 if self._mongo.skip > 0 {
615 find = find.skip(self._mongo.skip as u64);
616 }
617 if self._mongo.limit > 0 {
618 find = find.limit(self._mongo.limit as i64);
619 }
620 if let Some(select) = self._mongo.select.clone() {
621 find = find.projection(select);
622 }
623
624 let mut r = vec![];
625 match session {
626 None => {
627 let mut cursor = find.await?;
628 while let Some(d) = cursor.next().await {
629 r.push(self.clear(self.cast(d?,&self.req), &hidden_fields))
630 }
631 Ok(r)
632 }
633 Some(s) => {
634 let mut cursor = find.session(&mut *s).await?;
635 while let Some(d) = cursor.next(&mut *s).await {
636 r.push(self.clear(self.cast(d?,&self.req), &hidden_fields))
637 }
638 Ok(r)
639 }
640 }
641 }
642
643 pub async fn first(&mut self, session: Option<&mut ClientSession>) -> Result<Option<M>> {
645 self._mongo.limit = 1;
646 let r = self.get(session).await?;
647 for item in r {
648 return Ok(Some(item));
649 }
650 Ok(None)
651 }
652
653 pub async fn aggregate(
655 &mut self,
656 pipeline: impl IntoIterator<Item = Document>,
657 session: Option<&mut ClientSession>,
658 ) -> Result<Vec<M>> {
659 let collection = self.db.collection::<Document>(self.collection_name);
660 let res = collection.aggregate(pipeline);
661 let hidden_fields = self.hidden_fields();
662 let mut r = vec![];
663 match session {
664 None => {
665 let mut cursor = res.await?;
666 while let Some(d) = cursor.next().await {
667 r.push(self.clear(self.cast(d?,&self.req), &hidden_fields))
668 }
669 Ok(r)
670 }
671 Some(s) => {
672 let mut cursor = res.session(&mut *s).await?;
673 while let Some(d) = cursor.next(&mut *s).await {
674 r.push(self.clear(self.cast(d?,&self.req), &hidden_fields))
675 }
676 Ok(r)
677 }
678 }
679 }
680
681 pub async fn get_doc(&self, session: Option<&mut ClientSession>) -> Result<Vec<Document>> {
683 let whr = &self._mongo.r#where;
684 let filter = if whr.is_empty() {
685 doc! {}
686 } else {
687 doc! {"$and":whr}
688 };
689 let collection = self.db.collection::<Document>(self.collection_name);
690 let mut find = collection.find(filter);
691 find = find.sort(self._mongo.sort.clone());
692
693 if self._mongo.skip > 0 {
694 find = find.skip(self._mongo.skip as u64);
695 }
696 if self._mongo.limit > 0 {
697 find = find.limit(self._mongo.limit as i64);
698 }
699 if let Some(select) = self._mongo.select.clone() {
700 find = find.projection(select);
701 }
702
703 let mut r = vec![];
704 match session {
705 None => {
706 let mut cursor = find.await?;
707 while let Some(d) = cursor.next().await {
708 r.push(self.cast(d?,&self.req))
709 }
710 Ok(r)
711 }
712 Some(s) => {
713 let mut cursor = find.session(&mut *s).await?;
714 while let Some(d) = cursor.next(&mut *s).await {
715 r.push(self.cast(d?,&self.req))
716 }
717 Ok(r)
718 }
719 }
720 }
721
722 pub async fn first_doc(
724 &mut self,
725 session: Option<&mut ClientSession>,
726 ) -> Result<Option<Document>> {
727 self._mongo.limit = 1;
728 let r = self.get_doc(session).await?;
729 for item in r {
730 return Ok(Some(item));
731 }
732 Ok(None)
733 }
734
735 pub async fn aggregate_doc(
737 &mut self,
738 pipeline: impl IntoIterator<Item = Document>,
739 session: Option<&mut ClientSession>,
740 ) -> Result<Vec<Document>> {
741 let collection = self.db.collection::<Document>(self.collection_name);
742 let res = collection.aggregate(pipeline);
743 let mut r = vec![];
744 match session {
745 None => {
746 let mut cursor = res.await?;
747 while let Some(d) = cursor.next().await {
748 r.push(self.cast(d?,&self.req))
749 }
750 Ok(r)
751 }
752 Some(s) => {
753 let mut cursor = res.session(&mut *s).await?;
754 while let Some(d) = cursor.next(&mut *s).await {
755 r.push(self.cast(d?,&self.req))
756 }
757 Ok(r)
758 }
759 }
760 }
761
762 fn hidden_fields(&self) -> Vec<String> {
763 let mut r = vec![];
764 for (name, attr) in &self.columns {
765 if attr.hidden && !self._mongo.visible_fields.contains(&name.to_string()) {
766 r.push(name.to_string())
767 }
768 }
769 r
770 }
771 fn clear(&self, data: Document, hidden_fields: &Vec<String>) -> M {
772 let data = data;
773 let mut default = to_document(&M::default()).unwrap();
774 for (name, attr) in &self.columns {
775 if hidden_fields.contains(&name.to_string()) {
776 continue;
777 }
778 let rename = match attr.name.clone() {
779 None => name.to_string(),
780 Some(a) => a,
781 };
782 if data.contains_key(&rename) {
783 default.insert(name.to_string(), data.get(&rename).unwrap());
784 }
785 }
786
787 bson::from_document(default).unwrap()
788 }
789}
790
791impl<'a, M> Model<'a, M>
792where
793 M: Boot,
794 M: Default,
795 M: Serialize,
796{
797 pub fn take_inner(&mut self) -> M {
800 std::mem::take(&mut *self.inner)
801 }
802
803 pub fn inner_ref(&self) -> &M {
804 self.inner.as_ref()
805 }
806
807 pub fn inner_mut(&mut self) -> &mut M {
808 self.inner.as_mut()
809 }
810
811 pub fn inner_to_doc(&self) -> MongodbResult<Document> {
812 let mut re = to_document(&self.inner)?;
813 self.rename_field(&mut re, false);
814 Ok(re)
815 }
816
817 fn rename_field(&self, doc: &mut Document, is_opt: bool) {
818 for (name, attr) in &self.columns {
819 if let Some(a) = &attr.name {
820 if is_opt {
821 for (_, d) in doc.iter_mut() {
822 let i = d.as_document_mut().unwrap();
823 match i.get(name) {
824 None => {}
825 Some(b) => {
826 i.insert(a.clone(), b.clone());
827 i.remove(name);
828 }
829 }
830 }
831 } else {
832 match doc.get(name) {
833 None => {}
834 Some(b) => {
835 doc.insert(a.clone(), b.clone());
836 doc.remove(name);
837 }
838 }
839 }
840 }
841 }
842 }
843
844 pub fn fill(mut self, inner: M) -> Model<'a, M> {
845 *self.inner = inner;
846 self
847 }
848}