Skip to main content

ntex_amqp/
control.rs

1use std::{cell::RefCell, collections::VecDeque, fmt, io};
2
3use ntex_amqp_codec::protocol;
4use ntex_util::{future::Either, task::LocalWaker};
5
6use crate::cell::Cell;
7use crate::error::AmqpProtocolError;
8use crate::rcvlink::ReceiverLink;
9use crate::session::{Session, SessionInner};
10use crate::sndlink::SenderLink;
11use crate::types::Wrapper;
12
13pub struct ControlFrame(pub(super) Cell<FrameInner>);
14
15pub(super) struct FrameInner {
16    pub(super) kind: ControlFrameKind,
17    pub(super) session: Option<Cell<SessionInner>>,
18}
19
20impl fmt::Debug for ControlFrame {
21    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
22        f.debug_struct("ControlFrame")
23            .field("kind", &self.0.get_ref().kind)
24            .finish()
25    }
26}
27
28#[derive(Debug)]
29pub enum ControlFrameKind {
30    AttachSender(protocol::Attach, Wrapper<protocol::Attach>, SenderLink),
31    AttachReceiver(protocol::Attach, Wrapper<protocol::Attach>, ReceiverLink),
32    Flow(protocol::Flow, SenderLink),
33    LocalDetachSender(protocol::Detach, SenderLink),
34    RemoteDetachSender(protocol::Detach, SenderLink),
35    LocalDetachReceiver(protocol::Detach, ReceiverLink),
36    RemoteDetachReceiver(protocol::Detach, ReceiverLink),
37    LocalSessionEnded(Vec<Either<SenderLink, ReceiverLink>>),
38    RemoteSessionEnded(Vec<Either<SenderLink, ReceiverLink>>),
39    ProtocolError(AmqpProtocolError),
40    Disconnected(Option<io::Error>),
41    Closed,
42}
43
44impl ControlFrame {
45    pub(crate) fn new(session: Cell<SessionInner>, kind: ControlFrameKind) -> Self {
46        ControlFrame(Cell::new(FrameInner {
47            session: Some(session),
48            kind,
49        }))
50    }
51
52    pub(crate) fn new_kind(kind: ControlFrameKind) -> Self {
53        ControlFrame(Cell::new(FrameInner {
54            session: None,
55            kind,
56        }))
57    }
58
59    pub(crate) fn clone(&self) -> Self {
60        ControlFrame(self.0.clone())
61    }
62
63    pub(crate) fn session_cell(&self) -> &Cell<SessionInner> {
64        self.0.get_ref().session.as_ref().unwrap()
65    }
66
67    #[inline]
68    pub fn kind(&self) -> &ControlFrameKind {
69        &self.0.kind
70    }
71
72    pub fn session(&self) -> Option<Session> {
73        self.0.get_ref().session.clone().map(Session::new)
74    }
75}
76
77#[derive(Default, Debug)]
78pub(crate) struct ControlQueue {
79    pub(crate) pending: RefCell<VecDeque<ControlFrame>>,
80    pub(crate) waker: LocalWaker,
81}
82
83impl ControlQueue {
84    pub(crate) fn enqueue_frame(&self, frame: ControlFrame) {
85        self.pending.borrow_mut().push_back(frame);
86        self.waker.wake();
87    }
88}