rainmaker_components/http/
http_linux.rs

1#![cfg(target_os = "linux")]
2
3static LOGGER_TARGET: &str = "http_server";
4
5use std::collections::HashMap;
6use std::thread::{self, JoinHandle};
7use std::{net::SocketAddr, sync::mpsc};
8
9use log::error;
10
11use crate::error::Error;
12use crate::http::base::*;
13use crate::utils::{wrap_in_arc_mutex, WrappedInArcMutex};
14
15impl From<&tiny_http::Method> for HttpMethod {
16    fn from(inp: &tiny_http::Method) -> Self {
17        match inp {
18            tiny_http::Method::Get => HttpMethod::GET,
19            tiny_http::Method::Post => HttpMethod::POST,
20            _ => unreachable!(),
21        }
22    }
23}
24
25impl From<&mut tiny_http::Request> for HttpRequest {
26    fn from(req: &mut tiny_http::Request) -> Self {
27        let buf_len = req.body_length().unwrap_or(0);
28        let mut buf = vec![0; buf_len];
29        req.as_reader().read_exact(&mut buf).unwrap();
30
31        Self {
32            method: req.method().into(),
33            url: req.url().to_string(),
34            data: buf,
35        }
36    }
37}
38
39pub trait HttpEndpointCallback<'a> = Fn(HttpRequest) -> HttpResponse + Send + Sync + 'a;
40
41// http server from esp-idf-svc starts listening as soon as it is initialized and supports registering callback handlers later on
42// however tiny_http is a blocking server
43// we linux http server with idf by creating a hashmap mutex and spawning tiny_http in a separate thread
44type HttpCallbackMethodMapping<'a> = HashMap<HttpMethod, Box<dyn HttpEndpointCallback<'static>>>;
45pub struct HttpServerLinux {
46    // inner hashmap to store mapping of endpoint method with callback
47    // outer hashmap to store mapping to endpoint url with inner hashmap
48    callbacks: WrappedInArcMutex<HashMap<String, HttpCallbackMethodMapping<'static>>>,
49    execution_thread_handle: Option<JoinHandle<()>>,
50    executor_channel: mpsc::Sender<()>,
51}
52
53impl HttpServer<HttpServerLinux> {
54    pub fn new(config: HttpConfiguration) -> Result<Self, Error> {
55        let callbacks: HashMap<String, HttpCallbackMethodMapping<'static>> = HashMap::new();
56        let callbacks_mutex = wrap_in_arc_mutex(callbacks);
57        let callbacks_mutex_clone = callbacks_mutex.clone();
58
59        let server = tiny_http::Server::http(SocketAddr::new(config.addr, config.port)).unwrap();
60
61        // use a channel to send a dummy data to stop http server
62        let (sender, recver) = mpsc::channel::<()>();
63
64        // execute a server in a separate thread
65        let executor_joinhandle = thread::spawn(move || {
66            let callbacks = callbacks_mutex_clone.to_owned();
67            while recver.try_recv().is_err() {
68                // untill there is no data in the buffer
69                let mut req = match server.recv() {
70                    Ok(r) => r,
71                    Err(e) => {
72                        error!(target: LOGGER_TARGET, "unable to receive http request: {e}");
73                        continue;
74                    }
75                };
76
77                let req_ep = req.url();
78                let req_method = req.method();
79
80                let callbacks_lock = callbacks.lock().unwrap();
81
82                let res = match callbacks_lock.get(req_ep) {
83                    Some(h) => match h.get(&req_method.into()) {
84                        Some(cb) => {
85                            // callback exists, execute callback
86                            cb(HttpRequest::from(&mut req))
87                        }
88                        None => HttpResponse::from_bytes("invalid method"),
89                    },
90                    None => HttpResponse::from_bytes("invalid url"),
91                };
92
93                req.respond(tiny_http::Response::from_data(res.get_bytes_vectored()))
94                    .unwrap();
95            }
96        });
97
98        // let executor_joinhandle = thread::spawn(|| {});
99
100        Ok(Self(HttpServerLinux {
101            callbacks: callbacks_mutex,
102            execution_thread_handle: Some(executor_joinhandle),
103            executor_channel: sender,
104        }))
105    }
106
107    pub fn add_listener<T>(&mut self, path: String, method: HttpMethod, callback: T)
108    where
109        T: HttpEndpointCallback<'static>,
110    {
111        // if inner hashmap does not exist for a path, create it
112        let mut paths_hmap = self.0.callbacks.lock().unwrap();
113        let _ = paths_hmap.try_insert(path.clone(), HashMap::new()); // we can safely ignore the err
114
115        // insert the callback and check for error
116        let callbacks_hashmap = paths_hmap.get_mut(&path).unwrap();
117        match callbacks_hashmap.try_insert(method, Box::new(callback)) {
118            Ok(_) => {
119                log::debug!(target: LOGGER_TARGET, "Registered handler for {path}")
120            }
121            Err(_) => {
122                error!(target: LOGGER_TARGET, "handler for {path} for already exists");
123            }
124        }
125    }
126}
127
128impl Drop for HttpServerLinux {
129    fn drop(&mut self) {
130        // send a message to stop the server from listening
131        self.executor_channel.send(()).unwrap();
132
133        let join_handle = self.execution_thread_handle.take();
134        // wait for thread to gracefully exit
135        join_handle.map(|s| s.join());
136    }
137}