rvlib/
httpserver.rs

1use httparse::{Request, EMPTY_HEADER};
2use rvimage_domain::{rverr, to_rv, RvResult};
3use std::{
4    fmt::Debug,
5    io::{prelude::*, Read},
6    net::TcpListener,
7    str,
8    sync::mpsc::{self, Receiver, Sender},
9    thread::{self, JoinHandle},
10};
11use tracing::info;
12
13#[derive(Debug, PartialEq)]
14enum HandleResult {
15    Path(String),
16    Terminate,
17}
18
19fn handle_connection(buffer: &[u8]) -> RvResult<HandleResult> {
20    let mut headers = [EMPTY_HEADER; 64];
21
22    let mut req = Request::new(&mut headers);
23    let res = req.parse(buffer);
24    res.map_err(to_rv)?;
25    match req.path {
26        Some(p) => Ok(if p == "/TERMINATE" {
27            HandleResult::Terminate
28        } else {
29            HandleResult::Path(
30                percent_encoding::percent_decode_str(&p[1..])
31                    .decode_utf8()
32                    .map_err(to_rv)?
33                    .to_string(),
34            )
35        }),
36        None => {
37            let msg = "could not find path in stream";
38            match str::from_utf8(buffer) {
39                Ok(b) => Err(rverr!("{} '{}'", msg, b)),
40                Err(e) => Err(rverr!("{}, {:?}", msg, e)),
41            }
42        }
43    }
44}
45
46fn to_rv_or_send<T, E>(tx: &Sender<RvResult<String>>, x: Result<T, E>) -> RvResult<T>
47where
48    E: Debug,
49{
50    match x {
51        Ok(r) => Ok(r),
52        Err(e) => {
53            let error_str = format!("{e:?}");
54            match tx.send(Err(to_rv(e))) {
55                Ok(()) => Err(rverr!("error in http server, {}", error_str)),
56                Err(e) => Err(rverr!("error {}, send error {:?}", error_str, e)),
57            }
58        }
59    }
60}
61pub type LaunchResultType = RvResult<(JoinHandle<RvResult<()>>, Receiver<RvResult<String>>)>;
62pub fn launch(address: String) -> LaunchResultType {
63    info!("spawning httpserver at {address}");
64    let (tx_from_server, rx_from_server) = mpsc::channel();
65    let handle = thread::spawn(move || -> RvResult<()> {
66        let bind_result = TcpListener::bind(address);
67        let listener = to_rv_or_send(&tx_from_server, bind_result)?;
68        let mut buffer = vec![0; 4096];
69        // since the requests will only change the shown image, they will be handled sequentially
70        for stream in listener.incoming() {
71            let stream_processing_result: RvResult<HandleResult> = {
72                let mut stream = stream.map_err(to_rv)?;
73                stream.read(&mut buffer).map_err(to_rv)?;
74                // we don't care about headers and bodies and strip everything after the first CRLF
75                let buffer_slice = if let Some(idx) =
76                    (0..(buffer.len() - 1)).find(|idx| &buffer[*idx..*idx + 2] == b"\r\n")
77                {
78                    &buffer[..idx + 2]
79                } else {
80                    &buffer
81                };
82                let response = "HTTP/1.1 200 OK\r\n\r\n";
83                stream.write(response.as_bytes()).map_err(to_rv)?;
84                stream.flush().map_err(to_rv)?;
85                let res = handle_connection(buffer_slice);
86                let path = res?;
87                Ok(path)
88            };
89            // send recieved path
90            if let Ok(p) = to_rv_or_send(&tx_from_server, stream_processing_result) {
91                match p {
92                    HandleResult::Terminate => {
93                        info!("terminating httpserver");
94                        return Ok(());
95                    }
96                    HandleResult::Path(p_) => {
97                        info!("tcp listener sending result...");
98                        let send_result = tx_from_server.send(Ok(p_));
99                        info!("done. {send_result:?}");
100                        to_rv_or_send(&tx_from_server, send_result)?;
101                    }
102                }
103            }
104            info!("tcp listener waiting for new input");
105        }
106        Ok(())
107    });
108    info!("...done");
109    Ok((handle, rx_from_server))
110}
111
112fn increase_port(address: &str) -> RvResult<String> {
113    let address_wo_port = address.split(':').next();
114    let port = address.split(':').next_back();
115    if let Some(port) = port {
116        if let Some(address_wo_port) = address_wo_port {
117            Ok(format!(
118                "{}:{}",
119                address_wo_port,
120                (port.parse::<usize>().map_err(to_rv)? + 1)
121            ))
122        } else {
123            Err(rverr!("is address of {} missing?", address))
124        }
125    } else {
126        Err(rverr!("is port of address {} missing?", address))
127    }
128}
129
130pub fn restart_with_increased_port(
131    http_addr: &str,
132) -> RvResult<(String, Option<Receiver<RvResult<String>>>)> {
133    let http_addr = increase_port(http_addr)?;
134
135    info!("restarting http server with increased port");
136    Ok(if let Ok((_, rx)) = launch(http_addr.clone()) {
137        (http_addr, Some(rx))
138    } else {
139        (http_addr, None)
140    })
141}
142#[test]
143fn test_handler() -> RvResult<()> {
144    let buffer = b"garbage";
145    assert!(handle_connection(buffer.as_slice()).is_err());
146
147    let buffer = b"GET /index.html HTTP/1.1\r\nHost:";
148    assert_eq!(
149        handle_connection(buffer.as_slice()),
150        Ok(HandleResult::Path("index.html".to_string()))
151    );
152
153    let buffer = b"GET /folder%20name/file%20name.png HTTP/1.1\r\nHost:";
154    assert_eq!(
155        handle_connection(buffer.as_slice()),
156        Ok(HandleResult::Path("folder name/file name.png".to_string()))
157    );
158    let buffer = b"GET /TERMINATE HTTP/1.1\r\nHost:";
159    assert_eq!(
160        handle_connection(buffer.as_slice()),
161        Ok(HandleResult::Terminate)
162    );
163
164    Ok(())
165}
166#[cfg(test)]
167use std::{net::TcpStream, time::Duration};
168#[test]
169fn test_launch() -> RvResult<()> {
170    let address = "127.0.0.1:7942";
171    println!("launching server...");
172    let (handle, rx) = launch(address.to_string())?;
173    thread::sleep(Duration::from_millis(10));
174    assert!(!handle.is_finished());
175    println!("...done");
176
177    let send_request = |req| -> RvResult<()> {
178        let mut stream = TcpStream::connect(address).map_err(to_rv)?;
179        stream.write(req).map_err(to_rv)?;
180        stream.flush().map_err(to_rv)?;
181        Ok(())
182    };
183    println!("writing to stream...");
184    let input_stream = b"GET /some_path.png HTTP/1.1\r\nHost:";
185    send_request(input_stream.as_slice())?;
186    println!("...done");
187    println!("writing to stream...");
188    let input_stream = b"GET /some_other_path.png HTTP/1.1\r\nHost:";
189    send_request(input_stream.as_slice())?;
190    println!("...done");
191    thread::sleep(Duration::from_millis(1500));
192    println!("checking results...");
193    let result1 = rx.recv().map_err(to_rv)?;
194    let result2 = rx.recv().map_err(to_rv)?;
195    assert_eq!(result1, Ok("some_path.png".to_string()));
196    assert_eq!(result2, Ok("some_other_path.png".to_string()));
197    println!("...done");
198    println!("terminate...");
199    let terminate_stream = b"GET /TERMINATE HTTP/1.1\r\n";
200    send_request(terminate_stream.as_slice())?;
201    thread::sleep(Duration::from_millis(500));
202    assert!(handle.is_finished());
203    println!("...done");
204    Ok(())
205}
206#[test]
207fn test_increase_port() -> RvResult<()> {
208    assert_eq!(increase_port("address:1234")?, "address:1235");
209    Ok(())
210}