1use std::{collections::HashMap, convert::Infallible, error::Error};
2
3use crate::{filter, pipeline, util::from_utf8, AsyncFramework, AsyncPipeline, Pipeline, RawAsyncFramework, RawAsyncPipeline};
4use http_body_util::{BodyExt, Full};
5use hyper::{body::{Body, Bytes, Incoming}, header::HeaderValue, server::conn::http1, service::service_fn, Method, Response};
6use hyper_util::rt::TokioIo;
7use regex::Regex;
8use tokio::net::TcpListener;
9use async_trait::async_trait;
10use serde_urlencoded::from_str;
11
12pub use hyper::Request;
13
14struct HttpServer {
15 address: String
16}
17
18#[async_trait]
19impl RawAsyncFramework<Request<HttpContext<Bytes>>,Response<Full<Bytes>>,Response<Full<Bytes>>> for HttpServer {
20 async fn run(&self, pipeline: AsyncPipeline<Request<HttpContext<Bytes>>,Response<Full<Bytes>>,Response<Full<Bytes>>>) {
21 let listener = TcpListener::bind(&self.address).await.unwrap();
22 loop {
23 let (tcp, _) = listener.accept().await.unwrap();
24 let io = TokioIo::new(tcp);
25 if let Err(err) = http1::Builder::new()
26 .serve_connection(io, service_fn(|req| {
27 async {
28 Ok::<_,Infallible>(match (Ok(req) & to_bytes() & wrap_context() & pipeline.clone()).await {
29 Ok(a) => a.clone(),
30 Err(a) => a.clone(),
31 })
32 }
33 }))
34 .await
35 {
36 println!("Error serving connection: {:?}", err);
37 }
38 }
39 }
40}
41
42pub fn http_server(address: &str) -> AsyncFramework<Request<HttpContext<Bytes>>,Response<Full<Bytes>>,Response<Full<Bytes>>> {
43 AsyncFramework::new(HttpServer {
44 address: address.to_string()
45 })
46}
47
48pub type HttpResponse = Response<Full<Bytes>>;
49#[derive(Debug)]
52pub struct HttpContext<T> {
53 pub params: HashMap<String,String>,
54 pub body: T
55}
56
57impl<T: Clone> Clone for HttpContext<T> {
58 fn clone(&self) -> Self {
59 Self { params: self.params.clone(), body: self.body.clone() }
60 }
61}
62
63impl<T> HttpContext<T> {
64 pub fn new(params: HashMap<String,String>, body: T) -> HttpContext<T>{
65 HttpContext {
66 params,
67 body
68 }
69 }
70}
71
72pub fn method_is<T: Clone + 'static>(method: Method) -> Pipeline<Request<HttpContext<T>>, Request<HttpContext<T>>, Response<Full<Bytes>>>{
73 filter(move|r : &Request<HttpContext<T>>| r.method() == &method, Response::builder().status(405).body(Full::new(Bytes::from(""))).unwrap())
74}
75
76pub fn http_get<T: Clone + 'static>() -> Pipeline<Request<HttpContext<T>>, Request<HttpContext<T>>, Response<Full<Bytes>>> {
77 method_is::<T>(Method::GET)
78}
79
80pub fn http_post<T: Clone + 'static>() -> Pipeline<Request<HttpContext<T>>, Request<HttpContext<T>>, Response<Full<Bytes>>> {
81 method_is::<T>(Method::POST)
82}
83
84pub fn http_put<T: Clone + 'static>() -> Pipeline<Request<HttpContext<T>>, Request<HttpContext<T>>, Response<Full<Bytes>>> {
85 method_is::<T>(Method::PUT)
86}
87
88pub fn http_delete<T: Clone + 'static>() -> Pipeline<Request<HttpContext<T>>, Request<HttpContext<T>>, Response<Full<Bytes>>> {
89 method_is::<T>(Method::DELETE)
90}
91
92pub fn http_error<T: Clone + 'static>(status: u16, message: &'static str) -> Result<T, Response<Full<Bytes>>> {
93 Err(Response::builder().status(status).body(Full::new(Bytes::from(message))).unwrap())
94}
95
96pub fn http_ok<MT: Into<Bytes>, ET: Clone + 'static>(status: u16, message: MT) -> Result<Response<Full<Bytes>>, ET> {
97 Ok(Response::builder().status(status).body(Full::new(message.into())).unwrap())
98}
99
100pub fn set_header(key: &'static str, value: &'static str) -> Pipeline<Response<Full<Bytes>>,Response<Full<Bytes>>,Response<Full<Bytes>>> {
101 pipeline(move |r: Response<Full<Bytes>>| {
102 let mut r = r.clone();
103 r.headers_mut().insert(key, HeaderValue::from_str(value).unwrap());
104 Ok(r)
105 })
106}
107
108struct ToByte;
109
110#[async_trait]
111impl RawAsyncPipeline<Request<Incoming>, Request<Bytes>, Response<Full<Bytes>>> for ToByte {
112 async fn async_run(&self,r: Request<Incoming>) -> Result<Request<Bytes>, Response<Full<Bytes>>> {
113 let mut body: Option<Incoming> = None;
114 let r = r.map(|b| body = Some(b));
115 match body.unwrap().collect().await {
116 Ok(rr) => {
117 Ok(r.map(|_| rr.to_bytes()))
118 },
119 Err(_) => Err(Response::builder().status(400).body(Full::new(Bytes::from(""))).unwrap()),
120 }
121 }
122}
123
124fn to_bytes() -> AsyncPipeline<Request<Incoming>,Request<Bytes>, Response<Full<Bytes>>> {
125 AsyncPipeline::new(ToByte)
126}
127
128fn wrap_context<T: Send + 'static>() -> Pipeline<Request<T>,Request<HttpContext<T>>, Response<Full<Bytes>>> {
129 pipeline(|r: Request<T>| Ok(r.map(|body| HttpContext::new(HashMap::new(), body))))
130}
131
132pub fn to_string() -> Pipeline<Request<HttpContext<Bytes>>, Request<HttpContext<String>>, Response<Full<Bytes>>> {
133 request(pipeline(|b: Bytes| Ok(b.to_vec())) & from_utf8())
134}
135
136pub fn from_body<T: Body + Send + Sync + 'static>() -> Pipeline<Request<HttpContext<T>>, HttpContext<T>, Response<Full<Bytes>>> {
137 pipeline(|bv: Request<HttpContext<T>>| Ok(bv.into_body()))
138}
139
140pub fn request<VT : Send + Sync + 'static,RT:Send + Sync + 'static,ET: Send + Sync + Error + 'static>(pipline: Pipeline<VT,RT, ET>) -> Pipeline<Request<HttpContext<VT>>,Request<HttpContext<RT>>, Response<Full<Bytes>>> {
141 pipeline(move |r: Request<HttpContext<VT>>| {
142 let req = r.map(|v: HttpContext<VT>| HttpContext::new(v.params,Ok(v.body) & pipline.clone()));
143 match &req.body().body {
144 Ok(_) => Ok(req.map(|r| HttpContext::new(r.params, r.body.unwrap()))),
145 Err(e) => Err(Response::builder().status(400).body(Full::new(Bytes::from(e.to_string()))).unwrap())
146 }
147 })
148}
149
150pub fn from_path<T: 'static>(path: &str) -> Pipeline<Request<HttpContext<T>>, Request<HttpContext<T>>, Response<Full<Bytes>>> {
151 let path_re_base = Regex::new("/:(\\w+)").unwrap();
152 let path_params: Vec<String> = path_re_base.clone().captures_iter(path).map(|m| m.get(1).unwrap().as_str().to_string()).collect();
153 let re_text = path_params.iter().fold(format!("^{path}$"),|p: String,pp| p.replace(&format!(":{pp}"), &format!("(?<{pp}>[^/]+)")));
154 let path_re = Regex::new(&re_text).unwrap();
155 pipeline(move |r: Request<HttpContext<T>>| {
156 let rr = &r;
157 let params = rr.body().params.clone();
158 let new_params = match path_re.clone().captures(rr.uri().path()) {
159 Some(r) => {
160 let mut new_paramas = params.clone();
161 for pp in path_params.clone().iter() {
162 if let Some(r) = r.name(pp) {
163 new_paramas.insert(pp.clone(), r.as_str().to_string());
164 }
165 }
166 Ok(new_paramas)
167 },
168 None => Err(Response::builder().status(404).body(Full::new(Bytes::from(""))).unwrap()),
169 };
170 match new_params {
171 Ok(params) => Ok(r.map(|old| HttpContext::new(params, old.body))),
172 Err(e) => Err(e)
173 }
174 })
175}
176
177pub fn from_query<T: 'static>() -> Pipeline<Request<HttpContext<T>>, Request<HttpContext<T>>, Response<Full<Bytes>>> {
178 pipeline(|r: Request<HttpContext<T>>| {
179 let rr = &r;
180 let params = rr.body().params.clone();
181 let new_params = match from_str::<HashMap<String,String>>(rr.uri().query().unwrap_or_default()) {
182 Ok(v) => {
183 let mut new_paramas = params.clone();
184 for (k,v) in v.iter() {
185 new_paramas.insert(k.clone(), v.clone());
186 }
187 Ok(new_paramas)
188 },
189 Err(e) => Err(Response::builder().status(400).body(Full::new(Bytes::from(e.to_string()))).unwrap()),
190 };
191
192 match new_params {
193 Ok(params) => Ok(r.map(|old| HttpContext::new(params, old.body))),
194 Err(e) => Err(e)
195 }
196 })
197}
198
199
200pub fn to_body<T: Into<Bytes> + Send + Sync + 'static>() -> Pipeline<T, Response<Full<Bytes>>, Response<Full<Bytes>>> {
201 pipeline(|s: T| Ok(Response::builder().status(200).body(Full::new(s.into())).unwrap()))
202}