lapin_async/
consumer.rs

1use std::fmt::Debug;
2
3use crate::{
4  BasicProperties,
5  message::Delivery,
6  types::ShortString,
7};
8
9#[derive(Debug)]
10pub(crate) struct Consumer {
11  tag:             ShortString,
12  no_local:        bool,
13  no_ack:          bool,
14  exclusive:       bool,
15  subscriber:      Box<dyn ConsumerSubscriber>,
16  current_message: Option<Delivery>,
17}
18
19impl Consumer {
20  pub(crate) fn new(tag: ShortString, no_local: bool, no_ack: bool, exclusive: bool, subscriber: Box<dyn ConsumerSubscriber>) -> Consumer {
21    Consumer {
22      tag,
23      no_local,
24      no_ack,
25      exclusive,
26      subscriber,
27      current_message: None,
28    }
29  }
30
31  pub(crate) fn start_new_delivery(&mut self, delivery: Delivery) {
32    self.current_message = Some(delivery)
33  }
34
35  pub(crate) fn set_delivery_properties(&mut self, properties: BasicProperties) {
36    if let Some(delivery) = self.current_message.as_mut() {
37      delivery.properties = properties;
38    }
39  }
40
41  pub(crate) fn receive_delivery_content(&mut self, payload: Vec<u8>) {
42    if let Some(delivery) = self.current_message.as_mut() {
43      delivery.receive_content(payload);
44    }
45  }
46
47  pub(crate) fn new_delivery_complete(&mut self) {
48    if let Some(delivery) = self.current_message.take() {
49      self.subscriber.new_delivery(delivery);
50    }
51  }
52
53  pub(crate) fn drop_prefetched_messages(&self) {
54    self.subscriber.drop_prefetched_messages();
55  }
56
57  pub(crate) fn cancel(&self) {
58    self.subscriber.cancel();
59  }
60}
61
62#[deprecated(note = "use lapin instead")]
63pub trait ConsumerSubscriber: Debug + Send + Sync {
64  fn new_delivery(&self, delivery: Delivery);
65  fn drop_prefetched_messages(&self);
66  fn cancel(&self);
67}