rabbitmq_stream_client/client/
mod.rs

1use std::{
2    collections::HashMap,
3    io,
4    pin::Pin,
5    sync::{atomic::AtomicU64, Arc},
6    task::{Context, Poll},
7    time::{Duration, Instant},
8};
9use std::{future::Future, sync::atomic::Ordering};
10
11use futures::{
12    stream::{SplitSink, SplitStream},
13    Stream, StreamExt, TryFutureExt,
14};
15use pin_project::pin_project;
16use rabbitmq_stream_protocol::commands::exchange_command_versions::{
17    ExchangeCommandVersionsRequest, ExchangeCommandVersionsResponse,
18};
19use tokio::io::AsyncRead;
20use tokio::io::AsyncWrite;
21use tokio::io::ReadBuf;
22use tokio::sync::RwLock;
23use tokio::{net::TcpStream, sync::Notify};
24use tokio_rustls::client::TlsStream;
25
26use tokio_util::codec::Framed;
27use tracing::{trace, warn};
28
29use crate::{error::ClientError, RabbitMQStreamResult};
30pub use message::ClientMessage;
31pub use metadata::{Broker, StreamMetadata};
32pub use metrics::MetricsCollector;
33pub use options::{ClientOptions, TlsConfiguration, TlsConfigurationBuilder};
34use rabbitmq_stream_protocol::{
35    commands::{
36        close::{CloseRequest, CloseResponse},
37        consumer_update_request::ConsumerUpdateRequestCommand,
38        create_stream::CreateStreamCommand,
39        create_super_stream::CreateSuperStreamCommand,
40        credit::CreditCommand,
41        declare_publisher::DeclarePublisherCommand,
42        delete::Delete,
43        delete_publisher::DeletePublisherCommand,
44        delete_super_stream::DeleteSuperStreamCommand,
45        generic::GenericResponse,
46        heart_beat::HeartBeatCommand,
47        metadata::MetadataCommand,
48        open::{OpenCommand, OpenResponse},
49        peer_properties::{PeerPropertiesCommand, PeerPropertiesResponse},
50        publish::PublishCommand,
51        query_offset::{QueryOffsetRequest, QueryOffsetResponse},
52        query_publisher_sequence::{QueryPublisherRequest, QueryPublisherResponse},
53        sasl_authenticate::SaslAuthenticateCommand,
54        sasl_handshake::{SaslHandshakeCommand, SaslHandshakeResponse},
55        store_offset::StoreOffset,
56        subscribe::{OffsetSpecification, SubscribeCommand},
57        superstream_partitions::SuperStreamPartitionsRequest,
58        superstream_partitions::SuperStreamPartitionsResponse,
59        superstream_route::SuperStreamRouteRequest,
60        superstream_route::SuperStreamRouteResponse,
61        tune::TunesCommand,
62        unsubscribe::UnSubscribeCommand,
63    },
64    types::PublishedMessage,
65    FromResponse, Request, Response, ResponseCode, ResponseKind,
66};
67
68pub use self::handler::{MessageHandler, MessageResult};
69use self::{
70    channel::{channel, ChannelReceiver, ChannelSender},
71    codec::RabbitMqStreamCodec,
72    dispatcher::Dispatcher,
73    message::BaseMessage,
74};
75
76mod channel;
77mod codec;
78mod dispatcher;
79mod handler;
80mod message;
81mod metadata;
82mod metrics;
83mod options;
84mod task;
85
86#[pin_project(project = StreamProj)]
87#[derive(Debug)]
88pub enum GenericTcpStream {
89    Tcp(#[pin] TcpStream),
90    SecureTcp(#[pin] Box<TlsStream<TcpStream>>),
91}
92
93impl AsyncRead for GenericTcpStream {
94    fn poll_read(
95        self: Pin<&mut Self>,
96        cx: &mut Context<'_>,
97        buf: &mut ReadBuf<'_>,
98    ) -> Poll<io::Result<()>> {
99        match self.project() {
100            StreamProj::Tcp(tcp_stream) => tcp_stream.poll_read(cx, buf),
101            StreamProj::SecureTcp(tls_stream) => tls_stream.poll_read(cx, buf),
102        }
103    }
104}
105
106impl AsyncWrite for GenericTcpStream {
107    fn poll_write(
108        self: Pin<&mut Self>,
109        cx: &mut Context<'_>,
110        buf: &[u8],
111    ) -> Poll<io::Result<usize>> {
112        match self.project() {
113            StreamProj::Tcp(tcp_stream) => tcp_stream.poll_write(cx, buf),
114            StreamProj::SecureTcp(tls_stream) => tls_stream.poll_write(cx, buf),
115        }
116    }
117
118    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
119        match self.project() {
120            StreamProj::Tcp(tcp_stream) => tcp_stream.poll_flush(cx),
121            StreamProj::SecureTcp(tls_stream) => tls_stream.poll_flush(cx),
122        }
123    }
124
125    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
126        match self.project() {
127            StreamProj::Tcp(tcp_stream) => tcp_stream.poll_shutdown(cx),
128            StreamProj::SecureTcp(tls_stream) => tls_stream.poll_shutdown(cx),
129        }
130    }
131}
132
133type SinkConnection = SplitSink<Framed<GenericTcpStream, RabbitMqStreamCodec>, Request>;
134type StreamConnection = SplitStream<Framed<GenericTcpStream, RabbitMqStreamCodec>>;
135
136pub struct ClientState {
137    server_properties: HashMap<String, String>,
138    connection_properties: HashMap<String, String>,
139    handler: Option<Arc<dyn MessageHandler>>,
140    heartbeat: u32,
141    max_frame_size: u32,
142    last_heatbeat: Instant,
143    heartbeat_task: Option<task::TaskHandle>,
144    last_received_message: Arc<RwLock<Instant>>,
145}
146
147/// Raw API for taking to RabbitMQ stream
148///
149/// For high level APIs check [`crate::Environment`]
150#[derive(Clone)]
151pub struct Client {
152    dispatcher: Dispatcher<Client>,
153    channel: Arc<ChannelSender<SinkConnection>>,
154    state: Arc<RwLock<ClientState>>,
155    opts: ClientOptions,
156    tune_notifier: Arc<Notify>,
157    publish_sequence: Arc<AtomicU64>,
158    filtering_supported: bool,
159    client_properties: HashMap<String, String>,
160}
161
162impl Client {
163    pub async fn connect(opts: impl Into<ClientOptions>) -> Result<Client, ClientError> {
164        let broker = opts.into();
165
166        let (sender, receiver) = Client::create_connection(&broker).await?;
167
168        let last_received_message = Arc::new(RwLock::new(Instant::now()));
169
170        let dispatcher = Dispatcher::new();
171        let state = ClientState {
172            server_properties: HashMap::new(),
173            connection_properties: HashMap::new(),
174            handler: None,
175            heartbeat: broker.heartbeat,
176            max_frame_size: broker.max_frame_size,
177            last_heatbeat: Instant::now(),
178            heartbeat_task: None,
179            last_received_message: last_received_message.clone(),
180        };
181        let mut client = Client {
182            dispatcher,
183            opts: broker,
184            channel: Arc::new(sender),
185            state: Arc::new(RwLock::new(state)),
186            tune_notifier: Arc::new(Notify::new()),
187            publish_sequence: Arc::new(AtomicU64::new(1)),
188            filtering_supported: false,
189            client_properties: HashMap::new(),
190        };
191
192        const VERSION: &str = env!("CARGO_PKG_VERSION");
193
194        client
195            .client_properties
196            .insert(String::from("product"), String::from("RabbitMQ"));
197        client
198            .client_properties
199            .insert(String::from("version"), String::from(VERSION));
200        client
201            .client_properties
202            .insert(String::from("platform"), String::from("Rust"));
203        client.client_properties.insert(
204            String::from("copyright"),
205            String::from("Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries."));
206        client.client_properties.insert(
207            String::from("information"),
208            String::from(
209                "Licensed under the Apache 2.0 and MPL 2.0 licenses. See https://www.rabbitmq.com/",
210            ),
211        );
212        client.client_properties.insert(
213            String::from("connection_name"),
214            client.opts.client_provided_name.clone(),
215        );
216
217        client.initialize(receiver).await?;
218
219        let command_versions = client.exchange_command_versions().await?;
220        let (_, max_version) = command_versions.key_version(2);
221        if max_version >= 2 {
222            client.filtering_supported = true
223        }
224        Ok(client)
225    }
226
227    /// Get client's server properties.
228    pub async fn server_properties(&self) -> HashMap<String, String> {
229        self.state.read().await.server_properties.clone()
230    }
231
232    /// Get client's connection properties.
233    pub async fn connection_properties(&self) -> HashMap<String, String> {
234        self.state.read().await.connection_properties.clone()
235    }
236
237    pub async fn set_handler<H: MessageHandler>(&self, handler: H) {
238        let mut state = self.state.write().await;
239
240        state.handler = Some(Arc::new(handler));
241    }
242
243    pub fn is_closed(&self) -> bool {
244        self.channel.is_closed()
245    }
246
247    pub async fn close(&self) -> RabbitMQStreamResult<()> {
248        if self.is_closed() {
249            return Err(ClientError::AlreadyClosed);
250        }
251        let _: CloseResponse = self
252            .send_and_receive(|correlation_id| {
253                CloseRequest::new(correlation_id, ResponseCode::Ok, "Ok".to_owned())
254            })
255            .await?;
256
257        let mut state = self.state.write().await;
258        // This stop the tokio task that performs heartbeats
259        state.heartbeat_task.take();
260        drop(state);
261
262        self.force_drop_connection().await
263    }
264
265    async fn force_drop_connection(&self) -> RabbitMQStreamResult<()> {
266        self.channel.close().await
267    }
268
269    pub async fn subscribe(
270        &self,
271        subscription_id: u8,
272        stream: &str,
273        offset_specification: OffsetSpecification,
274        credit: u16,
275        properties: HashMap<String, String>,
276    ) -> RabbitMQStreamResult<GenericResponse> {
277        self.send_and_receive(|correlation_id| {
278            SubscribeCommand::new(
279                correlation_id,
280                subscription_id,
281                stream.to_owned(),
282                offset_specification,
283                credit,
284                properties,
285            )
286        })
287        .await
288    }
289
290    pub async fn unsubscribe(&self, subscription_id: u8) -> RabbitMQStreamResult<GenericResponse> {
291        self.send_and_receive(|correlation_id| {
292            UnSubscribeCommand::new(correlation_id, subscription_id)
293        })
294        .await
295    }
296
297    pub async fn partitions(
298        &self,
299        super_stream: String,
300    ) -> RabbitMQStreamResult<SuperStreamPartitionsResponse> {
301        self.send_and_receive(|correlation_id| {
302            SuperStreamPartitionsRequest::new(correlation_id, super_stream)
303        })
304        .await
305    }
306
307    pub async fn route(
308        &self,
309        routing_key: String,
310        super_stream: String,
311    ) -> RabbitMQStreamResult<SuperStreamRouteResponse> {
312        self.send_and_receive(|correlation_id| {
313            SuperStreamRouteRequest::new(correlation_id, routing_key, super_stream)
314        })
315        .await
316    }
317
318    pub async fn create_stream(
319        &self,
320        stream: &str,
321        options: HashMap<String, String>,
322    ) -> RabbitMQStreamResult<GenericResponse> {
323        self.send_and_receive(|correlation_id| {
324            CreateStreamCommand::new(correlation_id, stream.to_owned(), options)
325        })
326        .await
327    }
328
329    pub async fn create_super_stream(
330        &self,
331        super_stream: &str,
332        partitions: Vec<String>,
333        binding_keys: Vec<String>,
334        options: HashMap<String, String>,
335    ) -> RabbitMQStreamResult<GenericResponse> {
336        self.send_and_receive(|correlation_id| {
337            CreateSuperStreamCommand::new(
338                correlation_id,
339                super_stream.to_owned(),
340                partitions,
341                binding_keys,
342                options,
343            )
344        })
345        .await
346    }
347
348    pub async fn delete_stream(&self, stream: &str) -> RabbitMQStreamResult<GenericResponse> {
349        self.send_and_receive(|correlation_id| Delete::new(correlation_id, stream.to_owned()))
350            .await
351    }
352
353    pub async fn delete_super_stream(
354        &self,
355        super_stream: &str,
356    ) -> RabbitMQStreamResult<GenericResponse> {
357        self.send_and_receive(|correlation_id| {
358            DeleteSuperStreamCommand::new(correlation_id, super_stream.to_owned())
359        })
360        .await
361    }
362
363    pub async fn credit(&self, subscription_id: u8, credit: u16) -> RabbitMQStreamResult<()> {
364        self.send(CreditCommand::new(subscription_id, credit)).await
365    }
366
367    pub async fn metadata(
368        &self,
369        streams: Vec<String>,
370    ) -> RabbitMQStreamResult<HashMap<String, StreamMetadata>> {
371        self.send_and_receive(|correlation_id| MetadataCommand::new(correlation_id, streams))
372            .await
373            .map(metadata::from_response)
374    }
375
376    pub async fn store_offset(
377        &self,
378        reference: &str,
379        stream: &str,
380        offset: u64,
381    ) -> RabbitMQStreamResult<()> {
382        self.send(StoreOffset::new(
383            reference.to_owned(),
384            stream.to_owned(),
385            offset,
386        ))
387        .await
388    }
389
390    pub async fn query_offset(&self, reference: String, stream: &str) -> Result<u64, ClientError> {
391        let response = self
392            .send_and_receive::<QueryOffsetResponse, _, _>(|correlation_id| {
393                QueryOffsetRequest::new(correlation_id, reference, stream.to_owned())
394            })
395            .await?;
396
397        if !response.is_ok() {
398            Err(ClientError::RequestError(response.code().clone()))
399        } else {
400            Ok(response.from_response())
401        }
402    }
403
404    pub async fn declare_publisher(
405        &self,
406        publisher_id: u8,
407        publisher_reference: Option<String>,
408        stream: &str,
409    ) -> RabbitMQStreamResult<GenericResponse> {
410        self.send_and_receive(|correlation_id| {
411            DeclarePublisherCommand::new(
412                correlation_id,
413                publisher_id,
414                publisher_reference,
415                stream.to_owned(),
416            )
417        })
418        .await
419    }
420
421    pub async fn delete_publisher(
422        &self,
423        publisher_id: u8,
424    ) -> RabbitMQStreamResult<GenericResponse> {
425        self.send_and_receive(|correlation_id| {
426            DeletePublisherCommand::new(correlation_id, publisher_id)
427        })
428        .await
429    }
430
431    pub async fn publish<T: BaseMessage>(
432        &self,
433        publisher_id: u8,
434        messages: impl Into<Vec<T>>,
435        version: u16,
436    ) -> RabbitMQStreamResult<Vec<u64>> {
437        let messages: Vec<PublishedMessage> = messages
438            .into()
439            .into_iter()
440            .map(|message| {
441                let publishing_id: u64 = message
442                    .publishing_id()
443                    .unwrap_or_else(|| self.publish_sequence.fetch_add(1, Ordering::Relaxed));
444                let filter_value = message.filter_value();
445                PublishedMessage::new(publishing_id, message.to_message(), filter_value)
446            })
447            .collect();
448        let sequences = messages
449            .iter()
450            .map(rabbitmq_stream_protocol::types::PublishedMessage::publishing_id)
451            .collect();
452        let len = messages.len();
453
454        // TODO batch publish with max frame size check
455        self.send(PublishCommand::new(publisher_id, messages, version))
456            .await?;
457
458        self.opts.collector.publish(len as u64).await;
459
460        Ok(sequences)
461    }
462
463    pub async fn query_publisher_sequence(
464        &self,
465        reference: &str,
466        stream: &str,
467    ) -> Result<u64, ClientError> {
468        self.send_and_receive::<QueryPublisherResponse, _, _>(|correlation_id| {
469            QueryPublisherRequest::new(correlation_id, reference.to_owned(), stream.to_owned())
470        })
471        .await
472        .map(|sequence| sequence.from_response())
473    }
474
475    pub async fn exchange_command_versions(
476        &self,
477    ) -> RabbitMQStreamResult<ExchangeCommandVersionsResponse> {
478        self.send_and_receive::<ExchangeCommandVersionsResponse, _, _>(|correlation_id| {
479            ExchangeCommandVersionsRequest::new(correlation_id, vec![])
480        })
481        .await
482    }
483
484    pub fn filtering_supported(&self) -> bool {
485        self.filtering_supported
486    }
487
488    /// Don't use this method in production code.
489    pub async fn set_heartbeat(&self, heartbeat: u32) {
490        let mut state = self.state.write().await;
491        state.heartbeat = heartbeat;
492        // Eventually, this drops the previous heartbeat task
493        state.heartbeat_task =
494            self.start_hearbeat_task(heartbeat, state.last_received_message.clone());
495    }
496
497    async fn create_connection(
498        broker: &ClientOptions,
499    ) -> Result<
500        (
501            ChannelSender<SinkConnection>,
502            ChannelReceiver<StreamConnection>,
503        ),
504        ClientError,
505    > {
506        let stream = broker.build_generic_tcp_stream().await?;
507        let stream = Framed::new(stream, RabbitMqStreamCodec {});
508
509        let (sink, stream) = stream.split();
510        let (tx, rx) = channel(sink, stream);
511
512        Ok((tx, rx))
513    }
514
515    async fn initialize<T>(&mut self, receiver: ChannelReceiver<T>) -> Result<(), ClientError>
516    where
517        T: Stream<Item = Result<Response, ClientError>> + Unpin + Send,
518        T: 'static,
519    {
520        self.dispatcher.set_handler(self.clone()).await;
521        self.dispatcher.start(receiver).await;
522
523        self.with_state_lock(self.peer_properties(), move |state, server_properties| {
524            state.server_properties = server_properties;
525        })
526        .await?;
527        self.authenticate().await?;
528
529        self.wait_for_tune_data().await?;
530
531        self.with_state_lock(self.open(), |state, connection_properties| {
532            state.connection_properties = connection_properties;
533        })
534        .await?;
535
536        // Start heartbeat task after connection is established
537        let mut state = self.state.write().await;
538        state.heartbeat_task =
539            self.start_hearbeat_task(state.heartbeat, state.last_received_message.clone());
540        drop(state);
541
542        Ok(())
543    }
544
545    async fn with_state_lock<T>(
546        &self,
547        task: impl Future<Output = RabbitMQStreamResult<T>>,
548        mut updater: impl FnMut(&mut ClientState, T),
549    ) -> RabbitMQStreamResult<()> {
550        let result = task.await?;
551
552        let mut state = self.state.write().await;
553
554        updater(&mut state, result);
555
556        Ok(())
557    }
558
559    fn negotiate_value(&self, client: u32, server: u32) -> u32 {
560        match (client, server) {
561            (client, server) if client == 0 || server == 0 => client.max(server),
562            (client, server) => client.min(server),
563        }
564    }
565
566    async fn wait_for_tune_data(&mut self) -> Result<(), ClientError> {
567        self.tune_notifier.notified().await;
568        Ok(())
569    }
570
571    async fn authenticate(&self) -> Result<(), ClientError> {
572        self.sasl_mechanism()
573            .and_then(|mechanisms| self.handle_authentication(mechanisms))
574            .await
575    }
576
577    async fn handle_authentication(&self, _mechanism: Vec<String>) -> Result<(), ClientError> {
578        let auth_data = format!("\u{0000}{}\u{0000}{}", self.opts.user, self.opts.password);
579
580        let response = self
581            .send_and_receive::<GenericResponse, _, _>(|correlation_id| {
582                SaslAuthenticateCommand::new(
583                    correlation_id,
584                    "PLAIN".to_owned(),
585                    auth_data.as_bytes().to_vec(),
586                )
587            })
588            .await?;
589
590        if response.is_ok() {
591            Ok(())
592        } else {
593            Err(ClientError::RequestError(response.code().clone()))
594        }
595    }
596
597    async fn sasl_mechanism(&self) -> Result<Vec<String>, ClientError> {
598        self.send_and_receive::<SaslHandshakeResponse, _, _>(|correlation_id| {
599            SaslHandshakeCommand::new(correlation_id)
600        })
601        .await
602        .map(|handshake| handshake.mechanisms)
603    }
604
605    async fn send_and_receive<T, R, M>(&self, msg_factory: M) -> Result<T, ClientError>
606    where
607        R: Into<Request>,
608        T: FromResponse,
609        M: FnOnce(u32) -> R,
610    {
611        let Some((correlation_id, mut receiver)) = self.dispatcher.response_channel() else {
612            trace!("Connection is closed here");
613            return Err(ClientError::ConnectionClosed);
614        };
615
616        self.channel
617            .send(msg_factory(correlation_id).into())
618            .await?;
619
620        let response = receiver.recv().await.ok_or(ClientError::ConnectionClosed)?;
621
622        self.handle_response::<T>(response).await
623    }
624
625    async fn send<R>(&self, msg: R) -> Result<(), ClientError>
626    where
627        R: Into<Request>,
628    {
629        self.channel.send(msg.into()).await?;
630        Ok(())
631    }
632
633    async fn handle_response<T: FromResponse>(&self, response: Response) -> Result<T, ClientError> {
634        response.get::<T>().ok_or_else(|| {
635            ClientError::CastError(format!(
636                "Cannot cast response to {}",
637                std::any::type_name::<T>()
638            ))
639        })
640    }
641
642    async fn open(&self) -> Result<HashMap<String, String>, ClientError> {
643        self.send_and_receive::<OpenResponse, _, _>(|correlation_id| {
644            OpenCommand::new(correlation_id, self.opts.v_host.clone())
645        })
646        .await
647        .and_then(|open| {
648            if open.is_ok() {
649                Ok(open.connection_properties)
650            } else {
651                Err(ClientError::RequestError(open.code().clone()))
652            }
653        })
654    }
655
656    async fn peer_properties(&self) -> Result<HashMap<String, String>, ClientError> {
657        self.send_and_receive::<PeerPropertiesResponse, _, _>(|correlation_id| {
658            PeerPropertiesCommand::new(correlation_id, self.client_properties.clone())
659        })
660        .await
661        .map(|peer_properties| peer_properties.server_properties)
662    }
663
664    async fn handle_tune_command(&self, tunes: &TunesCommand) {
665        let mut state = self.state.write().await;
666        state.heartbeat = self.negotiate_value(self.opts.heartbeat, tunes.heartbeat);
667        state.max_frame_size = self.negotiate_value(self.opts.max_frame_size, tunes.max_frame_size);
668
669        let heart_beat = state.heartbeat;
670        let max_frame_size = state.max_frame_size;
671
672        trace!(
673            "Handling tune with frame size {} and heartbeat {}",
674            max_frame_size,
675            heart_beat
676        );
677
678        if state.heartbeat_task.take().is_some() {
679            // Start heartbeat task after connection is established
680            state.heartbeat_task =
681                self.start_hearbeat_task(state.heartbeat, state.last_received_message.clone());
682        }
683
684        drop(state);
685
686        let _ = self
687            .channel
688            .send(TunesCommand::new(max_frame_size, heart_beat).into())
689            .await;
690
691        self.tune_notifier.notify_one();
692    }
693
694    fn start_hearbeat_task(
695        &self,
696        heartbeat: u32,
697        last_received_message: Arc<RwLock<Instant>>,
698    ) -> Option<task::TaskHandle> {
699        if heartbeat == 0 {
700            return None;
701        }
702        let heartbeat_interval = (heartbeat / 2).max(1);
703        let channel = self.channel.clone();
704
705        let client = self.clone();
706
707        let heartbeat_task: task::TaskHandle = tokio::spawn(async move {
708            let timeout_threashold = u64::from(heartbeat * 4);
709
710            loop {
711                trace!("Sending heartbeat");
712                if channel
713                    .send(HeartBeatCommand::default().into())
714                    .await
715                    .is_err()
716                {
717                    break;
718                }
719                tokio::time::sleep(Duration::from_secs(heartbeat_interval.into())).await;
720
721                let now = Instant::now();
722                let last_message = last_received_message.read().await;
723                if now.duration_since(*last_message) >= Duration::from_secs(timeout_threashold) {
724                    warn!("Heartbeat timeout reached. Force closing connection.");
725                    if !client.is_closed() {
726                        if let Err(e) = client.close().await {
727                            warn!("Error closing client: {}", e);
728                        }
729                    }
730                    break;
731                }
732            }
733
734            warn!("Heartbeat task stopped. Force closing connection");
735        })
736        .into();
737
738        Some(heartbeat_task)
739    }
740
741    async fn handle_heart_beat_command(&self) {
742        trace!("Received heartbeat");
743        let mut state = self.state.write().await;
744        state.last_heatbeat = Instant::now();
745    }
746
747    pub async fn consumer_update(
748        &self,
749        correlation_id: u32,
750        offset_specification: OffsetSpecification,
751    ) -> RabbitMQStreamResult<GenericResponse> {
752        self.send_and_receive(|_| {
753            ConsumerUpdateRequestCommand::new(correlation_id, 1, offset_specification)
754        })
755        .await
756    }
757}
758
759#[async_trait::async_trait]
760impl MessageHandler for Client {
761    async fn handle_message(&self, item: MessageResult) -> RabbitMQStreamResult<()> {
762        match &item {
763            Some(Ok(response)) => {
764                // Update last received message time: needed for heartbeat task
765                {
766                    let s = self.state.read().await;
767                    let mut last_received_message = s.last_received_message.write().await;
768                    *last_received_message = Instant::now();
769                    drop(last_received_message);
770                    drop(s);
771                }
772
773                match response.kind_ref() {
774                    ResponseKind::Tunes(tune) => self.handle_tune_command(tune).await,
775                    ResponseKind::Heartbeat(_) => self.handle_heart_beat_command().await,
776                    _ => {
777                        if let Some(handler) = self.state.read().await.handler.as_ref() {
778                            let handler = handler.clone();
779
780                            tokio::task::spawn(async move { handler.handle_message(item).await });
781                        }
782                    }
783                }
784            }
785            Some(Err(err)) => {
786                trace!(?err);
787                if let Some(handler) = self.state.read().await.handler.as_ref() {
788                    let handler = handler.clone();
789
790                    tokio::task::spawn(async move { handler.handle_message(item).await });
791                }
792            }
793            None => {
794                trace!("Closing client");
795                if let Some(handler) = self.state.read().await.handler.as_ref() {
796                    let handler = handler.clone();
797                    tokio::task::spawn(async move { handler.handle_message(None).await });
798                }
799            }
800        }
801
802        Ok(())
803    }
804}