ntex_mqtt/v3/
router.rs

1use std::{rc::Rc, task::Context};
2
3use ntex_router::{IntoPattern, RouterBuilder};
4use ntex_service::boxed::{self, BoxService, BoxServiceFactory};
5use ntex_service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
6
7use super::{publish::Publish, Session};
8
9type Handler<S, E> = BoxServiceFactory<Session<S>, Publish, (), E, E>;
10type HandlerService<E> = BoxService<Publish, (), E>;
11
12/// Router - structure that follows the builder pattern
13/// for building publish packet router instances for mqtt server.
14pub struct Router<S, Err> {
15    router: RouterBuilder<usize>,
16    handlers: Vec<Handler<S, Err>>,
17    default: Handler<S, Err>,
18}
19
20impl<S, Err> Router<S, Err>
21where
22    S: 'static,
23    Err: 'static,
24{
25    /// Create mqtt application router.
26    ///
27    /// Default service to be used if no matching resource could be found.
28    pub fn new<F, U>(default_service: F) -> Self
29    where
30        F: IntoServiceFactory<U, Publish, Session<S>>,
31        U: ServiceFactory<Publish, Session<S>, Response = (), Error = Err, InitError = Err>
32            + 'static,
33    {
34        Router {
35            router: ntex_router::Router::build(),
36            handlers: Vec::new(),
37            default: boxed::factory(default_service.into_factory()),
38        }
39    }
40
41    /// Configure mqtt resource for a specific topic.
42    pub fn resource<T, F, U>(mut self, address: T, service: F) -> Self
43    where
44        T: IntoPattern,
45        F: IntoServiceFactory<U, Publish, Session<S>>,
46        U: ServiceFactory<Publish, Session<S>, Response = (), Error = Err> + 'static,
47        Err: From<U::InitError>,
48    {
49        self.router.path(address, self.handlers.len());
50        self.handlers.push(boxed::factory(service.into_factory().map_init_err(Err::from)));
51        self
52    }
53}
54
55impl<S, Err> IntoServiceFactory<RouterFactory<S, Err>, Publish, Session<S>> for Router<S, Err>
56where
57    S: 'static,
58    Err: 'static,
59{
60    fn into_factory(self) -> RouterFactory<S, Err> {
61        RouterFactory {
62            router: Rc::new(self.router.finish()),
63            handlers: self.handlers,
64            default: self.default,
65        }
66    }
67}
68
69pub struct RouterFactory<S, Err> {
70    router: Rc<ntex_router::Router<usize>>,
71    handlers: Vec<Handler<S, Err>>,
72    default: Handler<S, Err>,
73}
74
75impl<S, Err> ServiceFactory<Publish, Session<S>> for RouterFactory<S, Err>
76where
77    S: 'static,
78    Err: 'static,
79{
80    type Response = ();
81    type Error = Err;
82    type InitError = Err;
83    type Service = RouterService<Err>;
84
85    async fn create(&self, session: Session<S>) -> Result<Self::Service, Self::Error> {
86        let fut: Vec<_> = self.handlers.iter().map(|h| h.create(session.clone())).collect();
87
88        let mut handlers = Vec::new();
89        for handler in fut {
90            handlers.push(handler.await?);
91        }
92
93        Ok(RouterService {
94            handlers,
95            router: self.router.clone(),
96            default: self.default.create(session).await?,
97        })
98    }
99}
100
101pub struct RouterService<Err> {
102    router: Rc<ntex_router::Router<usize>>,
103    handlers: Vec<HandlerService<Err>>,
104    default: HandlerService<Err>,
105}
106
107impl<Err> Service<Publish> for RouterService<Err> {
108    type Response = ();
109    type Error = Err;
110
111    #[inline]
112    async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
113        for hnd in &self.handlers {
114            ctx.ready(hnd).await?;
115        }
116        ctx.ready(&self.default).await
117    }
118
119    #[inline]
120    fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> {
121        for hnd in &self.handlers {
122            hnd.poll(cx)?;
123        }
124        self.default.poll(cx)
125    }
126
127    #[inline]
128    async fn call(
129        &self,
130        mut req: Publish,
131        ctx: ServiceCtx<'_, Self>,
132    ) -> Result<Self::Response, Self::Error> {
133        if let Some((idx, _info)) = self.router.recognize(req.topic_mut()) {
134            ctx.call(&self.handlers[*idx], req).await
135        } else {
136            ctx.call(&self.default, req).await
137        }
138    }
139}