1use std::{cell::RefCell, fmt, marker, num::NonZeroU16, rc::Rc};
2
3use ntex_bytes::ByteString;
4use ntex_io::{DispatcherConfig, IoBoxed};
5use ntex_router::{IntoPattern, Path, Router, RouterBuilder};
6use ntex_service::{boxed, fn_service, IntoService, Pipeline, Service};
7use ntex_util::time::{sleep, Millis, Seconds};
8use ntex_util::{future::Either, future::Ready, HashMap};
9
10use crate::v5::publish::{Publish, PublishAck};
11use crate::v5::{codec, shared::MqttShared, sink::MqttSink, ControlAck};
12use crate::{error::MqttError, io::Dispatcher};
13
14use super::{control::Control, dispatcher::create_dispatcher};
15
16pub struct Client {
18 io: IoBoxed,
19 shared: Rc<MqttShared>,
20 keepalive: Seconds,
21 max_receive: usize,
22 config: DispatcherConfig,
23 pkt: Box<codec::ConnectAck>,
24}
25
26impl fmt::Debug for Client {
27 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28 f.debug_struct("v5::Client")
29 .field("keepalive", &self.keepalive)
30 .field("max_receive", &self.max_receive)
31 .field("connect", &self.pkt)
32 .field("config", &self.config)
33 .finish()
34 }
35}
36
37impl Client {
38 pub(super) fn new(
40 io: IoBoxed,
41 shared: Rc<MqttShared>,
42 pkt: Box<codec::ConnectAck>,
43 max_receive: u16,
44 keepalive: Seconds,
45 config: DispatcherConfig,
46 ) -> Self {
47 Client { io, pkt, shared, keepalive, config, max_receive: max_receive as usize }
48 }
49}
50
51impl Client {
52 #[inline]
53 pub fn sink(&self) -> MqttSink {
55 MqttSink::new(self.shared.clone())
56 }
57
58 #[inline]
59 pub fn session_present(&self) -> bool {
61 self.pkt.session_present
62 }
63
64 #[inline]
65 pub fn packet(&self) -> &codec::ConnectAck {
67 &self.pkt
68 }
69
70 #[inline]
71 pub fn packet_mut(&mut self) -> &mut codec::ConnectAck {
73 &mut self.pkt
74 }
75
76 pub fn resource<T, F, U, E>(self, address: T, service: F) -> ClientRouter<E, U::Error>
78 where
79 T: IntoPattern,
80 F: IntoService<U, Publish>,
81 U: Service<Publish, Response = PublishAck> + 'static,
82 E: From<U::Error>,
83 PublishAck: TryFrom<U::Error, Error = E>,
84 {
85 let mut builder = Router::build();
86 builder.path(address, 0);
87 let handlers = vec![Pipeline::new(boxed::service(service.into_service()))];
88
89 ClientRouter {
90 builder,
91 handlers,
92 io: self.io,
93 shared: self.shared,
94 keepalive: self.keepalive,
95 config: self.config,
96 max_receive: self.max_receive,
97 _t: marker::PhantomData,
98 }
99 }
100
101 pub async fn start_default(self) {
105 if self.keepalive.non_zero() {
106 let _ =
107 ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
108 }
109
110 let dispatcher = create_dispatcher(
111 MqttSink::new(self.shared.clone()),
112 self.max_receive,
113 16,
114 fn_service(|pkt| Ready::Ok(Either::Left(pkt))),
115 fn_service(|msg: Control<()>| {
116 Ready::Ok(msg.disconnect(codec::Disconnect::default()))
117 }),
118 );
119
120 let _ = Dispatcher::new(self.io, self.shared, dispatcher, &self.config).await;
121 }
122
123 pub async fn start<F, S, E>(self, service: F) -> Result<(), MqttError<E>>
125 where
126 E: 'static,
127 F: IntoService<S, Control<E>> + 'static,
128 S: Service<Control<E>, Response = ControlAck, Error = E> + 'static,
129 {
130 if self.keepalive.non_zero() {
131 let _ =
132 ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
133 }
134
135 let dispatcher = create_dispatcher(
136 MqttSink::new(self.shared.clone()),
137 self.max_receive,
138 16,
139 fn_service(|pkt| Ready::Ok(Either::Left(pkt))),
140 service.into_service(),
141 );
142
143 Dispatcher::new(self.io, self.shared, dispatcher, &self.config).await
144 }
145
146 pub fn into_inner(self) -> (IoBoxed, codec::Codec) {
148 (self.io, self.shared.codec.clone())
149 }
150}
151
152type Handler<E> = boxed::BoxService<Publish, PublishAck, E>;
153
154pub struct ClientRouter<Err, PErr> {
156 io: IoBoxed,
157 builder: RouterBuilder<usize>,
158 handlers: Vec<Pipeline<Handler<PErr>>>,
159 shared: Rc<MqttShared>,
160 keepalive: Seconds,
161 config: DispatcherConfig,
162 max_receive: usize,
163 _t: marker::PhantomData<Err>,
164}
165
166impl<Err, PErr> fmt::Debug for ClientRouter<Err, PErr> {
167 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
168 f.debug_struct("v5::ClientRouter")
169 .field("keepalive", &self.keepalive)
170 .field("config", &self.config)
171 .field("max_receive", &self.max_receive)
172 .finish()
173 }
174}
175
176impl<Err, PErr> ClientRouter<Err, PErr>
177where
178 Err: From<PErr> + 'static,
179 PublishAck: TryFrom<PErr, Error = Err>,
180 PErr: 'static,
181{
182 pub fn resource<T, F, S>(mut self, address: T, service: F) -> Self
184 where
185 T: IntoPattern,
186 F: IntoService<S, Publish>,
187 S: Service<Publish, Response = PublishAck, Error = PErr> + 'static,
188 {
189 self.builder.path(address, self.handlers.len());
190 self.handlers.push(Pipeline::new(boxed::service(service.into_service())));
191 self
192 }
193
194 pub async fn start_default(self) {
196 if self.keepalive.non_zero() {
197 let _ =
198 ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
199 }
200
201 let dispatcher = create_dispatcher(
202 MqttSink::new(self.shared.clone()),
203 self.max_receive,
204 16,
205 dispatch(self.builder.finish(), self.handlers),
206 fn_service(|msg: Control<Err>| {
207 Ready::Ok(msg.disconnect(codec::Disconnect::default()))
208 }),
209 );
210
211 let _ = Dispatcher::new(self.io, self.shared, dispatcher, &self.config).await;
212 }
213
214 pub async fn start<F, S>(self, service: F) -> Result<(), MqttError<Err>>
216 where
217 F: IntoService<S, Control<Err>>,
218 S: Service<Control<Err>, Response = ControlAck, Error = Err> + 'static,
219 {
220 if self.keepalive.non_zero() {
221 let _ =
222 ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
223 }
224
225 let dispatcher = create_dispatcher(
226 MqttSink::new(self.shared.clone()),
227 self.max_receive,
228 16,
229 dispatch(self.builder.finish(), self.handlers),
230 service.into_service(),
231 );
232
233 Dispatcher::new(self.io, self.shared, dispatcher, &self.config).await
234 }
235
236 pub fn into_inner(self) -> (IoBoxed, codec::Codec) {
238 (self.io, self.shared.codec.clone())
239 }
240}
241
242fn dispatch<Err, PErr>(
243 router: Router<usize>,
244 handlers: Vec<Pipeline<Handler<PErr>>>,
245) -> impl Service<Publish, Response = Either<Publish, PublishAck>, Error = Err>
246where
247 PErr: 'static,
248 PublishAck: TryFrom<PErr, Error = Err>,
249{
250 let aliases: RefCell<HashMap<NonZeroU16, (usize, Path<ByteString>)>> =
252 RefCell::new(HashMap::default());
253 let handlers = Rc::new(handlers);
254
255 fn_service(move |mut req: Publish| {
256 let idx = if !req.publish_topic().is_empty() {
257 if let Some((idx, _info)) = router.recognize(req.topic_mut()) {
258 if let Some(alias) = req.packet().properties.topic_alias {
260 aliases.borrow_mut().insert(alias, (*idx, req.topic().clone()));
261 }
262 *idx
263 } else {
264 return Either::Right(Ready::<_, Err>::Ok(Either::Left(req)));
265 }
266 }
267 else if let Some(ref alias) = req.packet().properties.topic_alias {
269 let aliases = aliases.borrow();
270 if let Some(item) = aliases.get(alias) {
271 *req.topic_mut() = item.1.clone();
272 item.0
273 } else {
274 log::error!("Unknown topic alias: {:?}", alias);
275 return Either::Right(Ready::<_, Err>::Ok(Either::Left(req)));
276 }
277 } else {
278 return Either::Right(Ready::<_, Err>::Ok(Either::Left(req)));
279 };
280
281 let handlers = handlers.clone();
283 Either::Left(async move { call(req, handlers[idx].clone()).await })
284 })
285}
286
287async fn call<S, Err>(
288 req: Publish,
289 srv: Pipeline<S>,
290) -> Result<Either<Publish, PublishAck>, Err>
291where
292 S: Service<Publish, Response = PublishAck>,
293 PublishAck: TryFrom<S::Error, Error = Err>,
294{
295 match srv.call(req).await {
296 Ok(ack) => Ok(Either::Right(ack)),
297 Err(err) => match PublishAck::try_from(err) {
298 Ok(ack) => Ok(Either::Right(ack)),
299 Err(err) => Err(err),
300 },
301 }
302}
303
304async fn keepalive(sink: MqttSink, timeout: Seconds) {
305 log::debug!("start mqtt client keep-alive task");
306
307 let keepalive = Millis::from(timeout);
308 loop {
309 sleep(keepalive).await;
310
311 if !sink.is_open() || !sink.ping() {
312 log::debug!("mqtt client connection is closed, stopping keep-alive task");
314 break;
315 }
316 }
317}