async_pubsub/
async_pubsub.rs

1use std::{error::Error, sync::Arc, time::Duration};
2
3use amq::{error::AmqError, AsyncClient, Config};
4use tokio::{select, sync::Mutex, time::sleep};
5
6#[tokio::main]
7async fn main() -> Result<(), Box<dyn Error>> {
8    let state = Arc::new(Mutex::new(0));
9
10    loop {
11        let config = Config::new().unwrap();
12
13        let mut client = AsyncClient::new(config, state.clone());
14
15        let rx = match client.connect().await {
16            Ok(rx) => rx,
17            Err(e) => {
18                println!("Connection error: {}", e);
19                sleep(Duration::from_secs(1)).await;
20                continue;
21            }
22        };
23
24        client
25            .subscribe("topic", |state, msg| async move {
26                let mut state = state.lock().await;
27                *state += 1;
28                println!("Received message: {} {:?}", state, msg);
29            })
30            .await?;
31
32        let should_exit = select! {
33            // Receive signal when connection closed
34            result = rx => {
35                client.shutdown().await;
36
37                match result {
38                    // Server closed connection
39                    Ok(AmqError::TcpServerClosed) => {
40                        true
41                    }
42                    // Other error
43                    Ok(e) => {
44                        println!("{}", e);
45                        false
46                    }
47                    Err(e) => {
48                        println!("Receive signal error: {:?}", e);
49                        false
50                    }
51                }
52            }
53
54            // Send message every 1s
55            _ = async {
56                loop {
57                    sleep(Duration::from_secs(1)).await;
58                    let _ = client.publish("topic", "Hello, world!".as_bytes().to_vec()).await;
59                }
60            } => {
61                false // Exit loop on error, and reconnect
62            }
63        };
64
65        if should_exit {
66            break;
67        }
68
69        println!("Reconnecting...");
70    }
71
72    Ok(())
73}