cyaxon_authifier/database/
mongo.rs

1use bson::{to_document, DateTime, Document};
2use chrono::{Duration, Utc};
3use futures::stream::TryStreamExt;
4use mongodb::options::{Collation, CollationStrength, FindOneOptions, UpdateOptions};
5use std::{ops::Deref, str::FromStr};
6use ulid::Ulid;
7
8use crate::{
9    models::{Account, Invite, MFATicket, Session},
10    Error, Result, Success,
11};
12
13use super::{definition::AbstractDatabase, Migration};
14
15#[derive(Clone)]
16pub struct MongoDb(pub mongodb::Database);
17
18impl Deref for MongoDb {
19    type Target = mongodb::Database;
20
21    fn deref(&self) -> &Self::Target {
22        &self.0
23    }
24}
25
26#[async_trait]
27impl AbstractDatabase for MongoDb {
28    /// Run a database migration
29    async fn run_migration(&self, migration: Migration) -> Success {
30        match migration {
31            #[cfg(debug_assertions)]
32            Migration::WipeAll => {
33                // Drop the entire database
34                self.drop(None).await.unwrap();
35            }
36            Migration::M2022_06_03EnsureUpToSpec => {
37                if self
38                    .collection::<Document>("mfa_tickets")
39                    .list_index_names()
40                    .await
41                    .unwrap_or_default()
42                    .contains(&"token".to_owned())
43                {
44                    return Ok(());
45                }
46
47                // Make sure all collections exist
48                let list = self.list_collection_names(None).await.unwrap();
49                let collections = ["accounts", "sessions", "invites", "mfa_tickets"];
50
51                for name in collections {
52                    if !list.contains(&name.to_string()) {
53                        self.create_collection(name, None).await.unwrap();
54                    }
55                }
56
57                // Setup index for `accounts`
58                let col = self.collection::<Document>("accounts");
59                col.drop_indexes(None).await.unwrap();
60
61                self.run_command(
62                    doc! {
63                        "createIndexes": "accounts",
64                        "indexes": [
65                            {
66                                "key": {
67                                    "email": 1
68                                },
69                                "name": "email",
70                                "unique": true,
71                                "collation": {
72                                    "locale": "en",
73                                    "strength": 2
74                                }
75                            },
76                            {
77                                "key": {
78                                    "email_normalized": 1
79                                },
80                                "name": "email_normalized",
81                                "unique": true,
82                                "collation": {
83                                    "locale": "en",
84                                    "strength": 2
85                                }
86                            },
87                            {
88                                "key": {
89                                    "verification.token": 1
90                                },
91                                "name": "email_verification"
92                            },
93                            {
94                                "key": {
95                                    "password_reset.token": 1
96                                },
97                                "name": "password_reset"
98                            }
99                        ]
100                    },
101                    None,
102                )
103                .await
104                .unwrap();
105
106                // Setup index for `sessions`
107                let col = self.collection::<Document>("sessions");
108                col.drop_indexes(None).await.unwrap();
109
110                self.run_command(
111                    doc! {
112                        "createIndexes": "sessions",
113                        "indexes": [
114                            {
115                                "key": {
116                                    "token": 1
117                                },
118                                "name": "token",
119                                "unique": true
120                            },
121                            {
122                                "key": {
123                                    "user_id": 1
124                                },
125                                "name": "user_id"
126                            }
127                        ]
128                    },
129                    None,
130                )
131                .await
132                .unwrap();
133
134                // Setup index for `mfa_tickets`
135                let col = self.collection::<Document>("mfa_tickets");
136                col.drop_indexes(None).await.unwrap();
137
138                self.run_command(
139                    doc! {
140                        "createIndexes": "mfa_tickets",
141                        "indexes": [
142                            {
143                                "key": {
144                                    "token": 1
145                                },
146                                "name": "token",
147                                "unique": true
148                            }
149                        ]
150                    },
151                    None,
152                )
153                .await
154                .unwrap();
155            }
156            Migration::M2022_06_09AddIndexForDeletion => {
157                if self
158                    .collection::<Document>("accounts")
159                    .list_index_names()
160                    .await
161                    .expect("list of index names")
162                    .contains(&"account_deletion".to_owned())
163                {
164                    return Ok(());
165                }
166
167                self.run_command(
168                    doc! {
169                        "createIndexes": "accounts",
170                        "indexes": [
171                            {
172                                "key": {
173                                    "deletion.token": 1
174                                },
175                                "name": "account_deletion"
176                            }
177                        ]
178                    },
179                    None,
180                )
181                .await
182                .unwrap();
183            }
184        }
185
186        Ok(())
187    }
188
189    /// Find account by id
190    async fn find_account(&self, id: &str) -> Result<Account> {
191        self.collection("accounts")
192            .find_one(
193                doc! {
194                    "_id": id
195                },
196                None,
197            )
198            .await
199            .map_err(|_| Error::DatabaseError {
200                operation: "find_one",
201                with: "account",
202            })?
203            .ok_or(Error::UnknownUser)
204    }
205
206    /// Find account by normalized email
207    async fn find_account_by_normalized_email(
208        &self,
209        normalized_email: &str,
210    ) -> Result<Option<Account>> {
211        self.collection("accounts")
212            .find_one(
213                doc! {
214                    "email_normalized": normalized_email
215                },
216                FindOneOptions::builder()
217                    .collation(
218                        Collation::builder()
219                            .locale("en")
220                            .strength(CollationStrength::Secondary)
221                            .build(),
222                    )
223                    .build(),
224            )
225            .await
226            .map_err(|_| Error::DatabaseError {
227                operation: "find_one",
228                with: "account",
229            })
230    }
231
232    /// Find account with active pending email verification
233    async fn find_account_with_email_verification(&self, token: &str) -> Result<Account> {
234        self.collection("accounts")
235            .find_one(
236                doc! {
237                    "verification.token": token,
238                    "verification.expiry": {
239                        "$gte": DateTime::now().to_rfc3339_string()
240                    }
241                },
242                None,
243            )
244            .await
245            .map_err(|_| Error::DatabaseError {
246                operation: "find_one",
247                with: "account",
248            })?
249            .ok_or(Error::InvalidToken)
250    }
251
252    /// Find account with active password reset
253    async fn find_account_with_password_reset(&self, token: &str) -> Result<Account> {
254        self.collection("accounts")
255            .find_one(
256                doc! {
257                    "password_reset.token": token,
258                    "password_reset.expiry": {
259                        "$gte": DateTime::now().to_rfc3339_string()
260                    }
261                },
262                None,
263            )
264            .await
265            .map_err(|_| Error::DatabaseError {
266                operation: "find_one",
267                with: "account",
268            })?
269            .ok_or(Error::InvalidToken)
270    }
271
272    /// Find account with active deletion token
273    async fn find_account_with_deletion_token(&self, token: &str) -> Result<Account> {
274        self.collection("accounts")
275            .find_one(
276                doc! {
277                    "deletion.token": token,
278                    "deletion.expiry": {
279                        "$gte": DateTime::now().to_rfc3339_string()
280                    }
281                },
282                None,
283            )
284            .await
285            .map_err(|_| Error::DatabaseError {
286                operation: "find_one",
287                with: "account",
288            })?
289            .ok_or(Error::InvalidToken)
290    }
291
292    /// Find invite by id
293    async fn find_invite(&self, id: &str) -> Result<Invite> {
294        self.collection("invites")
295            .find_one(
296                doc! {
297                    "_id": id
298                },
299                None,
300            )
301            .await
302            .map_err(|_| Error::DatabaseError {
303                operation: "find_one",
304                with: "invite",
305            })?
306            .ok_or(Error::InvalidInvite)
307    }
308
309    /// Find session by id
310    async fn find_session(&self, id: &str) -> Result<Session> {
311        self.collection("sessions")
312            .find_one(
313                doc! {
314                    "_id": id
315                },
316                None,
317            )
318            .await
319            .map_err(|_| Error::DatabaseError {
320                operation: "find_one",
321                with: "session",
322            })?
323            .ok_or(Error::UnknownUser)
324    }
325
326    /// Find sessions by user id
327    async fn find_sessions(&self, user_id: &str) -> Result<Vec<Session>> {
328        self.collection::<Session>("sessions")
329            .find(
330                doc! {
331                    "user_id": user_id
332                },
333                None,
334            )
335            .await
336            .map_err(|_| Error::DatabaseError {
337                operation: "find",
338                with: "sessions",
339            })?
340            .try_collect()
341            .await
342            .map_err(|_| Error::DatabaseError {
343                operation: "collect",
344                with: "sessions",
345            })
346    }
347
348    /// Find sessions by user ids
349    async fn find_sessions_with_subscription(&self, user_ids: &[String]) -> Result<Vec<Session>> {
350        self.collection::<Session>("sessions")
351            .find(
352                doc! {
353                    "user_id": {
354                        "$in": user_ids
355                    },
356                    "subscription": {
357                        "$exists": true
358                    }
359                },
360                None,
361            )
362            .await
363            .map_err(|_| Error::DatabaseError {
364                operation: "find",
365                with: "sessions",
366            })?
367            .try_collect()
368            .await
369            .map_err(|_| Error::DatabaseError {
370                operation: "collect",
371                with: "sessions",
372            })
373    }
374
375    /// Find session by token
376    async fn find_session_by_token(&self, token: &str) -> Result<Option<Session>> {
377        self.collection("sessions")
378            .find_one(
379                doc! {
380                    "token": token
381                },
382                None,
383            )
384            .await
385            .map_err(|_| Error::DatabaseError {
386                operation: "find_one",
387                with: "session",
388            })?
389            .ok_or(Error::UnknownUser)
390    }
391
392    /// Find ticket by token
393    /// <br>
394    /// Ticket is only valid for 1 minute
395    async fn find_ticket_by_token(&self, token: &str) -> Result<Option<MFATicket>> {
396        let ticket: MFATicket = self
397            .collection("mfa_tickets")
398            .find_one(
399                doc! {
400                    "token": token
401                },
402                None,
403            )
404            .await
405            .map_err(|_| Error::DatabaseError {
406                operation: "find_one",
407                with: "mfa_ticket",
408            })?
409            .ok_or(Error::InvalidToken)?;
410
411        if let Ok(ulid) = Ulid::from_str(&ticket.id) {
412            if (ulid.datetime() + Duration::minutes(1)) > Utc::now() {
413                Ok(Some(ticket))
414            } else {
415                Err(Error::InvalidToken)
416            }
417        } else {
418            Err(Error::InvalidToken)
419        }
420    }
421
422    // Save account
423    async fn save_account(&self, account: &Account) -> Success {
424        self.collection::<Account>("accounts")
425            .update_one(
426                doc! {
427                    "_id": &account.id
428                },
429                doc! {
430                    "$set": to_document(account).map_err(|_| Error::DatabaseError {
431                        operation: "to_document",
432                        with: "account",
433                    })?
434                },
435                UpdateOptions::builder().upsert(true).build(),
436            )
437            .await
438            .map_err(|_| Error::DatabaseError {
439                operation: "upsert_one",
440                with: "account",
441            })
442            .map(|_| ())
443    }
444
445    /// Save session
446    async fn save_session(&self, session: &Session) -> Success {
447        self.collection::<Session>("sessions")
448            .update_one(
449                doc! {
450                    "_id": &session.id
451                },
452                doc! {
453                    "$set": to_document(session).map_err(|_| Error::DatabaseError {
454                        operation: "to_document",
455                        with: "session",
456                    })?,
457                },
458                UpdateOptions::builder().upsert(true).build(),
459            )
460            .await
461            .map_err(|_| Error::DatabaseError {
462                operation: "upsert_one",
463                with: "session",
464            })
465            .map(|_| ())
466    }
467
468    /// Save invite
469    async fn save_invite(&self, invite: &Invite) -> Success {
470        self.collection::<Invite>("invites")
471            .update_one(
472                doc! {
473                    "_id": &invite.id
474                },
475                doc! {
476                    "$set": to_document(invite).map_err(|_| Error::DatabaseError {
477                        operation: "to_document",
478                        with: "invite",
479                    })?,
480                },
481                UpdateOptions::builder().upsert(true).build(),
482            )
483            .await
484            .map_err(|_| Error::DatabaseError {
485                operation: "upsert_one",
486                with: "invite",
487            })
488            .map(|_| ())
489    }
490
491    /// Save ticket
492    async fn save_ticket(&self, ticket: &MFATicket) -> Success {
493        self.collection::<MFATicket>("mfa_tickets")
494            .update_one(
495                doc! {
496                    "_id": &ticket.id
497                },
498                doc! {
499                    "$set": to_document(ticket).map_err(|_| Error::DatabaseError {
500                        operation: "to_document",
501                        with: "ticket",
502                    })?,
503                },
504                UpdateOptions::builder().upsert(true).build(),
505            )
506            .await
507            .map_err(|_| Error::DatabaseError {
508                operation: "upsert_one",
509                with: "ticket",
510            })
511            .map(|_| ())
512    }
513
514    /// Delete session
515    async fn delete_session(&self, id: &str) -> Success {
516        self.collection::<Session>("sessions")
517            .delete_one(
518                doc! {
519                    "_id": id
520                },
521                None,
522            )
523            .await
524            .map_err(|_| Error::DatabaseError {
525                operation: "delete_one",
526                with: "session",
527            })
528            .map(|_| ())
529    }
530
531    /// Delete session
532    async fn delete_all_sessions(&self, user_id: &str, ignore: Option<String>) -> Success {
533        let mut query = doc! {
534            "user_id": user_id
535        };
536
537        if let Some(id) = ignore {
538            query.insert(
539                "_id",
540                doc! {
541                    "$ne": id
542                },
543            );
544        }
545
546        self.collection::<Session>("sessions")
547            .delete_many(query, None)
548            .await
549            .map_err(|_| Error::DatabaseError {
550                operation: "delete_one",
551                with: "session",
552            })
553            .map(|_| ())
554    }
555
556    /// Delete ticket
557    async fn delete_ticket(&self, id: &str) -> Success {
558        self.collection::<MFATicket>("mfa_tickets")
559            .delete_one(
560                doc! {
561                    "_id": id
562                },
563                None,
564            )
565            .await
566            .map_err(|_| Error::DatabaseError {
567                operation: "delete_one",
568                with: "mfa_ticket",
569            })
570            .map(|_| ())
571    }
572}