AsyncClient

Struct AsyncClient 

Source
pub struct AsyncClient<T>
where T: Send + Sync + 'static,
{ /* private fields */ }
Expand description

§Async Client

loop {
    let config = Config::new().unwrap();
    let state = Arc::new(Mutex::new(0));

    let mut client = AsyncClient::new(config, state.clone());

    let rx = match client.connect().await {
        Ok(rx) => rx,
        Err(e) => {
            println!("Connection error: {}", e);
            sleep(Duration::from_secs(1)).await;
            continue;
        }
    };

    client
        .subscribe("topic", |state, msg| async move {
            let mut state = state.lock().await;
            *state += 1;
            println!("Received message: {} {:?}", state, msg);
        })
        .await?;

    let should_exit = select! {
        // Receive signal when connection closed
        result = rx => {
            client.shutdown().await;

            match result {
                // Server closed connection
                Ok(AmqError::TcpServerClosed) => {
                    true
                }
                // Other error
                Ok(e) => {
                    println!("{}", e);
                    false
                }
                Err(e) => {
                    println!("Receive signal error: {:?}", e);
                    false
                }
            }
        }

        // Send message every 1s
        _ = async {
            loop {
                sleep(Duration::from_secs(1)).await;
                let _ = client.publish("topic", "Hello, world!".as_bytes().to_vec()).await;
            }
        } => {
            false // Exit loop on error, and reconnect
        }
    };

    if should_exit {
        break;
    }

    println!("Reconnecting...");
}

Implementations§

Source§

impl<T> Client<T>
where T: Send + Sync + 'static,

Source

pub fn new(config: Config, state: Arc<T>) -> Self

§Create a new async client. (tokio)
Examples found in repository?
examples/async_pubsub.rs (line 13)
7async fn main() -> Result<(), Box<dyn Error>> {
8    let state = Arc::new(Mutex::new(0));
9
10    loop {
11        let config = Config::new().unwrap();
12
13        let mut client = AsyncClient::new(config, state.clone());
14
15        let rx = match client.connect().await {
16            Ok(rx) => rx,
17            Err(e) => {
18                println!("Connection error: {}", e);
19                sleep(Duration::from_secs(1)).await;
20                continue;
21            }
22        };
23
24        client
25            .subscribe("topic", |state, msg| async move {
26                let mut state = state.lock().await;
27                *state += 1;
28                println!("Received message: {} {:?}", state, msg);
29            })
30            .await?;
31
32        let should_exit = select! {
33            // Receive signal when connection closed
34            result = rx => {
35                client.shutdown().await;
36
37                match result {
38                    // Server closed connection
39                    Ok(AmqError::TcpServerClosed) => {
40                        true
41                    }
42                    // Other error
43                    Ok(e) => {
44                        println!("{}", e);
45                        false
46                    }
47                    Err(e) => {
48                        println!("Receive signal error: {:?}", e);
49                        false
50                    }
51                }
52            }
53
54            // Send message every 1s
55            _ = async {
56                loop {
57                    sleep(Duration::from_secs(1)).await;
58                    let _ = client.publish("topic", "Hello, world!".as_bytes().to_vec()).await;
59                }
60            } => {
61                false // Exit loop on error, and reconnect
62            }
63        };
64
65        if should_exit {
66            break;
67        }
68
69        println!("Reconnecting...");
70    }
71
72    Ok(())
73}
Source

pub async fn subscribe<F, Fut>( &mut self, topic: &str, f: F, ) -> Result<(), AmqError>
where F: Fn(Arc<T>, Vec<u8>) -> Fut + Send + Sync + 'static, Fut: Future<Output = ()> + Send + 'static,

§Subscribe a topic.
client.subscribe("topic", |msg| async move {
    println!("Received message: {:?}", msg);
}).await.unwrap();
Examples found in repository?
examples/async_pubsub.rs (lines 25-29)
7async fn main() -> Result<(), Box<dyn Error>> {
8    let state = Arc::new(Mutex::new(0));
9
10    loop {
11        let config = Config::new().unwrap();
12
13        let mut client = AsyncClient::new(config, state.clone());
14
15        let rx = match client.connect().await {
16            Ok(rx) => rx,
17            Err(e) => {
18                println!("Connection error: {}", e);
19                sleep(Duration::from_secs(1)).await;
20                continue;
21            }
22        };
23
24        client
25            .subscribe("topic", |state, msg| async move {
26                let mut state = state.lock().await;
27                *state += 1;
28                println!("Received message: {} {:?}", state, msg);
29            })
30            .await?;
31
32        let should_exit = select! {
33            // Receive signal when connection closed
34            result = rx => {
35                client.shutdown().await;
36
37                match result {
38                    // Server closed connection
39                    Ok(AmqError::TcpServerClosed) => {
40                        true
41                    }
42                    // Other error
43                    Ok(e) => {
44                        println!("{}", e);
45                        false
46                    }
47                    Err(e) => {
48                        println!("Receive signal error: {:?}", e);
49                        false
50                    }
51                }
52            }
53
54            // Send message every 1s
55            _ = async {
56                loop {
57                    sleep(Duration::from_secs(1)).await;
58                    let _ = client.publish("topic", "Hello, world!".as_bytes().to_vec()).await;
59                }
60            } => {
61                false // Exit loop on error, and reconnect
62            }
63        };
64
65        if should_exit {
66            break;
67        }
68
69        println!("Reconnecting...");
70    }
71
72    Ok(())
73}
Source

pub async fn unsubscribe(&mut self, topic: &str) -> Result<(), AmqError>

§Unsubscribe a topic.
client.unsubscribe("topic").await.unwrap();
Source

pub async fn publish( &mut self, topic: &str, content: Vec<u8>, ) -> Result<(), AmqError>

§Publish a message.
client.publish("topic", "Hello, world!".as_bytes().to_vec()).await.unwrap();
Examples found in repository?
examples/async_pubsub.rs (line 58)
7async fn main() -> Result<(), Box<dyn Error>> {
8    let state = Arc::new(Mutex::new(0));
9
10    loop {
11        let config = Config::new().unwrap();
12
13        let mut client = AsyncClient::new(config, state.clone());
14
15        let rx = match client.connect().await {
16            Ok(rx) => rx,
17            Err(e) => {
18                println!("Connection error: {}", e);
19                sleep(Duration::from_secs(1)).await;
20                continue;
21            }
22        };
23
24        client
25            .subscribe("topic", |state, msg| async move {
26                let mut state = state.lock().await;
27                *state += 1;
28                println!("Received message: {} {:?}", state, msg);
29            })
30            .await?;
31
32        let should_exit = select! {
33            // Receive signal when connection closed
34            result = rx => {
35                client.shutdown().await;
36
37                match result {
38                    // Server closed connection
39                    Ok(AmqError::TcpServerClosed) => {
40                        true
41                    }
42                    // Other error
43                    Ok(e) => {
44                        println!("{}", e);
45                        false
46                    }
47                    Err(e) => {
48                        println!("Receive signal error: {:?}", e);
49                        false
50                    }
51                }
52            }
53
54            // Send message every 1s
55            _ = async {
56                loop {
57                    sleep(Duration::from_secs(1)).await;
58                    let _ = client.publish("topic", "Hello, world!".as_bytes().to_vec()).await;
59                }
60            } => {
61                false // Exit loop on error, and reconnect
62            }
63        };
64
65        if should_exit {
66            break;
67        }
68
69        println!("Reconnecting...");
70    }
71
72    Ok(())
73}
Source

pub async fn consume<F, Fut>( &mut self, topic: &str, f: F, ) -> Result<(), AmqError>
where F: Fn(Arc<T>, Vec<u8>) -> Fut + Send + Sync + 'static, Fut: Future<Output = ()> + Send + 'static,

§Add a consume callback for a topic.
client.consume("topic", |msg| async move {
    println!("Received message: {:?}", msg);
}).await.unwrap();
Source

pub async fn unconsume(&mut self, topic: &str) -> Result<(), AmqError>

§Remove a consume callback for a topic.
client.unconsume("topic").await.unwrap();
Source

pub async fn ack(&mut self, message_id: u64) -> Result<(), AmqError>

§Acknowledge a message.
client.ack(message_id).await.unwrap();
Source

pub async fn ack_multi(&mut self, message_ids: Vec<u64>) -> Result<(), AmqError>

§Acknowledge multiple messages.
client.ack_multi(vec![message_id1, message_id2]).await.unwrap();
Source

pub async fn connect(&mut self) -> Result<Receiver<AmqError>, Box<dyn Error>>

§Connect to the server and start the receive task.
Examples found in repository?
examples/async_pubsub.rs (line 15)
7async fn main() -> Result<(), Box<dyn Error>> {
8    let state = Arc::new(Mutex::new(0));
9
10    loop {
11        let config = Config::new().unwrap();
12
13        let mut client = AsyncClient::new(config, state.clone());
14
15        let rx = match client.connect().await {
16            Ok(rx) => rx,
17            Err(e) => {
18                println!("Connection error: {}", e);
19                sleep(Duration::from_secs(1)).await;
20                continue;
21            }
22        };
23
24        client
25            .subscribe("topic", |state, msg| async move {
26                let mut state = state.lock().await;
27                *state += 1;
28                println!("Received message: {} {:?}", state, msg);
29            })
30            .await?;
31
32        let should_exit = select! {
33            // Receive signal when connection closed
34            result = rx => {
35                client.shutdown().await;
36
37                match result {
38                    // Server closed connection
39                    Ok(AmqError::TcpServerClosed) => {
40                        true
41                    }
42                    // Other error
43                    Ok(e) => {
44                        println!("{}", e);
45                        false
46                    }
47                    Err(e) => {
48                        println!("Receive signal error: {:?}", e);
49                        false
50                    }
51                }
52            }
53
54            // Send message every 1s
55            _ = async {
56                loop {
57                    sleep(Duration::from_secs(1)).await;
58                    let _ = client.publish("topic", "Hello, world!".as_bytes().to_vec()).await;
59                }
60            } => {
61                false // Exit loop on error, and reconnect
62            }
63        };
64
65        if should_exit {
66            break;
67        }
68
69        println!("Reconnecting...");
70    }
71
72    Ok(())
73}
Source

pub async fn shutdown(&mut self)

§Shutdown the client.
Examples found in repository?
examples/async_pubsub.rs (line 35)
7async fn main() -> Result<(), Box<dyn Error>> {
8    let state = Arc::new(Mutex::new(0));
9
10    loop {
11        let config = Config::new().unwrap();
12
13        let mut client = AsyncClient::new(config, state.clone());
14
15        let rx = match client.connect().await {
16            Ok(rx) => rx,
17            Err(e) => {
18                println!("Connection error: {}", e);
19                sleep(Duration::from_secs(1)).await;
20                continue;
21            }
22        };
23
24        client
25            .subscribe("topic", |state, msg| async move {
26                let mut state = state.lock().await;
27                *state += 1;
28                println!("Received message: {} {:?}", state, msg);
29            })
30            .await?;
31
32        let should_exit = select! {
33            // Receive signal when connection closed
34            result = rx => {
35                client.shutdown().await;
36
37                match result {
38                    // Server closed connection
39                    Ok(AmqError::TcpServerClosed) => {
40                        true
41                    }
42                    // Other error
43                    Ok(e) => {
44                        println!("{}", e);
45                        false
46                    }
47                    Err(e) => {
48                        println!("Receive signal error: {:?}", e);
49                        false
50                    }
51                }
52            }
53
54            // Send message every 1s
55            _ = async {
56                loop {
57                    sleep(Duration::from_secs(1)).await;
58                    let _ = client.publish("topic", "Hello, world!".as_bytes().to_vec()).await;
59                }
60            } => {
61                false // Exit loop on error, and reconnect
62            }
63        };
64
65        if should_exit {
66            break;
67        }
68
69        println!("Reconnecting...");
70    }
71
72    Ok(())
73}

Auto Trait Implementations§

§

impl<T> Freeze for Client<T>

§

impl<T> !RefUnwindSafe for Client<T>

§

impl<T> Send for Client<T>

§

impl<T> Sync for Client<T>

§

impl<T> Unpin for Client<T>

§

impl<T> !UnwindSafe for Client<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.