vertx_rust/http/
mod.rs

1use std::convert::Infallible;
2use std::net::SocketAddr;
3use std::panic::RefUnwindSafe;
4use std::sync::Arc;
5
6use chrono::{DateTime, Local};
7use hashbrown::hash_map::{Iter};
8use hashbrown::HashMap;
9use hyper::{Body, HeaderMap, Method, Response, StatusCode, Uri, Version};
10use hyper::body::HttpBody;
11use hyper::header::HeaderValue;
12use hyper::http::{Error, Extensions};
13use hyper::server::conn::AddrStream;
14use hyper::server::Server;
15use hyper::service::{make_service_fn, service_fn};
16use log::info;
17use regex::{Captures, Regex};
18use tokio::runtime::{Builder, Runtime};
19
20use crate::vertx::{cm::ClusterManager, EventBus};
21
22#[cfg(feature = "client")]
23pub mod client;
24
25
26pub struct Request {
27    pub request: hyper::Request<Body>,
28    pub(crate) paths: HashMap<String, String>,
29    pub request_timestamp: DateTime<Local>,
30    pub body: Vec<u8>,
31}
32
33impl Request {
34
35    #[inline]
36    pub fn path_value (&self, key: &str) -> Option<&String> {
37        self.paths.get(key)
38    }
39
40    #[inline]
41    pub fn path_iter (&self) -> Iter<String, String> {
42        self.paths.iter()
43    }
44
45    #[inline]
46    pub fn into_body (self) -> Body {
47        self.request.into_body()
48    }
49
50    #[inline]
51    pub fn method(&self) -> &Method {
52        &self.request.method()
53    }
54
55    #[inline]
56    pub fn uri(&self) -> &Uri {
57        &self.request.uri()
58    }
59
60    #[inline]
61    pub fn version(&self) -> Version {
62        self.request.version()
63    }
64
65    #[inline]
66    pub fn headers(&self) -> &HeaderMap<HeaderValue> {
67        &self.request.headers()
68    }
69
70    #[inline]
71    pub fn extensions(&self) -> &Extensions {
72        &self.request.extensions()
73    }
74
75    #[inline]
76    pub fn body(&self) -> &Body {
77        &self.request.body()
78    }
79
80}
81
82pub struct HttpServer<CM: 'static + ClusterManager + Send + Sync +  RefUnwindSafe> {
83    pub port: u16,
84    event_bus: Option<Arc<EventBus<CM>>>,
85    callers: Arc<
86        HashMap<
87            (String, Method),
88            Arc<
89                dyn FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
90                    + 'static
91                    + Send
92                    + Sync,
93            >,
94        >,
95    >,
96    regexes: Arc<HashMap<String, (Regex, String)>>,
97    rt: Runtime,
98    main_reg: Regex,
99}
100
101impl<CM: 'static + ClusterManager + Send + Sync + RefUnwindSafe> HttpServer<CM> {
102    pub(crate) fn new(event_bus: Option<Arc<EventBus<CM>>>) -> HttpServer<CM> {
103        HttpServer {
104            port: 0,
105            event_bus,
106            callers: Arc::new(HashMap::new()),
107            regexes: Arc::new(HashMap::new()),
108            main_reg: Regex::new("(:\\w+)").unwrap(),
109            rt: Builder::new_multi_thread()
110                .worker_threads(12)
111                .enable_all()
112                .build()
113                .unwrap(),
114        }
115    }
116
117    pub fn get<OP>(&mut self, path: &str, op: OP) -> &mut Self
118    where
119        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
120            + 'static
121            + Send
122            + Sync,
123    {
124        self.add_op(path, Method::GET, op)
125    }
126
127    pub fn post<OP>(&mut self, path: &str, op: OP) -> &mut Self
128    where
129        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
130            + 'static
131            + Send
132            + Sync,
133    {
134        self.add_op(path, Method::POST, op)
135    }
136
137    pub fn put<OP>(&mut self, path: &str, op: OP) -> &mut Self
138    where
139        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
140            + 'static
141            + Send
142            + Sync,
143    {
144        self.add_op(path, Method::PUT, op)
145    }
146
147    pub fn delete<OP>(&mut self, path: &str, op: OP) -> &mut Self
148    where
149        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
150            + 'static
151            + Send
152            + Sync,
153    {
154        self.add_op(path, Method::DELETE, op)
155    }
156
157    pub fn head<OP>(&mut self, path: &str, op: OP) -> &mut Self
158    where
159        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
160            + 'static
161            + Send
162            + Sync,
163    {
164        self.add_op(path, Method::HEAD, op)
165    }
166
167    pub fn patch<OP>(&mut self, path: &str, op: OP) -> &mut Self
168    where
169        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
170            + 'static
171            + Send
172            + Sync,
173    {
174        self.add_op(path, Method::PATCH, op)
175    }
176
177    pub fn options<OP>(&mut self, path: &str, op: OP) -> &mut Self
178    where
179        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
180            + 'static
181            + Send
182            + Sync,
183    {
184        self.add_op(path, Method::OPTIONS, op)
185    }
186
187    pub fn connect<OP>(&mut self, path: &str, op: OP) -> &mut Self
188    where
189        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
190            + 'static
191            + Send
192            + Sync,
193    {
194        self.add_op(path, Method::CONNECT, op)
195    }
196
197    pub fn trace<OP>(&mut self, path: &str, op: OP) -> &mut Self
198    where
199        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
200            + 'static
201            + Send
202            + Sync,
203    {
204        self.add_op(path, Method::TRACE, op)
205    }
206
207    fn add_op<OP>(&mut self, path: &str, method: Method, op: OP) -> &mut Self
208    where
209        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
210            + 'static
211            + Send
212            + Sync,
213    {
214        let callers = Arc::get_mut(&mut self.callers).unwrap();
215        let reg_path = self.main_reg.replace_all(path, "(\\w+)");
216        callers.insert((reg_path.to_string(), method), Arc::new(op));
217
218        let regexes = Arc::get_mut(&mut self.regexes).unwrap();
219        let path = path.replace(":", "");
220        regexes.insert(reg_path.to_string(), (Regex::new(&reg_path).unwrap(), path));
221
222        self
223    }
224
225    pub fn listen_with_default<OP>(&mut self, port: u16, mut default: OP)
226    where
227        OP: FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
228            + 'static
229            + Send
230            + Sync
231            + Copy,
232    {
233        let ev = self.event_bus.as_ref().unwrap().clone();
234
235        let callers = self.callers.clone();
236        let addr = SocketAddr::from(([0, 0, 0, 0], port));
237        let new_service = make_service_fn(move |_conn: &AddrStream| {
238            let ev = ev.clone();
239            let callers = callers.clone();
240            async move {
241                let x = Ok::<_, Infallible>(service_fn(move |mut req: hyper::Request<Body>| {
242                    let ev = ev.clone();
243                    let callers = callers.clone();
244                    async move {
245                        let mut data = vec![];
246                        let body = req.body_mut();
247                        loop {
248                            let bytes = body.data().await;
249                            match bytes {
250                                Some(bytes) => {
251                                    match bytes {
252                                        Ok(bytes) => {
253                                            data.extend_from_slice(&bytes);
254                                        }
255                                        Err(_) => {
256                                            break;
257                                        }
258                                    }
259                                }
260                                None => {break;}
261                            }
262                        }
263                        let ev = ev.to_owned();
264                        let op = callers.get(&(req.uri().path().to_owned(), req.method().clone()));
265                        let request = Request {
266                            request: req,
267                            paths: Default::default(),
268                            request_timestamp: Local::now(),
269                            body: data
270                        };
271                        match op {
272                            Some(op) => {
273                                let mut op = op.clone();
274                                unsafe {
275                                    let op = Arc::get_mut_unchecked(&mut op);
276                                    op(request, ev)
277                                }
278                            }
279                            None => default(request, ev),
280                        }
281                    }
282                }));
283
284                x
285            }
286        });
287
288        self.rt.spawn(async move {
289            let server = Server::bind(&addr).serve(new_service);
290            info!("start http_server on http://{}", addr);
291            let _ = server.await;
292        });
293    }
294
295    pub fn listen(&mut self, port: u16) {
296        let ev = self.event_bus.as_ref().unwrap().clone();
297
298        let callers = self.callers.clone();
299        let regexes = self.regexes.clone();
300        let addr = SocketAddr::from(([0, 0, 0, 0], port));
301        let new_service = make_service_fn(move |_conn: &AddrStream| {
302            let ev = ev.clone();
303            let callers = callers.clone();
304            let regexes = regexes.clone();
305            async move {
306                Ok::<_, Infallible>(service_fn(move |mut req: hyper::Request<Body>| {
307                    let ev = ev.clone();
308                    let callers = callers.clone();
309                    let regexes = regexes.clone();
310                    async move {
311                        let mut data = vec![];
312                        let body = req.body_mut();
313                        loop {
314                            let bytes = body.data().await;
315                            match bytes {
316                                Some(bytes) => {
317                                    match bytes {
318                                        Ok(bytes) => {
319                                            data.extend_from_slice(&bytes);
320                                        }
321                                        Err(_) => {
322                                            break;
323                                        }
324                                    }
325                                }
326                                None => {break;}
327                            }
328                        }
329                        let ev = ev.to_owned();
330                        let path = req.uri().path().to_owned();
331                        let mut path_key = &path;
332                        let mut paths = HashMap::new();
333                        for (k, v) in regexes.iter() {
334                            if v.0.is_match(&path) {
335                                path_key = k;
336                                let caps_base = v.0.captures_iter(&v.1);
337                                let caps : Vec<Captures> = v.0.captures_iter(&path).collect();
338                                for (i, c) in caps_base.enumerate() {
339                                    for idx in 1..c.len() {
340                                        if let Some(name) = c.get(idx) {
341                                            if let Some(value) = caps[i].get(idx) {
342                                                let _ = paths.insert(name.as_str().to_string(),
343                                                                     value.as_str().to_string()
344                                                );
345                                            }
346                                        }
347                                    }
348
349                                }
350                                break;
351                            }
352                        }
353                        let op = callers.get(&(path_key.to_owned(), req.method().clone()));
354                        let request = Request {
355                            request: req,
356                            paths,
357                            request_timestamp: Local::now(),
358                            body: data
359                        };
360                        <HttpServer<CM>>::invoke_function(request, ev, op)
361                    }
362                }))
363            }
364        });
365
366        self.rt.spawn(async move {
367            let server = Server::bind(&addr).serve(new_service);
368            info!("start http_server on http://{}", addr);
369            let _ = server.await;
370        });
371    }
372
373    #[inline]
374    fn invoke_function(
375        req: Request,
376        ev: Arc<EventBus<CM>>,
377        op: Option<
378            &Arc<
379                dyn FnMut(Request, Arc<EventBus<CM>>) -> Result<Response<Body>, Error>
380                    + Send
381                    + Sync,
382            >,
383        >,
384    ) -> Result<Response<Body>, Error> {
385        match op {
386            Some(op) => {
387                let mut op = op.clone();
388                unsafe {
389                    let op = Arc::get_mut_unchecked(&mut op);
390                    op(req, ev)
391                }
392            }
393            None => Ok(Response::builder()
394                .status(StatusCode::NOT_FOUND)
395                .body(b"NOTFOUND".to_vec().into())
396                .unwrap()),
397        }
398    }
399}