lopin/
http_server.rs

1use std::{collections::HashMap, convert::Infallible, error::Error};
2
3use crate::{filter, pipeline, util::from_utf8, AsyncFramework, AsyncPipeline, Pipeline, RawAsyncFramework, RawAsyncPipeline};
4use http_body_util::{BodyExt, Full};
5use hyper::{body::{Body, Bytes, Incoming}, header::HeaderValue, server::conn::http1, service::service_fn, Method, Response};
6use hyper_util::rt::TokioIo;
7use regex::Regex;
8use tokio::net::TcpListener;
9use async_trait::async_trait;
10use serde_urlencoded::from_str;
11
12pub use hyper::Request;
13
14struct HttpServer {
15  address: String
16}
17
18#[async_trait]
19impl RawAsyncFramework<Request<HttpContext<Bytes>>,Response<Full<Bytes>>,Response<Full<Bytes>>> for HttpServer {
20  async fn run(&self, pipeline: AsyncPipeline<Request<HttpContext<Bytes>>,Response<Full<Bytes>>,Response<Full<Bytes>>>) {
21    let listener = TcpListener::bind(&self.address).await.unwrap();
22    loop {
23      let (tcp, _) = listener.accept().await.unwrap();
24      let io = TokioIo::new(tcp);
25      if let Err(err) = http1::Builder::new()
26          .serve_connection(io, service_fn(|req| {
27            async {
28              Ok::<_,Infallible>(match (Ok(req) & to_bytes() & wrap_context() & pipeline.clone()).await {
29                Ok(a) => a.clone(),
30                Err(a) => a.clone(),
31              })
32            }
33          }))
34          .await
35      {
36          println!("Error serving connection: {:?}", err);
37      }
38    }
39  }
40}
41
42pub fn http_server(address: &str) -> AsyncFramework<Request<HttpContext<Bytes>>,Response<Full<Bytes>>,Response<Full<Bytes>>> {
43  AsyncFramework::new(HttpServer {
44    address: address.to_string()
45  })
46}
47
48pub type HttpResponse = Response<Full<Bytes>>;
49// pub type HttpAsyncPipeline<VT,RT> = AsyncPipeline<Request<HttpContext<VT>>, RT, HttpResponse>;
50
51#[derive(Debug)]
52pub struct HttpContext<T> {
53  pub params: HashMap<String,String>,
54  pub body: T
55}
56
57impl<T: Clone> Clone for HttpContext<T>  {
58    fn clone(&self) -> Self {
59        Self { params: self.params.clone(), body: self.body.clone() }
60    }
61}
62
63impl<T> HttpContext<T> {
64  pub fn new(params: HashMap<String,String>, body: T) -> HttpContext<T>{
65    HttpContext {
66      params,
67      body
68    }
69  }
70}
71
72pub fn method_is<T: Clone + 'static>(method: Method) -> Pipeline<Request<HttpContext<T>>, Request<HttpContext<T>>, Response<Full<Bytes>>>{
73  filter(move|r : &Request<HttpContext<T>>| r.method() == &method, Response::builder().status(405).body(Full::new(Bytes::from(""))).unwrap())
74}
75
76pub fn http_get<T: Clone + 'static>() -> Pipeline<Request<HttpContext<T>>, Request<HttpContext<T>>, Response<Full<Bytes>>> {
77  method_is::<T>(Method::GET)
78}
79
80pub fn http_post<T: Clone + 'static>() -> Pipeline<Request<HttpContext<T>>, Request<HttpContext<T>>, Response<Full<Bytes>>> {
81  method_is::<T>(Method::POST)
82}
83
84pub fn http_put<T: Clone + 'static>() -> Pipeline<Request<HttpContext<T>>, Request<HttpContext<T>>, Response<Full<Bytes>>> {
85  method_is::<T>(Method::PUT)
86}
87
88pub fn http_delete<T: Clone + 'static>() -> Pipeline<Request<HttpContext<T>>, Request<HttpContext<T>>, Response<Full<Bytes>>> {
89  method_is::<T>(Method::DELETE)
90}
91
92pub fn http_error<T: Clone + 'static>(status: u16, message: &'static str) -> Result<T, Response<Full<Bytes>>> {
93  Err(Response::builder().status(status).body(Full::new(Bytes::from(message))).unwrap())
94}
95
96pub fn http_ok<MT: Into<Bytes>, ET: Clone + 'static>(status: u16, message: MT) -> Result<Response<Full<Bytes>>, ET> {
97  Ok(Response::builder().status(status).body(Full::new(message.into())).unwrap())
98}
99
100pub fn set_header(key: &'static str, value: &'static str) -> Pipeline<Response<Full<Bytes>>,Response<Full<Bytes>>,Response<Full<Bytes>>> {
101  pipeline(move |r: Response<Full<Bytes>>| {
102    let mut r = r.clone();
103    r.headers_mut().insert(key, HeaderValue::from_str(value).unwrap());
104    Ok(r)
105  })
106}
107
108struct ToByte;
109
110#[async_trait]
111impl RawAsyncPipeline<Request<Incoming>, Request<Bytes>, Response<Full<Bytes>>> for ToByte {
112  async fn async_run(&self,r: Request<Incoming>) -> Result<Request<Bytes>, Response<Full<Bytes>>> {
113    let mut body: Option<Incoming> = None;
114    let r = r.map(|b| body = Some(b));
115    match body.unwrap().collect().await {
116      Ok(rr) => {
117        Ok(r.map(|_| rr.to_bytes()))
118      },
119      Err(_) => Err(Response::builder().status(400).body(Full::new(Bytes::from(""))).unwrap()),
120    }
121  }
122}
123
124fn to_bytes() -> AsyncPipeline<Request<Incoming>,Request<Bytes>, Response<Full<Bytes>>> {
125  AsyncPipeline::new(ToByte)
126}
127
128fn wrap_context<T: Send  + 'static>() -> Pipeline<Request<T>,Request<HttpContext<T>>, Response<Full<Bytes>>> {
129  pipeline(|r: Request<T>| Ok(r.map(|body| HttpContext::new(HashMap::new(), body))))
130}
131
132pub fn to_string() -> Pipeline<Request<HttpContext<Bytes>>, Request<HttpContext<String>>, Response<Full<Bytes>>> {
133  request(pipeline(|b: Bytes| Ok(b.to_vec())) & from_utf8())
134}
135
136pub fn from_body<T: Body + Send + Sync + 'static>() -> Pipeline<Request<HttpContext<T>>, HttpContext<T>, Response<Full<Bytes>>> {
137  pipeline(|bv: Request<HttpContext<T>>| Ok(bv.into_body()))
138}
139
140pub fn request<VT : Send + Sync + 'static,RT:Send + Sync + 'static,ET: Send + Sync + Error + 'static>(pipline: Pipeline<VT,RT, ET>) -> Pipeline<Request<HttpContext<VT>>,Request<HttpContext<RT>>, Response<Full<Bytes>>> {
141  pipeline(move |r: Request<HttpContext<VT>>| {
142    let req = r.map(|v: HttpContext<VT>| HttpContext::new(v.params,Ok(v.body) & pipline.clone()));
143    match &req.body().body {
144      Ok(_) => Ok(req.map(|r| HttpContext::new(r.params, r.body.unwrap()))),
145      Err(e) => Err(Response::builder().status(400).body(Full::new(Bytes::from(e.to_string()))).unwrap())
146    }
147  })
148}
149
150pub fn from_path<T: 'static>(path: &str) -> Pipeline<Request<HttpContext<T>>, Request<HttpContext<T>>, Response<Full<Bytes>>> {
151  let path_re_base = Regex::new("/:(\\w+)").unwrap();
152  let path_params: Vec<String> = path_re_base.clone().captures_iter(path).map(|m| m.get(1).unwrap().as_str().to_string()).collect();
153  let re_text = path_params.iter().fold(format!("^{path}$"),|p: String,pp| p.replace(&format!(":{pp}"), &format!("(?<{pp}>[^/]+)")));
154  let path_re = Regex::new(&re_text).unwrap();
155  pipeline(move |r: Request<HttpContext<T>>| {
156    let rr = &r;
157    let params = rr.body().params.clone();
158    let new_params = match path_re.clone().captures(rr.uri().path()) {
159        Some(r) => {
160          let mut new_paramas = params.clone();
161          for pp in path_params.clone().iter() {
162            if let Some(r) = r.name(pp) {
163              new_paramas.insert(pp.clone(), r.as_str().to_string());
164            }
165          }
166          Ok(new_paramas)
167        },
168        None => Err(Response::builder().status(404).body(Full::new(Bytes::from(""))).unwrap()),
169    };
170    match new_params {
171      Ok(params) => Ok(r.map(|old| HttpContext::new(params, old.body))),
172      Err(e) => Err(e)
173    }
174  })
175}
176
177pub fn from_query<T: 'static>() -> Pipeline<Request<HttpContext<T>>, Request<HttpContext<T>>, Response<Full<Bytes>>> {
178  pipeline(|r: Request<HttpContext<T>>| {
179    let rr = &r;
180    let params = rr.body().params.clone();
181    let new_params = match from_str::<HashMap<String,String>>(rr.uri().query().unwrap_or_default()) {
182      Ok(v) => {
183        let mut new_paramas = params.clone();
184        for (k,v) in v.iter() {
185          new_paramas.insert(k.clone(), v.clone());
186        }
187        Ok(new_paramas)
188      },
189      Err(e) => Err(Response::builder().status(400).body(Full::new(Bytes::from(e.to_string()))).unwrap()),
190    };
191
192    match new_params {
193      Ok(params) => Ok(r.map(|old| HttpContext::new(params, old.body))),
194      Err(e) => Err(e)
195    }
196  })
197}
198
199
200pub fn to_body<T: Into<Bytes> + Send + Sync + 'static>() -> Pipeline<T, Response<Full<Bytes>>, Response<Full<Bytes>>> {
201  pipeline(|s: T| Ok(Response::builder().status(200).body(Full::new(s.into())).unwrap()))
202}