1#![allow(clippy::let_underscore_future)]
2use std::{fmt, marker::PhantomData, rc::Rc};
3
4use ntex_io::{DispatcherConfig, IoBoxed};
5use ntex_router::{IntoPattern, Router, RouterBuilder};
6use ntex_service::{boxed, fn_service, IntoService, Pipeline, Service};
7use ntex_util::future::{Either, Ready};
8use ntex_util::time::{sleep, Millis, Seconds};
9
10use crate::v3::{codec, shared::MqttShared, sink::MqttSink, ControlAck, Publish};
11use crate::{error::MqttError, io::Dispatcher};
12
13use super::{control::Control, dispatcher::create_dispatcher};
14
15pub struct Client {
17 io: IoBoxed,
18 shared: Rc<MqttShared>,
19 keepalive: Seconds,
20 session_present: bool,
21 max_receive: usize,
22 config: DispatcherConfig,
23}
24
25impl fmt::Debug for Client {
26 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27 f.debug_struct("v3::Client")
28 .field("keepalive", &self.keepalive)
29 .field("session_present", &self.session_present)
30 .field("max_receive", &self.max_receive)
31 .field("config", &self.config)
32 .finish()
33 }
34}
35
36impl Client {
37 pub(super) fn new(
39 io: IoBoxed,
40 shared: Rc<MqttShared>,
41 session_present: bool,
42 keepalive_timeout: Seconds,
43 max_receive: usize,
44 config: DispatcherConfig,
45 ) -> Self {
46 Client {
47 io,
48 shared,
49 session_present,
50 max_receive,
51 config,
52 keepalive: keepalive_timeout,
53 }
54 }
55}
56
57impl Client {
58 #[inline]
59 pub fn sink(&self) -> MqttSink {
61 MqttSink::new(self.shared.clone())
62 }
63
64 #[inline]
65 pub fn session_present(&self) -> bool {
67 self.session_present
68 }
69
70 pub fn resource<T, F, U>(self, address: T, service: F) -> ClientRouter<U::Error, U::Error>
72 where
73 T: IntoPattern,
74 F: IntoService<U, Publish>,
75 U: Service<Publish, Response = ()> + 'static,
76 {
77 let mut builder = Router::build();
78 builder.path(address, 0);
79 let handlers = vec![Pipeline::new(boxed::service(service.into_service()))];
80
81 ClientRouter {
82 builder,
83 handlers,
84 io: self.io,
85 shared: self.shared,
86 keepalive: self.keepalive,
87 config: self.config,
88 max_receive: self.max_receive,
89 _t: PhantomData,
90 }
91 }
92
93 pub async fn start_default(self) {
97 if self.keepalive.non_zero() {
98 let _ =
99 ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
100 }
101
102 let dispatcher = create_dispatcher(
103 self.shared.clone(),
104 self.max_receive,
105 fn_service(|pkt| Ready::Ok(Either::Right(pkt))),
106 fn_service(|msg: Control<()>| Ready::<_, ()>::Ok(msg.disconnect())),
107 );
108
109 let _ = Dispatcher::new(self.io, self.shared.clone(), dispatcher, &self.config).await;
110 }
111
112 pub async fn start<F, S, E>(self, service: F) -> Result<(), MqttError<E>>
114 where
115 E: 'static,
116 F: IntoService<S, Control<E>> + 'static,
117 S: Service<Control<E>, Response = ControlAck, Error = E> + 'static,
118 {
119 if self.keepalive.non_zero() {
120 let _ =
121 ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
122 }
123
124 let dispatcher = create_dispatcher(
125 self.shared.clone(),
126 self.max_receive,
127 fn_service(|pkt| Ready::Ok(Either::Right(pkt))),
128 service.into_service(),
129 );
130
131 Dispatcher::new(self.io, self.shared.clone(), dispatcher, &self.config).await
132 }
133
134 pub fn into_inner(self) -> (IoBoxed, codec::Codec) {
136 (self.io, self.shared.codec.clone())
137 }
138}
139
140type Handler<E> = boxed::BoxService<Publish, (), E>;
141
142pub struct ClientRouter<Err, PErr> {
144 builder: RouterBuilder<usize>,
145 handlers: Vec<Pipeline<Handler<PErr>>>,
146 io: IoBoxed,
147 shared: Rc<MqttShared>,
148 keepalive: Seconds,
149 max_receive: usize,
150 config: DispatcherConfig,
151 _t: PhantomData<Err>,
152}
153
154impl<Err, PErr> fmt::Debug for ClientRouter<Err, PErr> {
155 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
156 f.debug_struct("v3::ClientRouter")
157 .field("keepalive", &self.keepalive)
158 .field("max_receive", &self.max_receive)
159 .field("config", &self.config)
160 .finish()
161 }
162}
163
164impl<Err, PErr> ClientRouter<Err, PErr>
165where
166 Err: From<PErr> + 'static,
167 PErr: 'static,
168{
169 pub fn resource<T, F, S>(mut self, address: T, service: F) -> Self
171 where
172 T: IntoPattern,
173 F: IntoService<S, Publish>,
174 S: Service<Publish, Response = (), Error = PErr> + 'static,
175 {
176 self.builder.path(address, self.handlers.len());
177 self.handlers.push(Pipeline::new(boxed::service(service.into_service())));
178 self
179 }
180
181 pub async fn start_default(self) {
183 if self.keepalive.non_zero() {
184 let _ =
185 ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
186 }
187
188 let dispatcher = create_dispatcher(
189 self.shared.clone(),
190 self.max_receive,
191 dispatch(self.builder.finish(), self.handlers),
192 fn_service(|msg: Control<Err>| Ready::<_, Err>::Ok(msg.disconnect())),
193 );
194
195 let _ = Dispatcher::new(self.io, self.shared.clone(), dispatcher, &self.config).await;
196 }
197
198 pub async fn start<F, S>(self, service: F) -> Result<(), MqttError<Err>>
200 where
201 F: IntoService<S, Control<Err>>,
202 S: Service<Control<Err>, Response = ControlAck, Error = Err> + 'static,
203 {
204 if self.keepalive.non_zero() {
205 let _ =
206 ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
207 }
208
209 let dispatcher = create_dispatcher(
210 self.shared.clone(),
211 self.max_receive,
212 dispatch(self.builder.finish(), self.handlers),
213 service.into_service(),
214 );
215
216 Dispatcher::new(self.io, self.shared.clone(), dispatcher, &self.config).await
217 }
218}
219
220fn dispatch<Err, PErr>(
221 router: Router<usize>,
222 handlers: Vec<Pipeline<Handler<PErr>>>,
223) -> impl Service<Publish, Response = Either<(), Publish>, Error = Err>
224where
225 PErr: 'static,
226 Err: From<PErr>,
227{
228 let handlers = Rc::new(handlers);
229
230 fn_service(move |mut req: Publish| {
231 if let Some((idx, _info)) = router.recognize(req.topic_mut()) {
232 let idx = *idx;
234 let handlers = handlers.clone();
235 Either::Left(async move { call(req, handlers[idx].clone()).await })
236 } else {
237 Either::Right(Ready::<_, Err>::Ok(Either::Right(req)))
238 }
239 })
240}
241
242async fn call<S, Err, PErr>(req: Publish, srv: Pipeline<S>) -> Result<Either<(), Publish>, Err>
243where
244 S: Service<Publish, Response = (), Error = PErr>,
245 Err: From<PErr>,
246{
247 match srv.call(req).await {
248 Ok(_) => Ok(Either::Left(())),
249 Err(err) => Err(err.into()),
250 }
251}
252
253async fn keepalive(sink: MqttSink, timeout: Seconds) {
254 log::debug!("start mqtt client keep-alive task");
255
256 let keepalive = Millis::from(timeout);
257 loop {
258 sleep(keepalive).await;
259
260 if !sink.is_open() || !sink.ping() {
261 log::debug!("mqtt client connection is closed, stopping keep-alive task");
263 break;
264 }
265 }
266}