1use std::collections::HashSet;
2
3use crate::events::rabbit::*;
4use crate::User;
5use amqprs::channel::BasicPublishArguments;
6use amqprs::{channel::Channel, connection::Connection, error::Error as AMQPError};
7use amqprs::{BasicProperties, FieldTable};
8use revolt_models::v0::PushNotification;
9use revolt_presence::filter_online;
10
11use serde_json::to_string;
12
13#[derive(Clone)]
14pub struct AMQP {
15 #[allow(unused)]
16 connection: Connection,
17 channel: Channel,
18}
19
20impl AMQP {
21 pub fn new(connection: Connection, channel: Channel) -> AMQP {
22 AMQP {
23 connection,
24 channel,
25 }
26 }
27
28 pub async fn friend_request_accepted(
29 &self,
30 accepted_request_user: &User,
31 sent_request_user: &User,
32 ) -> Result<(), AMQPError> {
33 let config = revolt_config::config().await;
34 let payload = FRAcceptedPayload {
35 accepted_user: accepted_request_user.to_owned(),
36 user: sent_request_user.id.clone(),
37 };
38 let payload = to_string(&payload).unwrap();
39
40 debug!(
41 "Sending friend request accept payload on channel {}: {}",
42 config.pushd.get_fr_accepted_routing_key(),
43 payload
44 );
45 self.channel
46 .basic_publish(
47 BasicProperties::default()
48 .with_content_type("application/json")
49 .with_persistence(true)
50 .finish(),
51 payload.into(),
52 BasicPublishArguments::new(
53 &config.pushd.exchange,
54 &config.pushd.get_fr_accepted_routing_key(),
55 ),
56 )
57 .await
58 }
59
60 pub async fn friend_request_received(
61 &self,
62 received_request_user: &User,
63 sent_request_user: &User,
64 ) -> Result<(), AMQPError> {
65 let config = revolt_config::config().await;
66 let payload = FRReceivedPayload {
67 from_user: sent_request_user.to_owned(),
68 user: received_request_user.id.clone(),
69 };
70 let payload = to_string(&payload).unwrap();
71
72 debug!(
73 "Sending friend request received payload on channel {}: {}",
74 config.pushd.get_fr_received_routing_key(),
75 payload
76 );
77
78 self.channel
79 .basic_publish(
80 BasicProperties::default()
81 .with_content_type("application/json")
82 .with_persistence(true)
83 .finish(),
84 payload.into(),
85 BasicPublishArguments::new(
86 &config.pushd.exchange,
87 &config.pushd.get_fr_received_routing_key(),
88 ),
89 )
90 .await
91 }
92
93 pub async fn generic_message(
94 &self,
95 user: &User,
96 title: String,
97 body: String,
98 icon: Option<String>,
99 ) -> Result<(), AMQPError> {
100 let config = revolt_config::config().await;
101 let payload = GenericPayload {
102 title,
103 body,
104 icon,
105 user: user.to_owned(),
106 };
107 let payload = to_string(&payload).unwrap();
108
109 debug!(
110 "Sending generic payload on channel {}: {}",
111 config.pushd.get_generic_routing_key(),
112 payload
113 );
114
115 self.channel
116 .basic_publish(
117 BasicProperties::default()
118 .with_content_type("application/json")
119 .with_persistence(true)
120 .finish(),
121 payload.into(),
122 BasicPublishArguments::new(
123 &config.pushd.exchange,
124 &config.pushd.get_generic_routing_key(),
125 ),
126 )
127 .await
128 }
129
130 pub async fn message_sent(
131 &self,
132 recipients: Vec<String>,
133 payload: PushNotification,
134 ) -> Result<(), AMQPError> {
135 if recipients.is_empty() {
136 return Ok(());
137 }
138
139 let config = revolt_config::config().await;
140
141 let online_ids = filter_online(&recipients).await;
142 let recipients = (&recipients.into_iter().collect::<HashSet<String>>() - &online_ids)
143 .into_iter()
144 .collect::<Vec<String>>();
145
146 let payload = MessageSentPayload {
147 notification: payload,
148 users: recipients,
149 };
150 let payload = to_string(&payload).unwrap();
151
152 debug!(
153 "Sending message payload on channel {}: {}",
154 config.pushd.get_message_routing_key(),
155 payload
156 );
157
158 self.channel
159 .basic_publish(
160 BasicProperties::default()
161 .with_content_type("application/json")
162 .with_persistence(true)
163 .finish(),
164 payload.into(),
165 BasicPublishArguments::new(
166 &config.pushd.exchange,
167 &config.pushd.get_message_routing_key(),
168 ),
169 )
170 .await
171 }
172
173 pub async fn mass_mention_message_sent(
174 &self,
175 server_id: String,
176 payload: Vec<PushNotification>,
177 ) -> Result<(), AMQPError> {
178 let config = revolt_config::config().await;
179
180 let payload = MassMessageSentPayload {
181 notifications: payload,
182 server_id,
183 };
184 let payload = to_string(&payload).unwrap();
185
186 let routing_key = config.pushd.get_mass_mention_routing_key();
187
188 debug!(
189 "Sending mass mention payload on channel {}: {}",
190 routing_key, payload
191 );
192
193 self.channel
194 .basic_publish(
195 BasicProperties::default()
196 .with_content_type("application/json")
197 .with_persistence(true)
198 .finish(),
199 payload.into(),
200 BasicPublishArguments::new(&config.pushd.exchange, routing_key.as_str()),
201 )
202 .await
203 }
204
205 pub async fn ack_message(
206 &self,
207 user_id: String,
208 channel_id: String,
209 message_id: String,
210 ) -> Result<(), AMQPError> {
211 let config = revolt_config::config().await;
212
213 let payload = AckPayload {
214 user_id: user_id.clone(),
215 channel_id: channel_id.clone(),
216 message_id,
217 };
218 let payload = to_string(&payload).unwrap();
219
220 info!(
221 "Sending ack payload on channel {}: {}",
222 config.pushd.ack_queue, payload
223 );
224
225 let mut headers = FieldTable::new();
226 headers.insert(
227 "x-deduplication-header".try_into().unwrap(),
228 format!("{}-{}", &user_id, &channel_id).into(),
229 );
230
231 self.channel
232 .basic_publish(
233 BasicProperties::default()
234 .with_content_type("application/json")
235 .with_persistence(true)
236 .finish(),
238 payload.into(),
239 BasicPublishArguments::new(&config.pushd.exchange, &config.pushd.ack_queue),
240 )
241 .await
242 }
243}