1use std::{cell::RefCell, collections::VecDeque, fmt, io};
2
3use ntex_amqp_codec::protocol;
4use ntex_util::{future::Either, task::LocalWaker};
5
6use crate::cell::Cell;
7use crate::error::AmqpProtocolError;
8use crate::rcvlink::ReceiverLink;
9use crate::session::{Session, SessionInner};
10use crate::sndlink::SenderLink;
11use crate::types::Wrapper;
12
13pub struct ControlFrame(pub(super) Cell<FrameInner>);
14
15pub(super) struct FrameInner {
16 pub(super) kind: ControlFrameKind,
17 pub(super) session: Option<Cell<SessionInner>>,
18}
19
20impl fmt::Debug for ControlFrame {
21 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
22 f.debug_struct("ControlFrame")
23 .field("kind", &self.0.get_ref().kind)
24 .finish()
25 }
26}
27
28#[derive(Debug)]
29pub enum ControlFrameKind {
30 AttachSender(protocol::Attach, Wrapper<protocol::Attach>, SenderLink),
31 AttachReceiver(protocol::Attach, Wrapper<protocol::Attach>, ReceiverLink),
32 Flow(protocol::Flow, SenderLink),
33 LocalDetachSender(protocol::Detach, SenderLink),
34 RemoteDetachSender(protocol::Detach, SenderLink),
35 LocalDetachReceiver(protocol::Detach, ReceiverLink),
36 RemoteDetachReceiver(protocol::Detach, ReceiverLink),
37 LocalSessionEnded(Vec<Either<SenderLink, ReceiverLink>>),
38 RemoteSessionEnded(Vec<Either<SenderLink, ReceiverLink>>),
39 ProtocolError(AmqpProtocolError),
40 Disconnected(Option<io::Error>),
41 Closed,
42}
43
44impl ControlFrame {
45 pub(crate) fn new(session: Cell<SessionInner>, kind: ControlFrameKind) -> Self {
46 ControlFrame(Cell::new(FrameInner {
47 session: Some(session),
48 kind,
49 }))
50 }
51
52 pub(crate) fn new_kind(kind: ControlFrameKind) -> Self {
53 ControlFrame(Cell::new(FrameInner {
54 session: None,
55 kind,
56 }))
57 }
58
59 pub(crate) fn clone(&self) -> Self {
60 ControlFrame(self.0.clone())
61 }
62
63 pub(crate) fn session_cell(&self) -> &Cell<SessionInner> {
64 self.0.get_ref().session.as_ref().unwrap()
65 }
66
67 #[inline]
68 pub fn kind(&self) -> &ControlFrameKind {
69 &self.0.kind
70 }
71
72 pub fn session(&self) -> Option<Session> {
73 self.0.get_ref().session.clone().map(Session::new)
74 }
75}
76
77#[derive(Default, Debug)]
78pub(crate) struct ControlQueue {
79 pub(crate) pending: RefCell<VecDeque<ControlFrame>>,
80 pub(crate) waker: LocalWaker,
81}
82
83impl ControlQueue {
84 pub(crate) fn enqueue_frame(&self, frame: ControlFrame) {
85 self.pending.borrow_mut().push_back(frame);
86 self.waker.wake();
87 }
88}