1#![forbid(unsafe_code)]
25use beatrice::reexport::{safina_executor, safina_timer};
26use beatrice::{print_log_response, socket_addr_127_0_0_1, HttpServerBuilder, Request, Response};
27use std::io::Read;
28use std::sync::atomic::{AtomicUsize, Ordering};
29use std::sync::Arc;
30use temp_dir::TempDir;
31
32pub struct State {
33 upload_count: AtomicUsize,
34}
35impl State {
36 #[allow(clippy::new_without_default)]
37 #[must_use]
38 pub fn new() -> Self {
39 State {
40 upload_count: AtomicUsize::new(0),
41 }
42 }
43}
44
45fn put(state: &Arc<State>, req: &Request) -> Result<Response, Response> {
46 if req.body.is_pending() {
47 return Ok(Response::get_body_and_reprocess(1024 * 1024));
48 }
49 let body_len = req.body.reader()?.bytes().count();
50 state.upload_count.fetch_add(1, Ordering::AcqRel);
51 Ok(Response::text(
52 200,
53 format!(
54 "Upload received, body_len={}, upload_count={}\n",
55 body_len,
56 state.upload_count.load(Ordering::Acquire)
57 ),
58 ))
59}
60
61fn handle_req(state: &Arc<State>, req: &Request) -> Result<Response, Response> {
62 match (req.method(), req.url().path()) {
63 ("GET", "/health") => Ok(Response::text(200, "ok")),
64 ("PUT", "/upload") => put(state, req),
65 (_, "/upload") => Ok(Response::method_not_allowed_405(&["PUT"])),
66 _ => Ok(Response::text(404, "Not found")),
67 }
68}
69
70pub fn main() {
71 println!("Access the server at http://127.0.0.1:8000/upload");
72 safina_timer::start_timer_thread();
73 let executor = safina_executor::Executor::default();
74 let cache_dir = TempDir::new().unwrap();
75 let state = Arc::new(State::new());
76 let request_handler = move |req: Request| print_log_response(&req, handle_req(&state, &req));
77 executor
78 .block_on(
79 HttpServerBuilder::new()
80 .listen_addr(socket_addr_127_0_0_1(8000))
81 .max_conns(100)
82 .small_body_len(64 * 1024)
83 .receive_large_bodies(cache_dir.path())
84 .spawn_and_join(request_handler),
85 )
86 .unwrap();
87}