cyaxon_authifier/database/
mongo.rs1use bson::{to_document, DateTime, Document};
2use chrono::{Duration, Utc};
3use futures::stream::TryStreamExt;
4use mongodb::options::{Collation, CollationStrength, FindOneOptions, UpdateOptions};
5use std::{ops::Deref, str::FromStr};
6use ulid::Ulid;
7
8use crate::{
9 models::{Account, Invite, MFATicket, Session},
10 Error, Result, Success,
11};
12
13use super::{definition::AbstractDatabase, Migration};
14
15#[derive(Clone)]
16pub struct MongoDb(pub mongodb::Database);
17
18impl Deref for MongoDb {
19 type Target = mongodb::Database;
20
21 fn deref(&self) -> &Self::Target {
22 &self.0
23 }
24}
25
26#[async_trait]
27impl AbstractDatabase for MongoDb {
28 async fn run_migration(&self, migration: Migration) -> Success {
30 match migration {
31 #[cfg(debug_assertions)]
32 Migration::WipeAll => {
33 self.drop(None).await.unwrap();
35 }
36 Migration::M2022_06_03EnsureUpToSpec => {
37 if self
38 .collection::<Document>("mfa_tickets")
39 .list_index_names()
40 .await
41 .unwrap_or_default()
42 .contains(&"token".to_owned())
43 {
44 return Ok(());
45 }
46
47 let list = self.list_collection_names(None).await.unwrap();
49 let collections = ["accounts", "sessions", "invites", "mfa_tickets"];
50
51 for name in collections {
52 if !list.contains(&name.to_string()) {
53 self.create_collection(name, None).await.unwrap();
54 }
55 }
56
57 let col = self.collection::<Document>("accounts");
59 col.drop_indexes(None).await.unwrap();
60
61 self.run_command(
62 doc! {
63 "createIndexes": "accounts",
64 "indexes": [
65 {
66 "key": {
67 "email": 1
68 },
69 "name": "email",
70 "unique": true,
71 "collation": {
72 "locale": "en",
73 "strength": 2
74 }
75 },
76 {
77 "key": {
78 "email_normalized": 1
79 },
80 "name": "email_normalized",
81 "unique": true,
82 "collation": {
83 "locale": "en",
84 "strength": 2
85 }
86 },
87 {
88 "key": {
89 "verification.token": 1
90 },
91 "name": "email_verification"
92 },
93 {
94 "key": {
95 "password_reset.token": 1
96 },
97 "name": "password_reset"
98 }
99 ]
100 },
101 None,
102 )
103 .await
104 .unwrap();
105
106 let col = self.collection::<Document>("sessions");
108 col.drop_indexes(None).await.unwrap();
109
110 self.run_command(
111 doc! {
112 "createIndexes": "sessions",
113 "indexes": [
114 {
115 "key": {
116 "token": 1
117 },
118 "name": "token",
119 "unique": true
120 },
121 {
122 "key": {
123 "user_id": 1
124 },
125 "name": "user_id"
126 }
127 ]
128 },
129 None,
130 )
131 .await
132 .unwrap();
133
134 let col = self.collection::<Document>("mfa_tickets");
136 col.drop_indexes(None).await.unwrap();
137
138 self.run_command(
139 doc! {
140 "createIndexes": "mfa_tickets",
141 "indexes": [
142 {
143 "key": {
144 "token": 1
145 },
146 "name": "token",
147 "unique": true
148 }
149 ]
150 },
151 None,
152 )
153 .await
154 .unwrap();
155 }
156 Migration::M2022_06_09AddIndexForDeletion => {
157 if self
158 .collection::<Document>("accounts")
159 .list_index_names()
160 .await
161 .expect("list of index names")
162 .contains(&"account_deletion".to_owned())
163 {
164 return Ok(());
165 }
166
167 self.run_command(
168 doc! {
169 "createIndexes": "accounts",
170 "indexes": [
171 {
172 "key": {
173 "deletion.token": 1
174 },
175 "name": "account_deletion"
176 }
177 ]
178 },
179 None,
180 )
181 .await
182 .unwrap();
183 }
184 }
185
186 Ok(())
187 }
188
189 async fn find_account(&self, id: &str) -> Result<Account> {
191 self.collection("accounts")
192 .find_one(
193 doc! {
194 "_id": id
195 },
196 None,
197 )
198 .await
199 .map_err(|_| Error::DatabaseError {
200 operation: "find_one",
201 with: "account",
202 })?
203 .ok_or(Error::UnknownUser)
204 }
205
206 async fn find_account_by_normalized_email(
208 &self,
209 normalized_email: &str,
210 ) -> Result<Option<Account>> {
211 self.collection("accounts")
212 .find_one(
213 doc! {
214 "email_normalized": normalized_email
215 },
216 FindOneOptions::builder()
217 .collation(
218 Collation::builder()
219 .locale("en")
220 .strength(CollationStrength::Secondary)
221 .build(),
222 )
223 .build(),
224 )
225 .await
226 .map_err(|_| Error::DatabaseError {
227 operation: "find_one",
228 with: "account",
229 })
230 }
231
232 async fn find_account_with_email_verification(&self, token: &str) -> Result<Account> {
234 self.collection("accounts")
235 .find_one(
236 doc! {
237 "verification.token": token,
238 "verification.expiry": {
239 "$gte": DateTime::now().to_rfc3339_string()
240 }
241 },
242 None,
243 )
244 .await
245 .map_err(|_| Error::DatabaseError {
246 operation: "find_one",
247 with: "account",
248 })?
249 .ok_or(Error::InvalidToken)
250 }
251
252 async fn find_account_with_password_reset(&self, token: &str) -> Result<Account> {
254 self.collection("accounts")
255 .find_one(
256 doc! {
257 "password_reset.token": token,
258 "password_reset.expiry": {
259 "$gte": DateTime::now().to_rfc3339_string()
260 }
261 },
262 None,
263 )
264 .await
265 .map_err(|_| Error::DatabaseError {
266 operation: "find_one",
267 with: "account",
268 })?
269 .ok_or(Error::InvalidToken)
270 }
271
272 async fn find_account_with_deletion_token(&self, token: &str) -> Result<Account> {
274 self.collection("accounts")
275 .find_one(
276 doc! {
277 "deletion.token": token,
278 "deletion.expiry": {
279 "$gte": DateTime::now().to_rfc3339_string()
280 }
281 },
282 None,
283 )
284 .await
285 .map_err(|_| Error::DatabaseError {
286 operation: "find_one",
287 with: "account",
288 })?
289 .ok_or(Error::InvalidToken)
290 }
291
292 async fn find_invite(&self, id: &str) -> Result<Invite> {
294 self.collection("invites")
295 .find_one(
296 doc! {
297 "_id": id
298 },
299 None,
300 )
301 .await
302 .map_err(|_| Error::DatabaseError {
303 operation: "find_one",
304 with: "invite",
305 })?
306 .ok_or(Error::InvalidInvite)
307 }
308
309 async fn find_session(&self, id: &str) -> Result<Session> {
311 self.collection("sessions")
312 .find_one(
313 doc! {
314 "_id": id
315 },
316 None,
317 )
318 .await
319 .map_err(|_| Error::DatabaseError {
320 operation: "find_one",
321 with: "session",
322 })?
323 .ok_or(Error::UnknownUser)
324 }
325
326 async fn find_sessions(&self, user_id: &str) -> Result<Vec<Session>> {
328 self.collection::<Session>("sessions")
329 .find(
330 doc! {
331 "user_id": user_id
332 },
333 None,
334 )
335 .await
336 .map_err(|_| Error::DatabaseError {
337 operation: "find",
338 with: "sessions",
339 })?
340 .try_collect()
341 .await
342 .map_err(|_| Error::DatabaseError {
343 operation: "collect",
344 with: "sessions",
345 })
346 }
347
348 async fn find_sessions_with_subscription(&self, user_ids: &[String]) -> Result<Vec<Session>> {
350 self.collection::<Session>("sessions")
351 .find(
352 doc! {
353 "user_id": {
354 "$in": user_ids
355 },
356 "subscription": {
357 "$exists": true
358 }
359 },
360 None,
361 )
362 .await
363 .map_err(|_| Error::DatabaseError {
364 operation: "find",
365 with: "sessions",
366 })?
367 .try_collect()
368 .await
369 .map_err(|_| Error::DatabaseError {
370 operation: "collect",
371 with: "sessions",
372 })
373 }
374
375 async fn find_session_by_token(&self, token: &str) -> Result<Option<Session>> {
377 self.collection("sessions")
378 .find_one(
379 doc! {
380 "token": token
381 },
382 None,
383 )
384 .await
385 .map_err(|_| Error::DatabaseError {
386 operation: "find_one",
387 with: "session",
388 })?
389 .ok_or(Error::UnknownUser)
390 }
391
392 async fn find_ticket_by_token(&self, token: &str) -> Result<Option<MFATicket>> {
396 let ticket: MFATicket = self
397 .collection("mfa_tickets")
398 .find_one(
399 doc! {
400 "token": token
401 },
402 None,
403 )
404 .await
405 .map_err(|_| Error::DatabaseError {
406 operation: "find_one",
407 with: "mfa_ticket",
408 })?
409 .ok_or(Error::InvalidToken)?;
410
411 if let Ok(ulid) = Ulid::from_str(&ticket.id) {
412 if (ulid.datetime() + Duration::minutes(1)) > Utc::now() {
413 Ok(Some(ticket))
414 } else {
415 Err(Error::InvalidToken)
416 }
417 } else {
418 Err(Error::InvalidToken)
419 }
420 }
421
422 async fn save_account(&self, account: &Account) -> Success {
424 self.collection::<Account>("accounts")
425 .update_one(
426 doc! {
427 "_id": &account.id
428 },
429 doc! {
430 "$set": to_document(account).map_err(|_| Error::DatabaseError {
431 operation: "to_document",
432 with: "account",
433 })?
434 },
435 UpdateOptions::builder().upsert(true).build(),
436 )
437 .await
438 .map_err(|_| Error::DatabaseError {
439 operation: "upsert_one",
440 with: "account",
441 })
442 .map(|_| ())
443 }
444
445 async fn save_session(&self, session: &Session) -> Success {
447 self.collection::<Session>("sessions")
448 .update_one(
449 doc! {
450 "_id": &session.id
451 },
452 doc! {
453 "$set": to_document(session).map_err(|_| Error::DatabaseError {
454 operation: "to_document",
455 with: "session",
456 })?,
457 },
458 UpdateOptions::builder().upsert(true).build(),
459 )
460 .await
461 .map_err(|_| Error::DatabaseError {
462 operation: "upsert_one",
463 with: "session",
464 })
465 .map(|_| ())
466 }
467
468 async fn save_invite(&self, invite: &Invite) -> Success {
470 self.collection::<Invite>("invites")
471 .update_one(
472 doc! {
473 "_id": &invite.id
474 },
475 doc! {
476 "$set": to_document(invite).map_err(|_| Error::DatabaseError {
477 operation: "to_document",
478 with: "invite",
479 })?,
480 },
481 UpdateOptions::builder().upsert(true).build(),
482 )
483 .await
484 .map_err(|_| Error::DatabaseError {
485 operation: "upsert_one",
486 with: "invite",
487 })
488 .map(|_| ())
489 }
490
491 async fn save_ticket(&self, ticket: &MFATicket) -> Success {
493 self.collection::<MFATicket>("mfa_tickets")
494 .update_one(
495 doc! {
496 "_id": &ticket.id
497 },
498 doc! {
499 "$set": to_document(ticket).map_err(|_| Error::DatabaseError {
500 operation: "to_document",
501 with: "ticket",
502 })?,
503 },
504 UpdateOptions::builder().upsert(true).build(),
505 )
506 .await
507 .map_err(|_| Error::DatabaseError {
508 operation: "upsert_one",
509 with: "ticket",
510 })
511 .map(|_| ())
512 }
513
514 async fn delete_session(&self, id: &str) -> Success {
516 self.collection::<Session>("sessions")
517 .delete_one(
518 doc! {
519 "_id": id
520 },
521 None,
522 )
523 .await
524 .map_err(|_| Error::DatabaseError {
525 operation: "delete_one",
526 with: "session",
527 })
528 .map(|_| ())
529 }
530
531 async fn delete_all_sessions(&self, user_id: &str, ignore: Option<String>) -> Success {
533 let mut query = doc! {
534 "user_id": user_id
535 };
536
537 if let Some(id) = ignore {
538 query.insert(
539 "_id",
540 doc! {
541 "$ne": id
542 },
543 );
544 }
545
546 self.collection::<Session>("sessions")
547 .delete_many(query, None)
548 .await
549 .map_err(|_| Error::DatabaseError {
550 operation: "delete_one",
551 with: "session",
552 })
553 .map(|_| ())
554 }
555
556 async fn delete_ticket(&self, id: &str) -> Success {
558 self.collection::<MFATicket>("mfa_tickets")
559 .delete_one(
560 doc! {
561 "_id": id
562 },
563 None,
564 )
565 .await
566 .map_err(|_| Error::DatabaseError {
567 operation: "delete_one",
568 with: "mfa_ticket",
569 })
570 .map(|_| ())
571 }
572}