Skip to main content

ntex_amqp/
types.rs

1use std::{cell::RefCell, fmt, rc::Rc};
2
3use ntex_bytes::ByteString;
4use ntex_router::Path;
5use ntex_util::future::Either;
6
7use crate::codec::protocol::{Accepted, Attach, DeliveryState, Detach, Error, Flow, Rejected};
8use crate::{
9    Handle, State, error::AmqpProtocolError, rcvlink::ReceiverLink, session::Session,
10    sndlink::SenderLink,
11};
12
13pub use crate::codec::protocol::Transfer;
14
15#[derive(Debug)]
16pub enum Message {
17    Attached(Attach, ReceiverLink),
18    Detached(ReceiverLink),
19    DetachedAll(Vec<ReceiverLink>),
20    Transfer(ReceiverLink),
21}
22
23#[derive(Debug)]
24pub(crate) enum Action {
25    None,
26    AttachSender(SenderLink, Attach, Attach),
27    AttachReceiver(ReceiverLink, Attach, Attach),
28    DetachSender(SenderLink, Detach),
29    DetachReceiver(ReceiverLink, Detach),
30    SessionEnded(Vec<Either<SenderLink, ReceiverLink>>),
31    Flow(SenderLink, Flow),
32    Transfer(ReceiverLink),
33    RemoteClose(AmqpProtocolError),
34}
35
36pub struct Link<S> {
37    pub(crate) state: State<S>,
38    pub(crate) link: ReceiverLink,
39    pub(crate) path: Path<ByteString>,
40    pub(crate) attach: Rc<Attach>,
41}
42
43impl<S> Link<S> {
44    pub(crate) fn new(
45        attach: Attach,
46        link: ReceiverLink,
47        state: State<S>,
48        path: ByteString,
49    ) -> Self {
50        Link {
51            state,
52            link,
53            attach: Rc::new(attach),
54            path: Path::new(path),
55        }
56    }
57
58    pub fn path(&self) -> &Path<ByteString> {
59        &self.path
60    }
61
62    pub fn path_mut(&mut self) -> &mut Path<ByteString> {
63        &mut self.path
64    }
65
66    pub fn state(&self) -> &State<S> {
67        &self.state
68    }
69
70    pub fn handle(&self) -> Handle {
71        self.link.handle()
72    }
73
74    pub fn frame(&self) -> &Attach {
75        self.attach.as_ref()
76    }
77
78    pub fn session(&self) -> &Session {
79        self.link.session()
80    }
81
82    pub fn receiver(&self) -> &ReceiverLink {
83        &self.link
84    }
85
86    pub fn receiver_mut(&mut self) -> &mut ReceiverLink {
87        &mut self.link
88    }
89
90    pub fn link_credit(&self, credit: u32) {
91        self.link.set_link_credit(credit);
92    }
93}
94
95impl<S> Clone for Link<S> {
96    fn clone(&self) -> Self {
97        Self {
98            state: self.state.clone(),
99            link: self.link.clone(),
100            path: self.path.clone(),
101            attach: self.attach.clone(),
102        }
103    }
104}
105
106impl<S> fmt::Debug for Link<S> {
107    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
108        fmt.debug_struct("Link<S>")
109            .field("frame", self.frame())
110            .finish()
111    }
112}
113
114#[derive(Debug, From)]
115pub enum Outcome {
116    Accept,
117    Reject,
118    Error(Error),
119}
120
121impl Outcome {
122    pub(crate) fn into_delivery_state(self) -> DeliveryState {
123        match self {
124            Outcome::Accept => DeliveryState::Accepted(Accepted {}),
125            Outcome::Reject => DeliveryState::Rejected(Rejected { error: None }),
126            Outcome::Error(e) => DeliveryState::Rejected(Rejected { error: Some(e) }),
127        }
128    }
129}
130
131#[derive(Debug)]
132pub struct Wrapper<T> {
133    inner: RefCell<Option<T>>,
134}
135
136impl<T> Wrapper<T> {
137    pub(crate) fn new(inner: T) -> Self {
138        Self {
139            inner: RefCell::new(Some(inner)),
140        }
141    }
142
143    pub(crate) fn take(&self) -> T {
144        self.inner.borrow_mut().take().unwrap()
145    }
146
147    #[allow(clippy::missing_panics_doc)]
148    pub fn with<F, R>(&self, f: F) -> R
149    where
150        F: FnOnce(&mut T) -> R,
151    {
152        f(self.inner.borrow_mut().as_mut().unwrap())
153    }
154}