1use std::{io, marker::PhantomData};
3
4use ntex_service::{Service, ServiceCtx, ServiceFactory};
5
6use crate::error;
7
8#[derive(Debug)]
10pub enum Control<E> {
11 WrBackpressure(WrBackpressure),
13 Stop(Reason<E>),
18}
19
20#[derive(Debug)]
22pub enum Reason<E> {
23 Error(Error<E>),
25 Protocol(ProtocolError),
27 PeerGone(PeerGone),
29}
30
31impl<E> Control<E> {
32 pub(super) fn wr(state: bool) -> Self {
33 Control::WrBackpressure(WrBackpressure(state))
34 }
35
36 pub(super) fn err(err: E) -> Self {
37 Control::Stop(Reason::Error(Error::new(err)))
38 }
39
40 pub(super) fn peer_gone(err: Option<io::Error>) -> Self {
41 Control::Stop(Reason::PeerGone(PeerGone(err)))
42 }
43
44 pub(super) fn proto(err: error::ProtocolError) -> Self {
45 Control::Stop(Reason::Protocol(ProtocolError::new(err)))
46 }
47}
48
49#[derive(Debug, Copy, Clone)]
51pub struct WrBackpressure(bool);
52
53impl WrBackpressure {
54 #[inline]
55 pub fn enabled(&self) -> bool {
57 self.0
58 }
59}
60
61#[derive(Debug, Clone)]
63pub struct Error<E> {
64 err: E,
65}
66
67impl<E> Error<E> {
68 pub fn new(err: E) -> Self {
69 Self { err }
70 }
71
72 #[inline]
73 pub fn get_ref(&self) -> &E {
75 &self.err
76 }
77
78 #[inline]
79 pub fn into(self) -> E {
81 self.err
82 }
83}
84
85#[derive(Debug, Clone)]
87pub struct ProtocolError {
88 err: error::ProtocolError,
89}
90
91impl ProtocolError {
92 pub fn new(err: error::ProtocolError) -> Self {
93 Self { err }
94 }
95
96 #[inline]
97 pub fn get_ref(&self) -> &error::ProtocolError {
99 &self.err
100 }
101
102 #[inline]
103 pub fn into(self) -> error::ProtocolError {
105 self.err
106 }
107}
108
109#[derive(Debug)]
110pub struct PeerGone(pub(crate) Option<io::Error>);
112
113impl PeerGone {
114 #[inline]
115 pub fn err(&self) -> Option<&io::Error> {
117 self.0.as_ref()
118 }
119
120 #[inline]
121 pub fn into(self) -> Option<io::Error> {
123 self.0
124 }
125}
126
127#[derive(Debug)]
129pub struct DefaultControlService<S, E, R, Err = ()>(PhantomData<(S, E, R, Err)>);
130
131impl<S, E, R, Err> Default for DefaultControlService<S, E, R, Err> {
132 fn default() -> Self {
133 DefaultControlService(PhantomData)
134 }
135}
136
137impl<S, E, R, Err> ServiceFactory<Control<E>, S> for DefaultControlService<S, E, R, Err> {
138 type Response = Option<R>;
139 type Error = E;
140 type InitError = Err;
141 type Service = DefaultControlService<S, E, R, ()>;
142
143 async fn create(&self, _: S) -> Result<Self::Service, Self::InitError> {
144 Ok(DefaultControlService(PhantomData))
145 }
146}
147
148impl<S, E, R> Service<Control<E>> for DefaultControlService<S, E, R, ()> {
149 type Response = Option<R>;
150 type Error = E;
151
152 async fn call(
153 &self,
154 _: Control<E>,
155 _: ServiceCtx<'_, Self>,
156 ) -> Result<Self::Response, Self::Error> {
157 log::warn!("MQTT5 Control service is not configured");
158 Ok(None)
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165
166 #[test]
167 fn test_debug() {
168 assert!(format!("{:?}", WrBackpressure(false)).contains("WrBackpressure"));
170 assert!(format!("{:?}", Error { err: () }).contains("Error"));
171 assert!(format!("{:?}", PeerGone(None)).contains("PeerGone"));
172 }
173}