revolt_database/amqp/
amqp.rs

1use std::collections::HashSet;
2
3use crate::events::rabbit::*;
4use crate::User;
5use amqprs::channel::BasicPublishArguments;
6use amqprs::{channel::Channel, connection::Connection, error::Error as AMQPError};
7use amqprs::{BasicProperties, FieldTable};
8use revolt_models::v0::PushNotification;
9use revolt_presence::filter_online;
10
11use serde_json::to_string;
12
13#[derive(Clone)]
14pub struct AMQP {
15    #[allow(unused)]
16    connection: Connection,
17    channel: Channel,
18}
19
20impl AMQP {
21    pub fn new(connection: Connection, channel: Channel) -> AMQP {
22        AMQP {
23            connection,
24            channel,
25        }
26    }
27
28    pub async fn friend_request_accepted(
29        &self,
30        accepted_request_user: &User,
31        sent_request_user: &User,
32    ) -> Result<(), AMQPError> {
33        let config = revolt_config::config().await;
34        let payload = FRAcceptedPayload {
35            accepted_user: accepted_request_user.to_owned(),
36            user: sent_request_user.id.clone(),
37        };
38        let payload = to_string(&payload).unwrap();
39
40        debug!(
41            "Sending friend request accept payload on channel {}: {}",
42            config.pushd.get_fr_accepted_routing_key(),
43            payload
44        );
45        self.channel
46            .basic_publish(
47                BasicProperties::default()
48                    .with_content_type("application/json")
49                    .with_persistence(true)
50                    .finish(),
51                payload.into(),
52                BasicPublishArguments::new(
53                    &config.pushd.exchange,
54                    &config.pushd.get_fr_accepted_routing_key(),
55                ),
56            )
57            .await
58    }
59
60    pub async fn friend_request_received(
61        &self,
62        received_request_user: &User,
63        sent_request_user: &User,
64    ) -> Result<(), AMQPError> {
65        let config = revolt_config::config().await;
66        let payload = FRReceivedPayload {
67            from_user: sent_request_user.to_owned(),
68            user: received_request_user.id.clone(),
69        };
70        let payload = to_string(&payload).unwrap();
71
72        debug!(
73            "Sending friend request received payload on channel {}: {}",
74            config.pushd.get_fr_received_routing_key(),
75            payload
76        );
77
78        self.channel
79            .basic_publish(
80                BasicProperties::default()
81                    .with_content_type("application/json")
82                    .with_persistence(true)
83                    .finish(),
84                payload.into(),
85                BasicPublishArguments::new(
86                    &config.pushd.exchange,
87                    &config.pushd.get_fr_received_routing_key(),
88                ),
89            )
90            .await
91    }
92
93    pub async fn generic_message(
94        &self,
95        user: &User,
96        title: String,
97        body: String,
98        icon: Option<String>,
99    ) -> Result<(), AMQPError> {
100        let config = revolt_config::config().await;
101        let payload = GenericPayload {
102            title,
103            body,
104            icon,
105            user: user.to_owned(),
106        };
107        let payload = to_string(&payload).unwrap();
108
109        debug!(
110            "Sending generic payload on channel {}: {}",
111            config.pushd.get_generic_routing_key(),
112            payload
113        );
114
115        self.channel
116            .basic_publish(
117                BasicProperties::default()
118                    .with_content_type("application/json")
119                    .with_persistence(true)
120                    .finish(),
121                payload.into(),
122                BasicPublishArguments::new(
123                    &config.pushd.exchange,
124                    &config.pushd.get_generic_routing_key(),
125                ),
126            )
127            .await
128    }
129
130    pub async fn message_sent(
131        &self,
132        recipients: Vec<String>,
133        payload: PushNotification,
134    ) -> Result<(), AMQPError> {
135        if recipients.is_empty() {
136            return Ok(());
137        }
138
139        let config = revolt_config::config().await;
140
141        let online_ids = filter_online(&recipients).await;
142        let recipients = (&recipients.into_iter().collect::<HashSet<String>>() - &online_ids)
143            .into_iter()
144            .collect::<Vec<String>>();
145
146        let payload = MessageSentPayload {
147            notification: payload,
148            users: recipients,
149        };
150        let payload = to_string(&payload).unwrap();
151
152        debug!(
153            "Sending message payload on channel {}: {}",
154            config.pushd.get_message_routing_key(),
155            payload
156        );
157
158        self.channel
159            .basic_publish(
160                BasicProperties::default()
161                    .with_content_type("application/json")
162                    .with_persistence(true)
163                    .finish(),
164                payload.into(),
165                BasicPublishArguments::new(
166                    &config.pushd.exchange,
167                    &config.pushd.get_message_routing_key(),
168                ),
169            )
170            .await
171    }
172
173    pub async fn mass_mention_message_sent(
174        &self,
175        server_id: String,
176        payload: Vec<PushNotification>,
177    ) -> Result<(), AMQPError> {
178        let config = revolt_config::config().await;
179
180        let payload = MassMessageSentPayload {
181            notifications: payload,
182            server_id,
183        };
184        let payload = to_string(&payload).unwrap();
185
186        let routing_key = config.pushd.get_mass_mention_routing_key();
187
188        debug!(
189            "Sending mass mention payload on channel {}: {}",
190            routing_key, payload
191        );
192
193        self.channel
194            .basic_publish(
195                BasicProperties::default()
196                    .with_content_type("application/json")
197                    .with_persistence(true)
198                    .finish(),
199                payload.into(),
200                BasicPublishArguments::new(&config.pushd.exchange, routing_key.as_str()),
201            )
202            .await
203    }
204
205    pub async fn ack_message(
206        &self,
207        user_id: String,
208        channel_id: String,
209        message_id: String,
210    ) -> Result<(), AMQPError> {
211        let config = revolt_config::config().await;
212
213        let payload = AckPayload {
214            user_id: user_id.clone(),
215            channel_id: channel_id.clone(),
216            message_id,
217        };
218        let payload = to_string(&payload).unwrap();
219
220        info!(
221            "Sending ack payload on channel {}: {}",
222            config.pushd.ack_queue, payload
223        );
224
225        let mut headers = FieldTable::new();
226        headers.insert(
227            "x-deduplication-header".try_into().unwrap(),
228            format!("{}-{}", &user_id, &channel_id).into(),
229        );
230
231        self.channel
232            .basic_publish(
233                BasicProperties::default()
234                    .with_content_type("application/json")
235                    .with_persistence(true)
236                    //.with_headers(headers)
237                    .finish(),
238                payload.into(),
239                BasicPublishArguments::new(&config.pushd.exchange, &config.pushd.ack_queue),
240            )
241            .await
242    }
243}