revolt_database/amqp/
amqp.rs

1use std::collections::HashSet;
2
3use crate::events::rabbit::*;
4use crate::User;
5use amqprs::channel::{BasicPublishArguments, ExchangeDeclareArguments};
6use amqprs::connection::OpenConnectionArguments;
7use amqprs::{channel::Channel, connection::Connection, error::Error as AMQPError};
8use amqprs::{BasicProperties, FieldTable};
9use revolt_models::v0::PushNotification;
10use revolt_presence::filter_online;
11
12use serde_json::to_string;
13
14#[derive(Clone)]
15pub struct AMQP {
16    #[allow(unused)]
17    connection: Connection,
18    channel: Channel,
19}
20
21impl AMQP {
22    pub fn new(connection: Connection, channel: Channel) -> AMQP {
23        AMQP {
24            connection,
25            channel,
26        }
27    }
28
29    pub async fn new_auto() -> AMQP {
30        let config = revolt_config::config().await;
31
32        let connection = Connection::open(&OpenConnectionArguments::new(
33            &config.rabbit.host,
34            config.rabbit.port,
35            &config.rabbit.username,
36            &config.rabbit.password,
37        ))
38        .await
39        .expect("Failed to connect to RabbitMQ");
40
41        let channel = connection
42            .open_channel(None)
43            .await
44            .expect("Failed to open RabbitMQ channel");
45
46        channel
47            .exchange_declare(
48                ExchangeDeclareArguments::new(&config.pushd.exchange, "direct")
49                    .durable(true)
50                    .finish(),
51            )
52            .await
53            .expect("Failed to declare exchange");
54
55        AMQP::new(connection, channel)
56    }
57
58    pub async fn friend_request_accepted(
59        &self,
60        accepted_request_user: &User,
61        sent_request_user: &User,
62    ) -> Result<(), AMQPError> {
63        let config = revolt_config::config().await;
64        let payload = FRAcceptedPayload {
65            accepted_user: accepted_request_user.to_owned(),
66            user: sent_request_user.id.clone(),
67        };
68        let payload = to_string(&payload).unwrap();
69
70        debug!(
71            "Sending friend request accept payload on channel {}: {}",
72            config.pushd.get_fr_accepted_routing_key(),
73            payload
74        );
75        self.channel
76            .basic_publish(
77                BasicProperties::default()
78                    .with_content_type("application/json")
79                    .with_persistence(true)
80                    .finish(),
81                payload.into(),
82                BasicPublishArguments::new(
83                    &config.pushd.exchange,
84                    &config.pushd.get_fr_accepted_routing_key(),
85                ),
86            )
87            .await
88    }
89
90    pub async fn friend_request_received(
91        &self,
92        received_request_user: &User,
93        sent_request_user: &User,
94    ) -> Result<(), AMQPError> {
95        let config = revolt_config::config().await;
96        let payload = FRReceivedPayload {
97            from_user: sent_request_user.to_owned(),
98            user: received_request_user.id.clone(),
99        };
100        let payload = to_string(&payload).unwrap();
101
102        debug!(
103            "Sending friend request received payload on channel {}: {}",
104            config.pushd.get_fr_received_routing_key(),
105            payload
106        );
107
108        self.channel
109            .basic_publish(
110                BasicProperties::default()
111                    .with_content_type("application/json")
112                    .with_persistence(true)
113                    .finish(),
114                payload.into(),
115                BasicPublishArguments::new(
116                    &config.pushd.exchange,
117                    &config.pushd.get_fr_received_routing_key(),
118                ),
119            )
120            .await
121    }
122
123    pub async fn generic_message(
124        &self,
125        user: &User,
126        title: String,
127        body: String,
128        icon: Option<String>,
129    ) -> Result<(), AMQPError> {
130        let config = revolt_config::config().await;
131        let payload = GenericPayload {
132            title,
133            body,
134            icon,
135            user: user.to_owned(),
136        };
137        let payload = to_string(&payload).unwrap();
138
139        debug!(
140            "Sending generic payload on channel {}: {}",
141            config.pushd.get_generic_routing_key(),
142            payload
143        );
144
145        self.channel
146            .basic_publish(
147                BasicProperties::default()
148                    .with_content_type("application/json")
149                    .with_persistence(true)
150                    .finish(),
151                payload.into(),
152                BasicPublishArguments::new(
153                    &config.pushd.exchange,
154                    &config.pushd.get_generic_routing_key(),
155                ),
156            )
157            .await
158    }
159
160    pub async fn message_sent(
161        &self,
162        recipients: Vec<String>,
163        payload: PushNotification,
164    ) -> Result<(), AMQPError> {
165        if recipients.is_empty() {
166            return Ok(());
167        }
168
169        let config = revolt_config::config().await;
170
171        let online_ids = filter_online(&recipients).await;
172        let recipients = (&recipients.into_iter().collect::<HashSet<String>>() - &online_ids)
173            .into_iter()
174            .collect::<Vec<String>>();
175
176        let payload = MessageSentPayload {
177            notification: payload,
178            users: recipients,
179        };
180        let payload = to_string(&payload).unwrap();
181
182        debug!(
183            "Sending message payload on channel {}: {}",
184            config.pushd.get_message_routing_key(),
185            payload
186        );
187
188        self.channel
189            .basic_publish(
190                BasicProperties::default()
191                    .with_content_type("application/json")
192                    .with_persistence(true)
193                    .finish(),
194                payload.into(),
195                BasicPublishArguments::new(
196                    &config.pushd.exchange,
197                    &config.pushd.get_message_routing_key(),
198                ),
199            )
200            .await
201    }
202
203    pub async fn mass_mention_message_sent(
204        &self,
205        server_id: String,
206        payload: Vec<PushNotification>,
207    ) -> Result<(), AMQPError> {
208        let config = revolt_config::config().await;
209
210        let payload = MassMessageSentPayload {
211            notifications: payload,
212            server_id,
213        };
214        let payload = to_string(&payload).unwrap();
215
216        let routing_key = config.pushd.get_mass_mention_routing_key();
217
218        debug!(
219            "Sending mass mention payload on channel {}: {}",
220            routing_key, payload
221        );
222
223        self.channel
224            .basic_publish(
225                BasicProperties::default()
226                    .with_content_type("application/json")
227                    .with_persistence(true)
228                    .finish(),
229                payload.into(),
230                BasicPublishArguments::new(&config.pushd.exchange, routing_key.as_str()),
231            )
232            .await
233    }
234
235    pub async fn ack_message(
236        &self,
237        user_id: String,
238        channel_id: String,
239        message_id: String,
240    ) -> Result<(), AMQPError> {
241        let config = revolt_config::config().await;
242
243        let payload = AckPayload {
244            user_id: user_id.clone(),
245            channel_id: channel_id.clone(),
246            message_id,
247        };
248        let payload = to_string(&payload).unwrap();
249
250        info!(
251            "Sending ack payload on channel {}: {}",
252            config.pushd.ack_queue, payload
253        );
254
255        let mut headers = FieldTable::new();
256        headers.insert(
257            "x-deduplication-header".try_into().unwrap(),
258            format!("{}-{}", &user_id, &channel_id).into(),
259        );
260
261        self.channel
262            .basic_publish(
263                BasicProperties::default()
264                    .with_content_type("application/json")
265                    .with_persistence(true)
266                    //.with_headers(headers)
267                    .finish(),
268                payload.into(),
269                BasicPublishArguments::new(&config.pushd.exchange, &config.pushd.ack_queue),
270            )
271            .await
272    }
273
274    /// # DM Call Update
275    /// Used to send an update about a DM call, eg. start or end of a call.
276    /// Recipients can be used to narrow the scope of recipients, otherwise all recipients will be notified.
277    /// `ended` refers to the ringing period, not necessarily the call itself.
278    pub async fn dm_call_updated(
279        &self,
280        initiator_id: &str,
281        channel_id: &str,
282        started_at: Option<&str>,
283        ended: bool,
284        recipients: Option<Vec<String>>,
285    ) -> Result<(), AMQPError> {
286        let config = revolt_config::config().await;
287
288        let payload = InternalDmCallPayload {
289            payload: DmCallPayload {
290                initiator_id: initiator_id.to_string(),
291                channel_id: channel_id.to_string(),
292                started_at: started_at.map(|f| f.to_string()),
293                ended,
294            },
295            recipients,
296        };
297        let payload = to_string(&payload).unwrap();
298
299        debug!(
300            "Sending dm call update payload on channel {}: {}",
301            config.pushd.get_dm_call_routing_key(),
302            payload
303        );
304
305        self.channel
306            .basic_publish(
307                BasicProperties::default()
308                    .with_content_type("application/json")
309                    .with_persistence(true)
310                    .finish(),
311                payload.into(),
312                BasicPublishArguments::new(
313                    &config.pushd.exchange,
314                    &config.pushd.get_dm_call_routing_key(),
315                ),
316            )
317            .await
318    }
319}