1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use std::{
  borrow::Borrow,
  collections::HashMap,
  hash::Hash,
};

use crate::{
  BasicProperties,
  consumer::Consumer,
  message::BasicGetMessage,
  types::ShortString,
  wait::WaitHandle,
};

#[derive(Clone, Debug)]
#[deprecated(note = "use lapin instead")]
pub struct Queue {
  name:           ShortString,
  message_count:  u32,
  consumer_count: u32,
}

impl Queue {
  #[deprecated(note = "use lapin instead")]
  pub fn name(&self) -> &ShortString {
    &self.name
  }

  #[deprecated(note = "use lapin instead")]
  pub fn message_count(&self) -> u32 {
    self.message_count
  }

  #[deprecated(note = "use lapin instead")]
  pub fn consumer_count(&self) -> u32 {
    self.consumer_count
  }
}

#[derive(Debug)]
pub(crate) struct QueueState {
  name:                ShortString,
  consumers:           HashMap<ShortString, Consumer>,
  current_get_message: Option<(BasicGetMessage, WaitHandle<Option<BasicGetMessage>>)>,
}

impl Queue {
  pub(crate) fn new(name: ShortString, message_count: u32, consumer_count: u32) -> Self {
    Self { name, message_count, consumer_count }
  }
}

impl Borrow<str> for Queue {
  fn borrow(&self) -> &str {
    self.name.as_str()
  }
}

impl QueueState {
  pub(crate) fn register_consumer(&mut self, consumer_tag: ShortString, consumer: Consumer) {
    self.consumers.insert(consumer_tag, consumer);
  }

  pub(crate) fn deregister_consumer<S: Hash + Eq + ?Sized>(&mut self, consumer_tag: &S) -> Option<Consumer> where ShortString: Borrow<S> {
    self.consumers.remove(consumer_tag)
  }

  pub(crate) fn get_consumer<S: Hash + Eq + ?Sized>(&mut self, consumer_tag: &S) -> Option<&mut Consumer> where ShortString: Borrow<S> {
    self.consumers.get_mut(consumer_tag.borrow())
  }

  pub(crate) fn name(&self) -> ShortString {
    self.name.clone()
  }

  pub(crate) fn drop_prefetched_messages(&mut self) {
    for consumer in self.consumers.values() {
      consumer.drop_prefetched_messages();
    }
  }

  pub(crate) fn start_new_delivery(&mut self, delivery: BasicGetMessage, wait_handle: WaitHandle<Option<BasicGetMessage>>) {
    self.current_get_message = Some((delivery, wait_handle));
  }

  pub(crate) fn set_delivery_properties(&mut self, properties: BasicProperties) {
    if let Some(delivery) = self.current_get_message.as_mut() {
      delivery.0.delivery.properties = properties;
    }
  }

  pub(crate) fn receive_delivery_content(&mut self, payload: Vec<u8>) {
    if let Some(delivery) = self.current_get_message.as_mut() {
      delivery.0.delivery.receive_content(payload);
    }
  }

  pub(crate) fn new_delivery_complete(&mut self) {
    if let Some((message, wait_handle)) = self.current_get_message.take() {
      wait_handle.finish(Some(message));
    }
  }
}

impl From<Queue> for QueueState {
  fn from(queue: Queue) -> Self {
    Self {
      name:                queue.name,
      consumers:           HashMap::new(),
      current_get_message: None,
    }
  }
}