1use std::collections::HashSet;
2
3use crate::events::rabbit::*;
4use crate::User;
5use amqprs::channel::{BasicPublishArguments, ExchangeDeclareArguments};
6use amqprs::connection::OpenConnectionArguments;
7use amqprs::{channel::Channel, connection::Connection, error::Error as AMQPError};
8use amqprs::{BasicProperties, FieldTable};
9use revolt_models::v0::PushNotification;
10use revolt_presence::filter_online;
11
12use serde_json::to_string;
13
14#[derive(Clone)]
15pub struct AMQP {
16 #[allow(unused)]
17 connection: Connection,
18 channel: Channel,
19}
20
21impl AMQP {
22 pub fn new(connection: Connection, channel: Channel) -> AMQP {
23 AMQP {
24 connection,
25 channel,
26 }
27 }
28
29 pub async fn new_auto() -> AMQP {
30 let config = revolt_config::config().await;
31
32 let connection = Connection::open(&OpenConnectionArguments::new(
33 &config.rabbit.host,
34 config.rabbit.port,
35 &config.rabbit.username,
36 &config.rabbit.password,
37 ))
38 .await
39 .expect("Failed to connect to RabbitMQ");
40
41 let channel = connection
42 .open_channel(None)
43 .await
44 .expect("Failed to open RabbitMQ channel");
45
46 channel
47 .exchange_declare(
48 ExchangeDeclareArguments::new(&config.pushd.exchange, "direct")
49 .durable(true)
50 .finish(),
51 )
52 .await
53 .expect("Failed to declare exchange");
54
55 AMQP::new(connection, channel)
56 }
57
58 pub async fn friend_request_accepted(
59 &self,
60 accepted_request_user: &User,
61 sent_request_user: &User,
62 ) -> Result<(), AMQPError> {
63 let config = revolt_config::config().await;
64 let payload = FRAcceptedPayload {
65 accepted_user: accepted_request_user.to_owned(),
66 user: sent_request_user.id.clone(),
67 };
68 let payload = to_string(&payload).unwrap();
69
70 debug!(
71 "Sending friend request accept payload on channel {}: {}",
72 config.pushd.get_fr_accepted_routing_key(),
73 payload
74 );
75 self.channel
76 .basic_publish(
77 BasicProperties::default()
78 .with_content_type("application/json")
79 .with_persistence(true)
80 .finish(),
81 payload.into(),
82 BasicPublishArguments::new(
83 &config.pushd.exchange,
84 &config.pushd.get_fr_accepted_routing_key(),
85 ),
86 )
87 .await
88 }
89
90 pub async fn friend_request_received(
91 &self,
92 received_request_user: &User,
93 sent_request_user: &User,
94 ) -> Result<(), AMQPError> {
95 let config = revolt_config::config().await;
96 let payload = FRReceivedPayload {
97 from_user: sent_request_user.to_owned(),
98 user: received_request_user.id.clone(),
99 };
100 let payload = to_string(&payload).unwrap();
101
102 debug!(
103 "Sending friend request received payload on channel {}: {}",
104 config.pushd.get_fr_received_routing_key(),
105 payload
106 );
107
108 self.channel
109 .basic_publish(
110 BasicProperties::default()
111 .with_content_type("application/json")
112 .with_persistence(true)
113 .finish(),
114 payload.into(),
115 BasicPublishArguments::new(
116 &config.pushd.exchange,
117 &config.pushd.get_fr_received_routing_key(),
118 ),
119 )
120 .await
121 }
122
123 pub async fn generic_message(
124 &self,
125 user: &User,
126 title: String,
127 body: String,
128 icon: Option<String>,
129 ) -> Result<(), AMQPError> {
130 let config = revolt_config::config().await;
131 let payload = GenericPayload {
132 title,
133 body,
134 icon,
135 user: user.to_owned(),
136 };
137 let payload = to_string(&payload).unwrap();
138
139 debug!(
140 "Sending generic payload on channel {}: {}",
141 config.pushd.get_generic_routing_key(),
142 payload
143 );
144
145 self.channel
146 .basic_publish(
147 BasicProperties::default()
148 .with_content_type("application/json")
149 .with_persistence(true)
150 .finish(),
151 payload.into(),
152 BasicPublishArguments::new(
153 &config.pushd.exchange,
154 &config.pushd.get_generic_routing_key(),
155 ),
156 )
157 .await
158 }
159
160 pub async fn message_sent(
161 &self,
162 recipients: Vec<String>,
163 payload: PushNotification,
164 ) -> Result<(), AMQPError> {
165 if recipients.is_empty() {
166 return Ok(());
167 }
168
169 let config = revolt_config::config().await;
170
171 let online_ids = filter_online(&recipients).await;
172 let recipients = (&recipients.into_iter().collect::<HashSet<String>>() - &online_ids)
173 .into_iter()
174 .collect::<Vec<String>>();
175
176 let payload = MessageSentPayload {
177 notification: payload,
178 users: recipients,
179 };
180 let payload = to_string(&payload).unwrap();
181
182 debug!(
183 "Sending message payload on channel {}: {}",
184 config.pushd.get_message_routing_key(),
185 payload
186 );
187
188 self.channel
189 .basic_publish(
190 BasicProperties::default()
191 .with_content_type("application/json")
192 .with_persistence(true)
193 .finish(),
194 payload.into(),
195 BasicPublishArguments::new(
196 &config.pushd.exchange,
197 &config.pushd.get_message_routing_key(),
198 ),
199 )
200 .await
201 }
202
203 pub async fn mass_mention_message_sent(
204 &self,
205 server_id: String,
206 payload: Vec<PushNotification>,
207 ) -> Result<(), AMQPError> {
208 let config = revolt_config::config().await;
209
210 let payload = MassMessageSentPayload {
211 notifications: payload,
212 server_id,
213 };
214 let payload = to_string(&payload).unwrap();
215
216 let routing_key = config.pushd.get_mass_mention_routing_key();
217
218 debug!(
219 "Sending mass mention payload on channel {}: {}",
220 routing_key, payload
221 );
222
223 self.channel
224 .basic_publish(
225 BasicProperties::default()
226 .with_content_type("application/json")
227 .with_persistence(true)
228 .finish(),
229 payload.into(),
230 BasicPublishArguments::new(&config.pushd.exchange, routing_key.as_str()),
231 )
232 .await
233 }
234
235 pub async fn ack_message(
236 &self,
237 user_id: String,
238 channel_id: String,
239 message_id: String,
240 ) -> Result<(), AMQPError> {
241 let config = revolt_config::config().await;
242
243 let payload = AckPayload {
244 user_id: user_id.clone(),
245 channel_id: channel_id.clone(),
246 message_id,
247 };
248 let payload = to_string(&payload).unwrap();
249
250 info!(
251 "Sending ack payload on channel {}: {}",
252 config.pushd.ack_queue, payload
253 );
254
255 let mut headers = FieldTable::new();
256 headers.insert(
257 "x-deduplication-header".try_into().unwrap(),
258 format!("{}-{}", &user_id, &channel_id).into(),
259 );
260
261 self.channel
262 .basic_publish(
263 BasicProperties::default()
264 .with_content_type("application/json")
265 .with_persistence(true)
266 .finish(),
268 payload.into(),
269 BasicPublishArguments::new(&config.pushd.exchange, &config.pushd.ack_queue),
270 )
271 .await
272 }
273
274 pub async fn dm_call_updated(
279 &self,
280 initiator_id: &str,
281 channel_id: &str,
282 started_at: Option<&str>,
283 ended: bool,
284 recipients: Option<Vec<String>>,
285 ) -> Result<(), AMQPError> {
286 let config = revolt_config::config().await;
287
288 let payload = InternalDmCallPayload {
289 payload: DmCallPayload {
290 initiator_id: initiator_id.to_string(),
291 channel_id: channel_id.to_string(),
292 started_at: started_at.map(|f| f.to_string()),
293 ended,
294 },
295 recipients,
296 };
297 let payload = to_string(&payload).unwrap();
298
299 debug!(
300 "Sending dm call update payload on channel {}: {}",
301 config.pushd.get_dm_call_routing_key(),
302 payload
303 );
304
305 self.channel
306 .basic_publish(
307 BasicProperties::default()
308 .with_content_type("application/json")
309 .with_persistence(true)
310 .finish(),
311 payload.into(),
312 BasicPublishArguments::new(
313 &config.pushd.exchange,
314 &config.pushd.get_dm_call_routing_key(),
315 ),
316 )
317 .await
318 }
319}