strut_rabbitmq/transport/inbound/subscriber.rs
1use crate::transport::inbound::envelope::DecoderError;
2use crate::util::Push;
3use crate::{
4 Connector, Decoder, Envelope, ExchangeKind, Gateway, Handle, Header, Ingress, NoopDecoder,
5 StringDecoder,
6};
7use futures::StreamExt;
8use lapin::message::Delivery;
9use lapin::options::{
10 BasicConsumeOptions, BasicQosOptions, ExchangeDeclareOptions, QueueBindOptions,
11 QueueDeclareOptions,
12};
13use lapin::types::FieldTable;
14use lapin::{
15 Channel, Consumer as LapinConsumer, Error as LapinError, Queue as LapinQueue,
16 Result as LapinResult,
17};
18use nonempty::NonEmpty;
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21use strut_util::Backoff;
22use thiserror::Error;
23use tokio::select;
24use tokio::sync::{Mutex as AsyncMutex, MutexGuard, Notify};
25use tracing::{debug, error, warn};
26
27/// Shorthand for a [`Subscriber`] that does not decode consumed messages.
28pub type UndecodedSubscriber = Subscriber<(), NoopDecoder>;
29
30/// Shorthand for a [`Subscriber`] that decodes messages into [`String`]s.
31pub type StringSubscriber = Subscriber<String, StringDecoder>;
32
33/// Shorthand for a [`Subscriber`] that decodes messages using
34/// [`JsonDecoder`](crate::JsonDecoder).
35#[cfg(feature = "json")]
36pub type JsonSubscriber<T> = Subscriber<T, crate::JsonDecoder<T>>;
37
38/// Receives incoming [`Envelope`]s from the RabbitMQ cluster, passing them
39/// through a pre-set [`Decoder`] before returning to the caller.
40pub struct Subscriber<T, D>
41where
42 D: Decoder<Result = T>,
43{
44 name: Arc<str>,
45 ingress: Ingress,
46 gateway: Gateway,
47 consumer: AsyncMutex<Option<LapinConsumer>>,
48 decoder: D,
49 batch_tail_size: usize,
50}
51
52/// Represents the outcome of polling a single message from a [`LapinConsumer`].
53enum PollOutcome<T> {
54 /// Successfully polled and decoded an [`Envelope`].
55 Envelope(Envelope<T>),
56 /// A [`LapinError`] delivered from the [`LapinConsumer`].
57 ConsumerError,
58 /// A message is successfully polled, but could not be decoded.
59 Gibberish,
60 /// The [`LapinConsumer`] is permanently out of messages and cannot be used
61 /// any further.
62 OutOfMessages,
63}
64
65/// Represents the outcome of assembling a batch of incoming messages.
66enum BatchState {
67 /// Keep polling.
68 InProgress,
69 /// Polled enough messages.
70 Completed,
71 /// Timed out: no more.
72 TimedOut,
73 /// [`LapinConsumer`] signaled it’s out of messages.
74 DriedOut,
75}
76
77/// Represents failure to issue at least one of the declarations that are
78/// required before the subscriber can start consuming messages.
79///
80/// The following types of declarations can potentially fail:
81///
82/// - Declare an exchange.
83/// - Declare a queue.
84/// - Bind a queue to an exchange.
85///
86/// The declarations are based on the [`Ingress`] definition.
87#[derive(Error, Debug)]
88#[error("failed to issue RabbitMQ declarations from the subscriber '{subscriber}': {error}")]
89pub struct DeclarationError {
90 subscriber: String,
91 error: String,
92}
93
94impl<T, D> Subscriber<T, D>
95where
96 D: Decoder<Result = T>,
97{
98 /// Creates and returns a new [`Subscriber`] for the given [`Ingress`] and
99 /// [`Decoder`].
100 pub fn new(gateway: Gateway, ingress: Ingress, decoder: D) -> Self {
101 let name = Self::compose_name(&ingress);
102 let consumer = AsyncMutex::new(None);
103 let batch_tail_size = usize::from(ingress.batch_size()) - 1;
104
105 Self {
106 name,
107 ingress,
108 gateway,
109 consumer,
110 decoder,
111 batch_tail_size,
112 }
113 }
114
115 /// Starts a new [`Connector`] with the given [`Handle`] and uses it to create
116 /// and return a new [`Subscriber`] for the given [`Ingress`] and [`Decoder`].
117 pub fn start(handle: impl AsRef<Handle>, ingress: Ingress, decoder: D) -> Self {
118 let gateway = Connector::start(handle);
119
120 Self::new(gateway, ingress, decoder)
121 }
122
123 /// Composes a globally unique, human-readable name for this [`Subscriber`].
124 fn compose_name(ingress: &Ingress) -> Arc<str> {
125 static COUNTER: AtomicUsize = AtomicUsize::new(0);
126
127 Arc::from(format!(
128 "rabbitmq:sub:{}:{}",
129 ingress.name(),
130 COUNTER.fetch_add(1, Ordering::Relaxed),
131 ))
132 }
133}
134
135impl<T, D> Subscriber<T, D>
136where
137 D: Decoder<Result = T>,
138{
139 /// Reports the name of this [`Subscriber`].
140 pub fn name(&self) -> &str {
141 &self.name
142 }
143}
144
145impl Subscriber<(), NoopDecoder> {
146 /// A shorthand for calling [`new`](Subscriber::new) with a [`NoopDecoder`].
147 pub fn new_undecoded(gateway: Gateway, ingress: Ingress) -> Self {
148 Self::new(gateway, ingress, NoopDecoder)
149 }
150
151 /// A shorthand for calling [`start`](Subscriber::start) with a
152 /// [`NoopDecoder`].
153 pub fn start_undecoded(handle: &Handle, ingress: Ingress) -> Self {
154 Self::start(handle, ingress, NoopDecoder)
155 }
156}
157
158#[cfg(feature = "json")]
159impl<T> Subscriber<T, crate::JsonDecoder<T>>
160where
161 T: serde::de::DeserializeOwned,
162{
163 /// A shorthand for calling [`new`](Subscriber::new) with a
164 /// [`JsonDecoder`](crate::JsonDecoder).
165 pub fn new_json(gateway: Gateway, ingress: Ingress) -> Self {
166 Self::new(gateway, ingress, crate::JsonDecoder::default())
167 }
168
169 /// A shorthand for calling [`start`](Subscriber::start) with a
170 /// [`JsonDecoder`](crate::JsonDecoder).
171 pub fn start_json(handle: &Handle, ingress: Ingress) -> Self {
172 Self::start(handle, ingress, crate::JsonDecoder::default())
173 }
174}
175
176impl<T, D> Subscriber<T, D>
177where
178 D: Decoder<Result = T>,
179{
180 /// Waits for the connection to RabbitMQ to become available, then issues
181 /// the declarations necessary for consuming messages with the [`Ingress`]
182 /// configured on this subscriber. The declarations include declaring an
183 /// exchange (if not a built-in exchange), declaring a queue, and binding
184 /// the queue to the exchange in some way. Such declarations are repeatable
185 /// (assuming the configuration options don’t change), so it shouldn’t hurt
186 /// to call this method any number of times.
187 ///
188 /// If and when this method returns [`Ok`], it can be reasonably expected
189 /// that the following calls to [`receive`](Subscriber::receive) or
190 /// [`receive_many`](Subscriber::receive_many) will be able to eventually
191 /// deliver incoming messages, assuming the connectivity to RabbitMQ
192 /// remains.
193 ///
194 /// If any of the declarations fail (e.g., a queue by that name already
195 /// exists with different configuration), this method returns a
196 /// [`DeclarationError`].
197 pub async fn try_declare(&self) -> Result<(), DeclarationError> {
198 // Patiently wait for a fresh channel
199 let channel = self.gateway.channel().await;
200
201 // Try to issue declarations (exchange, queue, bindings)
202 match self.issue_declarations(&channel).await {
203 // Success: queue is irrelevant, we just return
204 Ok(_queue) => Ok(()),
205
206 // Error: return the error
207 Err(error) => Err(DeclarationError::new(self.name.as_ref(), error)),
208 }
209 }
210
211 /// Repeatedly calls [`try_declare`](Subscriber::try_declare) until it
212 /// succeeds, with an exponential backoff.
213 ///
214 /// Most declaration errors can only be fixed outside the application, by
215 /// changing the broker configuration (e.g., deleting a conflicting queue).
216 /// In such cases, this method may be used, to keep the subscriber spinning
217 /// (and alerting about the declaration failure) until the issue is fixed
218 /// externally, at which point the declarations will eventually succeed, and
219 /// this method will return.
220 pub async fn declare(&self) {
221 // Prepare to backoff
222 let backoff = Backoff::default();
223
224 loop {
225 match self.try_declare().await {
226 // Success: we just return
227 Ok(()) => return,
228
229 // Error: alert and wait a bit
230 Err(error) => {
231 warn!(
232 alert = true,
233 subscriber = self.name.as_ref(),
234 ?error,
235 error_message = %error,
236 "Failed to declare an exchange or a queue",
237 );
238
239 backoff.sleep_next().await;
240 }
241 }
242 }
243 }
244
245 /// Receives a single, decode-able message from the broker. Will wait as
246 /// long as it takes for the first decode-able message to arrive.
247 pub async fn receive(&self) -> Envelope<T> {
248 // Grab the consumer (keep the guard until we return)
249 let (mut consumer_guard, mut consumer) = self.grab_consumer().await;
250
251 // Poll until an envelope is received
252 let envelope = self.poll(&mut consumer).await;
253
254 // Put the consumer back under lock
255 *consumer_guard = Some(consumer);
256
257 envelope
258 }
259
260 /// Receives a batch of up to [batch_size](Ingress::batch_size) of
261 /// decode-able messages from the broker. Will wait as long as it takes for
262 /// the first decode-able message to arrive, after which will take no longer
263 /// than [`BATCH_TIMEOUT`] to complete the batch before returning. The final
264 /// batch will thus always contain at least one message.
265 pub async fn receive_many(&self) -> NonEmpty<Envelope<T>> {
266 // Grab the consumer (keep the guard until the batch is assembled)
267 let (mut consumer_guard, mut consumer) = self.grab_consumer().await;
268
269 // Poll the head of the batch (the first envelope)
270 let batch_head = self.poll(&mut consumer).await;
271
272 // Now that we’ve got the first message of the batch, set a timeout for receiving the full batch
273
274 // Create the notification mechanism for the batch timer
275 let notify_in = Arc::new(Notify::new());
276 let notify_out = Arc::clone(¬ify_in);
277
278 // Start the batch timer (within this time limit additional messages may fall into the same batch)
279 let batch_timeout = self.ingress.batch_timeout();
280 let handle = tokio::spawn(async move {
281 tokio::time::sleep(batch_timeout).await;
282 notify_in.notify_one();
283 });
284
285 // Prepare storage
286 let mut batch_tail = Vec::with_capacity(self.batch_tail_size);
287
288 // Complete batch within timeout
289 let batch_state = self
290 .complete_batch(&mut consumer, &mut batch_tail, notify_out)
291 .await;
292
293 // Check if the batch state spells success
294 if batch_state.represents_healthy_consumer() {
295 // Put the consumer back and release the lock
296 *consumer_guard = Some(consumer);
297 }
298
299 // Drop the consumer guard
300 drop(consumer_guard);
301
302 // Whether or not the timer completed, we don’t need it anymore
303 handle.abort();
304
305 NonEmpty::from((batch_head, batch_tail))
306 }
307}
308
309impl<T, D> Subscriber<T, D>
310where
311 D: Decoder<Result = T>,
312{
313 /// Infinitely polls the given [`LapinConsumer`] until it yields a
314 /// decode-able [`Envelope`], then returns it. This method will re-fetch the
315 /// consumer as many times as needed.
316 async fn poll(&self, consumer: &mut LapinConsumer) -> Envelope<T> {
317 // Keep trying until we poll the first message
318 loop {
319 // Try to poll a message
320 let outcome = self.try_poll(consumer).await;
321
322 // Check if we have a good envelope
323 if let PollOutcome::Envelope(envelope) = outcome {
324 // Good envelope: return
325 return envelope;
326 }
327
328 // No luck with last outcome: go toward retrying
329
330 // If the consumer dried out, re-fetch it
331 if outcome.represents_empty_consumer() {
332 *consumer = self.fetch_consumer().await;
333 }
334 }
335 }
336
337 /// Completes the given batch tail within the given timeout by repeatedly
338 /// polling the given consumer.
339 async fn complete_batch(
340 &self,
341 consumer: &mut LapinConsumer,
342 batch_tail: &mut Vec<Envelope<T>>,
343 timeout: Arc<Notify>,
344 ) -> BatchState {
345 // Start collecting additional messages into the batch (within both the time and the count limits)
346 while batch_tail.len() < self.batch_tail_size {
347 let state = select! {
348 biased;
349 _ = timeout.notified() => BatchState::TimedOut,
350 outcome = self.try_poll(consumer) => self.receive_outcome(outcome, batch_tail),
351 };
352
353 match state {
354 BatchState::InProgress => continue,
355 BatchState::Completed | BatchState::TimedOut | BatchState::DriedOut => {
356 return state;
357 }
358 }
359 }
360
361 BatchState::Completed
362 }
363
364 /// Abstracts two asynchronous calls (next delivery from the consumer, and
365 /// unwrapping of the delivery) into a single asynchronous call for convenient
366 /// use in a `select` block.
367 async fn try_poll(&self, consumer: &mut LapinConsumer) -> PollOutcome<T> {
368 // Fetch and unwrap next delivery
369 self.unwrap_delivery(consumer.next().await).await
370 }
371
372 /// Pushes a [`PollOutcome`] onto the given batch, and translates that outcome
373 /// into the current [`BatchState`].
374 fn receive_outcome(&self, outcome: PollOutcome<T>, batch: &mut Vec<Envelope<T>>) -> BatchState {
375 match outcome {
376 PollOutcome::Envelope(envelope) => {
377 batch.push(envelope);
378 BatchState::InProgress
379 }
380 PollOutcome::OutOfMessages => BatchState::DriedOut,
381 PollOutcome::ConsumerError | PollOutcome::Gibberish => BatchState::InProgress,
382 }
383 }
384
385 /// Peels the layers off the given incoming delivery.
386 async fn unwrap_delivery(
387 &self,
388 option_delivery_result: Option<LapinResult<Delivery>>,
389 ) -> PollOutcome<T> {
390 // Unwrap the outer option
391 let delivery_result = match option_delivery_result {
392 Some(delivery_result) => delivery_result,
393 None => {
394 debug!(
395 subscriber = self.name.as_ref(),
396 "Ran out of messages on a RabbitMQ consumer",
397 );
398
399 return PollOutcome::OutOfMessages;
400 }
401 };
402
403 // Unwrap the inner result
404 let delivery = match delivery_result {
405 Ok(delivery) => delivery,
406 Err(error) => {
407 warn!(
408 alert = true,
409 subscriber = self.name.as_ref(),
410 ?error,
411 error_message = %error,
412 "Received an error from a RabbitMQ consumer",
413 );
414
415 return PollOutcome::ConsumerError;
416 }
417 };
418
419 // Decode an envelope
420 let envelope_result = Envelope::try_from(
421 self.name.clone(),
422 &self.decoder,
423 delivery,
424 self.ingress.delivers_pending(),
425 );
426
427 // Inspect the result
428 match envelope_result {
429 Ok(envelope) => PollOutcome::Envelope(envelope),
430 Err(error) => {
431 self.discard_gibberish(error).await;
432
433 PollOutcome::Gibberish
434 }
435 }
436 }
437
438 /// Handles and discards the given un-decodable inbound message.
439 async fn discard_gibberish(&self, decoder_error: DecoderError<D>) {
440 // Destruct the decoder error
441 let DecoderError {
442 bytes,
443 mut acker,
444 error,
445 } = decoder_error;
446
447 // Report the un-decodable message
448 error!(
449 alert = true,
450 subscriber = self.name.as_ref(),
451 ?error,
452 error_message = %error,
453 byte_preview = String::from_utf8_lossy(&bytes).as_ref(),
454 "Failed to decode an inbound RabbitMQ message",
455 );
456
457 // Finalize the message
458 if let Some(acker) = acker.take() {
459 self.ingress
460 .gibberish_behavior()
461 .apply(self.name.as_ref(), &acker, &bytes)
462 .await;
463 }
464 }
465
466 /// Obtains the lock on the consumer and returns both the guard and the consumer
467 async fn grab_consumer(&self) -> (MutexGuard<'_, Option<LapinConsumer>>, LapinConsumer) {
468 // Obtain the consumer guard
469 let mut consumer_guard = self.consumer.lock().await;
470
471 // Either take the consumer or fetch a fresh one
472 let consumer = match consumer_guard.take() {
473 Some(consumer) => consumer,
474 None => self.fetch_consumer().await,
475 };
476
477 // Return the pair
478 (consumer_guard, consumer)
479 }
480
481 /// Fetches a fresh [`LapinConsumer`], building it from scratch on a fresh
482 /// [`Channel`].
483 async fn fetch_consumer(&self) -> LapinConsumer {
484 loop {
485 // Patiently wait for a fresh channel
486 let channel = self.gateway.channel().await;
487
488 // Try to build a consumer
489 match self.build_consumer(&channel).await {
490 // Successfully built a consumer: return
491 Ok(consumer) => return consumer,
492
493 // Failed to build a consumer for some reason: report and retry
494 Err(error) => {
495 warn!(
496 alert = true,
497 subscriber = self.name.as_ref(),
498 ?error,
499 error_message = %error,
500 "Failed to build a RabbitMQ message consumer",
501 );
502 }
503 }
504 }
505 }
506
507 /// Builds a [`LapinConsumer`] on top of the given [`Channel`].
508 async fn build_consumer(&self, channel: &Channel) -> LapinResult<LapinConsumer> {
509 // Declare the RabbitMQ queue
510 let queue = self.issue_declarations(channel).await?;
511
512 // Initiate consuming of messages
513 channel
514 .basic_consume(
515 queue.name().as_str(),
516 &self.name,
517 BasicConsumeOptions {
518 no_local: false,
519 no_ack: self.ingress.no_ack(),
520 exclusive: false,
521 nowait: false,
522 },
523 FieldTable::default(),
524 )
525 .await
526 }
527
528 /// Declares the exchange (if necessary), the queue, and binds the queue to
529 /// the exchange.
530 async fn issue_declarations(&self, channel: &Channel) -> LapinResult<LapinQueue> {
531 // Set prefetch count on the channel if relevant
532 if let Some(prefetch_count) = self.ingress.prefetch_count() {
533 channel
534 .basic_qos(prefetch_count.into(), BasicQosOptions { global: false })
535 .await?;
536 }
537
538 // If the exchange is not built-in, declare it first
539 if self.ingress.exchange().is_custom() {
540 // Extract custom exchange reference for convenience
541 let exchange = self.ingress.exchange();
542
543 // Prepare args
544 let mut args = FieldTable::default();
545
546 // Check special exchange case
547 if exchange.kind() == ExchangeKind::HashId {
548 // For routing on message ID, we need a custom arg
549 args.push("hash-property", "message_id");
550 }
551
552 channel
553 .exchange_declare(
554 exchange.name(),
555 exchange.kind().lapin_value(),
556 ExchangeDeclareOptions {
557 passive: false,
558 durable: exchange.durable(),
559 auto_delete: exchange.auto_delete(),
560 internal: false,
561 nowait: false,
562 },
563 args,
564 )
565 .await?;
566 }
567
568 // Explicitly indicate desired queue kind
569 let mut args = FieldTable::default();
570 args.push("x-queue-type", self.ingress.queue().kind().rabbitmq_value());
571
572 // Declare the queue
573 let queue = channel
574 .queue_declare(
575 self.ingress.queue().name().as_ref(),
576 QueueDeclareOptions {
577 passive: false,
578 durable: self.ingress.durable(),
579 exclusive: self.ingress.exclusive(),
580 auto_delete: self.ingress.auto_delete(),
581 nowait: false,
582 },
583 args,
584 )
585 .await?;
586
587 // Bind the queue to the exchange
588 self.bind_queue_to_exchange(channel, &queue).await?;
589
590 Ok(queue)
591 }
592
593 /// Binds the given [`LapinQueue`] to the exchange configured on this
594 /// subscriber’s [`Ingress`].
595 async fn bind_queue_to_exchange(
596 &self,
597 channel: &Channel,
598 queue: &LapinQueue,
599 ) -> LapinResult<()> {
600 // Default exchange forbids any kind of binding
601 if self.ingress.exchange().is_default() {
602 return Ok(());
603 }
604
605 // Proceed according to the exchange kind
606 match self.ingress.exchange().kind() {
607 ExchangeKind::Direct | ExchangeKind::Topic => {
608 self.bind_queue_to_key_exchange(channel, queue).await
609 }
610 ExchangeKind::Fanout => self.bind_queue_to_fanout_exchange(channel, queue).await,
611 ExchangeKind::Headers => self.bind_queue_to_headers_exchange(channel, queue).await,
612 ExchangeKind::HashKey | ExchangeKind::HashId => {
613 self.bind_queue_to_hash_exchange(channel, queue).await
614 }
615 }
616 }
617
618 /// Binds the given [`LapinQueue`] to the **key-based exchange**
619 /// ([direct](ExchangeKind::Direct) or [topic](ExchangeKind::Topic))
620 /// configured on this subscriber’s [`Ingress`].
621 async fn bind_queue_to_key_exchange(
622 &self,
623 channel: &Channel,
624 queue: &LapinQueue,
625 ) -> LapinResult<()> {
626 // Bind the same queue to the same exchange once for every binding key
627 for binding_key in self.ingress.binding_keys() {
628 channel
629 .queue_bind(
630 queue.name().as_str(),
631 self.ingress.exchange().name(),
632 binding_key,
633 QueueBindOptions { nowait: false },
634 FieldTable::default(),
635 )
636 .await?;
637 }
638
639 Ok(())
640 }
641
642 /// Binds the given [`LapinQueue`] to the
643 /// [**fanout exchange**](ExchangeKind::Fanout) configured on this
644 /// subscriber’s [`Ingress`].
645 async fn bind_queue_to_fanout_exchange(
646 &self,
647 channel: &Channel,
648 queue: &LapinQueue,
649 ) -> LapinResult<()> {
650 channel
651 .queue_bind(
652 queue.name().as_str(),
653 self.ingress.exchange().name(),
654 "", // Routing key is irrelevant for fanout exchanges
655 QueueBindOptions { nowait: false },
656 FieldTable::default(),
657 )
658 .await?;
659
660 Ok(())
661 }
662
663 /// Binds the given [`LapinQueue`] to the
664 /// [**headers exchange**](ExchangeKind::Headers) configured on this
665 /// subscriber’s [`Ingress`].
666 async fn bind_queue_to_headers_exchange(
667 &self,
668 channel: &Channel,
669 queue: &LapinQueue,
670 ) -> LapinResult<()> {
671 let mut args = FieldTable::default();
672
673 args.push("x-match", self.ingress.headers_behavior().rabbitmq_value());
674
675 for (key, value) in self.ingress.binding_headers() {
676 match value {
677 Header::Boolean(value) => args.push(key, *value),
678 Header::Int(value) => args.push(key, *value),
679 Header::UInt(value) => args.push(key, *value),
680 Header::String(value) => args.push(key, value.as_ref()),
681 }
682 }
683
684 channel
685 .queue_bind(
686 queue.name().as_str(),
687 self.ingress.exchange().name(),
688 "", // Routing key is irrelevant for header-based matching
689 QueueBindOptions { nowait: false },
690 args,
691 )
692 .await?;
693
694 Ok(())
695 }
696
697 /// Binds the given [`LapinQueue`] to the **consistent hash exchange**
698 /// ([key-based](ExchangeKind::HashKey) or
699 /// [message ID-based](ExchangeKind::HashId)) configured on this
700 /// subscriber’s [`Ingress`].
701 async fn bind_queue_to_hash_exchange(
702 &self,
703 channel: &Channel,
704 queue: &LapinQueue,
705 ) -> LapinResult<()> {
706 channel
707 .queue_bind(
708 queue.name().as_str(),
709 self.ingress.exchange().name(),
710 "1", // Always bind with an equal weight of 1
711 QueueBindOptions { nowait: false },
712 FieldTable::default(),
713 )
714 .await?;
715
716 Ok(())
717 }
718}
719
720impl<T> PollOutcome<T> {
721 /// Reports whether the [`LapinConsumer`] can be judged as “empty” based
722 /// on this outcome alone. An empty consumer will not yield any more messages
723 /// if polled.
724 fn represents_empty_consumer(&self) -> bool {
725 match self {
726 Self::OutOfMessages => true,
727 Self::Envelope(_) | Self::Gibberish | Self::ConsumerError => true,
728 }
729 }
730}
731
732impl BatchState {
733 /// Reports whether the [`LapinConsumer`] can be judged as “healthy” based
734 /// on this batch state alone. A healthy consumer is a consumer that we can
735 /// still use to try and poll for more messages.
736 fn represents_healthy_consumer(&self) -> bool {
737 match self {
738 BatchState::InProgress | BatchState::Completed | BatchState::TimedOut => true,
739 BatchState::DriedOut => false,
740 }
741 }
742}
743
744impl<T> From<PollOutcome<T>> for Option<Envelope<T>> {
745 fn from(value: PollOutcome<T>) -> Self {
746 match value {
747 PollOutcome::Envelope(envelope) => Some(envelope),
748 PollOutcome::ConsumerError | PollOutcome::Gibberish | PollOutcome::OutOfMessages => {
749 None
750 }
751 }
752 }
753}
754
755impl DeclarationError {
756 fn new(subscriber: &str, error: LapinError) -> Self {
757 Self {
758 subscriber: subscriber.to_string(),
759 error: error.to_string(),
760 }
761 }
762}