Skip to main content

RabbitMqClientHandler

Struct RabbitMqClientHandler 

Source
pub struct RabbitMqClientHandler<BasicConsumerType: AsyncConsumer + Clone + Send + 'static> { /* private fields */ }
Expand description

This struct is holding the state of the RabbitMQ client. The whole resiliency revolves around it. It will handle connection errors and backup the list of consumers (basic.consume) you declare and recreate them when something happens. If you need to publish to RabbitMQ as well, a PublishChannel will be extracted from it and regenerated if needed.

Implementations§

Source§

impl<BasicConsumerType: AsyncConsumer + Clone + Send + 'static> RabbitMqClientHandler<BasicConsumerType>

Source

pub async fn new( open_connection_arguments: OpenConnectionArguments, ) -> RabbitMqClientHandler<BasicConsumerType>

Examples found in repository?
examples/one_consumer_no_publisher.rs (line 36)
31async fn main() {
32    // Declare a RabbitMqClientHandler
33    let rabbitmq_connection_arguments =
34        ConnectionArguments::new("localhost", 5672, "guest", "guest");
35    let mut resilient_rabbitmq_connection: RabbitMqClientHandler<MyRabbitMQClient> =
36        RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
37
38    // Declare a consumer on an 'incoming' queue and include it in the RabbitMqClientHandler
39    let basic_consumer_args = ConsumerArguments::new("incoming", "my_first_resilient_consumer");
40    let basic_consumer = MyRabbitMQClient;
41    resilient_rabbitmq_connection
42        .new_consumer(basic_consumer, basic_consumer_args)
43        .await;
44
45    // Keep alive the RabbitMqClientHandler
46    resilient_rabbitmq_connection.keep_alive().await;
47}
More examples
Hide additional examples
examples/only_a_publisher.rs (line 31)
26async fn main() {
27    // Declare a RabbitMqClientHandler
28    let rabbitmq_connection_arguments =
29        ConnectionArguments::new("localhost", 5672, "guest", "guest");
30    let mut resilient_rabbitmq_client: RabbitMqClientHandler<MyRabbitMQClient> =
31        RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
32
33    // Declare a PublishChannel
34    let mut publisher = resilient_rabbitmq_client.create_publisher().await.unwrap();
35    // Get a channel from it. This channel is meant to be used to communicate with the PublishChannel type.
36    let publish_sender = resilient_rabbitmq_client
37        .get_publish_sender_clone()
38        .unwrap();
39
40    // For the example, we spawn a task which will publish some random data on an "incoming" routing key every second.
41    tokio::task::spawn(async move {
42        loop {
43            publish_sender
44                .send(DataToPublish::from(
45                    "amq.direct".to_string(),
46                    "incoming".to_string(),
47                    "random data".to_string(),
48                ))
49                .await
50                .unwrap();
51
52            sleep(Duration::from_secs(1)).await;
53        }
54    });
55
56    // Keep alive both the client and the publisher
57    tokio::join!(
58        publisher.keep_ready_to_publish(),
59        resilient_rabbitmq_client.keep_alive()
60    );
61}
examples/multiple_kinds_of_consumers.rs (line 93)
88async fn main() {
89    // Declare a RabbitMqClientHandler
90    let rabbitmq_connection_arguments =
91        ConnectionArguments::new("localhost", 5672, "guest", "guest");
92    let mut resilient_rabbitmq_connection: RabbitMqClientHandler<RabbitMQClient> =
93        RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
94
95    // First kind of client : consumes from a specific queue a given data format
96    let basic_consumer_first_kind_args =
97        ConsumerArguments::new("first_queue", "first_kind_of_consumer").finish();
98    let basic_consumer_first_kind = FirstKindOfRabbitMQClient;
99    resilient_rabbitmq_connection
100        .new_consumer(
101            RabbitMQClient::FirstKind(basic_consumer_first_kind),
102            basic_consumer_first_kind_args,
103        )
104        .await;
105
106    // Second kind of client : consumes from another queue another kind of data
107    let basic_consumer_second_kind_args =
108        ConsumerArguments::new("second_queue", "second_kind_of_consumer").finish();
109    let basic_consumer_second_kind = SecondKindOfRabbitMQClient;
110    resilient_rabbitmq_connection
111        .new_consumer(
112            RabbitMQClient::SecondKind(basic_consumer_second_kind),
113            basic_consumer_second_kind_args,
114        )
115        .await;
116
117    // Keep alive the RabbitMqClientHandler
118    resilient_rabbitmq_connection.keep_alive().await;
119}
Source

pub async fn try_reconnect(&mut self) -> Result<(), String>

Source

pub async fn create_publisher(&mut self) -> Result<PublishChannel, String>

Examples found in repository?
examples/only_a_publisher.rs (line 34)
26async fn main() {
27    // Declare a RabbitMqClientHandler
28    let rabbitmq_connection_arguments =
29        ConnectionArguments::new("localhost", 5672, "guest", "guest");
30    let mut resilient_rabbitmq_client: RabbitMqClientHandler<MyRabbitMQClient> =
31        RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
32
33    // Declare a PublishChannel
34    let mut publisher = resilient_rabbitmq_client.create_publisher().await.unwrap();
35    // Get a channel from it. This channel is meant to be used to communicate with the PublishChannel type.
36    let publish_sender = resilient_rabbitmq_client
37        .get_publish_sender_clone()
38        .unwrap();
39
40    // For the example, we spawn a task which will publish some random data on an "incoming" routing key every second.
41    tokio::task::spawn(async move {
42        loop {
43            publish_sender
44                .send(DataToPublish::from(
45                    "amq.direct".to_string(),
46                    "incoming".to_string(),
47                    "random data".to_string(),
48                ))
49                .await
50                .unwrap();
51
52            sleep(Duration::from_secs(1)).await;
53        }
54    });
55
56    // Keep alive both the client and the publisher
57    tokio::join!(
58        publisher.keep_ready_to_publish(),
59        resilient_rabbitmq_client.keep_alive()
60    );
61}
Source

pub async fn new_consumer( &mut self, basic_consumer: BasicConsumerType, basic_consumer_args: BasicConsumeArguments, )

Examples found in repository?
examples/one_consumer_no_publisher.rs (line 42)
31async fn main() {
32    // Declare a RabbitMqClientHandler
33    let rabbitmq_connection_arguments =
34        ConnectionArguments::new("localhost", 5672, "guest", "guest");
35    let mut resilient_rabbitmq_connection: RabbitMqClientHandler<MyRabbitMQClient> =
36        RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
37
38    // Declare a consumer on an 'incoming' queue and include it in the RabbitMqClientHandler
39    let basic_consumer_args = ConsumerArguments::new("incoming", "my_first_resilient_consumer");
40    let basic_consumer = MyRabbitMQClient;
41    resilient_rabbitmq_connection
42        .new_consumer(basic_consumer, basic_consumer_args)
43        .await;
44
45    // Keep alive the RabbitMqClientHandler
46    resilient_rabbitmq_connection.keep_alive().await;
47}
More examples
Hide additional examples
examples/multiple_kinds_of_consumers.rs (lines 100-103)
88async fn main() {
89    // Declare a RabbitMqClientHandler
90    let rabbitmq_connection_arguments =
91        ConnectionArguments::new("localhost", 5672, "guest", "guest");
92    let mut resilient_rabbitmq_connection: RabbitMqClientHandler<RabbitMQClient> =
93        RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
94
95    // First kind of client : consumes from a specific queue a given data format
96    let basic_consumer_first_kind_args =
97        ConsumerArguments::new("first_queue", "first_kind_of_consumer").finish();
98    let basic_consumer_first_kind = FirstKindOfRabbitMQClient;
99    resilient_rabbitmq_connection
100        .new_consumer(
101            RabbitMQClient::FirstKind(basic_consumer_first_kind),
102            basic_consumer_first_kind_args,
103        )
104        .await;
105
106    // Second kind of client : consumes from another queue another kind of data
107    let basic_consumer_second_kind_args =
108        ConsumerArguments::new("second_queue", "second_kind_of_consumer").finish();
109    let basic_consumer_second_kind = SecondKindOfRabbitMQClient;
110    resilient_rabbitmq_connection
111        .new_consumer(
112            RabbitMQClient::SecondKind(basic_consumer_second_kind),
113            basic_consumer_second_kind_args,
114        )
115        .await;
116
117    // Keep alive the RabbitMqClientHandler
118    resilient_rabbitmq_connection.keep_alive().await;
119}
Source

pub async fn create_consumer( &mut self, basic_consumer: BasicConsumerType, basic_consumer_args: BasicConsumeArguments, ) -> Result<(), Error>

Source

pub fn get_publish_sender_clone(&self) -> Result<Sender<DataToPublish>, String>

Examples found in repository?
examples/only_a_publisher.rs (line 37)
26async fn main() {
27    // Declare a RabbitMqClientHandler
28    let rabbitmq_connection_arguments =
29        ConnectionArguments::new("localhost", 5672, "guest", "guest");
30    let mut resilient_rabbitmq_client: RabbitMqClientHandler<MyRabbitMQClient> =
31        RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
32
33    // Declare a PublishChannel
34    let mut publisher = resilient_rabbitmq_client.create_publisher().await.unwrap();
35    // Get a channel from it. This channel is meant to be used to communicate with the PublishChannel type.
36    let publish_sender = resilient_rabbitmq_client
37        .get_publish_sender_clone()
38        .unwrap();
39
40    // For the example, we spawn a task which will publish some random data on an "incoming" routing key every second.
41    tokio::task::spawn(async move {
42        loop {
43            publish_sender
44                .send(DataToPublish::from(
45                    "amq.direct".to_string(),
46                    "incoming".to_string(),
47                    "random data".to_string(),
48                ))
49                .await
50                .unwrap();
51
52            sleep(Duration::from_secs(1)).await;
53        }
54    });
55
56    // Keep alive both the client and the publisher
57    tokio::join!(
58        publisher.keep_ready_to_publish(),
59        resilient_rabbitmq_client.keep_alive()
60    );
61}
Source

pub async fn keep_alive(&mut self)

Examples found in repository?
examples/one_consumer_no_publisher.rs (line 46)
31async fn main() {
32    // Declare a RabbitMqClientHandler
33    let rabbitmq_connection_arguments =
34        ConnectionArguments::new("localhost", 5672, "guest", "guest");
35    let mut resilient_rabbitmq_connection: RabbitMqClientHandler<MyRabbitMQClient> =
36        RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
37
38    // Declare a consumer on an 'incoming' queue and include it in the RabbitMqClientHandler
39    let basic_consumer_args = ConsumerArguments::new("incoming", "my_first_resilient_consumer");
40    let basic_consumer = MyRabbitMQClient;
41    resilient_rabbitmq_connection
42        .new_consumer(basic_consumer, basic_consumer_args)
43        .await;
44
45    // Keep alive the RabbitMqClientHandler
46    resilient_rabbitmq_connection.keep_alive().await;
47}
More examples
Hide additional examples
examples/only_a_publisher.rs (line 59)
26async fn main() {
27    // Declare a RabbitMqClientHandler
28    let rabbitmq_connection_arguments =
29        ConnectionArguments::new("localhost", 5672, "guest", "guest");
30    let mut resilient_rabbitmq_client: RabbitMqClientHandler<MyRabbitMQClient> =
31        RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
32
33    // Declare a PublishChannel
34    let mut publisher = resilient_rabbitmq_client.create_publisher().await.unwrap();
35    // Get a channel from it. This channel is meant to be used to communicate with the PublishChannel type.
36    let publish_sender = resilient_rabbitmq_client
37        .get_publish_sender_clone()
38        .unwrap();
39
40    // For the example, we spawn a task which will publish some random data on an "incoming" routing key every second.
41    tokio::task::spawn(async move {
42        loop {
43            publish_sender
44                .send(DataToPublish::from(
45                    "amq.direct".to_string(),
46                    "incoming".to_string(),
47                    "random data".to_string(),
48                ))
49                .await
50                .unwrap();
51
52            sleep(Duration::from_secs(1)).await;
53        }
54    });
55
56    // Keep alive both the client and the publisher
57    tokio::join!(
58        publisher.keep_ready_to_publish(),
59        resilient_rabbitmq_client.keep_alive()
60    );
61}
examples/multiple_kinds_of_consumers.rs (line 118)
88async fn main() {
89    // Declare a RabbitMqClientHandler
90    let rabbitmq_connection_arguments =
91        ConnectionArguments::new("localhost", 5672, "guest", "guest");
92    let mut resilient_rabbitmq_connection: RabbitMqClientHandler<RabbitMQClient> =
93        RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
94
95    // First kind of client : consumes from a specific queue a given data format
96    let basic_consumer_first_kind_args =
97        ConsumerArguments::new("first_queue", "first_kind_of_consumer").finish();
98    let basic_consumer_first_kind = FirstKindOfRabbitMQClient;
99    resilient_rabbitmq_connection
100        .new_consumer(
101            RabbitMQClient::FirstKind(basic_consumer_first_kind),
102            basic_consumer_first_kind_args,
103        )
104        .await;
105
106    // Second kind of client : consumes from another queue another kind of data
107    let basic_consumer_second_kind_args =
108        ConsumerArguments::new("second_queue", "second_kind_of_consumer").finish();
109    let basic_consumer_second_kind = SecondKindOfRabbitMQClient;
110    resilient_rabbitmq_connection
111        .new_consumer(
112            RabbitMQClient::SecondKind(basic_consumer_second_kind),
113            basic_consumer_second_kind_args,
114        )
115        .await;
116
117    // Keep alive the RabbitMqClientHandler
118    resilient_rabbitmq_connection.keep_alive().await;
119}

Auto Trait Implementations§

§

impl<BasicConsumerType> Freeze for RabbitMqClientHandler<BasicConsumerType>

§

impl<BasicConsumerType> !RefUnwindSafe for RabbitMqClientHandler<BasicConsumerType>

§

impl<BasicConsumerType> Send for RabbitMqClientHandler<BasicConsumerType>

§

impl<BasicConsumerType> Sync for RabbitMqClientHandler<BasicConsumerType>
where BasicConsumerType: Sync,

§

impl<BasicConsumerType> Unpin for RabbitMqClientHandler<BasicConsumerType>
where BasicConsumerType: Unpin,

§

impl<BasicConsumerType> UnsafeUnpin for RabbitMqClientHandler<BasicConsumerType>

§

impl<BasicConsumerType> !UnwindSafe for RabbitMqClientHandler<BasicConsumerType>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more