sync_pubsub/
sync_pubsub.rs1use std::{
2 error::Error,
3 sync::{mpsc::TryRecvError, Arc, Mutex},
4 thread::sleep,
5 time::Duration,
6};
7
8use amq::{error::AmqError, Config, SyncClient};
9
10#[tokio::main]
11async fn main() -> Result<(), Box<dyn Error>> {
12 let state = Arc::new(Mutex::new(0));
13
14 loop {
15 let config = Config::new().unwrap();
16
17 let mut client = SyncClient::new(config, state.clone());
18
19 let rx = match client.connect() {
20 Ok(rx) => rx,
21 Err(e) => {
22 println!("Connection error: {}", e);
23 sleep(Duration::from_secs(1));
24 continue;
25 }
26 };
27
28 client.subscribe("topic", |state, msg| {
29 let mut state = state.lock().unwrap();
30 *state += 1;
31 println!("Received message: {} {:?}", state, msg);
32 })?;
33
34 let should_exit;
35 loop {
36 let result = rx.try_recv();
37 match result {
38 Ok(AmqError::TcpServerClosed) => {
39 should_exit = true;
40 break;
41 }
42 Ok(e) => {
43 println!("{}", e);
44 should_exit = false;
45 break;
46 }
47 Err(TryRecvError::Empty) => {
48 sleep(Duration::from_secs(1));
49 let _ = client.publish("topic", "Hello, world!".as_bytes().to_vec());
50 }
51 Err(e) => {
52 println!("Receive signal error: {:?}", e);
53 should_exit = false;
54 break;
55 }
56 }
57 }
58
59 if should_exit {
60 break;
61 }
62
63 println!("Reconnecting...");
64 }
65
66 Ok(())
67}