revolt_database/models/users/ops/
mongodb.rs

1use ::mongodb::options::{Collation, CollationStrength, FindOneOptions, FindOptions};
2use authifier::models::Session;
3use futures::StreamExt;
4use iso8601_timestamp::Timestamp;
5use revolt_result::Result;
6
7use crate::DocumentId;
8use crate::IntoDocumentPath;
9use crate::MongoDb;
10use crate::{FieldsUser, PartialUser, RelationshipStatus, User};
11
12use super::AbstractUsers;
13
14static COL: &str = "users";
15
16#[async_trait]
17impl AbstractUsers for MongoDb {
18    /// Insert a new user into the database
19    async fn insert_user(&self, user: &User) -> Result<()> {
20        query!(self, insert_one, COL, &user).map(|_| ())
21    }
22
23    /// Fetch a user from the database
24    async fn fetch_user(&self, id: &str) -> Result<User> {
25        query!(self, find_one_by_id, COL, id)?.ok_or_else(|| create_error!(NotFound))
26    }
27
28    /// Fetch a user from the database by their username
29    async fn fetch_user_by_username(&self, username: &str, discriminator: &str) -> Result<User> {
30        query!(
31            self,
32            find_one_with_options,
33            COL,
34            doc! {
35                "username": username,
36                "discriminator": discriminator
37            },
38            FindOneOptions::builder()
39                .collation(
40                    Collation::builder()
41                        .locale("en")
42                        .strength(CollationStrength::Secondary)
43                        .build(),
44                )
45                .build()
46        )?
47        .ok_or_else(|| create_error!(NotFound))
48    }
49
50    /// Fetch a session from the database by token
51    async fn fetch_session_by_token(&self, token: &str) -> Result<Session> {
52        self.col::<Session>("sessions")
53            .find_one(doc! {
54                "token": token
55            })
56            .await
57            .map_err(|_| create_database_error!("find_one", "sessions"))?
58            .ok_or_else(|| create_error!(InvalidSession))
59    }
60
61    /// Fetch multiple users by their ids
62    async fn fetch_users<'a>(&self, ids: &'a [String]) -> Result<Vec<User>> {
63        Ok(self
64            .col::<User>(COL)
65            .find(doc! {
66                "_id": {
67                    "$in": ids
68                }
69            })
70            .await
71            .map_err(|_| create_database_error!("find", COL))?
72            .filter_map(|s| async {
73                if cfg!(debug_assertions) {
74                    Some(s.unwrap())
75                } else {
76                    s.ok()
77                }
78            })
79            .collect()
80            .await)
81    }
82
83    /// Fetch all discriminators in use for a username
84    async fn fetch_discriminators_in_use(&self, username: &str) -> Result<Vec<String>> {
85        #[derive(Deserialize)]
86        struct UserDocument {
87            discriminator: String,
88        }
89
90        Ok(self
91            .col::<UserDocument>(COL)
92            .find(doc! {
93                "username": username
94            })
95            .with_options(
96                FindOptions::builder()
97                    .collation(
98                        Collation::builder()
99                            .locale("en")
100                            .strength(CollationStrength::Secondary)
101                            .build(),
102                    )
103                    .projection(doc! { "_id": 0, "discriminator": 1 })
104                    .build(),
105            )
106            .await
107            .map_err(|_| create_database_error!("find", COL))?
108            .filter_map(|s| async { s.ok() })
109            .collect::<Vec<UserDocument>>()
110            .await
111            .into_iter()
112            .map(|user| user.discriminator)
113            .collect::<Vec<String>>())
114    }
115
116    /// Fetch ids of users that both users are friends with
117    async fn fetch_mutual_user_ids(&self, user_a: &str, user_b: &str) -> Result<Vec<String>> {
118        Ok(self
119            .col::<DocumentId>(COL)
120            .find(doc! {
121                "$and": [
122                    { "relations": { "$elemMatch": { "_id": &user_a, "status": "Friend" } } },
123                    { "relations": { "$elemMatch": { "_id": &user_b, "status": "Friend" } } }
124                ]
125            })
126            .with_options(FindOptions::builder().projection(doc! { "_id": 1 }).build())
127            .await
128            .map_err(|_| create_database_error!("find", COL))?
129            .filter_map(|s| async { s.ok() })
130            .map(|user| user.id)
131            .collect()
132            .await)
133    }
134
135    /// Fetch ids of channels that both users are in
136    async fn fetch_mutual_channel_ids(&self, user_a: &str, user_b: &str) -> Result<Vec<String>> {
137        Ok(self
138            .col::<DocumentId>("channels")
139            .find(doc! {
140                "channel_type": {
141                    "$in": ["Group", "DirectMessage"]
142                },
143                "recipients": {
144                    "$all": [ user_a, user_b ]
145                }
146            })
147            .with_options(FindOptions::builder().projection(doc! { "_id": 1 }).build())
148            .await
149            .map_err(|_| create_database_error!("find", "channels"))?
150            .filter_map(|s| async { s.ok() })
151            .map(|user| user.id)
152            .collect()
153            .await)
154    }
155
156    /// Fetch ids of servers that both users share
157    async fn fetch_mutual_server_ids(&self, user_a: &str, user_b: &str) -> Result<Vec<String>> {
158        Ok(self
159            .col::<DocumentId>("server_members")
160            .aggregate(vec![
161                doc! {
162                    "$match": {
163                        "_id.user": user_a
164                    }
165                },
166                doc! {
167                    "$lookup": {
168                        "from": "server_members",
169                        "as": "members",
170                        "let": {
171                            "server": "$_id.server"
172                        },
173                        "pipeline": [
174                            {
175                                "$match": {
176                                    "$expr": {
177                                        "$and": [
178                                            { "$eq": [ "$_id.user", user_b ] },
179                                            { "$eq": [ "$_id.server", "$$server" ] }
180                                        ]
181                                    }
182                                }
183                            }
184                        ]
185                    }
186                },
187                doc! {
188                    "$match": {
189                        "members": {
190                            "$size": 1_i32
191                        }
192                    }
193                },
194                doc! {
195                    "$project": {
196                        "_id": "$_id.server"
197                    }
198                },
199            ])
200            .await
201            .map_err(|_| create_database_error!("aggregate", "server_members"))?
202            .filter_map(|s| async { s.ok() })
203            .filter_map(|doc| async move { doc.get_str("_id").map(|id| id.to_string()).ok() })
204            .collect()
205            .await)
206    }
207
208    /// Update a user by their id given some data
209    async fn update_user(
210        &self,
211        id: &str,
212        partial: &PartialUser,
213        remove: Vec<FieldsUser>,
214    ) -> Result<()> {
215        if remove.contains(&FieldsUser::StatusText) && partial.status.is_some() {
216            // stupid-ass workaround to fix mongo conflicting the same item
217            let _: Result<()> = query!(
218                self,
219                update_one_by_id,
220                COL,
221                id,
222                PartialUser {
223                    ..Default::default()
224                },
225                remove.iter().map(|x| x as &dyn IntoDocumentPath).collect(),
226                None
227            )
228            .map(|_| ());
229
230            query!(self, update_one_by_id, COL, id, partial, vec![], None).map(|_| ())
231        } else {
232            query!(
233                self,
234                update_one_by_id,
235                COL,
236                id,
237                partial,
238                remove.iter().map(|x| x as &dyn IntoDocumentPath).collect(),
239                None
240            )
241            .map(|_| ())
242        }
243    }
244
245    /// Set relationship with another user
246    ///
247    /// This should use pull_relationship if relationship is None.
248    async fn set_relationship(
249        &self,
250        user_id: &str,
251        target_id: &str,
252        relationship: &RelationshipStatus,
253    ) -> Result<()> {
254        if let RelationshipStatus::None = relationship {
255            return self.pull_relationship(user_id, target_id).await;
256        }
257
258        self.col::<User>(COL)
259            .update_one(
260                doc! {
261                    "_id": user_id
262                },
263                vec![doc! {
264                    "$set": {
265                        "relations": {
266                            "$concatArrays": [
267                                {
268                                    "$ifNull": [
269                                        {
270                                            "$filter": {
271                                                "input": "$relations",
272                                                "cond": {
273                                                    "$ne": [
274                                                        "$$this._id",
275                                                        target_id
276                                                    ]
277                                                }
278                                            }
279                                        },
280                                        []
281                                    ]
282                                },
283                                [
284                                    {
285                                        "_id": target_id,
286                                        "status": format!("{relationship:?}")
287                                    }
288                                ]
289                            ]
290                        }
291                    }
292                }],
293            )
294            .await
295            .map(|_| ())
296            .map_err(|_| create_database_error!("update_one", "user"))
297    }
298
299    /// Remove relationship with another user
300    async fn pull_relationship(&self, user_id: &str, target_id: &str) -> Result<()> {
301        self.col::<User>(COL)
302            .update_one(
303                doc! {
304                    "_id": user_id
305                },
306                doc! {
307                    "$pull": {
308                        "relations": {
309                            "_id": target_id
310                        }
311                    }
312                },
313            )
314            .await
315            .map(|_| ())
316            .map_err(|_| create_database_error!("update_one", COL))
317    }
318
319    /// Delete a user by their id
320    async fn delete_user(&self, id: &str) -> Result<()> {
321        query!(self, delete_one_by_id, COL, id).map(|_| ())
322    }
323
324    /// Remove push subscription for a session by session id (TODO: remove)
325    async fn remove_push_subscription_by_session_id(&self, session_id: &str) -> Result<()> {
326        self.col::<User>("sessions")
327            .update_one(
328                doc! {
329                    "_id": session_id
330                },
331                doc! {
332                    "$unset": {
333                        "subscription": 1
334                    }
335                },
336            )
337            .await
338            .map(|_| ())
339            .map_err(|_| create_database_error!("update_one", "sessions"))
340    }
341
342    async fn update_session_last_seen(&self, session_id: &str, when: Timestamp) -> Result<()> {
343        let formatted: &str = &when.format();
344
345        self.col::<Session>("sessions")
346            .update_one(
347                doc! {
348                    "_id": session_id
349                },
350                doc! {
351                    "$set": {
352                        "last_seen": formatted
353                    }
354                },
355            )
356            .await
357            .map(|_| ())
358            .map_err(|_| create_database_error!("update_one", "sessions"))
359    }
360}
361
362impl IntoDocumentPath for FieldsUser {
363    fn as_path(&self) -> Option<&'static str> {
364        Some(match self {
365            FieldsUser::Avatar => "avatar",
366            FieldsUser::ProfileBackground => "profile.background",
367            FieldsUser::ProfileContent => "profile.content",
368            FieldsUser::StatusPresence => "status.presence",
369            FieldsUser::StatusText => "status.text",
370            FieldsUser::DisplayName => "display_name",
371            FieldsUser::Suspension => "suspended_until",
372            FieldsUser::None => "none",
373        })
374    }
375}