1use futures_lite::{Future, StreamExt};
2use std::{collections::HashMap, sync::Arc, time::Duration};
3
4use lapin::{
5 message::Delivery,
6 options::{BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions},
7 types::FieldTable,
8 BasicProperties, Channel, Connection,
9};
10use tracing::{debug, error, info, warn};
11
12use crate::{
13 models::{CreateMatch, CreatedMatch, GameServerCreate, MatchAbrubtClose, MatchResult},
14 MessageHandler,
15};
16
17async fn try_connect(amqp_url: &str) -> Connection {
18 loop {
19 match Connection::connect(amqp_url, lapin::ConnectionProperties::default()).await {
20 Ok(conn) => return conn,
21 Err(err) => {
22 error!("Could not connect to RabbitMQ: {:?}", err);
23 tokio::time::sleep(Duration::from_secs(5)).await;
24 }
25 }
26 }
27}
28
29async fn setup_queue_and_listen<F, Fut>(amqp_url: String, queue_name: String, on_message: F)
33where
34 F: Fn(Delivery) -> Fut + Send + Sync + Clone + 'static,
35 Fut: Future<Output = ()> + Send + 'static,
36{
37 tokio::spawn(async move {
38 loop {
39 let conn = try_connect(&amqp_url).await;
40
41 let channel = match conn.create_channel().await {
42 Ok(ch) => Arc::new(ch),
43 Err(e) => {
44 error!("Failed to create channel: {:?}", e);
45 tokio::time::sleep(Duration::from_secs(5)).await;
46 continue;
47 }
48 };
49
50 if let Err(e) = channel
51 .queue_declare(&queue_name, QueueDeclareOptions::default(), FieldTable::default())
52 .await
53 {
54 error!("Failed to declare queue {}: {:?}", queue_name, e);
55 tokio::time::sleep(Duration::from_secs(5)).await;
56 continue;
57 }
58
59 let mut consumer = match channel
60 .basic_consume(
61 &queue_name,
62 &uuid::Uuid::new_v4().to_string(),
64 BasicConsumeOptions::default(),
65 FieldTable::default(),
66 )
67 .await
68 {
69 Ok(c) => c,
70 Err(e) => {
71 error!("Failed to start consumer for {}: {:?}", queue_name, e);
72 tokio::time::sleep(Duration::from_secs(5)).await;
73 continue;
74 }
75 };
76
77 info!("Listening on queue: {}", queue_name);
78
79 while let Some(delivery) = consumer.next().await {
80 match delivery {
81 Ok(delivery) => {
82 let on_message = on_message.clone();
83 tokio::spawn(async move { on_message(delivery).await });
84 }
85 Err(err) => {
86 error!("Consumer error on {}, reconnecting: {:?}", queue_name, err);
87 break;
88 }
89 }
90 }
91
92 warn!("Consumer loop for {} ended, reconnecting in 5s...", queue_name);
93 tokio::time::sleep(Duration::from_secs(5)).await;
94 }
95 });
96}
97
98pub struct RabbitMQCommunicator {
99 amqp_url: String,
100 channel: Arc<tokio::sync::RwLock<Arc<Channel>>>,
102 queues: HashMap<String, HashMap<String, String>>,
103}
104
105impl RabbitMQCommunicator {
106 pub async fn connect(amqp_url: &str) -> Self {
107 let conn = try_connect(amqp_url).await;
108 let channel = Arc::new(
109 conn.create_channel()
110 .await
111 .expect("Could not create publish channel"),
112 );
113
114 Self {
115 amqp_url: amqp_url.to_string(),
116 channel: Arc::new(tokio::sync::RwLock::new(channel)),
117 queues: Self::load_default_queues(),
118 }
119 }
120
121 fn load_default_queues() -> HashMap<String, HashMap<String, String>> {
122 let content = include_str!("../queues.yml");
123 serde_yaml::from_str(content).expect("Failed to parse queues file")
124 }
125
126 fn get_queue_name(&self, name: &str, action: &str) -> &str {
127 self.queues
128 .get(name)
129 .expect(&format!("Queue {} not found", name))
130 .get(action)
131 .expect(&format!("Action {} not found for queue {}", action, name))
132 }
133
134 pub fn load_queues(&mut self, path: &str) {
135 let content = std::fs::read_to_string(path).expect("Failed to read file");
136 self.queues = serde_yaml::from_str(&content).expect("Failed to parse routes file");
137 }
138
139 async fn publish_with_retry(&self, queue: &str, data: Vec<u8>) {
141 loop {
142 let channel = self.channel.read().await.clone();
143 match channel
144 .basic_publish(
145 "",
146 queue,
147 BasicPublishOptions::default(),
148 &data,
149 BasicProperties::default(),
150 )
151 .await
152 {
153 Ok(_) => return,
154 Err(err) => {
155 error!("Publish to {} failed: {:?}, reconnecting...", queue, err);
156 let conn = try_connect(&self.amqp_url).await;
157 match conn.create_channel().await {
158 Ok(new_ch) => *self.channel.write().await = Arc::new(new_ch),
159 Err(e) => error!("Failed to create channel after reconnect: {:?}", e),
160 }
161 }
162 }
163 }
164 }
165}
166
167impl super::Communicator for RabbitMQCommunicator {
168 async fn on_match_abrupt_close<F, Fut>(&self, callback: F)
169 where
170 F: MessageHandler<MatchAbrubtClose, Fut>,
171 Fut: Future<Output = ()> + Send + 'static,
172 {
173 let queue = self.get_queue_name("match", "abrupt_close").to_string();
174 setup_queue_and_listen(self.amqp_url.clone(), queue, move |delivery| {
175 let callback = callback.clone();
176 async move {
177 if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
178 error!("Failed to ack delivery: {:?}", e);
179 return;
180 }
181 let reason: MatchAbrubtClose = serde_json::from_slice(&delivery.data).unwrap();
182 callback(reason).await;
183 }
184 })
185 .await;
186 }
187
188 async fn on_game_create<F, Fut>(&self, callback: F)
189 where
190 F: MessageHandler<crate::models::GameServerCreate, Fut>,
191 Fut: Future<Output = String> + Send + 'static,
192 {
193 let queue = self.get_queue_name("game", "create").to_string();
194 setup_queue_and_listen(self.amqp_url.clone(), queue, move |delivery| {
195 let callback = callback.clone();
196 async move {
197 if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
198 error!("Failed to ack delivery: {:?}", e);
199 return;
200 }
201 let created_game: GameServerCreate =
202 serde_json::from_slice(&delivery.data).unwrap();
203 callback(created_game).await;
204 }
205 })
206 .await;
207 }
208
209 async fn on_match_created<F, Fut>(&self, callback: F)
210 where
211 F: MessageHandler<crate::models::CreatedMatch, Fut>,
212 Fut: Future<Output = ()> + Send + 'static,
213 {
214 let queue = self.get_queue_name("match", "created").to_string();
215 setup_queue_and_listen(self.amqp_url.clone(), queue, move |delivery| {
216 let callback = callback.clone();
217 async move {
218 if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
219 error!("Failed to ack delivery: {:?}", e);
220 return;
221 }
222 let created_match: CreatedMatch = serde_json::from_slice(&delivery.data).unwrap();
223 callback(created_match).await;
224 }
225 })
226 .await;
227 }
228
229 async fn on_match_result<F, Fut>(&self, callback: F)
230 where
231 F: MessageHandler<crate::models::MatchResult, Fut>,
232 Fut: Future<Output = ()> + Send + 'static,
233 {
234 let queue = self.get_queue_name("match", "result").to_string();
235 setup_queue_and_listen(self.amqp_url.clone(), queue, move |delivery| {
236 let callback = callback.clone();
237 async move {
238 if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
239 error!("Failed to ack delivery: {:?}", e);
240 return;
241 }
242 let result: MatchResult = serde_json::from_slice(&delivery.data).unwrap();
243 callback(result).await;
244 }
245 })
246 .await;
247 }
248
249 async fn on_match_create<F, Fut>(&self, callback: F)
250 where
251 F: MessageHandler<crate::models::CreateMatch, Fut>,
252 Fut: Future<Output = ()> + Send + 'static,
253 {
254 let queue = self.get_queue_name("match", "create").to_string();
255 setup_queue_and_listen(self.amqp_url.clone(), queue, move |delivery| {
256 let callback = callback.clone();
257 async move {
258 if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
259 error!("Failed to ack delivery: {:?}", e);
260 return;
261 }
262 let result: CreateMatch = serde_json::from_slice(&delivery.data).unwrap();
263 callback(result).await;
264 }
265 })
266 .await;
267 }
268
269 async fn create_game(
270 &self,
271 game_server: &GameServerCreate,
272 ) -> Result<(), Box<dyn std::error::Error>> {
273 self.publish_with_retry(
274 self.get_queue_name("game", "create"),
275 serde_json::to_vec(&game_server).unwrap(),
276 )
277 .await;
278 Ok(())
279 }
280
281 async fn create_match(&self, match_request: &CreateMatch) {
282 self.publish_with_retry(
283 self.get_queue_name("match", "create"),
284 serde_json::to_vec(&match_request).unwrap(),
285 )
286 .await;
287 }
288
289 async fn report_match_abrupt_close(&self, match_close: &MatchAbrubtClose) {
290 self.publish_with_retry(
291 self.get_queue_name("match", "abrupt_close"),
292 serde_json::to_vec(&match_close).unwrap(),
293 )
294 .await;
295 }
296
297 async fn report_match_created(&self, created_match: &CreatedMatch) {
298 self.publish_with_retry(
299 self.get_queue_name("match", "created"),
300 serde_json::to_vec(&created_match).unwrap(),
301 )
302 .await;
303 }
304
305 async fn create_ai_task(&self, task: &crate::models::Task) {
306 self.publish_with_retry(
307 self.get_queue_name("ai", "task"),
308 serde_json::to_vec(&task).unwrap(),
309 )
310 .await;
311 }
312
313 async fn report_match_result(&self, match_result: &MatchResult) {
314 self.publish_with_retry(
315 self.get_queue_name("match", "result"),
316 serde_json::to_vec(&match_result).unwrap(),
317 )
318 .await;
319 }
320
321 async fn send_health_check(&self, client_id: String) {
322 self.publish_with_retry(
323 self.get_queue_name("health_check", "check"),
324 client_id.into_bytes(),
325 )
326 .await;
327 }
328
329 async fn on_health_check<F, Fut>(&self, callback: F)
330 where
331 F: MessageHandler<String, Fut>,
332 Fut: Future<Output = ()> + Send + 'static,
333 {
334 let queue = self.get_queue_name("health_check", "check").to_string();
335 setup_queue_and_listen(self.amqp_url.clone(), queue, move |delivery| {
336 let callback = callback.clone();
337 async move {
338 debug!("Received healthcheck event");
339 if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
340 error!("Failed to ack delivery: {:?}", e);
341 return;
342 }
343 let client_id = String::from_utf8(delivery.data.to_vec()).unwrap();
344 callback(client_id).await;
345 }
346 })
347 .await;
348 }
349
350 async fn on_ai_register<F, Fut>(&self, callback: F)
351 where
352 F: MessageHandler<crate::models::AIPlayerRegister, Fut>,
353 Fut: Future<Output = ()> + Send + 'static,
354 {
355 let queue = self.get_queue_name("ai", "register").to_string();
356 setup_queue_and_listen(self.amqp_url.clone(), queue, move |delivery| {
357 let callback = callback.clone();
358 async move {
359 debug!("Received AI register event");
360 if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
361 error!("Failed to ack delivery: {:?}", e);
362 return;
363 }
364 let ai_register: crate::models::AIPlayerRegister =
365 serde_json::from_slice(&delivery.data).unwrap();
366 callback(ai_register).await;
367 }
368 })
369 .await;
370 }
371
372 async fn register_ai_player(&self, ai_player: &crate::models::AIPlayerRegister) {
373 self.publish_with_retry(
374 self.get_queue_name("ai", "register"),
375 serde_json::to_vec(&ai_player).unwrap(),
376 )
377 .await;
378 }
379}