revolt_database/models/messages/ops/
mongodb.rs1use bson::{to_bson, Document};
2use futures::try_join;
3use mongodb::options::FindOptions;
4use revolt_models::v0::MessageSort;
5use revolt_result::Result;
6
7use crate::{
8 AppendMessage, DocumentId, FieldsMessage, IntoDocumentPath, Message, MessageQuery,
9 MessageTimePeriod, MongoDb, PartialMessage,
10};
11
12use super::AbstractMessages;
13
14static COL: &str = "messages";
15
16#[async_trait]
17impl AbstractMessages for MongoDb {
18 async fn insert_message(&self, message: &Message) -> Result<()> {
20 query!(self, insert_one, COL, &message).map(|_| ())
21 }
22
23 async fn fetch_message(&self, id: &str) -> Result<Message> {
25 query!(self, find_one_by_id, COL, id)?.ok_or_else(|| create_error!(NotFound))
26 }
27
28 async fn fetch_messages(&self, query: MessageQuery) -> Result<Vec<Message>> {
30 let mut filter = doc! {};
31
32 if let Some(channel) = query.filter.channel {
34 filter.insert("channel", channel);
35 }
36
37 if let Some(author) = query.filter.author {
38 filter.insert("author", author);
39 }
40
41 let is_search_query = if let Some(query) = query.filter.query {
42 filter.insert(
43 "$text",
44 doc! {
45 "$search": query
46 },
47 );
48
49 true
50 } else {
51 false
52 };
53
54 if let Some(pinned) = query.filter.pinned {
55 filter.insert("pinned", pinned);
56 };
57
58 let limit = query.limit.unwrap_or(50);
60
61 match query.time_period {
63 MessageTimePeriod::Relative { nearby } => {
64 let mut older_message_filter = filter.clone();
66 let mut newer_message_filter = filter;
67
68 older_message_filter.insert(
69 "_id",
70 doc! {
71 "$lt": &nearby
72 },
73 );
74
75 newer_message_filter.insert(
76 "_id",
77 doc! {
78 "$gte": &nearby
79 },
80 );
81
82 let (a, b) = try_join!(
84 self.find_with_options::<_, Message>(
85 COL,
86 newer_message_filter,
87 FindOptions::builder()
88 .limit(limit / 2 + 1)
89 .sort(doc! {
90 "_id": 1_i32
91 })
92 .build(),
93 ),
94 self.find_with_options::<_, Message>(
95 COL,
96 older_message_filter,
97 FindOptions::builder()
98 .limit(limit / 2 + 1)
99 .sort(doc! {
100 "_id": -1_i32
101 })
102 .build(),
103 )
104 )
105 .map_err(|_| create_database_error!("find", COL))?;
106
107 Ok([a, b].concat())
108 }
109 MessageTimePeriod::Absolute {
110 before,
111 after,
112 sort,
113 } => {
114 if let Some(doc) = match (before, after) {
116 (Some(before), Some(after)) => Some(doc! {
117 "$lt": before,
118 "$gt": after
119 }),
120 (Some(before), _) => Some(doc! {
121 "$lt": before
122 }),
123 (_, Some(after)) => Some(doc! {
124 "$gt": after
125 }),
126 _ => None,
127 } {
128 filter.insert("_id", doc);
129 }
130
131 self.find_with_options(
133 COL,
134 filter,
135 FindOptions::builder()
136 .limit(limit)
137 .sort(match sort.unwrap_or(MessageSort::Latest) {
138 MessageSort::Relevance => {
140 if is_search_query {
141 doc! {
142 "score": {
143 "$meta": "textScore"
144 }
145 }
146 } else {
147 doc! {
148 "_id": -1_i32
149 }
150 }
151 }
152 MessageSort::Latest => doc! {
154 "_id": -1_i32
155 },
156 MessageSort::Oldest => doc! {
158 "_id": 1_i32
159 },
160 })
161 .build(),
162 )
163 .await
164 .map_err(|_| create_database_error!("find", COL))
165 }
166 }
167 }
168
169 async fn fetch_messages_by_id(&self, ids: &[String]) -> Result<Vec<Message>> {
171 self.find_with_options(
172 COL,
173 doc! {
174 "_id": {
175 "$in": ids
176 }
177 },
178 None,
179 )
180 .await
181 .map_err(|_| create_database_error!("find", COL))
182 }
183
184 async fn update_message(
186 &self,
187 id: &str,
188 message: &PartialMessage,
189 remove: Vec<FieldsMessage>,
190 ) -> Result<()> {
191 query!(
192 self,
193 update_one_by_id,
194 COL,
195 id,
196 message,
197 remove.iter().map(|x| x as &dyn IntoDocumentPath).collect(),
198 None
199 )
200 .map(|_| ())
201 }
202
203 async fn append_message(&self, id: &str, append: &AppendMessage) -> Result<()> {
205 let mut query = doc! {};
206
207 if let Some(embeds) = &append.embeds {
208 if !embeds.is_empty() {
209 query.insert(
210 "$push",
211 doc! {
212 "embeds": {
213 "$each": to_bson(embeds)
214 .map_err(|_| create_database_error!("to_bson", "embeds"))?
215 }
216 },
217 );
218 }
219 }
220
221 if query.is_empty() {
222 return Ok(());
223 }
224
225 self.col::<Document>(COL)
226 .update_one(
227 doc! {
228 "_id": id
229 },
230 query,
231 )
232 .await
233 .map(|_| ())
234 .map_err(|_| create_database_error!("update_one", COL))
235 }
236
237 async fn add_reaction(&self, id: &str, emoji: &str, user: &str) -> Result<()> {
239 self.col::<Document>(COL)
240 .update_one(
241 doc! {
242 "_id": id
243 },
244 doc! {
245 "$addToSet": {
246 format!("reactions.{emoji}"): user
247 }
248 },
249 )
250 .await
251 .map(|_| ())
252 .map_err(|_| create_database_error!("update_one", COL))
253 }
254
255 async fn remove_reaction(&self, id: &str, emoji: &str, user: &str) -> Result<()> {
257 self.col::<Document>(COL)
258 .update_one(
259 doc! {
260 "_id": id
261 },
262 doc! {
263 "$pull": {
264 format!("reactions.{emoji}"): user
265 }
266 },
267 )
268 .await
269 .map(|_| ())
270 .map_err(|_| create_database_error!("update_one", COL))
271 }
272
273 async fn clear_reaction(&self, id: &str, emoji: &str) -> Result<()> {
275 self.col::<Document>(COL)
276 .update_one(
277 doc! {
278 "_id": id
279 },
280 doc! {
281 "$unset": {
282 format!("reactions.{emoji}"): 1
283 }
284 },
285 )
286 .await
287 .map(|_| ())
288 .map_err(|_| create_database_error!("update_one", COL))
289 }
290
291 async fn delete_message(&self, id: &str) -> Result<()> {
293 query!(self, delete_one_by_id, COL, id).map(|_| ())
294 }
295
296 async fn delete_messages(&self, channel: &str, ids: &[String]) -> Result<()> {
298 self.col::<Document>(COL)
299 .delete_many(doc! {
300 "channel": channel,
301 "_id": {
302 "$in": ids
303 }
304 })
305 .await
306 .map(|_| ())
307 .map_err(|_| create_database_error!("delete_many", COL))
308 }
309}
310
311impl IntoDocumentPath for FieldsMessage {
312 fn as_path(&self) -> Option<&'static str> {
313 Some(match self {
314 FieldsMessage::Pinned => "pinned",
315 })
316 }
317}
318
319impl MongoDb {
320 pub async fn delete_bulk_messages(&self, projection: Document) -> Result<()> {
321 let mut for_attachments = projection.clone();
322 for_attachments.insert(
323 "attachments",
324 doc! {
325 "$exists": 1_i32
326 },
327 );
328
329 let message_ids_with_attachments = self
331 .find_with_options::<_, DocumentId>(
332 COL,
333 for_attachments,
334 FindOptions::builder()
335 .projection(doc! { "_id": 1_i32 })
336 .build(),
337 )
338 .await
339 .map_err(|_| create_database_error!("find_many", "attachments"))?
340 .into_iter()
341 .map(|x| x.id)
342 .collect::<Vec<String>>();
343
344 if !message_ids_with_attachments.is_empty() {
346 self.col::<Document>("attachments")
347 .update_many(
348 doc! {
349 "message_id": {
350 "$in": message_ids_with_attachments
351 }
352 },
353 doc! {
354 "$set": {
355 "deleted": true
356 }
357 },
358 )
359 .await
360 .map_err(|_| create_database_error!("update_many", "attachments"))?;
361 }
362
363 self.col::<Document>(COL)
365 .delete_many(projection)
366 .await
367 .map(|_| ())
368 .map_err(|_| create_database_error!("delete_many", COL))
369 }
370}