1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use log::trace;
use amq_protocol::frame::AMQPFrame;
use parking_lot::Mutex;

use std::{
  collections::{VecDeque, HashMap},
  sync::Arc,
};

use crate::{
  channel::Reply,
  id_sequence::IdSequence,
  wait::{Wait, WaitHandle},
};

pub type SendId = u64;

#[derive(Clone, Debug)]
pub enum Priority {
  LOW,
  NORMAL,
  HIGH,
  CRITICAL,
}

impl Default for Priority {
  fn default() -> Self {
    Priority::NORMAL
  }
}

#[derive(Clone, Debug, Default)]
pub struct Frames {
  inner: Arc<Mutex<Inner>>,
}

impl Frames {
  pub fn push(&self, channel_id: u16, priority: Priority, frame: AMQPFrame, expected_reply: Option<Reply>) -> Wait<()> {
    self.inner.lock().push(channel_id, priority, frame, expected_reply)
  }

  pub fn retry(&self, send_id: SendId, frame: AMQPFrame) {
    self.inner.lock().priority_frames.push_back((send_id, frame))
  }

  pub fn pop(&self) -> Option<(SendId, AMQPFrame)> {
    self.inner.lock().pop()
  }

  pub fn is_empty(&self) -> bool {
    self.inner.lock().is_empty()
  }

  pub fn next_expected_reply(&self, channel_id: u16) -> Option<Reply> {
    self.inner.lock().expected_replies.get_mut(&channel_id).and_then(|replies| replies.pop_front())
  }

  pub fn clear_expected_replies(&self, channel_id: u16) {
    self.inner.lock().expected_replies.remove(&channel_id);
  }

  pub fn mark_sent(&self, send_id: SendId) {
    if let Some(send) = self.inner.lock().outbox.remove(&send_id) {
      send.finish(());
    }
  }
}

#[derive(Debug)]
pub struct Inner {
  priority_frames:  VecDeque<(SendId, AMQPFrame)>,
  frames:           VecDeque<(SendId, AMQPFrame)>,
  low_prio_frames:  VecDeque<(SendId, AMQPFrame)>,
  expected_replies: HashMap<u16, VecDeque<Reply>>,
  outbox:           HashMap<SendId, WaitHandle<()>>,
  send_id:          IdSequence<SendId>,
}

impl Default for Inner {
  fn default() -> Self {
    Self {
      priority_frames:  VecDeque::default(),
      frames:           VecDeque::default(),
      low_prio_frames:  VecDeque::default(),
      expected_replies: HashMap::default(),
      outbox:           HashMap::default(),
      send_id:          IdSequence::new(false),
    }
  }
}

impl Inner {
  fn push(&mut self, channel_id: u16, priority: Priority, frame: AMQPFrame, expected_reply: Option<Reply>) -> Wait<()> {
    let send_id = if let Priority::CRITICAL = priority { 0 } else { self.send_id.next() };
    match priority {
      Priority::LOW      => self.low_prio_frames.push_back((send_id, frame)),
      Priority::NORMAL   => self.frames.push_back((send_id, frame)),
      Priority::HIGH     => self.priority_frames.push_back((send_id, frame)),
      Priority::CRITICAL => self.priority_frames.push_front((send_id, frame)),
    }
    let (wait, wait_handle) = Wait::new();
    self.outbox.insert(send_id, wait_handle);
    if let Some(reply) = expected_reply {
      trace!("channel {} state is now waiting for {:?}", channel_id, reply);
      self.expected_replies.entry(channel_id).or_default().push_back(reply);
    }
    wait
  }

  fn pop(&mut self) -> Option<(SendId, AMQPFrame)> {
    self.priority_frames.pop_front().or_else(|| self.frames.pop_front()).or_else(|| self.low_prio_frames.pop_front())
  }

  fn is_empty(&self) -> bool {
    self.priority_frames.is_empty() && self.frames.is_empty() && self.low_prio_frames.is_empty()
  }
}