1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
use std::fmt::Debug;

use crate::{
  BasicProperties,
  message::Delivery,
  types::ShortString,
};

#[derive(Debug)]
pub(crate) struct Consumer {
  tag:             ShortString,
  no_local:        bool,
  no_ack:          bool,
  exclusive:       bool,
  subscriber:      Box<dyn ConsumerSubscriber>,
  current_message: Option<Delivery>,
}

impl Consumer {
  pub(crate) fn new(tag: ShortString, no_local: bool, no_ack: bool, exclusive: bool, subscriber: Box<dyn ConsumerSubscriber>) -> Consumer {
    Consumer {
      tag,
      no_local,
      no_ack,
      exclusive,
      subscriber,
      current_message: None,
    }
  }

  pub(crate) fn start_new_delivery(&mut self, delivery: Delivery) {
    self.current_message = Some(delivery)
  }

  pub(crate) fn set_delivery_properties(&mut self, properties: BasicProperties) {
    if let Some(delivery) = self.current_message.as_mut() {
      delivery.properties = properties;
    }
  }

  pub(crate) fn receive_delivery_content(&mut self, payload: Vec<u8>) {
    if let Some(delivery) = self.current_message.as_mut() {
      delivery.receive_content(payload);
    }
  }

  pub(crate) fn new_delivery_complete(&mut self) {
    if let Some(delivery) = self.current_message.take() {
      self.subscriber.new_delivery(delivery);
    }
  }

  pub(crate) fn drop_prefetched_messages(&self) {
    self.subscriber.drop_prefetched_messages();
  }

  pub(crate) fn cancel(&self) {
    self.subscriber.cancel();
  }
}

pub trait ConsumerSubscriber: Debug + Send + Sync {
  fn new_delivery(&self, delivery: Delivery);
  fn drop_prefetched_messages(&self);
  fn cancel(&self);
}