1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
use std::collections::HashMap;

use crate::{
  channel::BasicProperties,
  consumer::Consumer,
  message::BasicGetMessage,
  types::ShortString,
  wait::WaitHandle,
};

#[derive(Clone, Debug)]
pub struct Queue {
  pub name:           ShortString,
  pub message_count:  u32,
  pub consumer_count: u32,
}

#[derive(Debug)]
pub struct QueueState {
  pub name:                ShortString,
  pub consumers:           HashMap<ShortString, Consumer>,
      current_get_message: Option<(BasicGetMessage, WaitHandle<Option<BasicGetMessage>>)>,
}

impl Queue {
  pub fn new(name: ShortString, message_count: u32, consumer_count: u32) -> Self {
    Self { name, message_count, consumer_count }
  }
}

impl QueueState {
  pub fn drop_prefetched_messages(&mut self) {
    for consumer in self.consumers.values() {
      consumer.drop_prefetched_messages();
    }
  }

  pub fn start_new_delivery(&mut self, delivery: BasicGetMessage, wait_handle: WaitHandle<Option<BasicGetMessage>>) {
    self.current_get_message = Some((delivery, wait_handle));
  }

  pub fn set_delivery_properties(&mut self, properties: BasicProperties) {
    if let Some(delivery) = self.current_get_message.as_mut() {
      delivery.0.delivery.properties = properties;
    }
  }

  pub fn receive_delivery_content(&mut self, payload: Vec<u8>) {
    if let Some(delivery) = self.current_get_message.as_mut() {
      delivery.0.delivery.receive_content(payload);
    }
  }

  pub fn new_delivery_complete(&mut self) {
    if let Some((message, wait_handle)) = self.current_get_message.take() {
      wait_handle.finish(Some(message));
    }
  }
}

impl From<Queue> for QueueState {
  fn from(queue: Queue) -> Self {
    Self {
      name:                queue.name,
      consumers:           HashMap::new(),
      current_get_message: None,
    }
  }
}