ntex_mqtt/v5/client/
connection.rs

1use std::{cell::RefCell, fmt, marker, num::NonZeroU16, rc::Rc};
2
3use ntex_bytes::ByteString;
4use ntex_io::{DispatcherConfig, IoBoxed};
5use ntex_router::{IntoPattern, Path, Router, RouterBuilder};
6use ntex_service::{boxed, fn_service, IntoService, Pipeline, Service};
7use ntex_util::time::{sleep, Millis, Seconds};
8use ntex_util::{future::Either, future::Ready, HashMap};
9
10use crate::v5::publish::{Publish, PublishAck};
11use crate::v5::{codec, shared::MqttShared, sink::MqttSink, ControlAck};
12use crate::{error::MqttError, io::Dispatcher};
13
14use super::{control::Control, dispatcher::create_dispatcher};
15
16/// Mqtt client
17pub struct Client {
18    io: IoBoxed,
19    shared: Rc<MqttShared>,
20    keepalive: Seconds,
21    max_receive: usize,
22    config: DispatcherConfig,
23    pkt: Box<codec::ConnectAck>,
24}
25
26impl fmt::Debug for Client {
27    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28        f.debug_struct("v5::Client")
29            .field("keepalive", &self.keepalive)
30            .field("max_receive", &self.max_receive)
31            .field("connect", &self.pkt)
32            .field("config", &self.config)
33            .finish()
34    }
35}
36
37impl Client {
38    /// Construct new `Dispatcher` instance with outgoing messages stream.
39    pub(super) fn new(
40        io: IoBoxed,
41        shared: Rc<MqttShared>,
42        pkt: Box<codec::ConnectAck>,
43        max_receive: u16,
44        keepalive: Seconds,
45        config: DispatcherConfig,
46    ) -> Self {
47        Client { io, pkt, shared, keepalive, config, max_receive: max_receive as usize }
48    }
49}
50
51impl Client {
52    #[inline]
53    /// Get client sink
54    pub fn sink(&self) -> MqttSink {
55        MqttSink::new(self.shared.clone())
56    }
57
58    #[inline]
59    /// Indicates whether there is already stored Session state
60    pub fn session_present(&self) -> bool {
61        self.pkt.session_present
62    }
63
64    #[inline]
65    /// Get reference to `ConnectAck` packet
66    pub fn packet(&self) -> &codec::ConnectAck {
67        &self.pkt
68    }
69
70    #[inline]
71    /// Get mutable reference to `ConnectAck` packet
72    pub fn packet_mut(&mut self) -> &mut codec::ConnectAck {
73        &mut self.pkt
74    }
75
76    /// Configure mqtt resource for a specific topic
77    pub fn resource<T, F, U, E>(self, address: T, service: F) -> ClientRouter<E, U::Error>
78    where
79        T: IntoPattern,
80        F: IntoService<U, Publish>,
81        U: Service<Publish, Response = PublishAck> + 'static,
82        E: From<U::Error>,
83        PublishAck: TryFrom<U::Error, Error = E>,
84    {
85        let mut builder = Router::build();
86        builder.path(address, 0);
87        let handlers = vec![Pipeline::new(boxed::service(service.into_service()))];
88
89        ClientRouter {
90            builder,
91            handlers,
92            io: self.io,
93            shared: self.shared,
94            keepalive: self.keepalive,
95            config: self.config,
96            max_receive: self.max_receive,
97            _t: marker::PhantomData,
98        }
99    }
100
101    /// Run client with default control messages handler.
102    ///
103    /// Default handler closes connection on any control message.
104    pub async fn start_default(self) {
105        if self.keepalive.non_zero() {
106            let _ =
107                ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
108        }
109
110        let dispatcher = create_dispatcher(
111            MqttSink::new(self.shared.clone()),
112            self.max_receive,
113            16,
114            fn_service(|pkt| Ready::Ok(Either::Left(pkt))),
115            fn_service(|msg: Control<()>| {
116                Ready::Ok(msg.disconnect(codec::Disconnect::default()))
117            }),
118        );
119
120        let _ = Dispatcher::new(self.io, self.shared, dispatcher, &self.config).await;
121    }
122
123    /// Run client with provided control messages handler
124    pub async fn start<F, S, E>(self, service: F) -> Result<(), MqttError<E>>
125    where
126        E: 'static,
127        F: IntoService<S, Control<E>> + 'static,
128        S: Service<Control<E>, Response = ControlAck, Error = E> + 'static,
129    {
130        if self.keepalive.non_zero() {
131            let _ =
132                ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
133        }
134
135        let dispatcher = create_dispatcher(
136            MqttSink::new(self.shared.clone()),
137            self.max_receive,
138            16,
139            fn_service(|pkt| Ready::Ok(Either::Left(pkt))),
140            service.into_service(),
141        );
142
143        Dispatcher::new(self.io, self.shared, dispatcher, &self.config).await
144    }
145
146    /// Get negotiated io stream and codec
147    pub fn into_inner(self) -> (IoBoxed, codec::Codec) {
148        (self.io, self.shared.codec.clone())
149    }
150}
151
152type Handler<E> = boxed::BoxService<Publish, PublishAck, E>;
153
154/// Mqtt client with routing capabilities
155pub struct ClientRouter<Err, PErr> {
156    io: IoBoxed,
157    builder: RouterBuilder<usize>,
158    handlers: Vec<Pipeline<Handler<PErr>>>,
159    shared: Rc<MqttShared>,
160    keepalive: Seconds,
161    config: DispatcherConfig,
162    max_receive: usize,
163    _t: marker::PhantomData<Err>,
164}
165
166impl<Err, PErr> fmt::Debug for ClientRouter<Err, PErr> {
167    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
168        f.debug_struct("v5::ClientRouter")
169            .field("keepalive", &self.keepalive)
170            .field("config", &self.config)
171            .field("max_receive", &self.max_receive)
172            .finish()
173    }
174}
175
176impl<Err, PErr> ClientRouter<Err, PErr>
177where
178    Err: From<PErr> + 'static,
179    PublishAck: TryFrom<PErr, Error = Err>,
180    PErr: 'static,
181{
182    /// Configure mqtt resource for a specific topic
183    pub fn resource<T, F, S>(mut self, address: T, service: F) -> Self
184    where
185        T: IntoPattern,
186        F: IntoService<S, Publish>,
187        S: Service<Publish, Response = PublishAck, Error = PErr> + 'static,
188    {
189        self.builder.path(address, self.handlers.len());
190        self.handlers.push(Pipeline::new(boxed::service(service.into_service())));
191        self
192    }
193
194    /// Run client with default control messages handler
195    pub async fn start_default(self) {
196        if self.keepalive.non_zero() {
197            let _ =
198                ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
199        }
200
201        let dispatcher = create_dispatcher(
202            MqttSink::new(self.shared.clone()),
203            self.max_receive,
204            16,
205            dispatch(self.builder.finish(), self.handlers),
206            fn_service(|msg: Control<Err>| {
207                Ready::Ok(msg.disconnect(codec::Disconnect::default()))
208            }),
209        );
210
211        let _ = Dispatcher::new(self.io, self.shared, dispatcher, &self.config).await;
212    }
213
214    /// Run client and handle control messages
215    pub async fn start<F, S>(self, service: F) -> Result<(), MqttError<Err>>
216    where
217        F: IntoService<S, Control<Err>>,
218        S: Service<Control<Err>, Response = ControlAck, Error = Err> + 'static,
219    {
220        if self.keepalive.non_zero() {
221            let _ =
222                ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
223        }
224
225        let dispatcher = create_dispatcher(
226            MqttSink::new(self.shared.clone()),
227            self.max_receive,
228            16,
229            dispatch(self.builder.finish(), self.handlers),
230            service.into_service(),
231        );
232
233        Dispatcher::new(self.io, self.shared, dispatcher, &self.config).await
234    }
235
236    /// Get negotiated io stream and codec
237    pub fn into_inner(self) -> (IoBoxed, codec::Codec) {
238        (self.io, self.shared.codec.clone())
239    }
240}
241
242fn dispatch<Err, PErr>(
243    router: Router<usize>,
244    handlers: Vec<Pipeline<Handler<PErr>>>,
245) -> impl Service<Publish, Response = Either<Publish, PublishAck>, Error = Err>
246where
247    PErr: 'static,
248    PublishAck: TryFrom<PErr, Error = Err>,
249{
250    // let handlers =
251    let aliases: RefCell<HashMap<NonZeroU16, (usize, Path<ByteString>)>> =
252        RefCell::new(HashMap::default());
253    let handlers = Rc::new(handlers);
254
255    fn_service(move |mut req: Publish| {
256        let idx = if !req.publish_topic().is_empty() {
257            if let Some((idx, _info)) = router.recognize(req.topic_mut()) {
258                // save info for topic alias
259                if let Some(alias) = req.packet().properties.topic_alias {
260                    aliases.borrow_mut().insert(alias, (*idx, req.topic().clone()));
261                }
262                *idx
263            } else {
264                return Either::Right(Ready::<_, Err>::Ok(Either::Left(req)));
265            }
266        }
267        // handle publish with topic alias
268        else if let Some(ref alias) = req.packet().properties.topic_alias {
269            let aliases = aliases.borrow();
270            if let Some(item) = aliases.get(alias) {
271                *req.topic_mut() = item.1.clone();
272                item.0
273            } else {
274                log::error!("Unknown topic alias: {:?}", alias);
275                return Either::Right(Ready::<_, Err>::Ok(Either::Left(req)));
276            }
277        } else {
278            return Either::Right(Ready::<_, Err>::Ok(Either::Left(req)));
279        };
280
281        // exec handler
282        let handlers = handlers.clone();
283        Either::Left(async move { call(req, handlers[idx].clone()).await })
284    })
285}
286
287async fn call<S, Err>(
288    req: Publish,
289    srv: Pipeline<S>,
290) -> Result<Either<Publish, PublishAck>, Err>
291where
292    S: Service<Publish, Response = PublishAck>,
293    PublishAck: TryFrom<S::Error, Error = Err>,
294{
295    match srv.call(req).await {
296        Ok(ack) => Ok(Either::Right(ack)),
297        Err(err) => match PublishAck::try_from(err) {
298            Ok(ack) => Ok(Either::Right(ack)),
299            Err(err) => Err(err),
300        },
301    }
302}
303
304async fn keepalive(sink: MqttSink, timeout: Seconds) {
305    log::debug!("start mqtt client keep-alive task");
306
307    let keepalive = Millis::from(timeout);
308    loop {
309        sleep(keepalive).await;
310
311        if !sink.is_open() || !sink.ping() {
312            // connection is closed
313            log::debug!("mqtt client connection is closed, stopping keep-alive task");
314            break;
315        }
316    }
317}