strut_rabbitmq/transport/inbound/
subscriber.rs

1use crate::transport::inbound::envelope::DecoderError;
2use crate::util::Push;
3use crate::{
4    Connector, Decoder, Envelope, ExchangeKind, Gateway, Handle, Header, Ingress, NoopDecoder,
5    StringDecoder,
6};
7use futures::StreamExt;
8use lapin::message::Delivery;
9use lapin::options::{
10    BasicConsumeOptions, BasicQosOptions, ExchangeDeclareOptions, QueueBindOptions,
11    QueueDeclareOptions,
12};
13use lapin::types::FieldTable;
14use lapin::{
15    Channel, Consumer as LapinConsumer, Error as LapinError, Queue as LapinQueue,
16    Result as LapinResult,
17};
18use nonempty::NonEmpty;
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21use strut_util::Backoff;
22use thiserror::Error;
23use tokio::select;
24use tokio::sync::{Mutex as AsyncMutex, MutexGuard, Notify};
25use tracing::{debug, error, warn};
26
27/// Shorthand for a [`Subscriber`] that does not decode consumed messages.
28pub type UndecodedSubscriber = Subscriber<(), NoopDecoder>;
29
30/// Shorthand for a [`Subscriber`] that decodes messages into [`String`]s.
31pub type StringSubscriber = Subscriber<String, StringDecoder>;
32
33/// Shorthand for a [`Subscriber`] that decodes messages using
34/// [`JsonDecoder`](crate::JsonDecoder).
35#[cfg(feature = "json")]
36pub type JsonSubscriber<T> = Subscriber<T, crate::JsonDecoder<T>>;
37
38/// Receives incoming [`Envelope`]s from the RabbitMQ cluster, passing them
39/// through a pre-set [`Decoder`] before returning to the caller.
40pub struct Subscriber<T, D>
41where
42    D: Decoder<Result = T>,
43{
44    name: Arc<str>,
45    ingress: Ingress,
46    gateway: Gateway,
47    consumer: AsyncMutex<Option<LapinConsumer>>,
48    decoder: D,
49    batch_tail_size: usize,
50}
51
52/// Represents the outcome of polling a single message from a [`LapinConsumer`].
53enum PollOutcome<T> {
54    /// Successfully polled and decoded an [`Envelope`].
55    Envelope(Envelope<T>),
56    /// A [`LapinError`] delivered from the [`LapinConsumer`].
57    ConsumerError,
58    /// A message is successfully polled, but could not be decoded.
59    Gibberish,
60    /// The [`LapinConsumer`] is permanently out of messages and cannot be used
61    /// any further.
62    OutOfMessages,
63}
64
65/// Represents the outcome of assembling a batch of incoming messages.
66enum BatchState {
67    /// Keep polling.
68    InProgress,
69    /// Polled enough messages.
70    Completed,
71    /// Timed out: no more.
72    TimedOut,
73    /// [`LapinConsumer`] signaled it’s out of messages.
74    DriedOut,
75}
76
77/// Represents failure to issue at least one of the declarations that are
78/// required before the subscriber can start consuming messages.
79///
80/// The following types of declarations can potentially fail:
81///
82/// - Declare an exchange.
83/// - Declare a queue.
84/// - Bind a queue to an exchange.
85///
86/// The declarations are based on the [`Ingress`] definition.
87#[derive(Error, Debug)]
88#[error("failed to issue RabbitMQ declarations from the subscriber '{subscriber}': {error}")]
89pub struct DeclarationError {
90    subscriber: String,
91    error: String,
92}
93
94impl<T, D> Subscriber<T, D>
95where
96    D: Decoder<Result = T>,
97{
98    /// Creates and returns a new [`Subscriber`] for the given [`Ingress`] and
99    /// [`Decoder`].
100    pub fn new(gateway: Gateway, ingress: Ingress, decoder: D) -> Self {
101        let name = Self::compose_name(&ingress);
102        let consumer = AsyncMutex::new(None);
103        let batch_tail_size = usize::from(ingress.batch_size()) - 1;
104
105        Self {
106            name,
107            ingress,
108            gateway,
109            consumer,
110            decoder,
111            batch_tail_size,
112        }
113    }
114
115    /// Starts a new [`Connector`] with the given [`Handle`] and uses it to create
116    /// and return a new [`Subscriber`] for the given [`Ingress`] and [`Decoder`].
117    pub fn start(handle: impl AsRef<Handle>, ingress: Ingress, decoder: D) -> Self {
118        let gateway = Connector::start(handle);
119
120        Self::new(gateway, ingress, decoder)
121    }
122
123    /// Composes a globally unique, human-readable name for this [`Subscriber`].
124    fn compose_name(ingress: &Ingress) -> Arc<str> {
125        static COUNTER: AtomicUsize = AtomicUsize::new(0);
126
127        Arc::from(format!(
128            "rabbitmq:sub:{}:{}",
129            ingress.name(),
130            COUNTER.fetch_add(1, Ordering::Relaxed),
131        ))
132    }
133}
134
135impl<T, D> Subscriber<T, D>
136where
137    D: Decoder<Result = T>,
138{
139    /// Reports the name of this [`Subscriber`].
140    pub fn name(&self) -> &str {
141        &self.name
142    }
143}
144
145impl Subscriber<(), NoopDecoder> {
146    /// A shorthand for calling [`new`](Subscriber::new) with a [`NoopDecoder`].
147    pub fn new_undecoded(gateway: Gateway, ingress: Ingress) -> Self {
148        Self::new(gateway, ingress, NoopDecoder)
149    }
150
151    /// A shorthand for calling [`start`](Subscriber::start) with a
152    /// [`NoopDecoder`].
153    pub fn start_undecoded(handle: &Handle, ingress: Ingress) -> Self {
154        Self::start(handle, ingress, NoopDecoder)
155    }
156}
157
158#[cfg(feature = "json")]
159impl<T> Subscriber<T, crate::JsonDecoder<T>>
160where
161    T: serde::de::DeserializeOwned,
162{
163    /// A shorthand for calling [`new`](Subscriber::new) with a
164    /// [`JsonDecoder`](crate::JsonDecoder).
165    pub fn new_json(gateway: Gateway, ingress: Ingress) -> Self {
166        Self::new(gateway, ingress, crate::JsonDecoder::default())
167    }
168
169    /// A shorthand for calling [`start`](Subscriber::start) with a
170    /// [`JsonDecoder`](crate::JsonDecoder).
171    pub fn start_json(handle: &Handle, ingress: Ingress) -> Self {
172        Self::start(handle, ingress, crate::JsonDecoder::default())
173    }
174}
175
176impl<T, D> Subscriber<T, D>
177where
178    D: Decoder<Result = T>,
179{
180    /// Waits for the connection to RabbitMQ to become available, then issues
181    /// the declarations necessary for consuming messages with the [`Ingress`]
182    /// configured on this subscriber. The declarations include declaring an
183    /// exchange (if not a built-in exchange), declaring a queue, and binding
184    /// the queue to the exchange in some way. Such declarations are repeatable
185    /// (assuming the configuration options don’t change), so it shouldn’t hurt
186    /// to call this method any number of times.
187    ///
188    /// If and when this method returns [`Ok`], it can be reasonably expected
189    /// that the following calls to [`receive`](Subscriber::receive) or
190    /// [`receive_many`](Subscriber::receive_many) will be able to eventually
191    /// deliver incoming messages, assuming the connectivity to RabbitMQ
192    /// remains.
193    ///
194    /// If any of the declarations fail (e.g., a queue by that name already
195    /// exists with different configuration), this method returns a
196    /// [`DeclarationError`].
197    pub async fn try_declare(&self) -> Result<(), DeclarationError> {
198        // Patiently wait for a fresh channel
199        let channel = self.gateway.channel().await;
200
201        // Try to issue declarations (exchange, queue, bindings)
202        match self.issue_declarations(&channel).await {
203            // Success: queue is irrelevant, we just return
204            Ok(_queue) => Ok(()),
205
206            // Error: return the error
207            Err(error) => Err(DeclarationError::new(self.name.as_ref(), error)),
208        }
209    }
210
211    /// Repeatedly calls [`try_declare`](Subscriber::try_declare) until it
212    /// succeeds, with an exponential backoff.
213    ///
214    /// Most declaration errors can only be fixed outside the application, by
215    /// changing the broker configuration (e.g., deleting a conflicting queue).
216    /// In such cases, this method may be used, to keep the subscriber spinning
217    /// (and alerting about the declaration failure) until the issue is fixed
218    /// externally, at which point the declarations will eventually succeed, and
219    /// this method will return.
220    pub async fn declare(&self) {
221        // Prepare to backoff
222        let backoff = Backoff::default();
223
224        loop {
225            match self.try_declare().await {
226                // Success: we just return
227                Ok(()) => return,
228
229                // Error: alert and wait a bit
230                Err(error) => {
231                    warn!(
232                        alert = true,
233                        subscriber = self.name.as_ref(),
234                        ?error,
235                        error_message = %error,
236                        "Failed to declare an exchange or a queue",
237                    );
238
239                    backoff.sleep_next().await;
240                }
241            }
242        }
243    }
244
245    /// Receives a single, decode-able message from the broker. Will wait as
246    /// long as it takes for the first decode-able message to arrive.
247    pub async fn receive(&self) -> Envelope<T> {
248        // Grab the consumer (keep the guard until we return)
249        let (mut consumer_guard, mut consumer) = self.grab_consumer().await;
250
251        // Poll until an envelope is received
252        let envelope = self.poll(&mut consumer).await;
253
254        // Put the consumer back under lock
255        *consumer_guard = Some(consumer);
256
257        envelope
258    }
259
260    /// Receives a batch of up to [batch_size](Ingress::batch_size) of
261    /// decode-able messages from the broker. Will wait as long as it takes for
262    /// the first decode-able message to arrive, after which will take no longer
263    /// than [`BATCH_TIMEOUT`] to complete the batch before returning. The final
264    /// batch will thus always contain at least one message.
265    pub async fn receive_many(&self) -> NonEmpty<Envelope<T>> {
266        // Grab the consumer (keep the guard until the batch is assembled)
267        let (mut consumer_guard, mut consumer) = self.grab_consumer().await;
268
269        // Poll the head of the batch (the first envelope)
270        let batch_head = self.poll(&mut consumer).await;
271
272        // Now that we’ve got the first message of the batch, set a timeout for receiving the full batch
273
274        // Create the notification mechanism for the batch timer
275        let notify_in = Arc::new(Notify::new());
276        let notify_out = Arc::clone(&notify_in);
277
278        // Start the batch timer (within this time limit additional messages may fall into the same batch)
279        let batch_timeout = self.ingress.batch_timeout();
280        let handle = tokio::spawn(async move {
281            tokio::time::sleep(batch_timeout).await;
282            notify_in.notify_one();
283        });
284
285        // Prepare storage
286        let mut batch_tail = Vec::with_capacity(self.batch_tail_size);
287
288        // Complete batch within timeout
289        let batch_state = self
290            .complete_batch(&mut consumer, &mut batch_tail, notify_out)
291            .await;
292
293        // Check if the batch state spells success
294        if batch_state.represents_healthy_consumer() {
295            // Put the consumer back and release the lock
296            *consumer_guard = Some(consumer);
297        }
298
299        // Drop the consumer guard
300        drop(consumer_guard);
301
302        // Whether or not the timer completed, we don’t need it anymore
303        handle.abort();
304
305        NonEmpty::from((batch_head, batch_tail))
306    }
307}
308
309impl<T, D> Subscriber<T, D>
310where
311    D: Decoder<Result = T>,
312{
313    /// Infinitely polls the given [`LapinConsumer`] until it yields a
314    /// decode-able [`Envelope`], then returns it. This method will re-fetch the
315    /// consumer as many times as needed.
316    async fn poll(&self, consumer: &mut LapinConsumer) -> Envelope<T> {
317        // Keep trying until we poll the first message
318        loop {
319            // Try to poll a message
320            let outcome = self.try_poll(consumer).await;
321
322            // Check if we have a good envelope
323            if let PollOutcome::Envelope(envelope) = outcome {
324                // Good envelope: return
325                return envelope;
326            }
327
328            // No luck with last outcome: go toward retrying
329
330            // If the consumer dried out, re-fetch it
331            if outcome.represents_empty_consumer() {
332                *consumer = self.fetch_consumer().await;
333            }
334        }
335    }
336
337    /// Completes the given batch tail within the given timeout by repeatedly
338    /// polling the given consumer.
339    async fn complete_batch(
340        &self,
341        consumer: &mut LapinConsumer,
342        batch_tail: &mut Vec<Envelope<T>>,
343        timeout: Arc<Notify>,
344    ) -> BatchState {
345        // Start collecting additional messages into the batch (within both the time and the count limits)
346        while batch_tail.len() < self.batch_tail_size {
347            let state = select! {
348                biased;
349                _ = timeout.notified() => BatchState::TimedOut,
350                outcome = self.try_poll(consumer) => self.receive_outcome(outcome, batch_tail),
351            };
352
353            match state {
354                BatchState::InProgress => continue,
355                BatchState::Completed | BatchState::TimedOut | BatchState::DriedOut => {
356                    return state;
357                }
358            }
359        }
360
361        BatchState::Completed
362    }
363
364    /// Abstracts two asynchronous calls (next delivery from the consumer, and
365    /// unwrapping of the delivery) into a single asynchronous call for convenient
366    /// use in a `select` block.
367    async fn try_poll(&self, consumer: &mut LapinConsumer) -> PollOutcome<T> {
368        // Fetch and unwrap next delivery
369        self.unwrap_delivery(consumer.next().await).await
370    }
371
372    /// Pushes a [`PollOutcome`] onto the given batch, and translates that outcome
373    /// into the current [`BatchState`].
374    fn receive_outcome(&self, outcome: PollOutcome<T>, batch: &mut Vec<Envelope<T>>) -> BatchState {
375        match outcome {
376            PollOutcome::Envelope(envelope) => {
377                batch.push(envelope);
378                BatchState::InProgress
379            }
380            PollOutcome::OutOfMessages => BatchState::DriedOut,
381            PollOutcome::ConsumerError | PollOutcome::Gibberish => BatchState::InProgress,
382        }
383    }
384
385    /// Peels the layers off the given incoming delivery.
386    async fn unwrap_delivery(
387        &self,
388        option_delivery_result: Option<LapinResult<Delivery>>,
389    ) -> PollOutcome<T> {
390        // Unwrap the outer option
391        let delivery_result = match option_delivery_result {
392            Some(delivery_result) => delivery_result,
393            None => {
394                debug!(
395                    subscriber = self.name.as_ref(),
396                    "Ran out of messages on a RabbitMQ consumer",
397                );
398
399                return PollOutcome::OutOfMessages;
400            }
401        };
402
403        // Unwrap the inner result
404        let delivery = match delivery_result {
405            Ok(delivery) => delivery,
406            Err(error) => {
407                warn!(
408                    alert = true,
409                    subscriber = self.name.as_ref(),
410                    ?error,
411                    error_message = %error,
412                    "Received an error from a RabbitMQ consumer",
413                );
414
415                return PollOutcome::ConsumerError;
416            }
417        };
418
419        // Decode an envelope
420        let envelope_result = Envelope::try_from(
421            self.name.clone(),
422            &self.decoder,
423            delivery,
424            self.ingress.delivers_pending(),
425        );
426
427        // Inspect the result
428        match envelope_result {
429            Ok(envelope) => PollOutcome::Envelope(envelope),
430            Err(error) => {
431                self.discard_gibberish(error).await;
432
433                PollOutcome::Gibberish
434            }
435        }
436    }
437
438    /// Handles and discards the given un-decodable inbound message.
439    async fn discard_gibberish(&self, decoder_error: DecoderError<D>) {
440        // Destruct the decoder error
441        let DecoderError {
442            bytes,
443            mut acker,
444            error,
445        } = decoder_error;
446
447        // Report the un-decodable message
448        error!(
449            alert = true,
450            subscriber = self.name.as_ref(),
451            ?error,
452            error_message = %error,
453            byte_preview = String::from_utf8_lossy(&bytes).as_ref(),
454            "Failed to decode an inbound RabbitMQ message",
455        );
456
457        // Finalize the message
458        if let Some(acker) = acker.take() {
459            self.ingress
460                .gibberish_behavior()
461                .apply(self.name.as_ref(), &acker, &bytes)
462                .await;
463        }
464    }
465
466    /// Obtains the lock on the consumer and returns both the guard and the consumer
467    async fn grab_consumer(&self) -> (MutexGuard<'_, Option<LapinConsumer>>, LapinConsumer) {
468        // Obtain the consumer guard
469        let mut consumer_guard = self.consumer.lock().await;
470
471        // Either take the consumer or fetch a fresh one
472        let consumer = match consumer_guard.take() {
473            Some(consumer) => consumer,
474            None => self.fetch_consumer().await,
475        };
476
477        // Return the pair
478        (consumer_guard, consumer)
479    }
480
481    /// Fetches a fresh [`LapinConsumer`], building it from scratch on a fresh
482    /// [`Channel`].
483    async fn fetch_consumer(&self) -> LapinConsumer {
484        loop {
485            // Patiently wait for a fresh channel
486            let channel = self.gateway.channel().await;
487
488            // Try to build a consumer
489            match self.build_consumer(&channel).await {
490                // Successfully built a consumer: return
491                Ok(consumer) => return consumer,
492
493                // Failed to build a consumer for some reason: report and retry
494                Err(error) => {
495                    warn!(
496                        alert = true,
497                        subscriber = self.name.as_ref(),
498                        ?error,
499                        error_message = %error,
500                        "Failed to build a RabbitMQ message consumer",
501                    );
502                }
503            }
504        }
505    }
506
507    /// Builds a [`LapinConsumer`] on top of the given [`Channel`].
508    async fn build_consumer(&self, channel: &Channel) -> LapinResult<LapinConsumer> {
509        // Declare the RabbitMQ queue
510        let queue = self.issue_declarations(channel).await?;
511
512        // Initiate consuming of messages
513        channel
514            .basic_consume(
515                queue.name().as_str(),
516                &self.name,
517                BasicConsumeOptions {
518                    no_local: false,
519                    no_ack: self.ingress.no_ack(),
520                    exclusive: false,
521                    nowait: false,
522                },
523                FieldTable::default(),
524            )
525            .await
526    }
527
528    /// Declares the exchange (if necessary), the queue, and binds the queue to
529    /// the exchange.
530    async fn issue_declarations(&self, channel: &Channel) -> LapinResult<LapinQueue> {
531        // Set prefetch count on the channel if relevant
532        if let Some(prefetch_count) = self.ingress.prefetch_count() {
533            channel
534                .basic_qos(prefetch_count.into(), BasicQosOptions { global: false })
535                .await?;
536        }
537
538        // If the exchange is not built-in, declare it first
539        if self.ingress.exchange().is_custom() {
540            // Extract custom exchange reference for convenience
541            let exchange = self.ingress.exchange();
542
543            // Prepare args
544            let mut args = FieldTable::default();
545
546            // Check special exchange case
547            if exchange.kind() == ExchangeKind::HashId {
548                // For routing on message ID, we need a custom arg
549                args.push("hash-property", "message_id");
550            }
551
552            channel
553                .exchange_declare(
554                    exchange.name(),
555                    exchange.kind().lapin_value(),
556                    ExchangeDeclareOptions {
557                        passive: false,
558                        durable: exchange.durable(),
559                        auto_delete: exchange.auto_delete(),
560                        internal: false,
561                        nowait: false,
562                    },
563                    args,
564                )
565                .await?;
566        }
567
568        // Explicitly indicate desired queue kind
569        let mut args = FieldTable::default();
570        args.push("x-queue-type", self.ingress.queue().kind().rabbitmq_value());
571
572        // Declare the queue
573        let queue = channel
574            .queue_declare(
575                self.ingress.queue().name().as_ref(),
576                QueueDeclareOptions {
577                    passive: false,
578                    durable: self.ingress.durable(),
579                    exclusive: self.ingress.exclusive(),
580                    auto_delete: self.ingress.auto_delete(),
581                    nowait: false,
582                },
583                args,
584            )
585            .await?;
586
587        // Bind the queue to the exchange
588        self.bind_queue_to_exchange(channel, &queue).await?;
589
590        Ok(queue)
591    }
592
593    /// Binds the given [`LapinQueue`] to the exchange configured on this
594    /// subscriber’s [`Ingress`].
595    async fn bind_queue_to_exchange(
596        &self,
597        channel: &Channel,
598        queue: &LapinQueue,
599    ) -> LapinResult<()> {
600        // Default exchange forbids any kind of binding
601        if self.ingress.exchange().is_default() {
602            return Ok(());
603        }
604
605        // Proceed according to the exchange kind
606        match self.ingress.exchange().kind() {
607            ExchangeKind::Direct | ExchangeKind::Topic => {
608                self.bind_queue_to_key_exchange(channel, queue).await
609            }
610            ExchangeKind::Fanout => self.bind_queue_to_fanout_exchange(channel, queue).await,
611            ExchangeKind::Headers => self.bind_queue_to_headers_exchange(channel, queue).await,
612            ExchangeKind::HashKey | ExchangeKind::HashId => {
613                self.bind_queue_to_hash_exchange(channel, queue).await
614            }
615        }
616    }
617
618    /// Binds the given [`LapinQueue`] to the **key-based exchange**
619    /// ([direct](ExchangeKind::Direct) or [topic](ExchangeKind::Topic))
620    /// configured on this subscriber’s [`Ingress`].
621    async fn bind_queue_to_key_exchange(
622        &self,
623        channel: &Channel,
624        queue: &LapinQueue,
625    ) -> LapinResult<()> {
626        // Bind the same queue to the same exchange once for every binding key
627        for binding_key in self.ingress.binding_keys() {
628            channel
629                .queue_bind(
630                    queue.name().as_str(),
631                    self.ingress.exchange().name(),
632                    binding_key,
633                    QueueBindOptions { nowait: false },
634                    FieldTable::default(),
635                )
636                .await?;
637        }
638
639        Ok(())
640    }
641
642    /// Binds the given [`LapinQueue`] to the
643    /// [**fanout exchange**](ExchangeKind::Fanout) configured on this
644    /// subscriber’s [`Ingress`].
645    async fn bind_queue_to_fanout_exchange(
646        &self,
647        channel: &Channel,
648        queue: &LapinQueue,
649    ) -> LapinResult<()> {
650        channel
651            .queue_bind(
652                queue.name().as_str(),
653                self.ingress.exchange().name(),
654                "", // Routing key is irrelevant for fanout exchanges
655                QueueBindOptions { nowait: false },
656                FieldTable::default(),
657            )
658            .await?;
659
660        Ok(())
661    }
662
663    /// Binds the given [`LapinQueue`] to the
664    /// [**headers exchange**](ExchangeKind::Headers) configured on this
665    /// subscriber’s [`Ingress`].
666    async fn bind_queue_to_headers_exchange(
667        &self,
668        channel: &Channel,
669        queue: &LapinQueue,
670    ) -> LapinResult<()> {
671        let mut args = FieldTable::default();
672
673        args.push("x-match", self.ingress.headers_behavior().rabbitmq_value());
674
675        for (key, value) in self.ingress.binding_headers() {
676            match value {
677                Header::Boolean(value) => args.push(key, *value),
678                Header::Int(value) => args.push(key, *value),
679                Header::UInt(value) => args.push(key, *value),
680                Header::String(value) => args.push(key, value.as_ref()),
681            }
682        }
683
684        channel
685            .queue_bind(
686                queue.name().as_str(),
687                self.ingress.exchange().name(),
688                "", // Routing key is irrelevant for header-based matching
689                QueueBindOptions { nowait: false },
690                args,
691            )
692            .await?;
693
694        Ok(())
695    }
696
697    /// Binds the given [`LapinQueue`] to the **consistent hash exchange**
698    /// ([key-based](ExchangeKind::HashKey) or
699    /// [message ID-based](ExchangeKind::HashId)) configured on this
700    /// subscriber’s [`Ingress`].
701    async fn bind_queue_to_hash_exchange(
702        &self,
703        channel: &Channel,
704        queue: &LapinQueue,
705    ) -> LapinResult<()> {
706        channel
707            .queue_bind(
708                queue.name().as_str(),
709                self.ingress.exchange().name(),
710                "1", // Always bind with an equal weight of 1
711                QueueBindOptions { nowait: false },
712                FieldTable::default(),
713            )
714            .await?;
715
716        Ok(())
717    }
718}
719
720impl<T> PollOutcome<T> {
721    /// Reports whether the [`LapinConsumer`] can be judged as “empty” based
722    /// on this outcome alone. An empty consumer will not yield any more messages
723    /// if polled.
724    fn represents_empty_consumer(&self) -> bool {
725        match self {
726            Self::OutOfMessages => true,
727            Self::Envelope(_) | Self::Gibberish | Self::ConsumerError => true,
728        }
729    }
730}
731
732impl BatchState {
733    /// Reports whether the [`LapinConsumer`] can be judged as “healthy” based
734    /// on this batch state alone. A healthy consumer is a consumer that we can
735    /// still use to try and poll for more messages.
736    fn represents_healthy_consumer(&self) -> bool {
737        match self {
738            BatchState::InProgress | BatchState::Completed | BatchState::TimedOut => true,
739            BatchState::DriedOut => false,
740        }
741    }
742}
743
744impl<T> From<PollOutcome<T>> for Option<Envelope<T>> {
745    fn from(value: PollOutcome<T>) -> Self {
746        match value {
747            PollOutcome::Envelope(envelope) => Some(envelope),
748            PollOutcome::ConsumerError | PollOutcome::Gibberish | PollOutcome::OutOfMessages => {
749                None
750            }
751        }
752    }
753}
754
755impl DeclarationError {
756    fn new(subscriber: &str, error: LapinError) -> Self {
757        Self {
758            subscriber: subscriber.to_string(),
759            error: error.to_string(),
760        }
761    }
762}