revolt_database/models/server_members/ops/
mongodb.rs1use bson::Document;
2use futures::StreamExt;
3use iso8601_timestamp::Timestamp;
4use mongodb::options::ReadConcern;
5use revolt_result::Result;
6
7use crate::{FieldsMember, Member, MemberCompositeKey, PartialMember};
8use crate::{IntoDocumentPath, MongoDb};
9
10use super::{AbstractServerMembers, ChunkedServerMembersGenerator};
11
12static COL: &str = "server_members";
13
14#[async_trait]
15impl AbstractServerMembers for MongoDb {
16 async fn insert_or_merge_member(&self, member: &Member) -> Result<Option<Member>> {
18 let existing: Result<Option<Document>> = query!(
19 self,
20 find_one,
21 COL,
22 doc! {
23 "_id.server": &member.id.server,
24 "_id.user": &member.id.user,
25 "pending_deletion_at": {"$exists": true}
26 }
27 );
28 if existing.is_ok_and(|x| x.is_some()) {
30 self.col::<Member>(COL)
31 .find_one_and_update(
32 doc! {
33 "_id.server": &member.id.server,
34 "_id.user": &member.id.user,
35 },
36 doc! {
37 "$set": {
38 "joined_at": member.joined_at.duration_since(Timestamp::UNIX_EPOCH).whole_seconds(),
39 },
40 "$unset": {
41 "pending_deletion_at": ""
42 }
43 },
44 )
45 .return_document(mongodb::options::ReturnDocument::After)
46 .await
47 .map_err(|_| create_database_error!("update_one", COL))
48 } else {
49 query!(self, insert_one, COL, &member).map(|_| ())?;
50 Ok(None)
51 }
52 }
53
54 async fn fetch_member(&self, server_id: &str, user_id: &str) -> Result<Member> {
56 query!(
57 self,
58 find_one,
59 COL,
60 doc! {
61 "_id.server": server_id,
62 "_id.user": user_id,
63 "pending_deletion_at": {"$exists": false}
64 }
65 )?
66 .ok_or_else(|| create_error!(NotFound))
67 }
68
69 async fn fetch_all_members(&self, server_id: &str) -> Result<Vec<Member>> {
71 Ok(self
72 .col::<Member>(COL)
73 .find(doc! {
74 "_id.server": server_id,
75 "pending_deletion_at": {"$exists": false}
76 })
77 .await
78 .map_err(|_| create_database_error!("find", COL))?
79 .filter_map(|s| async {
80 if cfg!(debug_assertions) {
81 Some(s.unwrap())
82 } else {
83 s.ok()
84 }
85 })
86 .collect()
87 .await)
88 }
89
90 async fn fetch_all_members_chunked(
93 &self,
94 server_id: &str,
95 ) -> Result<ChunkedServerMembersGenerator> {
96 let config = revolt_config::config().await;
97
98 let mut session = self
99 .start_session()
100 .await
101 .map_err(|_| create_database_error!("start_session", COL))?;
102
103 session
104 .start_transaction()
105 .read_concern(ReadConcern::snapshot())
106 .await
107 .map_err(|_| create_database_error!("start_transaction", COL))?;
108
109 let cursor = self
110 .col::<Member>(COL)
111 .find(doc! {
112 "_id.server": server_id
113 })
114 .session(&mut session)
115 .batch_size(config.pushd.mass_mention_chunk_size as u32)
116 .await
117 .map_err(|_| create_database_error!("find", COL))?;
118
119 Ok(ChunkedServerMembersGenerator::new_mongo(session, cursor))
120 }
121
122 async fn fetch_all_members_with_roles(
123 &self,
124 server_id: &str,
125 roles: &[String],
126 ) -> Result<Vec<Member>> {
127 Ok(self
128 .col::<Member>(COL)
129 .find(doc! {
130 "_id.server": server_id,
131 "roles": {"$in": roles}
132 })
133 .await
134 .map_err(|_| create_database_error!("find", COL))?
135 .filter_map(|s| async {
136 if cfg!(debug_assertions) {
137 Some(s.unwrap())
138 } else {
139 s.ok()
140 }
141 })
142 .collect()
143 .await)
144 }
145
146 async fn fetch_all_members_with_roles_chunked(
147 &self,
148 server_id: &str,
149 roles: &[String],
150 ) -> Result<ChunkedServerMembersGenerator> {
151 let config = revolt_config::config().await;
152
153 let mut session = self
154 .start_session()
155 .await
156 .map_err(|_| create_database_error!("start_session", COL))?;
157
158 session
159 .start_transaction()
160 .read_concern(ReadConcern::snapshot())
161 .await
162 .map_err(|_| create_database_error!("start_transaction", COL))?;
163
164 let cursor = self
165 .col::<Member>(COL)
166 .find(doc! {
167 "_id.server": server_id,
168 "roles": {"$in": roles}
169 })
170 .session(&mut session)
171 .batch_size(config.pushd.mass_mention_chunk_size as u32)
172 .await
173 .map_err(|_| create_database_error!("find", COL))?;
174
175 return Ok(ChunkedServerMembersGenerator::new_mongo(session, cursor));
176 }
177
178 async fn fetch_all_memberships(&self, user_id: &str) -> Result<Vec<Member>> {
180 Ok(self
181 .col::<Member>(COL)
182 .find(doc! {
183 "_id.user": user_id,
184 "pending_deletion_at": {"$exists": false}
185 })
186 .await
187 .map_err(|_| create_database_error!("find", COL))?
188 .filter_map(|s| async {
189 if cfg!(debug_assertions) {
190 Some(s.unwrap())
191 } else {
192 s.ok()
193 }
194 })
195 .collect()
196 .await)
197 }
198
199 async fn fetch_members(&self, server_id: &str, ids: &[String]) -> Result<Vec<Member>> {
201 Ok(self
202 .col::<Member>(COL)
203 .find(doc! {
204 "_id.server": server_id,
205 "pending_deletion_at": {"$exists": false},
206 "_id.user": {
207 "$in": ids
208 }
209 })
210 .await
211 .map_err(|_| create_database_error!("find", COL))?
212 .filter_map(|s| async {
213 if cfg!(debug_assertions) {
214 Some(s.unwrap())
215 } else {
216 s.ok()
217 }
218 })
219 .collect()
220 .await)
221 }
222
223 async fn fetch_member_count(&self, server_id: &str) -> Result<usize> {
225 self.col::<Member>(COL)
226 .count_documents(doc! {
227 "_id.server": server_id,
228 "pending_deletion_at": {"$exists": false}
229 })
230 .await
231 .map(|c| c as usize)
232 .map_err(|_| create_database_error!("count_documents", COL))
233 }
234
235 async fn fetch_server_count(&self, user_id: &str) -> Result<usize> {
237 self.col::<Member>(COL)
238 .count_documents(doc! {
239 "_id.user": user_id,
240 "pending_deletion_at": {"$exists": false}
241 })
242 .await
243 .map(|c| c as usize)
244 .map_err(|_| create_database_error!("count_documents", COL))
245 }
246
247 async fn update_member(
249 &self,
250 id: &MemberCompositeKey,
251 partial: &PartialMember,
252 remove: Vec<FieldsMember>,
253 ) -> Result<()> {
254 query!(
255 self,
256 update_one,
257 COL,
258 doc! {
259 "_id.server": &id.server,
260 "_id.user": &id.user
261 },
262 partial,
263 remove.iter().map(|x| x as &dyn IntoDocumentPath).collect(),
264 None
265 )
266 .map(|_| ())
267 }
268
269 async fn soft_delete_member(&self, id: &MemberCompositeKey) -> Result<()> {
273 let member = self.fetch_member(&id.server, &id.user).await;
274 if let Ok(member) = member {
275 if member.in_timeout() {
276 self.col::<Document>(COL)
277 .update_many(
278 doc! {
279 "_id.server": &id.server,
280 "_id.user": &id.user,
281 },
282 doc! {
283 "$set": {"pending_deletion_at": format!("{}", member.timeout.unwrap().format())},
284 "$unset": {
285 "joined_at": "",
286 "avatar": "",
287 "nickname": "",
288 "roles": ""
289 }
290 },
291 )
292 .await
293 .map(|_| ())
294 .map_err(|_| create_database_error!("update_many", COL))
295 } else {
296 self.force_delete_member(id).await
297 }
298 } else {
299 Err(create_database_error!("fetch_member", COL))
300 }
301 }
302
303 async fn force_delete_member(&self, id: &MemberCompositeKey) -> Result<()> {
305 query!(
306 self,
307 delete_one,
308 COL,
309 doc! {
310 "_id.server": &id.server,
311 "_id.user": &id.user
312 }
313 )
314 .map(|_| ())
315 }
316
317 async fn remove_dangling_members(&self) -> Result<()> {
318 let now = Timestamp::now_utc();
319 let date = bson::to_bson(&now).expect("Failed to serialize timestamp");
320
321 self.col::<Document>(COL)
322 .delete_many(doc! {
323 "pending_deletion_at": {"$lt": date}
324 })
325 .await
326 .map(|_| ())
327 .map_err(|_| create_database_error!("count_documents", COL))
328 }
329}
330
331impl IntoDocumentPath for FieldsMember {
332 fn as_path(&self) -> Option<&'static str> {
333 Some(match self {
334 FieldsMember::JoinedAt => "joined_at",
335 FieldsMember::Avatar => "avatar",
336 FieldsMember::Nickname => "nickname",
337 FieldsMember::Roles => "roles",
338 FieldsMember::Timeout => "timeout",
339 FieldsMember::CanPublish => "can_publish",
340 FieldsMember::CanReceive => "can_receive",
341 })
342 }
343}