sync_pubsub/
sync_pubsub.rs

1use std::{
2    error::Error,
3    sync::{mpsc::TryRecvError, Arc, Mutex},
4    thread::sleep,
5    time::Duration,
6};
7
8use amq::{error::AmqError, Config, SyncClient};
9
10#[tokio::main]
11async fn main() -> Result<(), Box<dyn Error>> {
12    let state = Arc::new(Mutex::new(0));
13
14    loop {
15        let config = Config::new().unwrap();
16
17        let mut client = SyncClient::new(config, state.clone());
18
19        let rx = match client.connect() {
20            Ok(rx) => rx,
21            Err(e) => {
22                println!("Connection error: {}", e);
23                sleep(Duration::from_secs(1));
24                continue;
25            }
26        };
27
28        client.subscribe("topic", |state, msg| {
29            let mut state = state.lock().unwrap();
30            *state += 1;
31            println!("Received message: {} {:?}", state, msg);
32        })?;
33
34        let should_exit;
35        loop {
36            let result = rx.try_recv();
37            match result {
38                Ok(AmqError::TcpServerClosed) => {
39                    should_exit = true;
40                    break;
41                }
42                Ok(e) => {
43                    println!("{}", e);
44                    should_exit = false;
45                    break;
46                }
47                Err(TryRecvError::Empty) => {
48                    sleep(Duration::from_secs(1));
49                    let _ = client.publish("topic", "Hello, world!".as_bytes().to_vec());
50                }
51                Err(e) => {
52                    println!("Receive signal error: {:?}", e);
53                    should_exit = false;
54                    break;
55                }
56            }
57        }
58
59        if should_exit {
60            break;
61        }
62
63        println!("Reconnecting...");
64    }
65
66    Ok(())
67}