revolt_database/models/messages/ops/
mongodb.rs

1use bson::{to_bson, Document};
2use futures::try_join;
3use mongodb::options::FindOptions;
4use revolt_models::v0::MessageSort;
5use revolt_result::Result;
6
7use crate::{
8    AppendMessage, DocumentId, FieldsMessage, IntoDocumentPath, Message, MessageQuery,
9    MessageTimePeriod, MongoDb, PartialMessage,
10};
11
12use super::AbstractMessages;
13
14static COL: &str = "messages";
15
16#[async_trait]
17impl AbstractMessages for MongoDb {
18    /// Insert a new message into the database
19    async fn insert_message(&self, message: &Message) -> Result<()> {
20        query!(self, insert_one, COL, &message).map(|_| ())
21    }
22
23    /// Fetch a message by its id
24    async fn fetch_message(&self, id: &str) -> Result<Message> {
25        query!(self, find_one_by_id, COL, id)?.ok_or_else(|| create_error!(NotFound))
26    }
27
28    /// Fetch multiple messages by given query
29    async fn fetch_messages(&self, query: MessageQuery) -> Result<Vec<Message>> {
30        let mut filter = doc! {};
31
32        // 1. Apply message filters
33        if let Some(channel) = query.filter.channel {
34            filter.insert("channel", channel);
35        }
36
37        if let Some(author) = query.filter.author {
38            filter.insert("author", author);
39        }
40
41        let is_search_query = if let Some(query) = query.filter.query {
42            filter.insert(
43                "$text",
44                doc! {
45                    "$search": query
46                },
47            );
48
49            true
50        } else {
51            false
52        };
53
54        if let Some(pinned) = query.filter.pinned {
55            filter.insert("pinned", pinned);
56        };
57
58        // 2. Find query limit
59        let limit = query.limit.unwrap_or(50);
60
61        // 3. Apply message time period
62        match query.time_period {
63            MessageTimePeriod::Relative { nearby } => {
64                // 3.1. Prepare filters
65                let mut older_message_filter = filter.clone();
66                let mut newer_message_filter = filter;
67
68                older_message_filter.insert(
69                    "_id",
70                    doc! {
71                        "$lt": &nearby
72                    },
73                );
74
75                newer_message_filter.insert(
76                    "_id",
77                    doc! {
78                        "$gte": &nearby
79                    },
80                );
81
82                // 3.2. Execute in both directions
83                let (a, b) = try_join!(
84                    self.find_with_options::<_, Message>(
85                        COL,
86                        newer_message_filter,
87                        FindOptions::builder()
88                            .limit(limit / 2 + 1)
89                            .sort(doc! {
90                                "_id": 1_i32
91                            })
92                            .build(),
93                    ),
94                    self.find_with_options::<_, Message>(
95                        COL,
96                        older_message_filter,
97                        FindOptions::builder()
98                            .limit(limit / 2 + 1)
99                            .sort(doc! {
100                                "_id": -1_i32
101                            })
102                            .build(),
103                    )
104                )
105                .map_err(|_| create_database_error!("find", COL))?;
106
107                Ok([a, b].concat())
108            }
109            MessageTimePeriod::Absolute {
110                before,
111                after,
112                sort,
113            } => {
114                // 3.1. Apply message ID filter
115                if let Some(doc) = match (before, after) {
116                    (Some(before), Some(after)) => Some(doc! {
117                        "$lt": before,
118                        "$gt": after
119                    }),
120                    (Some(before), _) => Some(doc! {
121                        "$lt": before
122                    }),
123                    (_, Some(after)) => Some(doc! {
124                        "$gt": after
125                    }),
126                    _ => None,
127                } {
128                    filter.insert("_id", doc);
129                }
130
131                // 3.2. Execute with given message sort
132                self.find_with_options(
133                    COL,
134                    filter,
135                    FindOptions::builder()
136                        .limit(limit)
137                        .sort(match sort.unwrap_or(MessageSort::Latest) {
138                            // Sort by relevance, fallback to latest
139                            MessageSort::Relevance => {
140                                if is_search_query {
141                                    doc! {
142                                        "score": {
143                                            "$meta": "textScore"
144                                        }
145                                    }
146                                } else {
147                                    doc! {
148                                        "_id": -1_i32
149                                    }
150                                }
151                            }
152                            // Sort by latest first
153                            MessageSort::Latest => doc! {
154                                "_id": -1_i32
155                            },
156                            // Sort by oldest first
157                            MessageSort::Oldest => doc! {
158                                "_id": 1_i32
159                            },
160                        })
161                        .build(),
162                )
163                .await
164                .map_err(|_| create_database_error!("find", COL))
165            }
166        }
167    }
168
169    /// Fetch multiple messages by given IDs
170    async fn fetch_messages_by_id(&self, ids: &[String]) -> Result<Vec<Message>> {
171        self.find_with_options(
172            COL,
173            doc! {
174                "_id": {
175                    "$in": ids
176                }
177            },
178            None,
179        )
180        .await
181        .map_err(|_| create_database_error!("find", COL))
182    }
183
184    /// Update a given message with new information
185    async fn update_message(
186        &self,
187        id: &str,
188        message: &PartialMessage,
189        remove: Vec<FieldsMessage>,
190    ) -> Result<()> {
191        query!(
192            self,
193            update_one_by_id,
194            COL,
195            id,
196            message,
197            remove.iter().map(|x| x as &dyn IntoDocumentPath).collect(),
198            None
199        )
200        .map(|_| ())
201    }
202
203    /// Append information to a given message
204    async fn append_message(&self, id: &str, append: &AppendMessage) -> Result<()> {
205        let mut query = doc! {};
206
207        if let Some(embeds) = &append.embeds {
208            if !embeds.is_empty() {
209                query.insert(
210                    "$push",
211                    doc! {
212                        "embeds": {
213                            "$each": to_bson(embeds)
214                                .map_err(|_| create_database_error!("to_bson", "embeds"))?
215                        }
216                    },
217                );
218            }
219        }
220
221        if query.is_empty() {
222            return Ok(());
223        }
224
225        self.col::<Document>(COL)
226            .update_one(
227                doc! {
228                    "_id": id
229                },
230                query,
231            )
232            .await
233            .map(|_| ())
234            .map_err(|_| create_database_error!("update_one", COL))
235    }
236
237    /// Add a new reaction to a message
238    async fn add_reaction(&self, id: &str, emoji: &str, user: &str) -> Result<()> {
239        self.col::<Document>(COL)
240            .update_one(
241                doc! {
242                    "_id": id
243                },
244                doc! {
245                    "$addToSet": {
246                        format!("reactions.{emoji}"): user
247                    }
248                },
249            )
250            .await
251            .map(|_| ())
252            .map_err(|_| create_database_error!("update_one", COL))
253    }
254
255    /// Remove a reaction from a message
256    async fn remove_reaction(&self, id: &str, emoji: &str, user: &str) -> Result<()> {
257        self.col::<Document>(COL)
258            .update_one(
259                doc! {
260                    "_id": id
261                },
262                doc! {
263                    "$pull": {
264                        format!("reactions.{emoji}"): user
265                    }
266                },
267            )
268            .await
269            .map(|_| ())
270            .map_err(|_| create_database_error!("update_one", COL))
271    }
272
273    /// Remove reaction from a message
274    async fn clear_reaction(&self, id: &str, emoji: &str) -> Result<()> {
275        self.col::<Document>(COL)
276            .update_one(
277                doc! {
278                    "_id": id
279                },
280                doc! {
281                    "$unset": {
282                        format!("reactions.{emoji}"): 1
283                    }
284                },
285            )
286            .await
287            .map(|_| ())
288            .map_err(|_| create_database_error!("update_one", COL))
289    }
290
291    /// Delete a message from the database by its id
292    async fn delete_message(&self, id: &str) -> Result<()> {
293        query!(self, delete_one_by_id, COL, id).map(|_| ())
294    }
295
296    /// Delete messages from a channel by their ids and corresponding channel id
297    async fn delete_messages(&self, channel: &str, ids: &[String]) -> Result<()> {
298        self.col::<Document>(COL)
299            .delete_many(doc! {
300                "channel": channel,
301                "_id": {
302                    "$in": ids
303                }
304            })
305            .await
306            .map(|_| ())
307            .map_err(|_| create_database_error!("delete_many", COL))
308    }
309}
310
311impl IntoDocumentPath for FieldsMessage {
312    fn as_path(&self) -> Option<&'static str> {
313        Some(match self {
314            FieldsMessage::Pinned => "pinned",
315        })
316    }
317}
318
319impl MongoDb {
320    pub async fn delete_bulk_messages(&self, projection: Document) -> Result<()> {
321        let mut for_attachments = projection.clone();
322        for_attachments.insert(
323            "attachments",
324            doc! {
325                "$exists": 1_i32
326            },
327        );
328
329        // Check if there are any attachments we need to delete.
330        let message_ids_with_attachments = self
331            .find_with_options::<_, DocumentId>(
332                COL,
333                for_attachments,
334                FindOptions::builder()
335                    .projection(doc! { "_id": 1_i32 })
336                    .build(),
337            )
338            .await
339            .map_err(|_| create_database_error!("find_many", "attachments"))?
340            .into_iter()
341            .map(|x| x.id)
342            .collect::<Vec<String>>();
343
344        // If we found any, mark them as deleted.
345        if !message_ids_with_attachments.is_empty() {
346            self.col::<Document>("attachments")
347                .update_many(
348                    doc! {
349                        "message_id": {
350                            "$in": message_ids_with_attachments
351                        }
352                    },
353                    doc! {
354                        "$set": {
355                            "deleted": true
356                        }
357                    },
358                )
359                .await
360                .map_err(|_| create_database_error!("update_many", "attachments"))?;
361        }
362
363        // And then delete said messages.
364        self.col::<Document>(COL)
365            .delete_many(projection)
366            .await
367            .map(|_| ())
368            .map_err(|_| create_database_error!("delete_many", COL))
369    }
370}