1use std::future::Future;
2use std::pin::Pin;
3use std::rc::Rc;
4use std::task::{Context, Poll};
5
6use actix_router::{IntoPattern, RouterBuilder};
7use actix_service::boxed::{self, BoxService, BoxServiceFactory};
8use actix_service::{fn_service, IntoServiceFactory, Service, ServiceFactory};
9use futures::future::{join_all, ok, JoinAll, LocalBoxFuture};
10
11use crate::publish::Publish;
12
13type Handler<S, E> = BoxServiceFactory<S, Publish<S>, (), E, E>;
14type HandlerService<S, E> = BoxService<Publish<S>, (), E>;
15
16pub struct Router<S, E> {
19 router: RouterBuilder<usize>,
20 handlers: Vec<Handler<S, E>>,
21 default: Handler<S, E>,
22}
23
24impl<S, E> Router<S, E>
25where
26 S: Clone + 'static,
27 E: 'static,
28{
29 pub fn new() -> Self {
33 Router {
34 router: actix_router::Router::build(),
35 handlers: Vec::new(),
36 default: boxed::factory(
37 fn_service(|p: Publish<S>| {
38 log::warn!("Unknown topic {:?}", p.publish_topic());
39 ok::<_, E>(())
40 })
41 .map_init_err(|_| panic!()),
42 ),
43 }
44 }
45
46 pub fn resource<T, F, U: 'static>(mut self, address: T, service: F) -> Self
48 where
49 T: IntoPattern,
50 F: IntoServiceFactory<U>,
51 U: ServiceFactory<Config = S, Request = Publish<S>, Response = (), Error = E>,
52 E: From<U::InitError>,
53 {
54 self.router.path(address, self.handlers.len());
55 self.handlers
56 .push(boxed::factory(service.into_factory().map_init_err(E::from)));
57 self
58 }
59
60 pub fn default_resource<F, U: 'static>(mut self, service: F) -> Self
62 where
63 F: IntoServiceFactory<U>,
64 U: ServiceFactory<
65 Config = S,
66 Request = Publish<S>,
67 Response = (),
68 Error = E,
69 InitError = E,
70 >,
71 {
72 self.default = boxed::factory(service.into_factory());
73 self
74 }
75}
76
77impl<S, E> IntoServiceFactory<RouterFactory<S, E>> for Router<S, E>
78where
79 S: Clone + 'static,
80 E: 'static,
81{
82 fn into_factory(self) -> RouterFactory<S, E> {
83 RouterFactory {
84 router: Rc::new(self.router.finish()),
85 handlers: self.handlers,
86 default: self.default,
87 }
88 }
89}
90
91pub struct RouterFactory<S, E> {
92 router: Rc<actix_router::Router<usize>>,
93 handlers: Vec<Handler<S, E>>,
94 default: Handler<S, E>,
95}
96
97impl<S, E> ServiceFactory for RouterFactory<S, E>
98where
99 S: Clone + 'static,
100 E: 'static,
101{
102 type Config = S;
103 type Request = Publish<S>;
104 type Response = ();
105 type Error = E;
106 type InitError = E;
107 type Service = RouterService<S, E>;
108 type Future = RouterFactoryFut<S, E>;
109
110 fn new_service(&self, session: S) -> Self::Future {
111 let fut: Vec<_> = self
112 .handlers
113 .iter()
114 .map(|h| h.new_service(session.clone()))
115 .collect();
116
117 RouterFactoryFut {
118 router: self.router.clone(),
119 handlers: join_all(fut),
120 default: Some(either::Either::Left(self.default.new_service(session))),
121 }
122 }
123}
124
125pub struct RouterFactoryFut<S, E> {
126 router: Rc<actix_router::Router<usize>>,
127 handlers: JoinAll<LocalBoxFuture<'static, Result<HandlerService<S, E>, E>>>,
128 default: Option<
129 either::Either<
130 LocalBoxFuture<'static, Result<HandlerService<S, E>, E>>,
131 HandlerService<S, E>,
132 >,
133 >,
134}
135
136impl<S, E> Future for RouterFactoryFut<S, E> {
137 type Output = Result<RouterService<S, E>, E>;
138
139 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
140 let res = match self.default.as_mut().unwrap() {
141 either::Either::Left(ref mut fut) => {
142 let default = match futures::ready!(Pin::new(fut).poll(cx)) {
143 Ok(default) => default,
144 Err(e) => return Poll::Ready(Err(e)),
145 };
146 self.default = Some(either::Either::Right(default));
147 return self.poll(cx);
148 }
149 either::Either::Right(_) => futures::ready!(Pin::new(&mut self.handlers).poll(cx)),
150 };
151
152 let mut handlers = Vec::new();
153 for handler in res {
154 match handler {
155 Ok(h) => handlers.push(h),
156 Err(e) => return Poll::Ready(Err(e)),
157 }
158 }
159
160 Poll::Ready(Ok(RouterService {
161 handlers,
162 router: self.router.clone(),
163 default: self.default.take().unwrap().right().unwrap(),
164 }))
165 }
166}
167
168pub struct RouterService<S, E> {
169 router: Rc<actix_router::Router<usize>>,
170 handlers: Vec<BoxService<Publish<S>, (), E>>,
171 default: BoxService<Publish<S>, (), E>,
172}
173
174impl<S, E> Service for RouterService<S, E>
175where
176 S: 'static,
177 E: 'static,
178{
179 type Request = Publish<S>;
180 type Response = ();
181 type Error = E;
182 type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
183
184 fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
185 let mut not_ready = false;
186 for hnd in &mut self.handlers {
187 if let Poll::Pending = hnd.poll_ready(cx)? {
188 not_ready = true;
189 }
190 }
191
192 if not_ready {
193 Poll::Pending
194 } else {
195 Poll::Ready(Ok(()))
196 }
197 }
198
199 fn call(&mut self, mut req: Publish<S>) -> Self::Future {
200 if let Some((idx, _info)) = self.router.recognize(req.topic_mut()) {
201 self.handlers[*idx].call(req)
202 } else {
203 self.default.call(req)
204 }
205 }
206}