1use httparse::{Request, EMPTY_HEADER};
2use rvimage_domain::{rverr, to_rv, RvResult};
3use std::{
4 fmt::Debug,
5 io::{prelude::*, Read},
6 net::TcpListener,
7 str,
8 sync::mpsc::{self, Receiver, Sender},
9 thread::{self, JoinHandle},
10};
11use tracing::info;
12
13#[derive(Debug, PartialEq)]
14enum HandleResult {
15 Path(String),
16 Terminate,
17}
18
19fn handle_connection(buffer: &[u8]) -> RvResult<HandleResult> {
20 let mut headers = [EMPTY_HEADER; 64];
21
22 let mut req = Request::new(&mut headers);
23 let res = req.parse(buffer);
24 res.map_err(to_rv)?;
25 match req.path {
26 Some(p) => Ok(if p == "/TERMINATE" {
27 HandleResult::Terminate
28 } else {
29 HandleResult::Path(
30 percent_encoding::percent_decode_str(&p[1..])
31 .decode_utf8()
32 .map_err(to_rv)?
33 .to_string(),
34 )
35 }),
36 None => {
37 let msg = "could not find path in stream";
38 match str::from_utf8(buffer) {
39 Ok(b) => Err(rverr!("{} '{}'", msg, b)),
40 Err(e) => Err(rverr!("{}, {:?}", msg, e)),
41 }
42 }
43 }
44}
45
46fn to_rv_or_send<T, E>(tx: &Sender<RvResult<String>>, x: Result<T, E>) -> RvResult<T>
47where
48 E: Debug,
49{
50 match x {
51 Ok(r) => Ok(r),
52 Err(e) => {
53 let error_str = format!("{e:?}");
54 match tx.send(Err(to_rv(e))) {
55 Ok(()) => Err(rverr!("error in http server, {}", error_str)),
56 Err(e) => Err(rverr!("error {}, send error {:?}", error_str, e)),
57 }
58 }
59 }
60}
61pub type LaunchResultType = RvResult<(JoinHandle<RvResult<()>>, Receiver<RvResult<String>>)>;
62pub fn launch(address: String) -> LaunchResultType {
63 info!("spawning httpserver at {address}");
64 let (tx_from_server, rx_from_server) = mpsc::channel();
65 let handle = thread::spawn(move || -> RvResult<()> {
66 let bind_result = TcpListener::bind(address);
67 let listener = to_rv_or_send(&tx_from_server, bind_result)?;
68 let mut buffer = vec![0; 4096];
69 for stream in listener.incoming() {
71 let stream_processing_result: RvResult<HandleResult> = {
72 let mut stream = stream.map_err(to_rv)?;
73 stream.read(&mut buffer).map_err(to_rv)?;
74 let buffer_slice = if let Some(idx) =
76 (0..(buffer.len() - 1)).find(|idx| &buffer[*idx..*idx + 2] == b"\r\n")
77 {
78 &buffer[..idx + 2]
79 } else {
80 &buffer
81 };
82 let response = "HTTP/1.1 200 OK\r\n\r\n";
83 stream.write(response.as_bytes()).map_err(to_rv)?;
84 stream.flush().map_err(to_rv)?;
85 let res = handle_connection(buffer_slice);
86 let path = res?;
87 Ok(path)
88 };
89 if let Ok(p) = to_rv_or_send(&tx_from_server, stream_processing_result) {
91 match p {
92 HandleResult::Terminate => {
93 info!("terminating httpserver");
94 return Ok(());
95 }
96 HandleResult::Path(p_) => {
97 info!("tcp listener sending result...");
98 let send_result = tx_from_server.send(Ok(p_));
99 info!("done. {send_result:?}");
100 to_rv_or_send(&tx_from_server, send_result)?;
101 }
102 }
103 }
104 info!("tcp listener waiting for new input");
105 }
106 Ok(())
107 });
108 info!("...done");
109 Ok((handle, rx_from_server))
110}
111
112fn increase_port(address: &str) -> RvResult<String> {
113 let address_wo_port = address.split(':').next();
114 let port = address.split(':').next_back();
115 if let Some(port) = port {
116 if let Some(address_wo_port) = address_wo_port {
117 Ok(format!(
118 "{}:{}",
119 address_wo_port,
120 (port.parse::<usize>().map_err(to_rv)? + 1)
121 ))
122 } else {
123 Err(rverr!("is address of {} missing?", address))
124 }
125 } else {
126 Err(rverr!("is port of address {} missing?", address))
127 }
128}
129
130pub fn restart_with_increased_port(
131 http_addr: &str,
132) -> RvResult<(String, Option<Receiver<RvResult<String>>>)> {
133 let http_addr = increase_port(http_addr)?;
134
135 info!("restarting http server with increased port");
136 Ok(if let Ok((_, rx)) = launch(http_addr.clone()) {
137 (http_addr, Some(rx))
138 } else {
139 (http_addr, None)
140 })
141}
142#[test]
143fn test_handler() -> RvResult<()> {
144 let buffer = b"garbage";
145 assert!(handle_connection(buffer.as_slice()).is_err());
146
147 let buffer = b"GET /index.html HTTP/1.1\r\nHost:";
148 assert_eq!(
149 handle_connection(buffer.as_slice()),
150 Ok(HandleResult::Path("index.html".to_string()))
151 );
152
153 let buffer = b"GET /folder%20name/file%20name.png HTTP/1.1\r\nHost:";
154 assert_eq!(
155 handle_connection(buffer.as_slice()),
156 Ok(HandleResult::Path("folder name/file name.png".to_string()))
157 );
158 let buffer = b"GET /TERMINATE HTTP/1.1\r\nHost:";
159 assert_eq!(
160 handle_connection(buffer.as_slice()),
161 Ok(HandleResult::Terminate)
162 );
163
164 Ok(())
165}
166#[cfg(test)]
167use std::{net::TcpStream, time::Duration};
168#[test]
169fn test_launch() -> RvResult<()> {
170 let address = "127.0.0.1:7942";
171 println!("launching server...");
172 let (handle, rx) = launch(address.to_string())?;
173 thread::sleep(Duration::from_millis(10));
174 assert!(!handle.is_finished());
175 println!("...done");
176
177 let send_request = |req| -> RvResult<()> {
178 let mut stream = TcpStream::connect(address).map_err(to_rv)?;
179 stream.write(req).map_err(to_rv)?;
180 stream.flush().map_err(to_rv)?;
181 Ok(())
182 };
183 println!("writing to stream...");
184 let input_stream = b"GET /some_path.png HTTP/1.1\r\nHost:";
185 send_request(input_stream.as_slice())?;
186 println!("...done");
187 println!("writing to stream...");
188 let input_stream = b"GET /some_other_path.png HTTP/1.1\r\nHost:";
189 send_request(input_stream.as_slice())?;
190 println!("...done");
191 thread::sleep(Duration::from_millis(1500));
192 println!("checking results...");
193 let result1 = rx.recv().map_err(to_rv)?;
194 let result2 = rx.recv().map_err(to_rv)?;
195 assert_eq!(result1, Ok("some_path.png".to_string()));
196 assert_eq!(result2, Ok("some_other_path.png".to_string()));
197 println!("...done");
198 println!("terminate...");
199 let terminate_stream = b"GET /TERMINATE HTTP/1.1\r\n";
200 send_request(terminate_stream.as_slice())?;
201 thread::sleep(Duration::from_millis(500));
202 assert!(handle.is_finished());
203 println!("...done");
204 Ok(())
205}
206#[test]
207fn test_increase_port() -> RvResult<()> {
208 assert_eq!(increase_port("address:1234")?, "address:1235");
209 Ok(())
210}