1use log::trace;
2use amq_protocol::frame::AMQPFrame;
3use parking_lot::Mutex;
4
5use std::{
6 collections::{VecDeque, HashMap},
7 sync::Arc,
8};
9
10use crate::{
11 channel::Reply,
12 id_sequence::IdSequence,
13 wait::{Wait, WaitHandle},
14};
15
16pub(crate) type SendId = u64;
17
18#[derive(Clone, Debug)]
19pub(crate) enum Priority {
20 LOW,
21 NORMAL,
22 CRITICAL,
23}
24
25impl Default for Priority {
26 fn default() -> Self {
27 Priority::NORMAL
28 }
29}
30
31#[derive(Clone, Debug, Default)]
32pub(crate) struct Frames {
33 inner: Arc<Mutex<Inner>>,
34}
35
36impl Frames {
37 pub(crate) fn push(&self, channel_id: u16, priority: Priority, frame: AMQPFrame, expected_reply: Option<Reply>) -> Wait<()> {
38 self.inner.lock().push(channel_id, priority, frame, expected_reply)
39 }
40
41 pub(crate) fn retry(&self, send_id: SendId, frame: AMQPFrame) {
42 self.inner.lock().priority_frames.push_back((send_id, frame))
43 }
44
45 pub(crate) fn pop(&self, flow: bool) -> Option<(SendId, AMQPFrame)> {
46 self.inner.lock().pop(flow)
47 }
48
49 pub(crate) fn next_expected_reply(&self, channel_id: u16) -> Option<Reply> {
50 self.inner.lock().expected_replies.get_mut(&channel_id).and_then(|replies| replies.pop_front())
51 }
52
53 pub(crate) fn clear_expected_replies(&self, channel_id: u16) {
54 self.inner.lock().expected_replies.remove(&channel_id);
55 }
56
57 pub(crate) fn mark_sent(&self, send_id: SendId) {
58 if let Some(send) = self.inner.lock().outbox.remove(&send_id) {
59 send.finish(());
60 }
61 }
62
63 pub(crate) fn drop_pending(&self) {
64 self.inner.lock().drop_pending();
65 }
66}
67
68#[derive(Debug)]
69struct Inner {
70 priority_frames: VecDeque<(SendId, AMQPFrame)>,
71 frames: VecDeque<(SendId, AMQPFrame)>,
72 low_prio_frames: VecDeque<(SendId, AMQPFrame)>,
73 expected_replies: HashMap<u16, VecDeque<Reply>>,
74 outbox: HashMap<SendId, WaitHandle<()>>,
75 send_id: IdSequence<SendId>,
76}
77
78impl Default for Inner {
79 fn default() -> Self {
80 Self {
81 priority_frames: VecDeque::default(),
82 frames: VecDeque::default(),
83 low_prio_frames: VecDeque::default(),
84 expected_replies: HashMap::default(),
85 outbox: HashMap::default(),
86 send_id: IdSequence::new(false),
87 }
88 }
89}
90
91impl Inner {
92 fn push(&mut self, channel_id: u16, priority: Priority, frame: AMQPFrame, expected_reply: Option<Reply>) -> Wait<()> {
93 let send_id = if let Priority::CRITICAL = priority { 0 } else { self.send_id.next() };
94 match priority {
95 Priority::LOW => self.low_prio_frames.push_back((send_id, frame)),
96 Priority::NORMAL => self.frames.push_back((send_id, frame)),
97 Priority::CRITICAL => self.priority_frames.push_front((send_id, frame)),
98 }
99 let (wait, wait_handle) = Wait::new();
100 self.outbox.insert(send_id, wait_handle);
101 if let Some(reply) = expected_reply {
102 trace!("channel {} state is now waiting for {:?}", channel_id, reply);
103 self.expected_replies.entry(channel_id).or_default().push_back(reply);
104 }
105 wait
106 }
107
108 fn pop(&mut self, flow: bool) -> Option<(SendId, AMQPFrame)> {
109 self.priority_frames.pop_front().or_else(|| self.frames.pop_front()).or_else(|| if flow { self.low_prio_frames.pop_front() } else { None })
110 }
111
112 fn drop_pending(&mut self) {
113 self.priority_frames.clear();
114 self.frames.clear();
115 self.low_prio_frames.clear();
116 self.expected_replies.clear();
117 for (_, wait_handle) in self.outbox.drain() {
118 wait_handle.finish(());
119 }
120 }
121}