revolt_database/models/channel_unreads/ops/
mongodb.rs

1use bson::Document;
2use mongodb::options::FindOneAndUpdateOptions;
3use mongodb::options::ReturnDocument;
4use mongodb::options::UpdateOptions;
5use revolt_result::Result;
6use ulid::Ulid;
7
8use crate::ChannelUnread;
9use crate::MongoDb;
10
11use super::AbstractChannelUnreads;
12
13static COL: &str = "channel_unreads";
14
15#[async_trait]
16impl AbstractChannelUnreads for MongoDb {
17    /// Acknowledge a message, and returns updated channel unread.
18    async fn acknowledge_message(
19        &self,
20        channel_id: &str,
21        user_id: &str,
22        message_id: &str,
23    ) -> Result<Option<ChannelUnread>> {
24        self.col::<ChannelUnread>(COL)
25            .find_one_and_update(
26                doc! {
27                    "_id.channel": channel_id,
28                    "_id.user": user_id,
29                },
30                doc! {
31                    "$pull": {
32                        "mentions": {
33                            "$lte": message_id
34                        }
35                    },
36                    "$set": {
37                        "last_id": message_id
38                    }
39                },
40            )
41            .with_options(
42                FindOneAndUpdateOptions::builder()
43                    .upsert(true)
44                    .return_document(ReturnDocument::After)
45                    .build(),
46            )
47            .await
48            .map_err(|_| create_database_error!("update_one", COL))
49    }
50
51    /// Acknowledge many channels.
52    async fn acknowledge_channels(&self, user_id: &str, channel_ids: &[String]) -> Result<()> {
53        let current_time = Ulid::new().to_string();
54
55        self.col::<Document>(COL)
56            .delete_many(doc! {
57                "_id.channel": {
58                    "$in": channel_ids
59                },
60                "_id.user": user_id
61            })
62            .await
63            .map_err(|_| create_database_error!("delete_many", COL))?;
64
65        self.col::<Document>(COL)
66            .insert_many(
67                channel_ids
68                    .iter()
69                    .map(|channel_id| {
70                        doc! {
71                            "_id": {
72                                "channel": channel_id,
73                                "user": user_id
74                            },
75                            "last_id": &current_time
76                        }
77                    })
78                    .collect::<Vec<Document>>(),
79            )
80            .await
81            .map(|_| ())
82            .map_err(|_| create_database_error!("update_many", COL))
83    }
84
85    /// Add a mention.
86    async fn add_mention_to_unread<'a>(
87        &self,
88        channel_id: &str,
89        user_id: &str,
90        message_ids: &[String],
91    ) -> Result<()> {
92        self.col::<Document>(COL)
93            .update_one(
94                doc! {
95                    "_id.channel": channel_id,
96                    "_id.user": user_id,
97                },
98                doc! {
99                    "$push": {
100                        "mentions": {
101                            "$each": message_ids
102                        }
103                    }
104                },
105            )
106            .with_options(UpdateOptions::builder().upsert(true).build())
107            .await
108            .map(|_| ())
109            .map_err(|_| create_database_error!("update_one", COL))
110    }
111
112    /// Add a mention to multiple users.
113    async fn add_mention_to_many_unreads<'a>(
114        &self,
115        channel_id: &str,
116        user_ids: &[String],
117        message_ids: &[String],
118    ) -> Result<()> {
119        self.col::<Document>(COL)
120            .update_many(
121                doc! {
122                    "_id.channel": channel_id,
123                    "_id.user": {
124                        "$in": user_ids
125                    },
126                },
127                doc! {
128                    "$push": {
129                        "mentions": {
130                            "$each": message_ids
131                        }
132                    }
133                },
134            )
135            .with_options(UpdateOptions::builder().upsert(true).build())
136            .await
137            .map(|_| ())
138            .map_err(|_| create_database_error!("update_many", COL))
139    }
140
141    /// Fetch all channel unreads for a user.
142    async fn fetch_unreads(&self, user_id: &str) -> Result<Vec<ChannelUnread>> {
143        query!(
144            self,
145            find,
146            COL,
147            doc! {
148                "_id.user": user_id
149            }
150        )
151    }
152
153    async fn fetch_unread_mentions(&self, user_id: &str) -> Result<Vec<ChannelUnread>> {
154        query! {
155            self,
156            find,
157            COL,
158            doc! {
159                "_id.user": user_id,
160                "mentions": {"$ne": null}
161            }
162        }
163    }
164
165    /// Fetch unread for a specific user in a channel.
166    async fn fetch_unread(&self, user_id: &str, channel_id: &str) -> Result<Option<ChannelUnread>> {
167        query!(
168            self,
169            find_one,
170            COL,
171            doc! {
172                "_id.user": user_id,
173                "_id.channel": channel_id
174            }
175        )
176    }
177}