pub struct AsyncClient<T>{ /* private fields */ }Expand description
§Async Client
loop {
let config = Config::new().unwrap();
let state = Arc::new(Mutex::new(0));
let mut client = AsyncClient::new(config, state.clone());
let rx = match client.connect().await {
Ok(rx) => rx,
Err(e) => {
println!("Connection error: {}", e);
sleep(Duration::from_secs(1)).await;
continue;
}
};
client
.subscribe("topic", |state, msg| async move {
let mut state = state.lock().await;
*state += 1;
println!("Received message: {} {:?}", state, msg);
})
.await?;
let should_exit = select! {
// Receive signal when connection closed
result = rx => {
client.shutdown().await;
match result {
// Server closed connection
Ok(AmqError::TcpServerClosed) => {
true
}
// Other error
Ok(e) => {
println!("{}", e);
false
}
Err(e) => {
println!("Receive signal error: {:?}", e);
false
}
}
}
// Send message every 1s
_ = async {
loop {
sleep(Duration::from_secs(1)).await;
let _ = client.publish("topic", "Hello, world!".as_bytes().to_vec()).await;
}
} => {
false // Exit loop on error, and reconnect
}
};
if should_exit {
break;
}
println!("Reconnecting...");
}Implementations§
Source§impl<T> Client<T>
impl<T> Client<T>
Sourcepub fn new(config: Config, state: Arc<T>) -> Self
pub fn new(config: Config, state: Arc<T>) -> Self
§Create a new async client. (tokio)
Examples found in repository?
examples/async_pubsub.rs (line 13)
7async fn main() -> Result<(), Box<dyn Error>> {
8 let state = Arc::new(Mutex::new(0));
9
10 loop {
11 let config = Config::new().unwrap();
12
13 let mut client = AsyncClient::new(config, state.clone());
14
15 let rx = match client.connect().await {
16 Ok(rx) => rx,
17 Err(e) => {
18 println!("Connection error: {}", e);
19 sleep(Duration::from_secs(1)).await;
20 continue;
21 }
22 };
23
24 client
25 .subscribe("topic", |state, msg| async move {
26 let mut state = state.lock().await;
27 *state += 1;
28 println!("Received message: {} {:?}", state, msg);
29 })
30 .await?;
31
32 let should_exit = select! {
33 // Receive signal when connection closed
34 result = rx => {
35 client.shutdown().await;
36
37 match result {
38 // Server closed connection
39 Ok(AmqError::TcpServerClosed) => {
40 true
41 }
42 // Other error
43 Ok(e) => {
44 println!("{}", e);
45 false
46 }
47 Err(e) => {
48 println!("Receive signal error: {:?}", e);
49 false
50 }
51 }
52 }
53
54 // Send message every 1s
55 _ = async {
56 loop {
57 sleep(Duration::from_secs(1)).await;
58 let _ = client.publish("topic", "Hello, world!".as_bytes().to_vec()).await;
59 }
60 } => {
61 false // Exit loop on error, and reconnect
62 }
63 };
64
65 if should_exit {
66 break;
67 }
68
69 println!("Reconnecting...");
70 }
71
72 Ok(())
73}Sourcepub async fn subscribe<F, Fut>(
&mut self,
topic: &str,
f: F,
) -> Result<(), AmqError>
pub async fn subscribe<F, Fut>( &mut self, topic: &str, f: F, ) -> Result<(), AmqError>
§Subscribe a topic.
client.subscribe("topic", |msg| async move {
println!("Received message: {:?}", msg);
}).await.unwrap();Examples found in repository?
examples/async_pubsub.rs (lines 25-29)
7async fn main() -> Result<(), Box<dyn Error>> {
8 let state = Arc::new(Mutex::new(0));
9
10 loop {
11 let config = Config::new().unwrap();
12
13 let mut client = AsyncClient::new(config, state.clone());
14
15 let rx = match client.connect().await {
16 Ok(rx) => rx,
17 Err(e) => {
18 println!("Connection error: {}", e);
19 sleep(Duration::from_secs(1)).await;
20 continue;
21 }
22 };
23
24 client
25 .subscribe("topic", |state, msg| async move {
26 let mut state = state.lock().await;
27 *state += 1;
28 println!("Received message: {} {:?}", state, msg);
29 })
30 .await?;
31
32 let should_exit = select! {
33 // Receive signal when connection closed
34 result = rx => {
35 client.shutdown().await;
36
37 match result {
38 // Server closed connection
39 Ok(AmqError::TcpServerClosed) => {
40 true
41 }
42 // Other error
43 Ok(e) => {
44 println!("{}", e);
45 false
46 }
47 Err(e) => {
48 println!("Receive signal error: {:?}", e);
49 false
50 }
51 }
52 }
53
54 // Send message every 1s
55 _ = async {
56 loop {
57 sleep(Duration::from_secs(1)).await;
58 let _ = client.publish("topic", "Hello, world!".as_bytes().to_vec()).await;
59 }
60 } => {
61 false // Exit loop on error, and reconnect
62 }
63 };
64
65 if should_exit {
66 break;
67 }
68
69 println!("Reconnecting...");
70 }
71
72 Ok(())
73}Sourcepub async fn unsubscribe(&mut self, topic: &str) -> Result<(), AmqError>
pub async fn unsubscribe(&mut self, topic: &str) -> Result<(), AmqError>
§Unsubscribe a topic.
client.unsubscribe("topic").await.unwrap();Sourcepub async fn publish(
&mut self,
topic: &str,
content: Vec<u8>,
) -> Result<(), AmqError>
pub async fn publish( &mut self, topic: &str, content: Vec<u8>, ) -> Result<(), AmqError>
§Publish a message.
client.publish("topic", "Hello, world!".as_bytes().to_vec()).await.unwrap();Examples found in repository?
examples/async_pubsub.rs (line 58)
7async fn main() -> Result<(), Box<dyn Error>> {
8 let state = Arc::new(Mutex::new(0));
9
10 loop {
11 let config = Config::new().unwrap();
12
13 let mut client = AsyncClient::new(config, state.clone());
14
15 let rx = match client.connect().await {
16 Ok(rx) => rx,
17 Err(e) => {
18 println!("Connection error: {}", e);
19 sleep(Duration::from_secs(1)).await;
20 continue;
21 }
22 };
23
24 client
25 .subscribe("topic", |state, msg| async move {
26 let mut state = state.lock().await;
27 *state += 1;
28 println!("Received message: {} {:?}", state, msg);
29 })
30 .await?;
31
32 let should_exit = select! {
33 // Receive signal when connection closed
34 result = rx => {
35 client.shutdown().await;
36
37 match result {
38 // Server closed connection
39 Ok(AmqError::TcpServerClosed) => {
40 true
41 }
42 // Other error
43 Ok(e) => {
44 println!("{}", e);
45 false
46 }
47 Err(e) => {
48 println!("Receive signal error: {:?}", e);
49 false
50 }
51 }
52 }
53
54 // Send message every 1s
55 _ = async {
56 loop {
57 sleep(Duration::from_secs(1)).await;
58 let _ = client.publish("topic", "Hello, world!".as_bytes().to_vec()).await;
59 }
60 } => {
61 false // Exit loop on error, and reconnect
62 }
63 };
64
65 if should_exit {
66 break;
67 }
68
69 println!("Reconnecting...");
70 }
71
72 Ok(())
73}Sourcepub async fn consume<F, Fut>(
&mut self,
topic: &str,
f: F,
) -> Result<(), AmqError>
pub async fn consume<F, Fut>( &mut self, topic: &str, f: F, ) -> Result<(), AmqError>
§Add a consume callback for a topic.
client.consume("topic", |msg| async move {
println!("Received message: {:?}", msg);
}).await.unwrap();Sourcepub async fn unconsume(&mut self, topic: &str) -> Result<(), AmqError>
pub async fn unconsume(&mut self, topic: &str) -> Result<(), AmqError>
§Remove a consume callback for a topic.
client.unconsume("topic").await.unwrap();Sourcepub async fn ack(&mut self, message_id: u64) -> Result<(), AmqError>
pub async fn ack(&mut self, message_id: u64) -> Result<(), AmqError>
§Acknowledge a message.
client.ack(message_id).await.unwrap();Sourcepub async fn ack_multi(&mut self, message_ids: Vec<u64>) -> Result<(), AmqError>
pub async fn ack_multi(&mut self, message_ids: Vec<u64>) -> Result<(), AmqError>
§Acknowledge multiple messages.
client.ack_multi(vec![message_id1, message_id2]).await.unwrap();Sourcepub async fn connect(&mut self) -> Result<Receiver<AmqError>, Box<dyn Error>>
pub async fn connect(&mut self) -> Result<Receiver<AmqError>, Box<dyn Error>>
§Connect to the server and start the receive task.
Examples found in repository?
examples/async_pubsub.rs (line 15)
7async fn main() -> Result<(), Box<dyn Error>> {
8 let state = Arc::new(Mutex::new(0));
9
10 loop {
11 let config = Config::new().unwrap();
12
13 let mut client = AsyncClient::new(config, state.clone());
14
15 let rx = match client.connect().await {
16 Ok(rx) => rx,
17 Err(e) => {
18 println!("Connection error: {}", e);
19 sleep(Duration::from_secs(1)).await;
20 continue;
21 }
22 };
23
24 client
25 .subscribe("topic", |state, msg| async move {
26 let mut state = state.lock().await;
27 *state += 1;
28 println!("Received message: {} {:?}", state, msg);
29 })
30 .await?;
31
32 let should_exit = select! {
33 // Receive signal when connection closed
34 result = rx => {
35 client.shutdown().await;
36
37 match result {
38 // Server closed connection
39 Ok(AmqError::TcpServerClosed) => {
40 true
41 }
42 // Other error
43 Ok(e) => {
44 println!("{}", e);
45 false
46 }
47 Err(e) => {
48 println!("Receive signal error: {:?}", e);
49 false
50 }
51 }
52 }
53
54 // Send message every 1s
55 _ = async {
56 loop {
57 sleep(Duration::from_secs(1)).await;
58 let _ = client.publish("topic", "Hello, world!".as_bytes().to_vec()).await;
59 }
60 } => {
61 false // Exit loop on error, and reconnect
62 }
63 };
64
65 if should_exit {
66 break;
67 }
68
69 println!("Reconnecting...");
70 }
71
72 Ok(())
73}Sourcepub async fn shutdown(&mut self)
pub async fn shutdown(&mut self)
§Shutdown the client.
Examples found in repository?
examples/async_pubsub.rs (line 35)
7async fn main() -> Result<(), Box<dyn Error>> {
8 let state = Arc::new(Mutex::new(0));
9
10 loop {
11 let config = Config::new().unwrap();
12
13 let mut client = AsyncClient::new(config, state.clone());
14
15 let rx = match client.connect().await {
16 Ok(rx) => rx,
17 Err(e) => {
18 println!("Connection error: {}", e);
19 sleep(Duration::from_secs(1)).await;
20 continue;
21 }
22 };
23
24 client
25 .subscribe("topic", |state, msg| async move {
26 let mut state = state.lock().await;
27 *state += 1;
28 println!("Received message: {} {:?}", state, msg);
29 })
30 .await?;
31
32 let should_exit = select! {
33 // Receive signal when connection closed
34 result = rx => {
35 client.shutdown().await;
36
37 match result {
38 // Server closed connection
39 Ok(AmqError::TcpServerClosed) => {
40 true
41 }
42 // Other error
43 Ok(e) => {
44 println!("{}", e);
45 false
46 }
47 Err(e) => {
48 println!("Receive signal error: {:?}", e);
49 false
50 }
51 }
52 }
53
54 // Send message every 1s
55 _ = async {
56 loop {
57 sleep(Duration::from_secs(1)).await;
58 let _ = client.publish("topic", "Hello, world!".as_bytes().to_vec()).await;
59 }
60 } => {
61 false // Exit loop on error, and reconnect
62 }
63 };
64
65 if should_exit {
66 break;
67 }
68
69 println!("Reconnecting...");
70 }
71
72 Ok(())
73}Auto Trait Implementations§
impl<T> Freeze for Client<T>
impl<T> !RefUnwindSafe for Client<T>
impl<T> Send for Client<T>
impl<T> Sync for Client<T>
impl<T> Unpin for Client<T>
impl<T> !UnwindSafe for Client<T>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more