edc_dataplane_core/
web.rs

1use std::net::IpAddr;
2
3use axum::Router;
4use std::net::TcpStream;
5use tokio::{
6    net::TcpListener,
7    sync::watch::{Receiver, Sender},
8};
9
10pub async fn start_server<T: Clone + Send + Sync + 'static>(
11    bind: IpAddr,
12    port: u16,
13    app: Router<T>,
14    state: T,
15    name: &'static str,
16) -> anyhow::Result<ServerHandle> {
17    let app = app.with_state(state);
18    let listener = TcpListener::bind((bind, port)).await?;
19    let server_addr = listener.local_addr()?;
20
21    let (shutdown_trigger, shutdown_receiver) = tokio::sync::watch::channel(());
22    let (shutdown_notifier, shutdown_listener) = tokio::sync::watch::channel(());
23
24    tokio::task::spawn(async move {
25        tracing::debug!("Launching {} on {}", name, listener.local_addr().unwrap());
26        axum::serve(listener, app)
27            .with_graceful_shutdown(shutdown_signal(shutdown_receiver))
28            .await
29            .unwrap();
30
31        shutdown_notifier.send(()).unwrap();
32    });
33
34    wait_for_server(server_addr).await;
35
36    Ok(ServerHandle::new(shutdown_trigger, shutdown_listener))
37}
38
39pub struct ServerHandle {
40    shutdown: Sender<()>,
41    waiter: Receiver<()>,
42}
43
44impl ServerHandle {
45    pub fn new(shutdown: Sender<()>, waiter: Receiver<()>) -> Self {
46        Self { shutdown, waiter }
47    }
48
49    pub async fn shutdown(self) {
50        self.shutdown.send(()).unwrap();
51    }
52
53    pub async fn wait(&mut self) -> anyhow::Result<()> {
54        self.waiter.changed().await.map(Ok)?
55    }
56}
57
58async fn shutdown_signal(mut receiver: Receiver<()>) {
59    let _ = receiver.changed().await;
60}
61
62pub async fn wait_for_server(socket: std::net::SocketAddr) {
63    for _ in 0..10 {
64        if TcpStream::connect_timeout(&socket, std::time::Duration::from_millis(25)).is_ok() {
65            break;
66        }
67        tokio::time::sleep(std::time::Duration::from_millis(25)).await;
68    }
69}