1use std::{
2 collections::HashMap,
3 io,
4 pin::Pin,
5 sync::{atomic::AtomicU64, Arc},
6 task::{Context, Poll},
7 time::{Duration, Instant},
8};
9use std::{future::Future, sync::atomic::Ordering};
10
11use futures::{
12 stream::{SplitSink, SplitStream},
13 Stream, StreamExt, TryFutureExt,
14};
15use pin_project::pin_project;
16use rabbitmq_stream_protocol::commands::exchange_command_versions::{
17 ExchangeCommandVersionsRequest, ExchangeCommandVersionsResponse,
18};
19use tokio::io::AsyncRead;
20use tokio::io::AsyncWrite;
21use tokio::io::ReadBuf;
22use tokio::sync::RwLock;
23use tokio::{net::TcpStream, sync::Notify};
24use tokio_rustls::client::TlsStream;
25
26use tokio_util::codec::Framed;
27use tracing::{trace, warn};
28
29use crate::{error::ClientError, RabbitMQStreamResult};
30pub use message::ClientMessage;
31pub use metadata::{Broker, StreamMetadata};
32pub use metrics::MetricsCollector;
33pub use options::{ClientOptions, TlsConfiguration, TlsConfigurationBuilder};
34use rabbitmq_stream_protocol::{
35 commands::{
36 close::{CloseRequest, CloseResponse},
37 consumer_update_request::ConsumerUpdateRequestCommand,
38 create_stream::CreateStreamCommand,
39 create_super_stream::CreateSuperStreamCommand,
40 credit::CreditCommand,
41 declare_publisher::DeclarePublisherCommand,
42 delete::Delete,
43 delete_publisher::DeletePublisherCommand,
44 delete_super_stream::DeleteSuperStreamCommand,
45 generic::GenericResponse,
46 heart_beat::HeartBeatCommand,
47 metadata::MetadataCommand,
48 open::{OpenCommand, OpenResponse},
49 peer_properties::{PeerPropertiesCommand, PeerPropertiesResponse},
50 publish::PublishCommand,
51 query_offset::{QueryOffsetRequest, QueryOffsetResponse},
52 query_publisher_sequence::{QueryPublisherRequest, QueryPublisherResponse},
53 sasl_authenticate::SaslAuthenticateCommand,
54 sasl_handshake::{SaslHandshakeCommand, SaslHandshakeResponse},
55 store_offset::StoreOffset,
56 subscribe::{OffsetSpecification, SubscribeCommand},
57 superstream_partitions::SuperStreamPartitionsRequest,
58 superstream_partitions::SuperStreamPartitionsResponse,
59 superstream_route::SuperStreamRouteRequest,
60 superstream_route::SuperStreamRouteResponse,
61 tune::TunesCommand,
62 unsubscribe::UnSubscribeCommand,
63 },
64 types::PublishedMessage,
65 FromResponse, Request, Response, ResponseCode, ResponseKind,
66};
67
68pub use self::handler::{MessageHandler, MessageResult};
69use self::{
70 channel::{channel, ChannelReceiver, ChannelSender},
71 codec::RabbitMqStreamCodec,
72 dispatcher::Dispatcher,
73 message::BaseMessage,
74};
75
76mod channel;
77mod codec;
78mod dispatcher;
79mod handler;
80mod message;
81mod metadata;
82mod metrics;
83mod options;
84mod task;
85
86#[pin_project(project = StreamProj)]
87#[derive(Debug)]
88pub enum GenericTcpStream {
89 Tcp(#[pin] TcpStream),
90 SecureTcp(#[pin] Box<TlsStream<TcpStream>>),
91}
92
93impl AsyncRead for GenericTcpStream {
94 fn poll_read(
95 self: Pin<&mut Self>,
96 cx: &mut Context<'_>,
97 buf: &mut ReadBuf<'_>,
98 ) -> Poll<io::Result<()>> {
99 match self.project() {
100 StreamProj::Tcp(tcp_stream) => tcp_stream.poll_read(cx, buf),
101 StreamProj::SecureTcp(tls_stream) => tls_stream.poll_read(cx, buf),
102 }
103 }
104}
105
106impl AsyncWrite for GenericTcpStream {
107 fn poll_write(
108 self: Pin<&mut Self>,
109 cx: &mut Context<'_>,
110 buf: &[u8],
111 ) -> Poll<io::Result<usize>> {
112 match self.project() {
113 StreamProj::Tcp(tcp_stream) => tcp_stream.poll_write(cx, buf),
114 StreamProj::SecureTcp(tls_stream) => tls_stream.poll_write(cx, buf),
115 }
116 }
117
118 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
119 match self.project() {
120 StreamProj::Tcp(tcp_stream) => tcp_stream.poll_flush(cx),
121 StreamProj::SecureTcp(tls_stream) => tls_stream.poll_flush(cx),
122 }
123 }
124
125 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
126 match self.project() {
127 StreamProj::Tcp(tcp_stream) => tcp_stream.poll_shutdown(cx),
128 StreamProj::SecureTcp(tls_stream) => tls_stream.poll_shutdown(cx),
129 }
130 }
131}
132
133type SinkConnection = SplitSink<Framed<GenericTcpStream, RabbitMqStreamCodec>, Request>;
134type StreamConnection = SplitStream<Framed<GenericTcpStream, RabbitMqStreamCodec>>;
135
136pub struct ClientState {
137 server_properties: HashMap<String, String>,
138 connection_properties: HashMap<String, String>,
139 handler: Option<Arc<dyn MessageHandler>>,
140 heartbeat: u32,
141 max_frame_size: u32,
142 last_heatbeat: Instant,
143 heartbeat_task: Option<task::TaskHandle>,
144 last_received_message: Arc<RwLock<Instant>>,
145}
146
147#[derive(Clone)]
151pub struct Client {
152 dispatcher: Dispatcher<Client>,
153 channel: Arc<ChannelSender<SinkConnection>>,
154 state: Arc<RwLock<ClientState>>,
155 opts: ClientOptions,
156 tune_notifier: Arc<Notify>,
157 publish_sequence: Arc<AtomicU64>,
158 filtering_supported: bool,
159 client_properties: HashMap<String, String>,
160}
161
162impl Client {
163 pub async fn connect(opts: impl Into<ClientOptions>) -> Result<Client, ClientError> {
164 let broker = opts.into();
165
166 let (sender, receiver) = Client::create_connection(&broker).await?;
167
168 let last_received_message = Arc::new(RwLock::new(Instant::now()));
169
170 let dispatcher = Dispatcher::new();
171 let state = ClientState {
172 server_properties: HashMap::new(),
173 connection_properties: HashMap::new(),
174 handler: None,
175 heartbeat: broker.heartbeat,
176 max_frame_size: broker.max_frame_size,
177 last_heatbeat: Instant::now(),
178 heartbeat_task: None,
179 last_received_message: last_received_message.clone(),
180 };
181 let mut client = Client {
182 dispatcher,
183 opts: broker,
184 channel: Arc::new(sender),
185 state: Arc::new(RwLock::new(state)),
186 tune_notifier: Arc::new(Notify::new()),
187 publish_sequence: Arc::new(AtomicU64::new(1)),
188 filtering_supported: false,
189 client_properties: HashMap::new(),
190 };
191
192 const VERSION: &str = env!("CARGO_PKG_VERSION");
193
194 client
195 .client_properties
196 .insert(String::from("product"), String::from("RabbitMQ"));
197 client
198 .client_properties
199 .insert(String::from("version"), String::from(VERSION));
200 client
201 .client_properties
202 .insert(String::from("platform"), String::from("Rust"));
203 client.client_properties.insert(
204 String::from("copyright"),
205 String::from("Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries."));
206 client.client_properties.insert(
207 String::from("information"),
208 String::from(
209 "Licensed under the Apache 2.0 and MPL 2.0 licenses. See https://www.rabbitmq.com/",
210 ),
211 );
212 client.client_properties.insert(
213 String::from("connection_name"),
214 client.opts.client_provided_name.clone(),
215 );
216
217 client.initialize(receiver).await?;
218
219 let command_versions = client.exchange_command_versions().await?;
220 let (_, max_version) = command_versions.key_version(2);
221 if max_version >= 2 {
222 client.filtering_supported = true
223 }
224 Ok(client)
225 }
226
227 pub async fn server_properties(&self) -> HashMap<String, String> {
229 self.state.read().await.server_properties.clone()
230 }
231
232 pub async fn connection_properties(&self) -> HashMap<String, String> {
234 self.state.read().await.connection_properties.clone()
235 }
236
237 pub async fn set_handler<H: MessageHandler>(&self, handler: H) {
238 let mut state = self.state.write().await;
239
240 state.handler = Some(Arc::new(handler));
241 }
242
243 pub fn is_closed(&self) -> bool {
244 self.channel.is_closed()
245 }
246
247 pub async fn close(&self) -> RabbitMQStreamResult<()> {
248 if self.is_closed() {
249 return Err(ClientError::AlreadyClosed);
250 }
251 let _: CloseResponse = self
252 .send_and_receive(|correlation_id| {
253 CloseRequest::new(correlation_id, ResponseCode::Ok, "Ok".to_owned())
254 })
255 .await?;
256
257 let mut state = self.state.write().await;
258 state.heartbeat_task.take();
260 drop(state);
261
262 self.force_drop_connection().await
263 }
264
265 async fn force_drop_connection(&self) -> RabbitMQStreamResult<()> {
266 self.channel.close().await
267 }
268
269 pub async fn subscribe(
270 &self,
271 subscription_id: u8,
272 stream: &str,
273 offset_specification: OffsetSpecification,
274 credit: u16,
275 properties: HashMap<String, String>,
276 ) -> RabbitMQStreamResult<GenericResponse> {
277 self.send_and_receive(|correlation_id| {
278 SubscribeCommand::new(
279 correlation_id,
280 subscription_id,
281 stream.to_owned(),
282 offset_specification,
283 credit,
284 properties,
285 )
286 })
287 .await
288 }
289
290 pub async fn unsubscribe(&self, subscription_id: u8) -> RabbitMQStreamResult<GenericResponse> {
291 self.send_and_receive(|correlation_id| {
292 UnSubscribeCommand::new(correlation_id, subscription_id)
293 })
294 .await
295 }
296
297 pub async fn partitions(
298 &self,
299 super_stream: String,
300 ) -> RabbitMQStreamResult<SuperStreamPartitionsResponse> {
301 self.send_and_receive(|correlation_id| {
302 SuperStreamPartitionsRequest::new(correlation_id, super_stream)
303 })
304 .await
305 }
306
307 pub async fn route(
308 &self,
309 routing_key: String,
310 super_stream: String,
311 ) -> RabbitMQStreamResult<SuperStreamRouteResponse> {
312 self.send_and_receive(|correlation_id| {
313 SuperStreamRouteRequest::new(correlation_id, routing_key, super_stream)
314 })
315 .await
316 }
317
318 pub async fn create_stream(
319 &self,
320 stream: &str,
321 options: HashMap<String, String>,
322 ) -> RabbitMQStreamResult<GenericResponse> {
323 self.send_and_receive(|correlation_id| {
324 CreateStreamCommand::new(correlation_id, stream.to_owned(), options)
325 })
326 .await
327 }
328
329 pub async fn create_super_stream(
330 &self,
331 super_stream: &str,
332 partitions: Vec<String>,
333 binding_keys: Vec<String>,
334 options: HashMap<String, String>,
335 ) -> RabbitMQStreamResult<GenericResponse> {
336 self.send_and_receive(|correlation_id| {
337 CreateSuperStreamCommand::new(
338 correlation_id,
339 super_stream.to_owned(),
340 partitions,
341 binding_keys,
342 options,
343 )
344 })
345 .await
346 }
347
348 pub async fn delete_stream(&self, stream: &str) -> RabbitMQStreamResult<GenericResponse> {
349 self.send_and_receive(|correlation_id| Delete::new(correlation_id, stream.to_owned()))
350 .await
351 }
352
353 pub async fn delete_super_stream(
354 &self,
355 super_stream: &str,
356 ) -> RabbitMQStreamResult<GenericResponse> {
357 self.send_and_receive(|correlation_id| {
358 DeleteSuperStreamCommand::new(correlation_id, super_stream.to_owned())
359 })
360 .await
361 }
362
363 pub async fn credit(&self, subscription_id: u8, credit: u16) -> RabbitMQStreamResult<()> {
364 self.send(CreditCommand::new(subscription_id, credit)).await
365 }
366
367 pub async fn metadata(
368 &self,
369 streams: Vec<String>,
370 ) -> RabbitMQStreamResult<HashMap<String, StreamMetadata>> {
371 self.send_and_receive(|correlation_id| MetadataCommand::new(correlation_id, streams))
372 .await
373 .map(metadata::from_response)
374 }
375
376 pub async fn store_offset(
377 &self,
378 reference: &str,
379 stream: &str,
380 offset: u64,
381 ) -> RabbitMQStreamResult<()> {
382 self.send(StoreOffset::new(
383 reference.to_owned(),
384 stream.to_owned(),
385 offset,
386 ))
387 .await
388 }
389
390 pub async fn query_offset(&self, reference: String, stream: &str) -> Result<u64, ClientError> {
391 let response = self
392 .send_and_receive::<QueryOffsetResponse, _, _>(|correlation_id| {
393 QueryOffsetRequest::new(correlation_id, reference, stream.to_owned())
394 })
395 .await?;
396
397 if !response.is_ok() {
398 Err(ClientError::RequestError(response.code().clone()))
399 } else {
400 Ok(response.from_response())
401 }
402 }
403
404 pub async fn declare_publisher(
405 &self,
406 publisher_id: u8,
407 publisher_reference: Option<String>,
408 stream: &str,
409 ) -> RabbitMQStreamResult<GenericResponse> {
410 self.send_and_receive(|correlation_id| {
411 DeclarePublisherCommand::new(
412 correlation_id,
413 publisher_id,
414 publisher_reference,
415 stream.to_owned(),
416 )
417 })
418 .await
419 }
420
421 pub async fn delete_publisher(
422 &self,
423 publisher_id: u8,
424 ) -> RabbitMQStreamResult<GenericResponse> {
425 self.send_and_receive(|correlation_id| {
426 DeletePublisherCommand::new(correlation_id, publisher_id)
427 })
428 .await
429 }
430
431 pub async fn publish<T: BaseMessage>(
432 &self,
433 publisher_id: u8,
434 messages: impl Into<Vec<T>>,
435 version: u16,
436 ) -> RabbitMQStreamResult<Vec<u64>> {
437 let messages: Vec<PublishedMessage> = messages
438 .into()
439 .into_iter()
440 .map(|message| {
441 let publishing_id: u64 = message
442 .publishing_id()
443 .unwrap_or_else(|| self.publish_sequence.fetch_add(1, Ordering::Relaxed));
444 let filter_value = message.filter_value();
445 PublishedMessage::new(publishing_id, message.to_message(), filter_value)
446 })
447 .collect();
448 let sequences = messages
449 .iter()
450 .map(rabbitmq_stream_protocol::types::PublishedMessage::publishing_id)
451 .collect();
452 let len = messages.len();
453
454 self.send(PublishCommand::new(publisher_id, messages, version))
456 .await?;
457
458 self.opts.collector.publish(len as u64).await;
459
460 Ok(sequences)
461 }
462
463 pub async fn query_publisher_sequence(
464 &self,
465 reference: &str,
466 stream: &str,
467 ) -> Result<u64, ClientError> {
468 self.send_and_receive::<QueryPublisherResponse, _, _>(|correlation_id| {
469 QueryPublisherRequest::new(correlation_id, reference.to_owned(), stream.to_owned())
470 })
471 .await
472 .map(|sequence| sequence.from_response())
473 }
474
475 pub async fn exchange_command_versions(
476 &self,
477 ) -> RabbitMQStreamResult<ExchangeCommandVersionsResponse> {
478 self.send_and_receive::<ExchangeCommandVersionsResponse, _, _>(|correlation_id| {
479 ExchangeCommandVersionsRequest::new(correlation_id, vec![])
480 })
481 .await
482 }
483
484 pub fn filtering_supported(&self) -> bool {
485 self.filtering_supported
486 }
487
488 pub async fn set_heartbeat(&self, heartbeat: u32) {
490 let mut state = self.state.write().await;
491 state.heartbeat = heartbeat;
492 state.heartbeat_task =
494 self.start_hearbeat_task(heartbeat, state.last_received_message.clone());
495 }
496
497 async fn create_connection(
498 broker: &ClientOptions,
499 ) -> Result<
500 (
501 ChannelSender<SinkConnection>,
502 ChannelReceiver<StreamConnection>,
503 ),
504 ClientError,
505 > {
506 let stream = broker.build_generic_tcp_stream().await?;
507 let stream = Framed::new(stream, RabbitMqStreamCodec {});
508
509 let (sink, stream) = stream.split();
510 let (tx, rx) = channel(sink, stream);
511
512 Ok((tx, rx))
513 }
514
515 async fn initialize<T>(&mut self, receiver: ChannelReceiver<T>) -> Result<(), ClientError>
516 where
517 T: Stream<Item = Result<Response, ClientError>> + Unpin + Send,
518 T: 'static,
519 {
520 self.dispatcher.set_handler(self.clone()).await;
521 self.dispatcher.start(receiver).await;
522
523 self.with_state_lock(self.peer_properties(), move |state, server_properties| {
524 state.server_properties = server_properties;
525 })
526 .await?;
527 self.authenticate().await?;
528
529 self.wait_for_tune_data().await?;
530
531 self.with_state_lock(self.open(), |state, connection_properties| {
532 state.connection_properties = connection_properties;
533 })
534 .await?;
535
536 let mut state = self.state.write().await;
538 state.heartbeat_task =
539 self.start_hearbeat_task(state.heartbeat, state.last_received_message.clone());
540 drop(state);
541
542 Ok(())
543 }
544
545 async fn with_state_lock<T>(
546 &self,
547 task: impl Future<Output = RabbitMQStreamResult<T>>,
548 mut updater: impl FnMut(&mut ClientState, T),
549 ) -> RabbitMQStreamResult<()> {
550 let result = task.await?;
551
552 let mut state = self.state.write().await;
553
554 updater(&mut state, result);
555
556 Ok(())
557 }
558
559 fn negotiate_value(&self, client: u32, server: u32) -> u32 {
560 match (client, server) {
561 (client, server) if client == 0 || server == 0 => client.max(server),
562 (client, server) => client.min(server),
563 }
564 }
565
566 async fn wait_for_tune_data(&mut self) -> Result<(), ClientError> {
567 self.tune_notifier.notified().await;
568 Ok(())
569 }
570
571 async fn authenticate(&self) -> Result<(), ClientError> {
572 self.sasl_mechanism()
573 .and_then(|mechanisms| self.handle_authentication(mechanisms))
574 .await
575 }
576
577 async fn handle_authentication(&self, _mechanism: Vec<String>) -> Result<(), ClientError> {
578 let auth_data = format!("\u{0000}{}\u{0000}{}", self.opts.user, self.opts.password);
579
580 let response = self
581 .send_and_receive::<GenericResponse, _, _>(|correlation_id| {
582 SaslAuthenticateCommand::new(
583 correlation_id,
584 "PLAIN".to_owned(),
585 auth_data.as_bytes().to_vec(),
586 )
587 })
588 .await?;
589
590 if response.is_ok() {
591 Ok(())
592 } else {
593 Err(ClientError::RequestError(response.code().clone()))
594 }
595 }
596
597 async fn sasl_mechanism(&self) -> Result<Vec<String>, ClientError> {
598 self.send_and_receive::<SaslHandshakeResponse, _, _>(|correlation_id| {
599 SaslHandshakeCommand::new(correlation_id)
600 })
601 .await
602 .map(|handshake| handshake.mechanisms)
603 }
604
605 async fn send_and_receive<T, R, M>(&self, msg_factory: M) -> Result<T, ClientError>
606 where
607 R: Into<Request>,
608 T: FromResponse,
609 M: FnOnce(u32) -> R,
610 {
611 let Some((correlation_id, mut receiver)) = self.dispatcher.response_channel() else {
612 trace!("Connection is closed here");
613 return Err(ClientError::ConnectionClosed);
614 };
615
616 self.channel
617 .send(msg_factory(correlation_id).into())
618 .await?;
619
620 let response = receiver.recv().await.ok_or(ClientError::ConnectionClosed)?;
621
622 self.handle_response::<T>(response).await
623 }
624
625 async fn send<R>(&self, msg: R) -> Result<(), ClientError>
626 where
627 R: Into<Request>,
628 {
629 self.channel.send(msg.into()).await?;
630 Ok(())
631 }
632
633 async fn handle_response<T: FromResponse>(&self, response: Response) -> Result<T, ClientError> {
634 response.get::<T>().ok_or_else(|| {
635 ClientError::CastError(format!(
636 "Cannot cast response to {}",
637 std::any::type_name::<T>()
638 ))
639 })
640 }
641
642 async fn open(&self) -> Result<HashMap<String, String>, ClientError> {
643 self.send_and_receive::<OpenResponse, _, _>(|correlation_id| {
644 OpenCommand::new(correlation_id, self.opts.v_host.clone())
645 })
646 .await
647 .and_then(|open| {
648 if open.is_ok() {
649 Ok(open.connection_properties)
650 } else {
651 Err(ClientError::RequestError(open.code().clone()))
652 }
653 })
654 }
655
656 async fn peer_properties(&self) -> Result<HashMap<String, String>, ClientError> {
657 self.send_and_receive::<PeerPropertiesResponse, _, _>(|correlation_id| {
658 PeerPropertiesCommand::new(correlation_id, self.client_properties.clone())
659 })
660 .await
661 .map(|peer_properties| peer_properties.server_properties)
662 }
663
664 async fn handle_tune_command(&self, tunes: &TunesCommand) {
665 let mut state = self.state.write().await;
666 state.heartbeat = self.negotiate_value(self.opts.heartbeat, tunes.heartbeat);
667 state.max_frame_size = self.negotiate_value(self.opts.max_frame_size, tunes.max_frame_size);
668
669 let heart_beat = state.heartbeat;
670 let max_frame_size = state.max_frame_size;
671
672 trace!(
673 "Handling tune with frame size {} and heartbeat {}",
674 max_frame_size,
675 heart_beat
676 );
677
678 if state.heartbeat_task.take().is_some() {
679 state.heartbeat_task =
681 self.start_hearbeat_task(state.heartbeat, state.last_received_message.clone());
682 }
683
684 drop(state);
685
686 let _ = self
687 .channel
688 .send(TunesCommand::new(max_frame_size, heart_beat).into())
689 .await;
690
691 self.tune_notifier.notify_one();
692 }
693
694 fn start_hearbeat_task(
695 &self,
696 heartbeat: u32,
697 last_received_message: Arc<RwLock<Instant>>,
698 ) -> Option<task::TaskHandle> {
699 if heartbeat == 0 {
700 return None;
701 }
702 let heartbeat_interval = (heartbeat / 2).max(1);
703 let channel = self.channel.clone();
704
705 let client = self.clone();
706
707 let heartbeat_task: task::TaskHandle = tokio::spawn(async move {
708 let timeout_threashold = u64::from(heartbeat * 4);
709
710 loop {
711 trace!("Sending heartbeat");
712 if channel
713 .send(HeartBeatCommand::default().into())
714 .await
715 .is_err()
716 {
717 break;
718 }
719 tokio::time::sleep(Duration::from_secs(heartbeat_interval.into())).await;
720
721 let now = Instant::now();
722 let last_message = last_received_message.read().await;
723 if now.duration_since(*last_message) >= Duration::from_secs(timeout_threashold) {
724 warn!("Heartbeat timeout reached. Force closing connection.");
725 if !client.is_closed() {
726 if let Err(e) = client.close().await {
727 warn!("Error closing client: {}", e);
728 }
729 }
730 break;
731 }
732 }
733
734 warn!("Heartbeat task stopped. Force closing connection");
735 })
736 .into();
737
738 Some(heartbeat_task)
739 }
740
741 async fn handle_heart_beat_command(&self) {
742 trace!("Received heartbeat");
743 let mut state = self.state.write().await;
744 state.last_heatbeat = Instant::now();
745 }
746
747 pub async fn consumer_update(
748 &self,
749 correlation_id: u32,
750 offset_specification: OffsetSpecification,
751 ) -> RabbitMQStreamResult<GenericResponse> {
752 self.send_and_receive(|_| {
753 ConsumerUpdateRequestCommand::new(correlation_id, 1, offset_specification)
754 })
755 .await
756 }
757}
758
759#[async_trait::async_trait]
760impl MessageHandler for Client {
761 async fn handle_message(&self, item: MessageResult) -> RabbitMQStreamResult<()> {
762 match &item {
763 Some(Ok(response)) => {
764 {
766 let s = self.state.read().await;
767 let mut last_received_message = s.last_received_message.write().await;
768 *last_received_message = Instant::now();
769 drop(last_received_message);
770 drop(s);
771 }
772
773 match response.kind_ref() {
774 ResponseKind::Tunes(tune) => self.handle_tune_command(tune).await,
775 ResponseKind::Heartbeat(_) => self.handle_heart_beat_command().await,
776 _ => {
777 if let Some(handler) = self.state.read().await.handler.as_ref() {
778 let handler = handler.clone();
779
780 tokio::task::spawn(async move { handler.handle_message(item).await });
781 }
782 }
783 }
784 }
785 Some(Err(err)) => {
786 trace!(?err);
787 if let Some(handler) = self.state.read().await.handler.as_ref() {
788 let handler = handler.clone();
789
790 tokio::task::spawn(async move { handler.handle_message(item).await });
791 }
792 }
793 None => {
794 trace!("Closing client");
795 if let Some(handler) = self.state.read().await.handler.as_ref() {
796 let handler = handler.clone();
797 tokio::task::spawn(async move { handler.handle_message(None).await });
798 }
799 }
800 }
801
802 Ok(())
803 }
804}