Skip to main content

ntex_mqtt/
control.rs

1//! Control message for connection management service
2use std::{io, marker::PhantomData};
3
4use ntex_service::{Service, ServiceCtx, ServiceFactory};
5
6use crate::error;
7
8/// Connection control messages
9#[derive(Debug)]
10pub enum Control<E> {
11    /// Write back-pressure is enabled/disabled
12    WrBackpressure(WrBackpressure),
13    /// Dispatcher is preparing for shutdown.
14    ///
15    /// The control service will receive this message only once.
16    /// After receiving this message dispatcher stops.
17    Stop(Reason<E>),
18}
19
20/// Dispatcher stop reasons
21#[derive(Debug)]
22pub enum Reason<E> {
23    /// Unhandled application level error from handshake, publish and control services
24    Error(Error<E>),
25    /// Protocol level error
26    Protocol(ProtocolError),
27    /// Peer is gone
28    PeerGone(PeerGone),
29}
30
31impl<E> Control<E> {
32    pub(super) fn wr(state: bool) -> Self {
33        Control::WrBackpressure(WrBackpressure(state))
34    }
35
36    pub(super) fn err(err: E) -> Self {
37        Control::Stop(Reason::Error(Error::new(err)))
38    }
39
40    pub(super) fn peer_gone(err: Option<io::Error>) -> Self {
41        Control::Stop(Reason::PeerGone(PeerGone(err)))
42    }
43
44    pub(super) fn proto(err: error::ProtocolError) -> Self {
45        Control::Stop(Reason::Protocol(ProtocolError::new(err)))
46    }
47}
48
49/// Write back-pressure `CtlFrame` message
50#[derive(Debug, Copy, Clone)]
51pub struct WrBackpressure(bool);
52
53impl WrBackpressure {
54    #[inline]
55    /// Is write back-pressure enabled
56    pub fn enabled(&self) -> bool {
57        self.0
58    }
59}
60
61/// Service level error
62#[derive(Debug, Clone)]
63pub struct Error<E> {
64    err: E,
65}
66
67impl<E> Error<E> {
68    pub fn new(err: E) -> Self {
69        Self { err }
70    }
71
72    #[inline]
73    /// Returns reference to mqtt error
74    pub fn get_ref(&self) -> &E {
75        &self.err
76    }
77
78    #[inline]
79    /// Return inner error
80    pub fn into(self) -> E {
81        self.err
82    }
83}
84
85/// Protocol level error
86#[derive(Debug, Clone)]
87pub struct ProtocolError {
88    err: error::ProtocolError,
89}
90
91impl ProtocolError {
92    pub fn new(err: error::ProtocolError) -> Self {
93        Self { err }
94    }
95
96    #[inline]
97    /// Returns reference to a protocol error
98    pub fn get_ref(&self) -> &error::ProtocolError {
99        &self.err
100    }
101
102    #[inline]
103    /// Return inner error
104    pub fn into(self) -> error::ProtocolError {
105        self.err
106    }
107}
108
109#[derive(Debug)]
110/// Peer gone control message
111pub struct PeerGone(pub(crate) Option<io::Error>);
112
113impl PeerGone {
114    #[inline]
115    /// Returns error reference
116    pub fn err(&self) -> Option<&io::Error> {
117        self.0.as_ref()
118    }
119
120    #[inline]
121    /// Take error
122    pub fn into(self) -> Option<io::Error> {
123        self.0
124    }
125}
126
127/// Default control service
128#[derive(Debug)]
129pub struct DefaultControlService<S, E, R, Err = ()>(PhantomData<(S, E, R, Err)>);
130
131impl<S, E, R, Err> Default for DefaultControlService<S, E, R, Err> {
132    fn default() -> Self {
133        DefaultControlService(PhantomData)
134    }
135}
136
137impl<S, E, R, Err> ServiceFactory<Control<E>, S> for DefaultControlService<S, E, R, Err> {
138    type Response = Option<R>;
139    type Error = E;
140    type InitError = Err;
141    type Service = DefaultControlService<S, E, R, ()>;
142
143    async fn create(&self, _: S) -> Result<Self::Service, Self::InitError> {
144        Ok(DefaultControlService(PhantomData))
145    }
146}
147
148impl<S, E, R> Service<Control<E>> for DefaultControlService<S, E, R, ()> {
149    type Response = Option<R>;
150    type Error = E;
151
152    async fn call(
153        &self,
154        _: Control<E>,
155        _: ServiceCtx<'_, Self>,
156    ) -> Result<Self::Response, Self::Error> {
157        log::warn!("MQTT5 Control service is not configured");
158        Ok(None)
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165
166    #[test]
167    fn test_debug() {
168        // WrBackpressure, Error, PeerGone
169        assert!(format!("{:?}", WrBackpressure(false)).contains("WrBackpressure"));
170        assert!(format!("{:?}", Error { err: () }).contains("Error"));
171        assert!(format!("{:?}", PeerGone(None)).contains("PeerGone"));
172    }
173}