ntex_mqtt/v3/client/
connection.rs

1#![allow(clippy::let_underscore_future)]
2use std::{fmt, marker::PhantomData, rc::Rc};
3
4use ntex_io::{DispatcherConfig, IoBoxed};
5use ntex_router::{IntoPattern, Router, RouterBuilder};
6use ntex_service::{boxed, fn_service, IntoService, Pipeline, Service};
7use ntex_util::future::{Either, Ready};
8use ntex_util::time::{sleep, Millis, Seconds};
9
10use crate::v3::{codec, shared::MqttShared, sink::MqttSink, ControlAck, Publish};
11use crate::{error::MqttError, io::Dispatcher};
12
13use super::{control::Control, dispatcher::create_dispatcher};
14
15/// Mqtt client
16pub struct Client {
17    io: IoBoxed,
18    shared: Rc<MqttShared>,
19    keepalive: Seconds,
20    session_present: bool,
21    max_receive: usize,
22    config: DispatcherConfig,
23}
24
25impl fmt::Debug for Client {
26    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27        f.debug_struct("v3::Client")
28            .field("keepalive", &self.keepalive)
29            .field("session_present", &self.session_present)
30            .field("max_receive", &self.max_receive)
31            .field("config", &self.config)
32            .finish()
33    }
34}
35
36impl Client {
37    /// Construct new `Dispatcher` instance with outgoing messages stream.
38    pub(super) fn new(
39        io: IoBoxed,
40        shared: Rc<MqttShared>,
41        session_present: bool,
42        keepalive_timeout: Seconds,
43        max_receive: usize,
44        config: DispatcherConfig,
45    ) -> Self {
46        Client {
47            io,
48            shared,
49            session_present,
50            max_receive,
51            config,
52            keepalive: keepalive_timeout,
53        }
54    }
55}
56
57impl Client {
58    #[inline]
59    /// Get client sink
60    pub fn sink(&self) -> MqttSink {
61        MqttSink::new(self.shared.clone())
62    }
63
64    #[inline]
65    /// Indicates whether there is already stored Session state
66    pub fn session_present(&self) -> bool {
67        self.session_present
68    }
69
70    /// Configure mqtt resource for a specific topic
71    pub fn resource<T, F, U>(self, address: T, service: F) -> ClientRouter<U::Error, U::Error>
72    where
73        T: IntoPattern,
74        F: IntoService<U, Publish>,
75        U: Service<Publish, Response = ()> + 'static,
76    {
77        let mut builder = Router::build();
78        builder.path(address, 0);
79        let handlers = vec![Pipeline::new(boxed::service(service.into_service()))];
80
81        ClientRouter {
82            builder,
83            handlers,
84            io: self.io,
85            shared: self.shared,
86            keepalive: self.keepalive,
87            config: self.config,
88            max_receive: self.max_receive,
89            _t: PhantomData,
90        }
91    }
92
93    /// Run client with default control messages handler.
94    ///
95    /// Default handler closes connection on any control message.
96    pub async fn start_default(self) {
97        if self.keepalive.non_zero() {
98            let _ =
99                ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
100        }
101
102        let dispatcher = create_dispatcher(
103            self.shared.clone(),
104            self.max_receive,
105            fn_service(|pkt| Ready::Ok(Either::Right(pkt))),
106            fn_service(|msg: Control<()>| Ready::<_, ()>::Ok(msg.disconnect())),
107        );
108
109        let _ = Dispatcher::new(self.io, self.shared.clone(), dispatcher, &self.config).await;
110    }
111
112    /// Run client with provided control messages handler
113    pub async fn start<F, S, E>(self, service: F) -> Result<(), MqttError<E>>
114    where
115        E: 'static,
116        F: IntoService<S, Control<E>> + 'static,
117        S: Service<Control<E>, Response = ControlAck, Error = E> + 'static,
118    {
119        if self.keepalive.non_zero() {
120            let _ =
121                ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
122        }
123
124        let dispatcher = create_dispatcher(
125            self.shared.clone(),
126            self.max_receive,
127            fn_service(|pkt| Ready::Ok(Either::Right(pkt))),
128            service.into_service(),
129        );
130
131        Dispatcher::new(self.io, self.shared.clone(), dispatcher, &self.config).await
132    }
133
134    /// Get negotiated io stream and codec
135    pub fn into_inner(self) -> (IoBoxed, codec::Codec) {
136        (self.io, self.shared.codec.clone())
137    }
138}
139
140type Handler<E> = boxed::BoxService<Publish, (), E>;
141
142/// Mqtt client with routing capabilities
143pub struct ClientRouter<Err, PErr> {
144    builder: RouterBuilder<usize>,
145    handlers: Vec<Pipeline<Handler<PErr>>>,
146    io: IoBoxed,
147    shared: Rc<MqttShared>,
148    keepalive: Seconds,
149    max_receive: usize,
150    config: DispatcherConfig,
151    _t: PhantomData<Err>,
152}
153
154impl<Err, PErr> fmt::Debug for ClientRouter<Err, PErr> {
155    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
156        f.debug_struct("v3::ClientRouter")
157            .field("keepalive", &self.keepalive)
158            .field("max_receive", &self.max_receive)
159            .field("config", &self.config)
160            .finish()
161    }
162}
163
164impl<Err, PErr> ClientRouter<Err, PErr>
165where
166    Err: From<PErr> + 'static,
167    PErr: 'static,
168{
169    /// Configure mqtt resource for a specific topic
170    pub fn resource<T, F, S>(mut self, address: T, service: F) -> Self
171    where
172        T: IntoPattern,
173        F: IntoService<S, Publish>,
174        S: Service<Publish, Response = (), Error = PErr> + 'static,
175    {
176        self.builder.path(address, self.handlers.len());
177        self.handlers.push(Pipeline::new(boxed::service(service.into_service())));
178        self
179    }
180
181    /// Run client with default control messages handler
182    pub async fn start_default(self) {
183        if self.keepalive.non_zero() {
184            let _ =
185                ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
186        }
187
188        let dispatcher = create_dispatcher(
189            self.shared.clone(),
190            self.max_receive,
191            dispatch(self.builder.finish(), self.handlers),
192            fn_service(|msg: Control<Err>| Ready::<_, Err>::Ok(msg.disconnect())),
193        );
194
195        let _ = Dispatcher::new(self.io, self.shared.clone(), dispatcher, &self.config).await;
196    }
197
198    /// Run client and handle control messages
199    pub async fn start<F, S>(self, service: F) -> Result<(), MqttError<Err>>
200    where
201        F: IntoService<S, Control<Err>>,
202        S: Service<Control<Err>, Response = ControlAck, Error = Err> + 'static,
203    {
204        if self.keepalive.non_zero() {
205            let _ =
206                ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
207        }
208
209        let dispatcher = create_dispatcher(
210            self.shared.clone(),
211            self.max_receive,
212            dispatch(self.builder.finish(), self.handlers),
213            service.into_service(),
214        );
215
216        Dispatcher::new(self.io, self.shared.clone(), dispatcher, &self.config).await
217    }
218}
219
220fn dispatch<Err, PErr>(
221    router: Router<usize>,
222    handlers: Vec<Pipeline<Handler<PErr>>>,
223) -> impl Service<Publish, Response = Either<(), Publish>, Error = Err>
224where
225    PErr: 'static,
226    Err: From<PErr>,
227{
228    let handlers = Rc::new(handlers);
229
230    fn_service(move |mut req: Publish| {
231        if let Some((idx, _info)) = router.recognize(req.topic_mut()) {
232            // exec handler
233            let idx = *idx;
234            let handlers = handlers.clone();
235            Either::Left(async move { call(req, handlers[idx].clone()).await })
236        } else {
237            Either::Right(Ready::<_, Err>::Ok(Either::Right(req)))
238        }
239    })
240}
241
242async fn call<S, Err, PErr>(req: Publish, srv: Pipeline<S>) -> Result<Either<(), Publish>, Err>
243where
244    S: Service<Publish, Response = (), Error = PErr>,
245    Err: From<PErr>,
246{
247    match srv.call(req).await {
248        Ok(_) => Ok(Either::Left(())),
249        Err(err) => Err(err.into()),
250    }
251}
252
253async fn keepalive(sink: MqttSink, timeout: Seconds) {
254    log::debug!("start mqtt client keep-alive task");
255
256    let keepalive = Millis::from(timeout);
257    loop {
258        sleep(keepalive).await;
259
260        if !sink.is_open() || !sink.ping() {
261            // connection is closed
262            log::debug!("mqtt client connection is closed, stopping keep-alive task");
263            break;
264        }
265    }
266}