pub struct RabbitMqClientHandler<BasicConsumerType: AsyncConsumer + Clone + Send + 'static> { /* private fields */ }Expand description
This struct is holding the state of the RabbitMQ client. The whole resiliency revolves around it. It will handle connection errors and backup the list of consumers (basic.consume) you declare and recreate them when something happens. If you need to publish to RabbitMQ as well, a PublishChannel will be extracted from it and regenerated if needed.
Implementations§
Source§impl<BasicConsumerType: AsyncConsumer + Clone + Send + 'static> RabbitMqClientHandler<BasicConsumerType>
impl<BasicConsumerType: AsyncConsumer + Clone + Send + 'static> RabbitMqClientHandler<BasicConsumerType>
Sourcepub async fn new(
open_connection_arguments: OpenConnectionArguments,
) -> RabbitMqClientHandler<BasicConsumerType>
pub async fn new( open_connection_arguments: OpenConnectionArguments, ) -> RabbitMqClientHandler<BasicConsumerType>
Examples found in repository?
examples/one_consumer_no_publisher.rs (line 36)
31async fn main() {
32 // Declare a RabbitMqClientHandler
33 let rabbitmq_connection_arguments =
34 ConnectionArguments::new("localhost", 5672, "guest", "guest");
35 let mut resilient_rabbitmq_connection: RabbitMqClientHandler<MyRabbitMQClient> =
36 RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
37
38 // Declare a consumer on an 'incoming' queue and include it in the RabbitMqClientHandler
39 let basic_consumer_args = ConsumerArguments::new("incoming", "my_first_resilient_consumer");
40 let basic_consumer = MyRabbitMQClient;
41 resilient_rabbitmq_connection
42 .new_consumer(basic_consumer, basic_consumer_args)
43 .await;
44
45 // Keep alive the RabbitMqClientHandler
46 resilient_rabbitmq_connection.keep_alive().await;
47}More examples
examples/only_a_publisher.rs (line 31)
26async fn main() {
27 // Declare a RabbitMqClientHandler
28 let rabbitmq_connection_arguments =
29 ConnectionArguments::new("localhost", 5672, "guest", "guest");
30 let mut resilient_rabbitmq_client: RabbitMqClientHandler<MyRabbitMQClient> =
31 RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
32
33 // Declare a PublishChannel
34 let mut publisher = resilient_rabbitmq_client.create_publisher().await.unwrap();
35 // Get a channel from it. This channel is meant to be used to communicate with the PublishChannel type.
36 let publish_sender = resilient_rabbitmq_client
37 .get_publish_sender_clone()
38 .unwrap();
39
40 // For the example, we spawn a task which will publish some random data on an "incoming" routing key every second.
41 tokio::task::spawn(async move {
42 loop {
43 publish_sender
44 .send(DataToPublish::from(
45 "amq.direct".to_string(),
46 "incoming".to_string(),
47 "random data".to_string(),
48 ))
49 .await
50 .unwrap();
51
52 sleep(Duration::from_secs(1)).await;
53 }
54 });
55
56 // Keep alive both the client and the publisher
57 tokio::join!(
58 publisher.keep_ready_to_publish(),
59 resilient_rabbitmq_client.keep_alive()
60 );
61}examples/multiple_kinds_of_consumers.rs (line 93)
88async fn main() {
89 // Declare a RabbitMqClientHandler
90 let rabbitmq_connection_arguments =
91 ConnectionArguments::new("localhost", 5672, "guest", "guest");
92 let mut resilient_rabbitmq_connection: RabbitMqClientHandler<RabbitMQClient> =
93 RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
94
95 // First kind of client : consumes from a specific queue a given data format
96 let basic_consumer_first_kind_args =
97 ConsumerArguments::new("first_queue", "first_kind_of_consumer").finish();
98 let basic_consumer_first_kind = FirstKindOfRabbitMQClient;
99 resilient_rabbitmq_connection
100 .new_consumer(
101 RabbitMQClient::FirstKind(basic_consumer_first_kind),
102 basic_consumer_first_kind_args,
103 )
104 .await;
105
106 // Second kind of client : consumes from another queue another kind of data
107 let basic_consumer_second_kind_args =
108 ConsumerArguments::new("second_queue", "second_kind_of_consumer").finish();
109 let basic_consumer_second_kind = SecondKindOfRabbitMQClient;
110 resilient_rabbitmq_connection
111 .new_consumer(
112 RabbitMQClient::SecondKind(basic_consumer_second_kind),
113 basic_consumer_second_kind_args,
114 )
115 .await;
116
117 // Keep alive the RabbitMqClientHandler
118 resilient_rabbitmq_connection.keep_alive().await;
119}pub async fn try_reconnect(&mut self) -> Result<(), String>
Sourcepub async fn create_publisher(&mut self) -> Result<PublishChannel, String>
pub async fn create_publisher(&mut self) -> Result<PublishChannel, String>
Examples found in repository?
examples/only_a_publisher.rs (line 34)
26async fn main() {
27 // Declare a RabbitMqClientHandler
28 let rabbitmq_connection_arguments =
29 ConnectionArguments::new("localhost", 5672, "guest", "guest");
30 let mut resilient_rabbitmq_client: RabbitMqClientHandler<MyRabbitMQClient> =
31 RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
32
33 // Declare a PublishChannel
34 let mut publisher = resilient_rabbitmq_client.create_publisher().await.unwrap();
35 // Get a channel from it. This channel is meant to be used to communicate with the PublishChannel type.
36 let publish_sender = resilient_rabbitmq_client
37 .get_publish_sender_clone()
38 .unwrap();
39
40 // For the example, we spawn a task which will publish some random data on an "incoming" routing key every second.
41 tokio::task::spawn(async move {
42 loop {
43 publish_sender
44 .send(DataToPublish::from(
45 "amq.direct".to_string(),
46 "incoming".to_string(),
47 "random data".to_string(),
48 ))
49 .await
50 .unwrap();
51
52 sleep(Duration::from_secs(1)).await;
53 }
54 });
55
56 // Keep alive both the client and the publisher
57 tokio::join!(
58 publisher.keep_ready_to_publish(),
59 resilient_rabbitmq_client.keep_alive()
60 );
61}Sourcepub async fn new_consumer(
&mut self,
basic_consumer: BasicConsumerType,
basic_consumer_args: BasicConsumeArguments,
)
pub async fn new_consumer( &mut self, basic_consumer: BasicConsumerType, basic_consumer_args: BasicConsumeArguments, )
Examples found in repository?
examples/one_consumer_no_publisher.rs (line 42)
31async fn main() {
32 // Declare a RabbitMqClientHandler
33 let rabbitmq_connection_arguments =
34 ConnectionArguments::new("localhost", 5672, "guest", "guest");
35 let mut resilient_rabbitmq_connection: RabbitMqClientHandler<MyRabbitMQClient> =
36 RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
37
38 // Declare a consumer on an 'incoming' queue and include it in the RabbitMqClientHandler
39 let basic_consumer_args = ConsumerArguments::new("incoming", "my_first_resilient_consumer");
40 let basic_consumer = MyRabbitMQClient;
41 resilient_rabbitmq_connection
42 .new_consumer(basic_consumer, basic_consumer_args)
43 .await;
44
45 // Keep alive the RabbitMqClientHandler
46 resilient_rabbitmq_connection.keep_alive().await;
47}More examples
examples/multiple_kinds_of_consumers.rs (lines 100-103)
88async fn main() {
89 // Declare a RabbitMqClientHandler
90 let rabbitmq_connection_arguments =
91 ConnectionArguments::new("localhost", 5672, "guest", "guest");
92 let mut resilient_rabbitmq_connection: RabbitMqClientHandler<RabbitMQClient> =
93 RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
94
95 // First kind of client : consumes from a specific queue a given data format
96 let basic_consumer_first_kind_args =
97 ConsumerArguments::new("first_queue", "first_kind_of_consumer").finish();
98 let basic_consumer_first_kind = FirstKindOfRabbitMQClient;
99 resilient_rabbitmq_connection
100 .new_consumer(
101 RabbitMQClient::FirstKind(basic_consumer_first_kind),
102 basic_consumer_first_kind_args,
103 )
104 .await;
105
106 // Second kind of client : consumes from another queue another kind of data
107 let basic_consumer_second_kind_args =
108 ConsumerArguments::new("second_queue", "second_kind_of_consumer").finish();
109 let basic_consumer_second_kind = SecondKindOfRabbitMQClient;
110 resilient_rabbitmq_connection
111 .new_consumer(
112 RabbitMQClient::SecondKind(basic_consumer_second_kind),
113 basic_consumer_second_kind_args,
114 )
115 .await;
116
117 // Keep alive the RabbitMqClientHandler
118 resilient_rabbitmq_connection.keep_alive().await;
119}pub async fn create_consumer( &mut self, basic_consumer: BasicConsumerType, basic_consumer_args: BasicConsumeArguments, ) -> Result<(), Error>
Sourcepub fn get_publish_sender_clone(&self) -> Result<Sender<DataToPublish>, String>
pub fn get_publish_sender_clone(&self) -> Result<Sender<DataToPublish>, String>
Examples found in repository?
examples/only_a_publisher.rs (line 37)
26async fn main() {
27 // Declare a RabbitMqClientHandler
28 let rabbitmq_connection_arguments =
29 ConnectionArguments::new("localhost", 5672, "guest", "guest");
30 let mut resilient_rabbitmq_client: RabbitMqClientHandler<MyRabbitMQClient> =
31 RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
32
33 // Declare a PublishChannel
34 let mut publisher = resilient_rabbitmq_client.create_publisher().await.unwrap();
35 // Get a channel from it. This channel is meant to be used to communicate with the PublishChannel type.
36 let publish_sender = resilient_rabbitmq_client
37 .get_publish_sender_clone()
38 .unwrap();
39
40 // For the example, we spawn a task which will publish some random data on an "incoming" routing key every second.
41 tokio::task::spawn(async move {
42 loop {
43 publish_sender
44 .send(DataToPublish::from(
45 "amq.direct".to_string(),
46 "incoming".to_string(),
47 "random data".to_string(),
48 ))
49 .await
50 .unwrap();
51
52 sleep(Duration::from_secs(1)).await;
53 }
54 });
55
56 // Keep alive both the client and the publisher
57 tokio::join!(
58 publisher.keep_ready_to_publish(),
59 resilient_rabbitmq_client.keep_alive()
60 );
61}Sourcepub async fn keep_alive(&mut self)
pub async fn keep_alive(&mut self)
Examples found in repository?
examples/one_consumer_no_publisher.rs (line 46)
31async fn main() {
32 // Declare a RabbitMqClientHandler
33 let rabbitmq_connection_arguments =
34 ConnectionArguments::new("localhost", 5672, "guest", "guest");
35 let mut resilient_rabbitmq_connection: RabbitMqClientHandler<MyRabbitMQClient> =
36 RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
37
38 // Declare a consumer on an 'incoming' queue and include it in the RabbitMqClientHandler
39 let basic_consumer_args = ConsumerArguments::new("incoming", "my_first_resilient_consumer");
40 let basic_consumer = MyRabbitMQClient;
41 resilient_rabbitmq_connection
42 .new_consumer(basic_consumer, basic_consumer_args)
43 .await;
44
45 // Keep alive the RabbitMqClientHandler
46 resilient_rabbitmq_connection.keep_alive().await;
47}More examples
examples/only_a_publisher.rs (line 59)
26async fn main() {
27 // Declare a RabbitMqClientHandler
28 let rabbitmq_connection_arguments =
29 ConnectionArguments::new("localhost", 5672, "guest", "guest");
30 let mut resilient_rabbitmq_client: RabbitMqClientHandler<MyRabbitMQClient> =
31 RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
32
33 // Declare a PublishChannel
34 let mut publisher = resilient_rabbitmq_client.create_publisher().await.unwrap();
35 // Get a channel from it. This channel is meant to be used to communicate with the PublishChannel type.
36 let publish_sender = resilient_rabbitmq_client
37 .get_publish_sender_clone()
38 .unwrap();
39
40 // For the example, we spawn a task which will publish some random data on an "incoming" routing key every second.
41 tokio::task::spawn(async move {
42 loop {
43 publish_sender
44 .send(DataToPublish::from(
45 "amq.direct".to_string(),
46 "incoming".to_string(),
47 "random data".to_string(),
48 ))
49 .await
50 .unwrap();
51
52 sleep(Duration::from_secs(1)).await;
53 }
54 });
55
56 // Keep alive both the client and the publisher
57 tokio::join!(
58 publisher.keep_ready_to_publish(),
59 resilient_rabbitmq_client.keep_alive()
60 );
61}examples/multiple_kinds_of_consumers.rs (line 118)
88async fn main() {
89 // Declare a RabbitMqClientHandler
90 let rabbitmq_connection_arguments =
91 ConnectionArguments::new("localhost", 5672, "guest", "guest");
92 let mut resilient_rabbitmq_connection: RabbitMqClientHandler<RabbitMQClient> =
93 RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
94
95 // First kind of client : consumes from a specific queue a given data format
96 let basic_consumer_first_kind_args =
97 ConsumerArguments::new("first_queue", "first_kind_of_consumer").finish();
98 let basic_consumer_first_kind = FirstKindOfRabbitMQClient;
99 resilient_rabbitmq_connection
100 .new_consumer(
101 RabbitMQClient::FirstKind(basic_consumer_first_kind),
102 basic_consumer_first_kind_args,
103 )
104 .await;
105
106 // Second kind of client : consumes from another queue another kind of data
107 let basic_consumer_second_kind_args =
108 ConsumerArguments::new("second_queue", "second_kind_of_consumer").finish();
109 let basic_consumer_second_kind = SecondKindOfRabbitMQClient;
110 resilient_rabbitmq_connection
111 .new_consumer(
112 RabbitMQClient::SecondKind(basic_consumer_second_kind),
113 basic_consumer_second_kind_args,
114 )
115 .await;
116
117 // Keep alive the RabbitMqClientHandler
118 resilient_rabbitmq_connection.keep_alive().await;
119}Auto Trait Implementations§
impl<BasicConsumerType> Freeze for RabbitMqClientHandler<BasicConsumerType>
impl<BasicConsumerType> !RefUnwindSafe for RabbitMqClientHandler<BasicConsumerType>
impl<BasicConsumerType> Send for RabbitMqClientHandler<BasicConsumerType>
impl<BasicConsumerType> Sync for RabbitMqClientHandler<BasicConsumerType>where
BasicConsumerType: Sync,
impl<BasicConsumerType> Unpin for RabbitMqClientHandler<BasicConsumerType>where
BasicConsumerType: Unpin,
impl<BasicConsumerType> UnsafeUnpin for RabbitMqClientHandler<BasicConsumerType>
impl<BasicConsumerType> !UnwindSafe for RabbitMqClientHandler<BasicConsumerType>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more