Skip to main content

lapin_async/
frames.rs

1use log::trace;
2use amq_protocol::frame::AMQPFrame;
3use parking_lot::Mutex;
4
5use std::{
6  collections::{VecDeque, HashMap},
7  sync::Arc,
8};
9
10use crate::{
11  channel::Reply,
12  id_sequence::IdSequence,
13  wait::{Wait, WaitHandle},
14};
15
16pub(crate) type SendId = u64;
17
18#[derive(Clone, Debug)]
19pub(crate) enum Priority {
20  LOW,
21  NORMAL,
22  CRITICAL,
23}
24
25impl Default for Priority {
26  fn default() -> Self {
27    Priority::NORMAL
28  }
29}
30
31#[derive(Clone, Debug, Default)]
32pub(crate) struct Frames {
33  inner: Arc<Mutex<Inner>>,
34}
35
36impl Frames {
37  pub(crate) fn push(&self, channel_id: u16, priority: Priority, frame: AMQPFrame, expected_reply: Option<Reply>) -> Wait<()> {
38    self.inner.lock().push(channel_id, priority, frame, expected_reply)
39  }
40
41  pub(crate) fn retry(&self, send_id: SendId, frame: AMQPFrame) {
42    self.inner.lock().priority_frames.push_back((send_id, frame))
43  }
44
45  pub(crate) fn pop(&self, flow: bool) -> Option<(SendId, AMQPFrame)> {
46    self.inner.lock().pop(flow)
47  }
48
49  pub(crate) fn next_expected_reply(&self, channel_id: u16) -> Option<Reply> {
50    self.inner.lock().expected_replies.get_mut(&channel_id).and_then(|replies| replies.pop_front())
51  }
52
53  pub(crate) fn clear_expected_replies(&self, channel_id: u16) {
54    self.inner.lock().expected_replies.remove(&channel_id);
55  }
56
57  pub(crate) fn mark_sent(&self, send_id: SendId) {
58    if let Some(send) = self.inner.lock().outbox.remove(&send_id) {
59      send.finish(());
60    }
61  }
62
63  pub(crate) fn drop_pending(&self) {
64    self.inner.lock().drop_pending();
65  }
66}
67
68#[derive(Debug)]
69struct Inner {
70  priority_frames:  VecDeque<(SendId, AMQPFrame)>,
71  frames:           VecDeque<(SendId, AMQPFrame)>,
72  low_prio_frames:  VecDeque<(SendId, AMQPFrame)>,
73  expected_replies: HashMap<u16, VecDeque<Reply>>,
74  outbox:           HashMap<SendId, WaitHandle<()>>,
75  send_id:          IdSequence<SendId>,
76}
77
78impl Default for Inner {
79  fn default() -> Self {
80    Self {
81      priority_frames:  VecDeque::default(),
82      frames:           VecDeque::default(),
83      low_prio_frames:  VecDeque::default(),
84      expected_replies: HashMap::default(),
85      outbox:           HashMap::default(),
86      send_id:          IdSequence::new(false),
87    }
88  }
89}
90
91impl Inner {
92  fn push(&mut self, channel_id: u16, priority: Priority, frame: AMQPFrame, expected_reply: Option<Reply>) -> Wait<()> {
93    let send_id = if let Priority::CRITICAL = priority { 0 } else { self.send_id.next() };
94    match priority {
95      Priority::LOW      => self.low_prio_frames.push_back((send_id, frame)),
96      Priority::NORMAL   => self.frames.push_back((send_id, frame)),
97      Priority::CRITICAL => self.priority_frames.push_front((send_id, frame)),
98    }
99    let (wait, wait_handle) = Wait::new();
100    self.outbox.insert(send_id, wait_handle);
101    if let Some(reply) = expected_reply {
102      trace!("channel {} state is now waiting for {:?}", channel_id, reply);
103      self.expected_replies.entry(channel_id).or_default().push_back(reply);
104    }
105    wait
106  }
107
108  fn pop(&mut self, flow: bool) -> Option<(SendId, AMQPFrame)> {
109    self.priority_frames.pop_front().or_else(|| self.frames.pop_front()).or_else(|| if flow { self.low_prio_frames.pop_front() } else { None })
110  }
111
112  fn drop_pending(&mut self) {
113    self.priority_frames.clear();
114    self.frames.clear();
115    self.low_prio_frames.clear();
116    self.expected_replies.clear();
117    for (_, wait_handle) in self.outbox.drain() {
118      wait_handle.finish(());
119    }
120  }
121}