Skip to main content

gn_communicator/
rabbitmq.rs

1use futures_lite::{Future, StreamExt};
2use std::{collections::HashMap, sync::Arc, time::Duration};
3
4use lapin::{
5    message::Delivery,
6    options::{BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions},
7    types::FieldTable,
8    BasicProperties, Channel, Connection,
9};
10use tracing::{debug, error, info, warn};
11
12use crate::{
13    models::{CreateMatch, CreatedMatch, GameServerCreate, MatchAbrubtClose, MatchResult},
14    MessageHandler,
15};
16
17async fn try_connect(amqp_url: &str) -> Connection {
18    loop {
19        match Connection::connect(amqp_url, lapin::ConnectionProperties::default()).await {
20            Ok(conn) => return conn,
21            Err(err) => {
22                error!("Could not connect to RabbitMQ: {:?}", err);
23                tokio::time::sleep(Duration::from_secs(5)).await;
24            }
25        }
26    }
27}
28
29/// Spawn a consumer loop that automatically reconnects when the connection drops.
30/// Each invocation creates its own AMQP connection so consumers are isolated from
31/// each other and from the publish channel.
32async fn setup_queue_and_listen<F, Fut>(amqp_url: String, queue_name: String, on_message: F)
33where
34    F: Fn(Delivery) -> Fut + Send + Sync + Clone + 'static,
35    Fut: Future<Output = ()> + Send + 'static,
36{
37    tokio::spawn(async move {
38        loop {
39            let conn = try_connect(&amqp_url).await;
40
41            let channel = match conn.create_channel().await {
42                Ok(ch) => Arc::new(ch),
43                Err(e) => {
44                    error!("Failed to create channel: {:?}", e);
45                    tokio::time::sleep(Duration::from_secs(5)).await;
46                    continue;
47                }
48            };
49
50            if let Err(e) = channel
51                .queue_declare(&queue_name, QueueDeclareOptions::default(), FieldTable::default())
52                .await
53            {
54                error!("Failed to declare queue {}: {:?}", queue_name, e);
55                tokio::time::sleep(Duration::from_secs(5)).await;
56                continue;
57            }
58
59            let mut consumer = match channel
60                .basic_consume(
61                    &queue_name,
62                    // Use a unique tag so re-registration never conflicts with a stale consumer
63                    &uuid::Uuid::new_v4().to_string(),
64                    BasicConsumeOptions::default(),
65                    FieldTable::default(),
66                )
67                .await
68            {
69                Ok(c) => c,
70                Err(e) => {
71                    error!("Failed to start consumer for {}: {:?}", queue_name, e);
72                    tokio::time::sleep(Duration::from_secs(5)).await;
73                    continue;
74                }
75            };
76
77            info!("Listening on queue: {}", queue_name);
78
79            while let Some(delivery) = consumer.next().await {
80                match delivery {
81                    Ok(delivery) => {
82                        let on_message = on_message.clone();
83                        tokio::spawn(async move { on_message(delivery).await });
84                    }
85                    Err(err) => {
86                        error!("Consumer error on {}, reconnecting: {:?}", queue_name, err);
87                        break;
88                    }
89                }
90            }
91
92            warn!("Consumer loop for {} ended, reconnecting in 5s...", queue_name);
93            tokio::time::sleep(Duration::from_secs(5)).await;
94        }
95    });
96}
97
98pub struct RabbitMQCommunicator {
99    amqp_url: String,
100    /// Shared publish channel — replaced atomically when a publish fails.
101    channel: Arc<tokio::sync::RwLock<Arc<Channel>>>,
102    queues: HashMap<String, HashMap<String, String>>,
103}
104
105impl RabbitMQCommunicator {
106    pub async fn connect(amqp_url: &str) -> Self {
107        let conn = try_connect(amqp_url).await;
108        let channel = Arc::new(
109            conn.create_channel()
110                .await
111                .expect("Could not create publish channel"),
112        );
113
114        Self {
115            amqp_url: amqp_url.to_string(),
116            channel: Arc::new(tokio::sync::RwLock::new(channel)),
117            queues: Self::load_default_queues(),
118        }
119    }
120
121    fn load_default_queues() -> HashMap<String, HashMap<String, String>> {
122        let content = include_str!("../queues.yml");
123        serde_yaml::from_str(content).expect("Failed to parse queues file")
124    }
125
126    fn get_queue_name(&self, name: &str, action: &str) -> &str {
127        self.queues
128            .get(name)
129            .expect(&format!("Queue {} not found", name))
130            .get(action)
131            .expect(&format!("Action {} not found for queue {}", action, name))
132    }
133
134    pub fn load_queues(&mut self, path: &str) {
135        let content = std::fs::read_to_string(path).expect("Failed to read file");
136        self.queues = serde_yaml::from_str(&content).expect("Failed to parse routes file");
137    }
138
139    /// Publish `data` to `queue`, transparently reconnecting if the channel is dead.
140    async fn publish_with_retry(&self, queue: &str, data: Vec<u8>) {
141        loop {
142            let channel = self.channel.read().await.clone();
143            match channel
144                .basic_publish(
145                    "",
146                    queue,
147                    BasicPublishOptions::default(),
148                    &data,
149                    BasicProperties::default(),
150                )
151                .await
152            {
153                Ok(_) => return,
154                Err(err) => {
155                    error!("Publish to {} failed: {:?}, reconnecting...", queue, err);
156                    let conn = try_connect(&self.amqp_url).await;
157                    match conn.create_channel().await {
158                        Ok(new_ch) => *self.channel.write().await = Arc::new(new_ch),
159                        Err(e) => error!("Failed to create channel after reconnect: {:?}", e),
160                    }
161                }
162            }
163        }
164    }
165}
166
167impl super::Communicator for RabbitMQCommunicator {
168    async fn on_match_abrupt_close<F, Fut>(&self, callback: F)
169    where
170        F: MessageHandler<MatchAbrubtClose, Fut>,
171        Fut: Future<Output = ()> + Send + 'static,
172    {
173        let queue = self.get_queue_name("match", "abrupt_close").to_string();
174        setup_queue_and_listen(self.amqp_url.clone(), queue, move |delivery| {
175            let callback = callback.clone();
176            async move {
177                if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
178                    error!("Failed to ack delivery: {:?}", e);
179                    return;
180                }
181                let reason: MatchAbrubtClose = serde_json::from_slice(&delivery.data).unwrap();
182                callback(reason).await;
183            }
184        })
185        .await;
186    }
187
188    async fn on_game_create<F, Fut>(&self, callback: F)
189    where
190        F: MessageHandler<crate::models::GameServerCreate, Fut>,
191        Fut: Future<Output = String> + Send + 'static,
192    {
193        let queue = self.get_queue_name("game", "create").to_string();
194        setup_queue_and_listen(self.amqp_url.clone(), queue, move |delivery| {
195            let callback = callback.clone();
196            async move {
197                if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
198                    error!("Failed to ack delivery: {:?}", e);
199                    return;
200                }
201                let created_game: GameServerCreate =
202                    serde_json::from_slice(&delivery.data).unwrap();
203                callback(created_game).await;
204            }
205        })
206        .await;
207    }
208
209    async fn on_match_created<F, Fut>(&self, callback: F)
210    where
211        F: MessageHandler<crate::models::CreatedMatch, Fut>,
212        Fut: Future<Output = ()> + Send + 'static,
213    {
214        let queue = self.get_queue_name("match", "created").to_string();
215        setup_queue_and_listen(self.amqp_url.clone(), queue, move |delivery| {
216            let callback = callback.clone();
217            async move {
218                if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
219                    error!("Failed to ack delivery: {:?}", e);
220                    return;
221                }
222                let created_match: CreatedMatch = serde_json::from_slice(&delivery.data).unwrap();
223                callback(created_match).await;
224            }
225        })
226        .await;
227    }
228
229    async fn on_match_result<F, Fut>(&self, callback: F)
230    where
231        F: MessageHandler<crate::models::MatchResult, Fut>,
232        Fut: Future<Output = ()> + Send + 'static,
233    {
234        let queue = self.get_queue_name("match", "result").to_string();
235        setup_queue_and_listen(self.amqp_url.clone(), queue, move |delivery| {
236            let callback = callback.clone();
237            async move {
238                if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
239                    error!("Failed to ack delivery: {:?}", e);
240                    return;
241                }
242                let result: MatchResult = serde_json::from_slice(&delivery.data).unwrap();
243                callback(result).await;
244            }
245        })
246        .await;
247    }
248
249    async fn on_match_create<F, Fut>(&self, callback: F)
250    where
251        F: MessageHandler<crate::models::CreateMatch, Fut>,
252        Fut: Future<Output = ()> + Send + 'static,
253    {
254        let queue = self.get_queue_name("match", "create").to_string();
255        setup_queue_and_listen(self.amqp_url.clone(), queue, move |delivery| {
256            let callback = callback.clone();
257            async move {
258                if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
259                    error!("Failed to ack delivery: {:?}", e);
260                    return;
261                }
262                let result: CreateMatch = serde_json::from_slice(&delivery.data).unwrap();
263                callback(result).await;
264            }
265        })
266        .await;
267    }
268
269    async fn create_game(
270        &self,
271        game_server: &GameServerCreate,
272    ) -> Result<(), Box<dyn std::error::Error>> {
273        self.publish_with_retry(
274            self.get_queue_name("game", "create"),
275            serde_json::to_vec(&game_server).unwrap(),
276        )
277        .await;
278        Ok(())
279    }
280
281    async fn create_match(&self, match_request: &CreateMatch) {
282        self.publish_with_retry(
283            self.get_queue_name("match", "create"),
284            serde_json::to_vec(&match_request).unwrap(),
285        )
286        .await;
287    }
288
289    async fn report_match_abrupt_close(&self, match_close: &MatchAbrubtClose) {
290        self.publish_with_retry(
291            self.get_queue_name("match", "abrupt_close"),
292            serde_json::to_vec(&match_close).unwrap(),
293        )
294        .await;
295    }
296
297    async fn report_match_created(&self, created_match: &CreatedMatch) {
298        self.publish_with_retry(
299            self.get_queue_name("match", "created"),
300            serde_json::to_vec(&created_match).unwrap(),
301        )
302        .await;
303    }
304
305    async fn create_ai_task(&self, task: &crate::models::Task) {
306        self.publish_with_retry(
307            self.get_queue_name("ai", "task"),
308            serde_json::to_vec(&task).unwrap(),
309        )
310        .await;
311    }
312
313    async fn report_match_result(&self, match_result: &MatchResult) {
314        self.publish_with_retry(
315            self.get_queue_name("match", "result"),
316            serde_json::to_vec(&match_result).unwrap(),
317        )
318        .await;
319    }
320
321    async fn send_health_check(&self, client_id: String) {
322        self.publish_with_retry(
323            self.get_queue_name("health_check", "check"),
324            client_id.into_bytes(),
325        )
326        .await;
327    }
328
329    async fn on_health_check<F, Fut>(&self, callback: F)
330    where
331        F: MessageHandler<String, Fut>,
332        Fut: Future<Output = ()> + Send + 'static,
333    {
334        let queue = self.get_queue_name("health_check", "check").to_string();
335        setup_queue_and_listen(self.amqp_url.clone(), queue, move |delivery| {
336            let callback = callback.clone();
337            async move {
338                debug!("Received healthcheck event");
339                if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
340                    error!("Failed to ack delivery: {:?}", e);
341                    return;
342                }
343                let client_id = String::from_utf8(delivery.data.to_vec()).unwrap();
344                callback(client_id).await;
345            }
346        })
347        .await;
348    }
349
350    async fn on_ai_register<F, Fut>(&self, callback: F)
351    where
352        F: MessageHandler<crate::models::AIPlayerRegister, Fut>,
353        Fut: Future<Output = ()> + Send + 'static,
354    {
355        let queue = self.get_queue_name("ai", "register").to_string();
356        setup_queue_and_listen(self.amqp_url.clone(), queue, move |delivery| {
357            let callback = callback.clone();
358            async move {
359                debug!("Received AI register event");
360                if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
361                    error!("Failed to ack delivery: {:?}", e);
362                    return;
363                }
364                let ai_register: crate::models::AIPlayerRegister =
365                    serde_json::from_slice(&delivery.data).unwrap();
366                callback(ai_register).await;
367            }
368        })
369        .await;
370    }
371
372    async fn register_ai_player(&self, ai_player: &crate::models::AIPlayerRegister) {
373        self.publish_with_retry(
374            self.get_queue_name("ai", "register"),
375            serde_json::to_vec(&ai_player).unwrap(),
376        )
377        .await;
378    }
379}