async_pubsub/
async_pubsub.rs1use std::{error::Error, sync::Arc, time::Duration};
2
3use amq::{error::AmqError, AsyncClient, Config};
4use tokio::{select, sync::Mutex, time::sleep};
5
6#[tokio::main]
7async fn main() -> Result<(), Box<dyn Error>> {
8 let state = Arc::new(Mutex::new(0));
9
10 loop {
11 let config = Config::new().unwrap();
12
13 let mut client = AsyncClient::new(config, state.clone());
14
15 let rx = match client.connect().await {
16 Ok(rx) => rx,
17 Err(e) => {
18 println!("Connection error: {}", e);
19 sleep(Duration::from_secs(1)).await;
20 continue;
21 }
22 };
23
24 client
25 .subscribe("topic", |state, msg| async move {
26 let mut state = state.lock().await;
27 *state += 1;
28 println!("Received message: {} {:?}", state, msg);
29 })
30 .await?;
31
32 let should_exit = select! {
33 result = rx => {
35 client.shutdown().await;
36
37 match result {
38 Ok(AmqError::TcpServerClosed) => {
40 true
41 }
42 Ok(e) => {
44 println!("{}", e);
45 false
46 }
47 Err(e) => {
48 println!("Receive signal error: {:?}", e);
49 false
50 }
51 }
52 }
53
54 _ = async {
56 loop {
57 sleep(Duration::from_secs(1)).await;
58 let _ = client.publish("topic", "Hello, world!".as_bytes().to_vec()).await;
59 }
60 } => {
61 false }
63 };
64
65 if should_exit {
66 break;
67 }
68
69 println!("Reconnecting...");
70 }
71
72 Ok(())
73}