ntex_mqtt/v5/
router.rs

1use std::{cell::RefCell, num::NonZeroU16, rc::Rc, task::Context};
2
3use ntex_bytes::ByteString;
4use ntex_router::{IntoPattern, Path, RouterBuilder};
5use ntex_service::boxed::{self, BoxService, BoxServiceFactory};
6use ntex_service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
7use ntex_util::HashMap;
8
9use super::{publish::Publish, publish::PublishAck, Session};
10
11type Handler<S, E> = BoxServiceFactory<Session<S>, Publish, PublishAck, E, E>;
12type HandlerService<E> = BoxService<Publish, PublishAck, E>;
13
14/// Router - structure that follows the builder pattern
15/// for building publish packet router instances for mqtt server.
16pub struct Router<S, Err> {
17    router: RouterBuilder<usize>,
18    handlers: Vec<Handler<S, Err>>,
19    default: Handler<S, Err>,
20}
21
22impl<S, Err> Router<S, Err>
23where
24    S: 'static,
25    Err: 'static,
26{
27    /// Create mqtt application router.
28    ///
29    /// Default service to be used if no matching resource could be found.
30    pub fn new<F, U>(default_service: F) -> Self
31    where
32        F: IntoServiceFactory<U, Publish, Session<S>>,
33        U: ServiceFactory<
34                Publish,
35                Session<S>,
36                Response = PublishAck,
37                Error = Err,
38                InitError = Err,
39            > + 'static,
40    {
41        Router {
42            router: ntex_router::Router::build(),
43            handlers: Vec::new(),
44            default: boxed::factory(default_service.into_factory()),
45        }
46    }
47
48    /// Configure mqtt resource for a specific topic.
49    pub fn resource<T, F, U>(mut self, address: T, service: F) -> Self
50    where
51        T: IntoPattern,
52        F: IntoServiceFactory<U, Publish, Session<S>>,
53        U: ServiceFactory<Publish, Session<S>, Response = PublishAck, Error = Err> + 'static,
54        Err: From<U::InitError>,
55    {
56        self.router.path(address, self.handlers.len());
57        self.handlers.push(boxed::factory(service.into_factory().map_init_err(Err::from)));
58        self
59    }
60
61    /// Finish router configuration and create router service factory
62    pub fn finish(self) -> RouterFactory<S, Err> {
63        RouterFactory {
64            router: self.router.finish(),
65            handlers: Rc::new(self.handlers),
66            default: self.default,
67        }
68    }
69}
70
71impl<S, Err> IntoServiceFactory<RouterFactory<S, Err>, Publish, Session<S>> for Router<S, Err>
72where
73    S: 'static,
74    Err: 'static,
75{
76    fn into_factory(self) -> RouterFactory<S, Err> {
77        self.finish()
78    }
79}
80
81pub struct RouterFactory<S, Err> {
82    router: ntex_router::Router<usize>,
83    handlers: Rc<Vec<Handler<S, Err>>>,
84    default: Handler<S, Err>,
85}
86
87impl<S, Err> ServiceFactory<Publish, Session<S>> for RouterFactory<S, Err>
88where
89    S: 'static,
90    Err: 'static,
91{
92    type Response = PublishAck;
93    type Error = Err;
94    type InitError = Err;
95    type Service = RouterService<Err>;
96
97    async fn create(&self, session: Session<S>) -> Result<Self::Service, Err> {
98        let default = self.default.create(session.clone()).await?;
99
100        let mut handlers = Vec::with_capacity(self.handlers.len());
101        for f in self.handlers.as_ref() {
102            handlers.push(f.create(session.clone()).await?);
103        }
104
105        Ok(RouterService {
106            default,
107            handlers,
108            router: self.router.clone(),
109            aliases: RefCell::new(HashMap::default()),
110        })
111    }
112}
113
114pub struct RouterService<Err> {
115    router: ntex_router::Router<usize>,
116    default: HandlerService<Err>,
117    handlers: Vec<HandlerService<Err>>,
118    aliases: RefCell<HashMap<NonZeroU16, (usize, Path<ByteString>)>>,
119}
120
121impl<Err: 'static> Service<Publish> for RouterService<Err> {
122    type Response = PublishAck;
123    type Error = Err;
124
125    #[inline]
126    async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
127        for hnd in self.handlers.iter() {
128            ctx.ready(hnd).await?;
129        }
130        ctx.ready(&self.default).await
131    }
132
133    #[inline]
134    fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> {
135        for hnd in &self.handlers {
136            hnd.poll(cx)?;
137        }
138        self.default.poll(cx)
139    }
140
141    #[allow(clippy::await_holding_refcell_ref)]
142    async fn call(
143        &self,
144        mut req: Publish,
145        ctx: ServiceCtx<'_, Self>,
146    ) -> Result<Self::Response, Self::Error> {
147        if !req.publish_topic().is_empty() {
148            if let Some((idx, _info)) = self.router.recognize(req.topic_mut()) {
149                // save info for topic alias
150                if let Some(alias) = req.packet().properties.topic_alias {
151                    self.aliases.borrow_mut().insert(alias, (*idx, req.topic().clone()));
152                }
153                return ctx.call(&self.handlers[*idx], req).await;
154            }
155        }
156        // handle publish with topic alias
157        else if let Some(ref alias) = req.packet().properties.topic_alias {
158            let aliases = self.aliases.borrow();
159            if let Some(item) = aliases.get(alias) {
160                let idx = item.0;
161                *req.topic_mut() = item.1.clone();
162                drop(aliases);
163                return ctx.call(&self.handlers[idx], req).await;
164            } else {
165                log::error!("Unknown topic alias: {:?}", alias);
166            }
167        }
168        ctx.call(&self.default, req).await
169    }
170}