revolt_database/models/users/ops/
mongodb.rs1use ::mongodb::options::{Collation, CollationStrength, FindOneOptions, FindOptions};
2use authifier::models::Session;
3use futures::StreamExt;
4use iso8601_timestamp::Timestamp;
5use revolt_result::Result;
6
7use crate::DocumentId;
8use crate::IntoDocumentPath;
9use crate::MongoDb;
10use crate::{FieldsUser, PartialUser, RelationshipStatus, User};
11
12use super::AbstractUsers;
13
14static COL: &str = "users";
15
16#[async_trait]
17impl AbstractUsers for MongoDb {
18 async fn insert_user(&self, user: &User) -> Result<()> {
20 query!(self, insert_one, COL, &user).map(|_| ())
21 }
22
23 async fn fetch_user(&self, id: &str) -> Result<User> {
25 query!(self, find_one_by_id, COL, id)?.ok_or_else(|| create_error!(NotFound))
26 }
27
28 async fn fetch_user_by_username(&self, username: &str, discriminator: &str) -> Result<User> {
30 query!(
31 self,
32 find_one_with_options,
33 COL,
34 doc! {
35 "username": username,
36 "discriminator": discriminator
37 },
38 FindOneOptions::builder()
39 .collation(
40 Collation::builder()
41 .locale("en")
42 .strength(CollationStrength::Secondary)
43 .build(),
44 )
45 .build()
46 )?
47 .ok_or_else(|| create_error!(NotFound))
48 }
49
50 async fn fetch_session_by_token(&self, token: &str) -> Result<Session> {
52 self.col::<Session>("sessions")
53 .find_one(doc! {
54 "token": token
55 })
56 .await
57 .map_err(|_| create_database_error!("find_one", "sessions"))?
58 .ok_or_else(|| create_error!(InvalidSession))
59 }
60
61 async fn fetch_users<'a>(&self, ids: &'a [String]) -> Result<Vec<User>> {
63 Ok(self
64 .col::<User>(COL)
65 .find(doc! {
66 "_id": {
67 "$in": ids
68 }
69 })
70 .await
71 .map_err(|_| create_database_error!("find", COL))?
72 .filter_map(|s| async {
73 if cfg!(debug_assertions) {
74 Some(s.unwrap())
75 } else {
76 s.ok()
77 }
78 })
79 .collect()
80 .await)
81 }
82
83 async fn fetch_discriminators_in_use(&self, username: &str) -> Result<Vec<String>> {
85 #[derive(Deserialize)]
86 struct UserDocument {
87 discriminator: String,
88 }
89
90 Ok(self
91 .col::<UserDocument>(COL)
92 .find(doc! {
93 "username": username
94 })
95 .with_options(
96 FindOptions::builder()
97 .collation(
98 Collation::builder()
99 .locale("en")
100 .strength(CollationStrength::Secondary)
101 .build(),
102 )
103 .projection(doc! { "_id": 0, "discriminator": 1 })
104 .build(),
105 )
106 .await
107 .map_err(|_| create_database_error!("find", COL))?
108 .filter_map(|s| async { s.ok() })
109 .collect::<Vec<UserDocument>>()
110 .await
111 .into_iter()
112 .map(|user| user.discriminator)
113 .collect::<Vec<String>>())
114 }
115
116 async fn fetch_mutual_user_ids(&self, user_a: &str, user_b: &str) -> Result<Vec<String>> {
118 Ok(self
119 .col::<DocumentId>(COL)
120 .find(doc! {
121 "$and": [
122 { "relations": { "$elemMatch": { "_id": &user_a, "status": "Friend" } } },
123 { "relations": { "$elemMatch": { "_id": &user_b, "status": "Friend" } } }
124 ]
125 })
126 .with_options(FindOptions::builder().projection(doc! { "_id": 1 }).build())
127 .await
128 .map_err(|_| create_database_error!("find", COL))?
129 .filter_map(|s| async { s.ok() })
130 .map(|user| user.id)
131 .collect()
132 .await)
133 }
134
135 async fn fetch_mutual_channel_ids(&self, user_a: &str, user_b: &str) -> Result<Vec<String>> {
137 Ok(self
138 .col::<DocumentId>("channels")
139 .find(doc! {
140 "channel_type": {
141 "$in": ["Group", "DirectMessage"]
142 },
143 "recipients": {
144 "$all": [ user_a, user_b ]
145 }
146 })
147 .with_options(FindOptions::builder().projection(doc! { "_id": 1 }).build())
148 .await
149 .map_err(|_| create_database_error!("find", "channels"))?
150 .filter_map(|s| async { s.ok() })
151 .map(|user| user.id)
152 .collect()
153 .await)
154 }
155
156 async fn fetch_mutual_server_ids(&self, user_a: &str, user_b: &str) -> Result<Vec<String>> {
158 Ok(self
159 .col::<DocumentId>("server_members")
160 .aggregate(vec![
161 doc! {
162 "$match": {
163 "_id.user": user_a
164 }
165 },
166 doc! {
167 "$lookup": {
168 "from": "server_members",
169 "as": "members",
170 "let": {
171 "server": "$_id.server"
172 },
173 "pipeline": [
174 {
175 "$match": {
176 "$expr": {
177 "$and": [
178 { "$eq": [ "$_id.user", user_b ] },
179 { "$eq": [ "$_id.server", "$$server" ] }
180 ]
181 }
182 }
183 }
184 ]
185 }
186 },
187 doc! {
188 "$match": {
189 "members": {
190 "$size": 1_i32
191 }
192 }
193 },
194 doc! {
195 "$project": {
196 "_id": "$_id.server"
197 }
198 },
199 ])
200 .await
201 .map_err(|_| create_database_error!("aggregate", "server_members"))?
202 .filter_map(|s| async { s.ok() })
203 .filter_map(|doc| async move { doc.get_str("_id").map(|id| id.to_string()).ok() })
204 .collect()
205 .await)
206 }
207
208 async fn update_user(
210 &self,
211 id: &str,
212 partial: &PartialUser,
213 remove: Vec<FieldsUser>,
214 ) -> Result<()> {
215 if remove.contains(&FieldsUser::StatusText) && partial.status.is_some() {
216 let _: Result<()> = query!(
218 self,
219 update_one_by_id,
220 COL,
221 id,
222 PartialUser {
223 ..Default::default()
224 },
225 remove.iter().map(|x| x as &dyn IntoDocumentPath).collect(),
226 None
227 )
228 .map(|_| ());
229
230 query!(self, update_one_by_id, COL, id, partial, vec![], None).map(|_| ())
231 } else {
232 query!(
233 self,
234 update_one_by_id,
235 COL,
236 id,
237 partial,
238 remove.iter().map(|x| x as &dyn IntoDocumentPath).collect(),
239 None
240 )
241 .map(|_| ())
242 }
243 }
244
245 async fn set_relationship(
249 &self,
250 user_id: &str,
251 target_id: &str,
252 relationship: &RelationshipStatus,
253 ) -> Result<()> {
254 if let RelationshipStatus::None = relationship {
255 return self.pull_relationship(user_id, target_id).await;
256 }
257
258 self.col::<User>(COL)
259 .update_one(
260 doc! {
261 "_id": user_id
262 },
263 vec![doc! {
264 "$set": {
265 "relations": {
266 "$concatArrays": [
267 {
268 "$ifNull": [
269 {
270 "$filter": {
271 "input": "$relations",
272 "cond": {
273 "$ne": [
274 "$$this._id",
275 target_id
276 ]
277 }
278 }
279 },
280 []
281 ]
282 },
283 [
284 {
285 "_id": target_id,
286 "status": format!("{relationship:?}")
287 }
288 ]
289 ]
290 }
291 }
292 }],
293 )
294 .await
295 .map(|_| ())
296 .map_err(|_| create_database_error!("update_one", "user"))
297 }
298
299 async fn pull_relationship(&self, user_id: &str, target_id: &str) -> Result<()> {
301 self.col::<User>(COL)
302 .update_one(
303 doc! {
304 "_id": user_id
305 },
306 doc! {
307 "$pull": {
308 "relations": {
309 "_id": target_id
310 }
311 }
312 },
313 )
314 .await
315 .map(|_| ())
316 .map_err(|_| create_database_error!("update_one", COL))
317 }
318
319 async fn delete_user(&self, id: &str) -> Result<()> {
321 query!(self, delete_one_by_id, COL, id).map(|_| ())
322 }
323
324 async fn remove_push_subscription_by_session_id(&self, session_id: &str) -> Result<()> {
326 self.col::<User>("sessions")
327 .update_one(
328 doc! {
329 "_id": session_id
330 },
331 doc! {
332 "$unset": {
333 "subscription": 1
334 }
335 },
336 )
337 .await
338 .map(|_| ())
339 .map_err(|_| create_database_error!("update_one", "sessions"))
340 }
341
342 async fn update_session_last_seen(&self, session_id: &str, when: Timestamp) -> Result<()> {
343 let formatted: &str = &when.format();
344
345 self.col::<Session>("sessions")
346 .update_one(
347 doc! {
348 "_id": session_id
349 },
350 doc! {
351 "$set": {
352 "last_seen": formatted
353 }
354 },
355 )
356 .await
357 .map(|_| ())
358 .map_err(|_| create_database_error!("update_one", "sessions"))
359 }
360}
361
362impl IntoDocumentPath for FieldsUser {
363 fn as_path(&self) -> Option<&'static str> {
364 Some(match self {
365 FieldsUser::Avatar => "avatar",
366 FieldsUser::ProfileBackground => "profile.background",
367 FieldsUser::ProfileContent => "profile.content",
368 FieldsUser::StatusPresence => "status.presence",
369 FieldsUser::StatusText => "status.text",
370 FieldsUser::DisplayName => "display_name",
371 FieldsUser::Suspension => "suspended_until",
372 FieldsUser::None => "none",
373 })
374 }
375}