1use std::{cell::RefCell, num::NonZeroU16, rc::Rc, task::Context};
2
3use ntex_bytes::ByteString;
4use ntex_router::{IntoPattern, Path, RouterBuilder};
5use ntex_service::boxed::{self, BoxService, BoxServiceFactory};
6use ntex_service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
7use ntex_util::HashMap;
8
9use super::{publish::Publish, publish::PublishAck, Session};
10
11type Handler<S, E> = BoxServiceFactory<Session<S>, Publish, PublishAck, E, E>;
12type HandlerService<E> = BoxService<Publish, PublishAck, E>;
13
14pub struct Router<S, Err> {
17 router: RouterBuilder<usize>,
18 handlers: Vec<Handler<S, Err>>,
19 default: Handler<S, Err>,
20}
21
22impl<S, Err> Router<S, Err>
23where
24 S: 'static,
25 Err: 'static,
26{
27 pub fn new<F, U>(default_service: F) -> Self
31 where
32 F: IntoServiceFactory<U, Publish, Session<S>>,
33 U: ServiceFactory<
34 Publish,
35 Session<S>,
36 Response = PublishAck,
37 Error = Err,
38 InitError = Err,
39 > + 'static,
40 {
41 Router {
42 router: ntex_router::Router::build(),
43 handlers: Vec::new(),
44 default: boxed::factory(default_service.into_factory()),
45 }
46 }
47
48 pub fn resource<T, F, U>(mut self, address: T, service: F) -> Self
50 where
51 T: IntoPattern,
52 F: IntoServiceFactory<U, Publish, Session<S>>,
53 U: ServiceFactory<Publish, Session<S>, Response = PublishAck, Error = Err> + 'static,
54 Err: From<U::InitError>,
55 {
56 self.router.path(address, self.handlers.len());
57 self.handlers.push(boxed::factory(service.into_factory().map_init_err(Err::from)));
58 self
59 }
60
61 pub fn finish(self) -> RouterFactory<S, Err> {
63 RouterFactory {
64 router: self.router.finish(),
65 handlers: Rc::new(self.handlers),
66 default: self.default,
67 }
68 }
69}
70
71impl<S, Err> IntoServiceFactory<RouterFactory<S, Err>, Publish, Session<S>> for Router<S, Err>
72where
73 S: 'static,
74 Err: 'static,
75{
76 fn into_factory(self) -> RouterFactory<S, Err> {
77 self.finish()
78 }
79}
80
81pub struct RouterFactory<S, Err> {
82 router: ntex_router::Router<usize>,
83 handlers: Rc<Vec<Handler<S, Err>>>,
84 default: Handler<S, Err>,
85}
86
87impl<S, Err> ServiceFactory<Publish, Session<S>> for RouterFactory<S, Err>
88where
89 S: 'static,
90 Err: 'static,
91{
92 type Response = PublishAck;
93 type Error = Err;
94 type InitError = Err;
95 type Service = RouterService<Err>;
96
97 async fn create(&self, session: Session<S>) -> Result<Self::Service, Err> {
98 let default = self.default.create(session.clone()).await?;
99
100 let mut handlers = Vec::with_capacity(self.handlers.len());
101 for f in self.handlers.as_ref() {
102 handlers.push(f.create(session.clone()).await?);
103 }
104
105 Ok(RouterService {
106 default,
107 handlers,
108 router: self.router.clone(),
109 aliases: RefCell::new(HashMap::default()),
110 })
111 }
112}
113
114pub struct RouterService<Err> {
115 router: ntex_router::Router<usize>,
116 default: HandlerService<Err>,
117 handlers: Vec<HandlerService<Err>>,
118 aliases: RefCell<HashMap<NonZeroU16, (usize, Path<ByteString>)>>,
119}
120
121impl<Err: 'static> Service<Publish> for RouterService<Err> {
122 type Response = PublishAck;
123 type Error = Err;
124
125 #[inline]
126 async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
127 for hnd in self.handlers.iter() {
128 ctx.ready(hnd).await?;
129 }
130 ctx.ready(&self.default).await
131 }
132
133 #[inline]
134 fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> {
135 for hnd in &self.handlers {
136 hnd.poll(cx)?;
137 }
138 self.default.poll(cx)
139 }
140
141 #[allow(clippy::await_holding_refcell_ref)]
142 async fn call(
143 &self,
144 mut req: Publish,
145 ctx: ServiceCtx<'_, Self>,
146 ) -> Result<Self::Response, Self::Error> {
147 if !req.publish_topic().is_empty() {
148 if let Some((idx, _info)) = self.router.recognize(req.topic_mut()) {
149 if let Some(alias) = req.packet().properties.topic_alias {
151 self.aliases.borrow_mut().insert(alias, (*idx, req.topic().clone()));
152 }
153 return ctx.call(&self.handlers[*idx], req).await;
154 }
155 }
156 else if let Some(ref alias) = req.packet().properties.topic_alias {
158 let aliases = self.aliases.borrow();
159 if let Some(item) = aliases.get(alias) {
160 let idx = item.0;
161 *req.topic_mut() = item.1.clone();
162 drop(aliases);
163 return ctx.call(&self.handlers[idx], req).await;
164 } else {
165 log::error!("Unknown topic alias: {:?}", alias);
166 }
167 }
168 ctx.call(&self.default, req).await
169 }
170}