streambed_test/
server.rs

1/// Taken from https://github.com/seanmonstar/reqwest/blob/master/tests/support/server.rs
2use std::convert::Infallible;
3use std::future::Future;
4use std::net;
5use std::sync::mpsc as std_mpsc;
6use std::thread;
7use std::time::Duration;
8
9use tokio::sync::oneshot;
10
11pub use http::Response;
12use tokio::runtime;
13
14pub struct Server {
15    addr: net::SocketAddr,
16    panic_rx: std_mpsc::Receiver<()>,
17    shutdown_tx: Option<oneshot::Sender<()>>,
18}
19
20impl Server {
21    pub fn addr(&self) -> net::SocketAddr {
22        self.addr
23    }
24}
25
26impl Drop for Server {
27    fn drop(&mut self) {
28        if let Some(tx) = self.shutdown_tx.take() {
29            let _ = tx.send(());
30        }
31
32        if !::std::thread::panicking() {
33            self.panic_rx
34                .recv_timeout(Duration::from_secs(3))
35                .expect("test server should not panic");
36        }
37    }
38}
39
40#[allow(clippy::async_yields_async)]
41pub fn http<F, Fut>(func: F) -> Server
42where
43    F: Fn(http::Request<hyper::Body>) -> Fut + Clone + Send + 'static,
44    Fut: Future<Output = http::Response<hyper::Body>> + Send + 'static,
45{
46    //Spawn new runtime in thread to prevent reactor execution context conflict
47    thread::spawn(move || {
48        let rt = runtime::Builder::new_current_thread()
49            .enable_all()
50            .build()
51            .expect("new rt");
52        let srv = rt.block_on(async move {
53            hyper::Server::bind(&([127, 0, 0, 1], 0).into()).serve(hyper::service::make_service_fn(
54                move |_| {
55                    let func = func.clone();
56                    async move {
57                        Ok::<_, Infallible>(hyper::service::service_fn(move |req| {
58                            let fut = func(req);
59                            async move { Ok::<_, Infallible>(fut.await) }
60                        }))
61                    }
62                },
63            ))
64        });
65
66        let addr = srv.local_addr();
67        let (shutdown_tx, shutdown_rx) = oneshot::channel();
68        let srv = srv.with_graceful_shutdown(async move {
69            let _ = shutdown_rx.await;
70        });
71
72        let (panic_tx, panic_rx) = std_mpsc::channel();
73        let tname = format!(
74            "test({})-support-server",
75            thread::current().name().unwrap_or("<unknown>")
76        );
77        thread::Builder::new()
78            .name(tname)
79            .spawn(move || {
80                rt.block_on(srv).unwrap();
81                let _ = panic_tx.send(());
82            })
83            .expect("thread spawn");
84
85        Server {
86            addr,
87            panic_rx,
88            shutdown_tx: Some(shutdown_tx),
89        }
90    })
91    .join()
92    .unwrap()
93}