revolt_database/models/server_members/ops/
mongodb.rs

1use bson::Document;
2use futures::StreamExt;
3use iso8601_timestamp::Timestamp;
4use mongodb::options::ReadConcern;
5use revolt_result::Result;
6
7use crate::{FieldsMember, Member, MemberCompositeKey, PartialMember};
8use crate::{IntoDocumentPath, MongoDb};
9
10use super::{AbstractServerMembers, ChunkedServerMembersGenerator};
11
12static COL: &str = "server_members";
13
14#[async_trait]
15impl AbstractServerMembers for MongoDb {
16    /// Insert a new server member (or use the existing member if one is found)
17    async fn insert_or_merge_member(&self, member: &Member) -> Result<Option<Member>> {
18        let existing: Result<Option<Document>> = query!(
19            self,
20            find_one,
21            COL,
22            doc! {
23                "_id.server": &member.id.server,
24                "_id.user": &member.id.user,
25                "pending_deletion_at": {"$exists": true}
26            }
27        );
28        // Update the existing record if it exist, otherwise make a new record
29        if existing.is_ok_and(|x| x.is_some()) {
30            self.col::<Member>(COL)
31                .find_one_and_update(
32                    doc! {
33                        "_id.server": &member.id.server,
34                        "_id.user": &member.id.user,
35                    },
36                    doc! {
37                        "$set": {
38                            "joined_at": member.joined_at.duration_since(Timestamp::UNIX_EPOCH).whole_seconds(),
39                        },
40                        "$unset": {
41                            "pending_deletion_at": ""
42                        }
43                    },
44                )
45                .return_document(mongodb::options::ReturnDocument::After)
46                .await
47                .map_err(|_| create_database_error!("update_one", COL))
48        } else {
49            query!(self, insert_one, COL, &member).map(|_| ())?;
50            Ok(None)
51        }
52    }
53
54    /// Fetch a server member by their id
55    async fn fetch_member(&self, server_id: &str, user_id: &str) -> Result<Member> {
56        query!(
57            self,
58            find_one,
59            COL,
60            doc! {
61                "_id.server": server_id,
62                "_id.user": user_id,
63                "pending_deletion_at": {"$exists": false}
64            }
65        )?
66        .ok_or_else(|| create_error!(NotFound))
67    }
68
69    /// Fetch all members in a server
70    async fn fetch_all_members(&self, server_id: &str) -> Result<Vec<Member>> {
71        Ok(self
72            .col::<Member>(COL)
73            .find(doc! {
74                "_id.server": server_id,
75                "pending_deletion_at": {"$exists": false}
76            })
77            .await
78            .map_err(|_| create_database_error!("find", COL))?
79            .filter_map(|s| async {
80                if cfg!(debug_assertions) {
81                    Some(s.unwrap())
82                } else {
83                    s.ok()
84                }
85            })
86            .collect()
87            .await)
88    }
89
90    /// Fetch all members in a server as a generator.
91    /// Uses config key pushd.mass_mention_chunk_size as the batch size.
92    async fn fetch_all_members_chunked(
93        &self,
94        server_id: &str,
95    ) -> Result<ChunkedServerMembersGenerator> {
96        let config = revolt_config::config().await;
97
98        let mut session = self
99            .start_session()
100            .await
101            .map_err(|_| create_database_error!("start_session", COL))?;
102
103        session
104            .start_transaction()
105            .read_concern(ReadConcern::snapshot())
106            .await
107            .map_err(|_| create_database_error!("start_transaction", COL))?;
108
109        let cursor = self
110            .col::<Member>(COL)
111            .find(doc! {
112                "_id.server": server_id
113            })
114            .session(&mut session)
115            .batch_size(config.pushd.mass_mention_chunk_size as u32)
116            .await
117            .map_err(|_| create_database_error!("find", COL))?;
118
119        Ok(ChunkedServerMembersGenerator::new_mongo(session, cursor))
120    }
121
122    async fn fetch_all_members_with_roles(
123        &self,
124        server_id: &str,
125        roles: &[String],
126    ) -> Result<Vec<Member>> {
127        Ok(self
128            .col::<Member>(COL)
129            .find(doc! {
130                "_id.server": server_id,
131                "roles": {"$in": roles}
132            })
133            .await
134            .map_err(|_| create_database_error!("find", COL))?
135            .filter_map(|s| async {
136                if cfg!(debug_assertions) {
137                    Some(s.unwrap())
138                } else {
139                    s.ok()
140                }
141            })
142            .collect()
143            .await)
144    }
145
146    async fn fetch_all_members_with_roles_chunked(
147        &self,
148        server_id: &str,
149        roles: &[String],
150    ) -> Result<ChunkedServerMembersGenerator> {
151        let config = revolt_config::config().await;
152
153        let mut session = self
154            .start_session()
155            .await
156            .map_err(|_| create_database_error!("start_session", COL))?;
157
158        session
159            .start_transaction()
160            .read_concern(ReadConcern::snapshot())
161            .await
162            .map_err(|_| create_database_error!("start_transaction", COL))?;
163
164        let cursor = self
165            .col::<Member>(COL)
166            .find(doc! {
167                "_id.server": server_id,
168                "roles": {"$in": roles}
169            })
170            .session(&mut session)
171            .batch_size(config.pushd.mass_mention_chunk_size as u32)
172            .await
173            .map_err(|_| create_database_error!("find", COL))?;
174
175        return Ok(ChunkedServerMembersGenerator::new_mongo(session, cursor));
176    }
177
178    /// Fetch all memberships for a user
179    async fn fetch_all_memberships(&self, user_id: &str) -> Result<Vec<Member>> {
180        Ok(self
181            .col::<Member>(COL)
182            .find(doc! {
183                "_id.user": user_id,
184                "pending_deletion_at": {"$exists": false}
185            })
186            .await
187            .map_err(|_| create_database_error!("find", COL))?
188            .filter_map(|s| async {
189                if cfg!(debug_assertions) {
190                    Some(s.unwrap())
191                } else {
192                    s.ok()
193                }
194            })
195            .collect()
196            .await)
197    }
198
199    /// Fetch multiple members by their ids
200    async fn fetch_members(&self, server_id: &str, ids: &[String]) -> Result<Vec<Member>> {
201        Ok(self
202            .col::<Member>(COL)
203            .find(doc! {
204                "_id.server": server_id,
205                "pending_deletion_at": {"$exists": false},
206                "_id.user": {
207                    "$in": ids
208                }
209            })
210            .await
211            .map_err(|_| create_database_error!("find", COL))?
212            .filter_map(|s| async {
213                if cfg!(debug_assertions) {
214                    Some(s.unwrap())
215                } else {
216                    s.ok()
217                }
218            })
219            .collect()
220            .await)
221    }
222
223    /// Fetch member count of a server
224    async fn fetch_member_count(&self, server_id: &str) -> Result<usize> {
225        self.col::<Member>(COL)
226            .count_documents(doc! {
227                "_id.server": server_id,
228                "pending_deletion_at": {"$exists": false}
229            })
230            .await
231            .map(|c| c as usize)
232            .map_err(|_| create_database_error!("count_documents", COL))
233    }
234
235    /// Fetch server count of a user
236    async fn fetch_server_count(&self, user_id: &str) -> Result<usize> {
237        self.col::<Member>(COL)
238            .count_documents(doc! {
239                "_id.user": user_id,
240                "pending_deletion_at": {"$exists": false}
241            })
242            .await
243            .map(|c| c as usize)
244            .map_err(|_| create_database_error!("count_documents", COL))
245    }
246
247    /// Update information for a server member
248    async fn update_member(
249        &self,
250        id: &MemberCompositeKey,
251        partial: &PartialMember,
252        remove: Vec<FieldsMember>,
253    ) -> Result<()> {
254        query!(
255            self,
256            update_one,
257            COL,
258            doc! {
259                "_id.server": &id.server,
260                "_id.user": &id.user
261            },
262            partial,
263            remove.iter().map(|x| x as &dyn IntoDocumentPath).collect(),
264            None
265        )
266        .map(|_| ())
267    }
268
269    /// Marks a member for deletion.
270    /// This will remove the record if the user has no pending actions (eg. timeout),
271    /// otherwise will slate the record for deletion by revolt_crond once the actions expire.
272    async fn soft_delete_member(&self, id: &MemberCompositeKey) -> Result<()> {
273        let member = self.fetch_member(&id.server, &id.user).await;
274        if let Ok(member) = member {
275            if member.in_timeout() {
276                self.col::<Document>(COL)
277                    .update_many(
278                        doc! {
279                            "_id.server": &id.server,
280                            "_id.user": &id.user,
281                        },
282                        doc! {
283                            "$set": {"pending_deletion_at": format!("{}", member.timeout.unwrap().format())},
284                            "$unset": {
285                                "joined_at": "",
286                                "avatar": "",
287                                "nickname": "",
288                                "roles": ""
289                            }
290                        },
291                    )
292                    .await
293                    .map(|_| ())
294                    .map_err(|_| create_database_error!("update_many", COL))
295            } else {
296                self.force_delete_member(id).await
297            }
298        } else {
299            Err(create_database_error!("fetch_member", COL))
300        }
301    }
302
303    /// Delete a server member by their id
304    async fn force_delete_member(&self, id: &MemberCompositeKey) -> Result<()> {
305        query!(
306            self,
307            delete_one,
308            COL,
309            doc! {
310                "_id.server": &id.server,
311                "_id.user": &id.user
312            }
313        )
314        .map(|_| ())
315    }
316
317    async fn remove_dangling_members(&self) -> Result<()> {
318        let now = Timestamp::now_utc();
319        let date = bson::to_bson(&now).expect("Failed to serialize timestamp");
320
321        self.col::<Document>(COL)
322            .delete_many(doc! {
323                "pending_deletion_at": {"$lt": date}
324            })
325            .await
326            .map(|_| ())
327            .map_err(|_| create_database_error!("count_documents", COL))
328    }
329}
330
331impl IntoDocumentPath for FieldsMember {
332    fn as_path(&self) -> Option<&'static str> {
333        Some(match self {
334            FieldsMember::JoinedAt => "joined_at",
335            FieldsMember::Avatar => "avatar",
336            FieldsMember::Nickname => "nickname",
337            FieldsMember::Roles => "roles",
338            FieldsMember::Timeout => "timeout",
339            FieldsMember::CanPublish => "can_publish",
340            FieldsMember::CanReceive => "can_receive",
341        })
342    }
343}