rainmaker_components/http/
http_linux.rs1#![cfg(target_os = "linux")]
2
3static LOGGER_TARGET: &str = "http_server";
4
5use std::collections::HashMap;
6use std::thread::{self, JoinHandle};
7use std::{net::SocketAddr, sync::mpsc};
8
9use log::error;
10
11use crate::error::Error;
12use crate::http::base::*;
13use crate::utils::{wrap_in_arc_mutex, WrappedInArcMutex};
14
15impl From<&tiny_http::Method> for HttpMethod {
16 fn from(inp: &tiny_http::Method) -> Self {
17 match inp {
18 tiny_http::Method::Get => HttpMethod::GET,
19 tiny_http::Method::Post => HttpMethod::POST,
20 _ => unreachable!(),
21 }
22 }
23}
24
25impl From<&mut tiny_http::Request> for HttpRequest {
26 fn from(req: &mut tiny_http::Request) -> Self {
27 let buf_len = req.body_length().unwrap_or(0);
28 let mut buf = vec![0; buf_len];
29 req.as_reader().read_exact(&mut buf).unwrap();
30
31 Self {
32 method: req.method().into(),
33 url: req.url().to_string(),
34 data: buf,
35 }
36 }
37}
38
39pub trait HttpEndpointCallback<'a> = Fn(HttpRequest) -> HttpResponse + Send + Sync + 'a;
40
41type HttpCallbackMethodMapping<'a> = HashMap<HttpMethod, Box<dyn HttpEndpointCallback<'static>>>;
45pub struct HttpServerLinux {
46 callbacks: WrappedInArcMutex<HashMap<String, HttpCallbackMethodMapping<'static>>>,
49 execution_thread_handle: Option<JoinHandle<()>>,
50 executor_channel: mpsc::Sender<()>,
51}
52
53impl HttpServer<HttpServerLinux> {
54 pub fn new(config: HttpConfiguration) -> Result<Self, Error> {
55 let callbacks: HashMap<String, HttpCallbackMethodMapping<'static>> = HashMap::new();
56 let callbacks_mutex = wrap_in_arc_mutex(callbacks);
57 let callbacks_mutex_clone = callbacks_mutex.clone();
58
59 let server = tiny_http::Server::http(SocketAddr::new(config.addr, config.port)).unwrap();
60
61 let (sender, recver) = mpsc::channel::<()>();
63
64 let executor_joinhandle = thread::spawn(move || {
66 let callbacks = callbacks_mutex_clone.to_owned();
67 while recver.try_recv().is_err() {
68 let mut req = match server.recv() {
70 Ok(r) => r,
71 Err(e) => {
72 error!(target: LOGGER_TARGET, "unable to receive http request: {e}");
73 continue;
74 }
75 };
76
77 let req_ep = req.url();
78 let req_method = req.method();
79
80 let callbacks_lock = callbacks.lock().unwrap();
81
82 let res = match callbacks_lock.get(req_ep) {
83 Some(h) => match h.get(&req_method.into()) {
84 Some(cb) => {
85 cb(HttpRequest::from(&mut req))
87 }
88 None => HttpResponse::from_bytes("invalid method"),
89 },
90 None => HttpResponse::from_bytes("invalid url"),
91 };
92
93 req.respond(tiny_http::Response::from_data(res.get_bytes_vectored()))
94 .unwrap();
95 }
96 });
97
98 Ok(Self(HttpServerLinux {
101 callbacks: callbacks_mutex,
102 execution_thread_handle: Some(executor_joinhandle),
103 executor_channel: sender,
104 }))
105 }
106
107 pub fn add_listener<T>(&mut self, path: String, method: HttpMethod, callback: T)
108 where
109 T: HttpEndpointCallback<'static>,
110 {
111 let mut paths_hmap = self.0.callbacks.lock().unwrap();
113 let _ = paths_hmap.try_insert(path.clone(), HashMap::new()); let callbacks_hashmap = paths_hmap.get_mut(&path).unwrap();
117 match callbacks_hashmap.try_insert(method, Box::new(callback)) {
118 Ok(_) => {
119 log::debug!(target: LOGGER_TARGET, "Registered handler for {path}")
120 }
121 Err(_) => {
122 error!(target: LOGGER_TARGET, "handler for {path} for already exists");
123 }
124 }
125 }
126}
127
128impl Drop for HttpServerLinux {
129 fn drop(&mut self) {
130 self.executor_channel.send(()).unwrap();
132
133 let join_handle = self.execution_thread_handle.take();
134 join_handle.map(|s| s.join());
136 }
137}