1use std::convert::Infallible;
2use std::net::SocketAddr;
3use std::panic::RefUnwindSafe;
4use std::sync::Arc;
5
6use chrono::{DateTime, Local};
7use hashbrown::hash_map::{Iter};
8use hashbrown::HashMap;
9use hyper::{Body, HeaderMap, Method, Response, StatusCode, Uri, Version};
10use hyper::body::HttpBody;
11use hyper::header::HeaderValue;
12use hyper::http::{Error, Extensions};
13use hyper::server::conn::AddrStream;
14use hyper::server::Server;
15use hyper::service::{make_service_fn, service_fn};
16use log::info;
17use regex::{Captures, Regex};
18use tokio::runtime::{Builder, Runtime};
19
20use crate::vertx::{cm::ClusterManager, EventBus};
21
22#[cfg(feature = "client")]
23pub mod client;
24
25
26pub struct Request {
27 pub request: hyper::Request<Body>,
28 pub(crate) paths: HashMap<String, String>,
29 pub request_timestamp: DateTime<Local>,
30 pub body: Vec<u8>,
31}
32
33impl Request {
34
35 #[inline]
36 pub fn path_value (&self, key: &str) -> Option<&String> {
37 self.paths.get(key)
38 }
39
40 #[inline]
41 pub fn path_iter (&self) -> Iter<String, String> {
42 self.paths.iter()
43 }
44
45 #[inline]
46 pub fn into_body (self) -> Body {
47 self.request.into_body()
48 }
49
50 #[inline]
51 pub fn method(&self) -> &Method {
52 &self.request.method()
53 }
54
55 #[inline]
56 pub fn uri(&self) -> &Uri {
57 &self.request.uri()
58 }
59
60 #[inline]
61 pub fn version(&self) -> Version {
62 self.request.version()
63 }
64
65 #[inline]
66 pub fn headers(&self) -> &HeaderMap<HeaderValue> {
67 &self.request.headers()
68 }
69
70 #[inline]
71 pub fn extensions(&self) -> &Extensions {
72 &self.request.extensions()
73 }
74
75 #[inline]
76 pub fn body(&self) -> &Body {
77 &self.request.body()
78 }
79
80}
81
82pub struct HttpServer<CM: 'static + ClusterManager + Send + Sync + RefUnwindSafe> {
83 pub port: u16,
84 event_bus: Option<Arc<EventBus<CM>>>,
85 callers: Arc<
86 HashMap<
87 (String, Method),
88 Arc<
89 dyn FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
90 + 'static
91 + Send
92 + Sync,
93 >,
94 >,
95 >,
96 regexes: Arc<HashMap<String, (Regex, String)>>,
97 rt: Runtime,
98 main_reg: Regex,
99}
100
101impl<CM: 'static + ClusterManager + Send + Sync + RefUnwindSafe> HttpServer<CM> {
102 pub(crate) fn new(event_bus: Option<Arc<EventBus<CM>>>) -> HttpServer<CM> {
103 HttpServer {
104 port: 0,
105 event_bus,
106 callers: Arc::new(HashMap::new()),
107 regexes: Arc::new(HashMap::new()),
108 main_reg: Regex::new("(:\\w+)").unwrap(),
109 rt: Builder::new_multi_thread()
110 .worker_threads(12)
111 .enable_all()
112 .build()
113 .unwrap(),
114 }
115 }
116
117 pub fn get<OP>(&mut self, path: &str, op: OP) -> &mut Self
118 where
119 OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
120 + 'static
121 + Send
122 + Sync,
123 {
124 self.add_op(path, Method::GET, op)
125 }
126
127 pub fn post<OP>(&mut self, path: &str, op: OP) -> &mut Self
128 where
129 OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
130 + 'static
131 + Send
132 + Sync,
133 {
134 self.add_op(path, Method::POST, op)
135 }
136
137 pub fn put<OP>(&mut self, path: &str, op: OP) -> &mut Self
138 where
139 OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
140 + 'static
141 + Send
142 + Sync,
143 {
144 self.add_op(path, Method::PUT, op)
145 }
146
147 pub fn delete<OP>(&mut self, path: &str, op: OP) -> &mut Self
148 where
149 OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
150 + 'static
151 + Send
152 + Sync,
153 {
154 self.add_op(path, Method::DELETE, op)
155 }
156
157 pub fn head<OP>(&mut self, path: &str, op: OP) -> &mut Self
158 where
159 OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
160 + 'static
161 + Send
162 + Sync,
163 {
164 self.add_op(path, Method::HEAD, op)
165 }
166
167 pub fn patch<OP>(&mut self, path: &str, op: OP) -> &mut Self
168 where
169 OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
170 + 'static
171 + Send
172 + Sync,
173 {
174 self.add_op(path, Method::PATCH, op)
175 }
176
177 pub fn options<OP>(&mut self, path: &str, op: OP) -> &mut Self
178 where
179 OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
180 + 'static
181 + Send
182 + Sync,
183 {
184 self.add_op(path, Method::OPTIONS, op)
185 }
186
187 pub fn connect<OP>(&mut self, path: &str, op: OP) -> &mut Self
188 where
189 OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
190 + 'static
191 + Send
192 + Sync,
193 {
194 self.add_op(path, Method::CONNECT, op)
195 }
196
197 pub fn trace<OP>(&mut self, path: &str, op: OP) -> &mut Self
198 where
199 OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
200 + 'static
201 + Send
202 + Sync,
203 {
204 self.add_op(path, Method::TRACE, op)
205 }
206
207 fn add_op<OP>(&mut self, path: &str, method: Method, op: OP) -> &mut Self
208 where
209 OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
210 + 'static
211 + Send
212 + Sync,
213 {
214 let callers = Arc::get_mut(&mut self.callers).unwrap();
215 let reg_path = self.main_reg.replace_all(path, "(\\w+)");
216 callers.insert((reg_path.to_string(), method), Arc::new(op));
217
218 let regexes = Arc::get_mut(&mut self.regexes).unwrap();
219 let path = path.replace(":", "");
220 regexes.insert(reg_path.to_string(), (Regex::new(®_path).unwrap(), path));
221
222 self
223 }
224
225 pub fn listen_with_default<OP>(&mut self, port: u16, mut default: OP)
226 where
227 OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
228 + 'static
229 + Send
230 + Sync
231 + Copy,
232 {
233 let ev = self.event_bus.as_ref().unwrap().clone();
234
235 let callers = self.callers.clone();
236 let addr = SocketAddr::from(([0, 0, 0, 0], port));
237 let new_service = make_service_fn(move |_conn: &AddrStream| {
238 let ev = ev.clone();
239 let callers = callers.clone();
240 async move {
241 let x = Ok::<_, Infallible>(service_fn(move |mut req: hyper::Request<Body>| {
242 let ev = ev.clone();
243 let callers = callers.clone();
244 async move {
245 let mut data = vec![];
246 let body = req.body_mut();
247 loop {
248 let bytes = body.data().await;
249 match bytes {
250 Some(bytes) => {
251 match bytes {
252 Ok(bytes) => {
253 data.extend_from_slice(&bytes);
254 }
255 Err(_) => {
256 break;
257 }
258 }
259 }
260 None => {break;}
261 }
262 }
263 let ev = ev.to_owned();
264 let op = callers.get(&(req.uri().path().to_owned(), req.method().clone()));
265 let request = Request {
266 request: req,
267 paths: Default::default(),
268 request_timestamp: Local::now(),
269 body: data
270 };
271 match op {
272 Some(op) => {
273 let mut op = op.clone();
274 unsafe {
275 let op = Arc::get_mut_unchecked(&mut op);
276 op(request, ev)
277 }
278 }
279 None => default(request, ev),
280 }
281 }
282 }));
283
284 x
285 }
286 });
287
288 self.rt.spawn(async move {
289 let server = Server::bind(&addr).serve(new_service);
290 info!("start http_server on http://{}", addr);
291 let _ = server.await;
292 });
293 }
294
295 pub fn listen(&mut self, port: u16) {
296 let ev = self.event_bus.as_ref().unwrap().clone();
297
298 let callers = self.callers.clone();
299 let regexes = self.regexes.clone();
300 let addr = SocketAddr::from(([0, 0, 0, 0], port));
301 let new_service = make_service_fn(move |_conn: &AddrStream| {
302 let ev = ev.clone();
303 let callers = callers.clone();
304 let regexes = regexes.clone();
305 async move {
306 Ok::<_, Infallible>(service_fn(move |mut req: hyper::Request<Body>| {
307 let ev = ev.clone();
308 let callers = callers.clone();
309 let regexes = regexes.clone();
310 async move {
311 let mut data = vec![];
312 let body = req.body_mut();
313 loop {
314 let bytes = body.data().await;
315 match bytes {
316 Some(bytes) => {
317 match bytes {
318 Ok(bytes) => {
319 data.extend_from_slice(&bytes);
320 }
321 Err(_) => {
322 break;
323 }
324 }
325 }
326 None => {break;}
327 }
328 }
329 let ev = ev.to_owned();
330 let path = req.uri().path().to_owned();
331 let mut path_key = &path;
332 let mut paths = HashMap::new();
333 for (k, v) in regexes.iter() {
334 if v.0.is_match(&path) {
335 path_key = k;
336 let caps_base = v.0.captures_iter(&v.1);
337 let caps : Vec<Captures> = v.0.captures_iter(&path).collect();
338 for (i, c) in caps_base.enumerate() {
339 for idx in 1..c.len() {
340 if let Some(name) = c.get(idx) {
341 if let Some(value) = caps[i].get(idx) {
342 let _ = paths.insert(name.as_str().to_string(),
343 value.as_str().to_string()
344 );
345 }
346 }
347 }
348
349 }
350 break;
351 }
352 }
353 let op = callers.get(&(path_key.to_owned(), req.method().clone()));
354 let request = Request {
355 request: req,
356 paths,
357 request_timestamp: Local::now(),
358 body: data
359 };
360 <HttpServer<CM>>::invoke_function(request, ev, op)
361 }
362 }))
363 }
364 });
365
366 self.rt.spawn(async move {
367 let server = Server::bind(&addr).serve(new_service);
368 info!("start http_server on http://{}", addr);
369 let _ = server.await;
370 });
371 }
372
373 #[inline]
374 fn invoke_function(
375 req: Request,
376 ev: Arc<EventBus<CM>>,
377 op: Option<
378 &Arc<
379 dyn FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
380 + Send
381 + Sync,
382 >,
383 >,
384 ) -> Result<Response<Body>, Error> {
385 match op {
386 Some(op) => {
387 let mut op = op.clone();
388 unsafe {
389 let op = Arc::get_mut_unchecked(&mut op);
390 op(req, ev)
391 }
392 }
393 None => Ok(Response::builder()
394 .status(StatusCode::NOT_FOUND)
395 .body(b"NOTFOUND".to_vec().into())
396 .unwrap()),
397 }
398 }
399}