actix_mqtt/
router.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::rc::Rc;
4use std::task::{Context, Poll};
5
6use actix_router::{IntoPattern, RouterBuilder};
7use actix_service::boxed::{self, BoxService, BoxServiceFactory};
8use actix_service::{fn_service, IntoServiceFactory, Service, ServiceFactory};
9use futures::future::{join_all, ok, JoinAll, LocalBoxFuture};
10
11use crate::publish::Publish;
12
13type Handler<S, E> = BoxServiceFactory<S, Publish<S>, (), E, E>;
14type HandlerService<S, E> = BoxService<Publish<S>, (), E>;
15
16/// Router - structure that follows the builder pattern
17/// for building publish packet router instances for mqtt server.
18pub struct Router<S, E> {
19    router: RouterBuilder<usize>,
20    handlers: Vec<Handler<S, E>>,
21    default: Handler<S, E>,
22}
23
24impl<S, E> Router<S, E>
25where
26    S: Clone + 'static,
27    E: 'static,
28{
29    /// Create mqtt application.
30    ///
31    /// **Note** Default service acks all publish packets
32    pub fn new() -> Self {
33        Router {
34            router: actix_router::Router::build(),
35            handlers: Vec::new(),
36            default: boxed::factory(
37                fn_service(|p: Publish<S>| {
38                    log::warn!("Unknown topic {:?}", p.publish_topic());
39                    ok::<_, E>(())
40                })
41                .map_init_err(|_| panic!()),
42            ),
43        }
44    }
45
46    /// Configure mqtt resource for a specific topic.
47    pub fn resource<T, F, U: 'static>(mut self, address: T, service: F) -> Self
48    where
49        T: IntoPattern,
50        F: IntoServiceFactory<U>,
51        U: ServiceFactory<Config = S, Request = Publish<S>, Response = (), Error = E>,
52        E: From<U::InitError>,
53    {
54        self.router.path(address, self.handlers.len());
55        self.handlers
56            .push(boxed::factory(service.into_factory().map_init_err(E::from)));
57        self
58    }
59
60    /// Default service to be used if no matching resource could be found.
61    pub fn default_resource<F, U: 'static>(mut self, service: F) -> Self
62    where
63        F: IntoServiceFactory<U>,
64        U: ServiceFactory<
65            Config = S,
66            Request = Publish<S>,
67            Response = (),
68            Error = E,
69            InitError = E,
70        >,
71    {
72        self.default = boxed::factory(service.into_factory());
73        self
74    }
75}
76
77impl<S, E> IntoServiceFactory<RouterFactory<S, E>> for Router<S, E>
78where
79    S: Clone + 'static,
80    E: 'static,
81{
82    fn into_factory(self) -> RouterFactory<S, E> {
83        RouterFactory {
84            router: Rc::new(self.router.finish()),
85            handlers: self.handlers,
86            default: self.default,
87        }
88    }
89}
90
91pub struct RouterFactory<S, E> {
92    router: Rc<actix_router::Router<usize>>,
93    handlers: Vec<Handler<S, E>>,
94    default: Handler<S, E>,
95}
96
97impl<S, E> ServiceFactory for RouterFactory<S, E>
98where
99    S: Clone + 'static,
100    E: 'static,
101{
102    type Config = S;
103    type Request = Publish<S>;
104    type Response = ();
105    type Error = E;
106    type InitError = E;
107    type Service = RouterService<S, E>;
108    type Future = RouterFactoryFut<S, E>;
109
110    fn new_service(&self, session: S) -> Self::Future {
111        let fut: Vec<_> = self
112            .handlers
113            .iter()
114            .map(|h| h.new_service(session.clone()))
115            .collect();
116
117        RouterFactoryFut {
118            router: self.router.clone(),
119            handlers: join_all(fut),
120            default: Some(either::Either::Left(self.default.new_service(session))),
121        }
122    }
123}
124
125pub struct RouterFactoryFut<S, E> {
126    router: Rc<actix_router::Router<usize>>,
127    handlers: JoinAll<LocalBoxFuture<'static, Result<HandlerService<S, E>, E>>>,
128    default: Option<
129        either::Either<
130            LocalBoxFuture<'static, Result<HandlerService<S, E>, E>>,
131            HandlerService<S, E>,
132        >,
133    >,
134}
135
136impl<S, E> Future for RouterFactoryFut<S, E> {
137    type Output = Result<RouterService<S, E>, E>;
138
139    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
140        let res = match self.default.as_mut().unwrap() {
141            either::Either::Left(ref mut fut) => {
142                let default = match futures::ready!(Pin::new(fut).poll(cx)) {
143                    Ok(default) => default,
144                    Err(e) => return Poll::Ready(Err(e)),
145                };
146                self.default = Some(either::Either::Right(default));
147                return self.poll(cx);
148            }
149            either::Either::Right(_) => futures::ready!(Pin::new(&mut self.handlers).poll(cx)),
150        };
151
152        let mut handlers = Vec::new();
153        for handler in res {
154            match handler {
155                Ok(h) => handlers.push(h),
156                Err(e) => return Poll::Ready(Err(e)),
157            }
158        }
159
160        Poll::Ready(Ok(RouterService {
161            handlers,
162            router: self.router.clone(),
163            default: self.default.take().unwrap().right().unwrap(),
164        }))
165    }
166}
167
168pub struct RouterService<S, E> {
169    router: Rc<actix_router::Router<usize>>,
170    handlers: Vec<BoxService<Publish<S>, (), E>>,
171    default: BoxService<Publish<S>, (), E>,
172}
173
174impl<S, E> Service for RouterService<S, E>
175where
176    S: 'static,
177    E: 'static,
178{
179    type Request = Publish<S>;
180    type Response = ();
181    type Error = E;
182    type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
183
184    fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
185        let mut not_ready = false;
186        for hnd in &mut self.handlers {
187            if let Poll::Pending = hnd.poll_ready(cx)? {
188                not_ready = true;
189            }
190        }
191
192        if not_ready {
193            Poll::Pending
194        } else {
195            Poll::Ready(Ok(()))
196        }
197    }
198
199    fn call(&mut self, mut req: Publish<S>) -> Self::Future {
200        if let Some((idx, _info)) = self.router.recognize(req.topic_mut()) {
201            self.handlers[*idx].call(req)
202        } else {
203            self.default.call(req)
204        }
205    }
206}