edc_dataplane_core/
web.rs1use std::net::IpAddr;
2
3use axum::Router;
4use std::net::TcpStream;
5use tokio::{
6 net::TcpListener,
7 sync::watch::{Receiver, Sender},
8};
9
10pub async fn start_server<T: Clone + Send + Sync + 'static>(
11 bind: IpAddr,
12 port: u16,
13 app: Router<T>,
14 state: T,
15 name: &'static str,
16) -> anyhow::Result<ServerHandle> {
17 let app = app.with_state(state);
18 let listener = TcpListener::bind((bind, port)).await?;
19 let server_addr = listener.local_addr()?;
20
21 let (shutdown_trigger, shutdown_receiver) = tokio::sync::watch::channel(());
22 let (shutdown_notifier, shutdown_listener) = tokio::sync::watch::channel(());
23
24 tokio::task::spawn(async move {
25 tracing::debug!("Launching {} on {}", name, listener.local_addr().unwrap());
26 axum::serve(listener, app)
27 .with_graceful_shutdown(shutdown_signal(shutdown_receiver))
28 .await
29 .unwrap();
30
31 shutdown_notifier.send(()).unwrap();
32 });
33
34 wait_for_server(server_addr).await;
35
36 Ok(ServerHandle::new(shutdown_trigger, shutdown_listener))
37}
38
39pub struct ServerHandle {
40 shutdown: Sender<()>,
41 waiter: Receiver<()>,
42}
43
44impl ServerHandle {
45 pub fn new(shutdown: Sender<()>, waiter: Receiver<()>) -> Self {
46 Self { shutdown, waiter }
47 }
48
49 pub async fn shutdown(self) {
50 self.shutdown.send(()).unwrap();
51 }
52
53 pub async fn wait(&mut self) -> anyhow::Result<()> {
54 self.waiter.changed().await.map(Ok)?
55 }
56}
57
58async fn shutdown_signal(mut receiver: Receiver<()>) {
59 let _ = receiver.changed().await;
60}
61
62pub async fn wait_for_server(socket: std::net::SocketAddr) {
63 for _ in 0..10 {
64 if TcpStream::connect_timeout(&socket, std::time::Duration::from_millis(25)).is_ok() {
65 break;
66 }
67 tokio::time::sleep(std::time::Duration::from_millis(25)).await;
68 }
69}