enigma_node_registry/
server.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::time::Duration;
4
5use axum::http::StatusCode;
6use axum::Router;
7use enigma_node_types::NodeInfo;
8use tokio::net::TcpListener;
9use tokio::sync::oneshot;
10use tokio::task::JoinHandle;
11use tower_http::timeout::TimeoutLayer;
12
13use crate::config::RegistryConfig;
14use crate::error::{EnigmaNodeRegistryError, Result};
15use crate::routes::{build_router, AppState};
16use crate::store::Store;
17use crate::ttl;
18
19pub struct RunningServer {
20    pub base_url: String,
21    pub shutdown: oneshot::Sender<()>,
22    pub handle: JoinHandle<Result<()>>,
23    #[cfg(test)]
24    pub store: Arc<Store>,
25}
26
27pub async fn start(cfg: RegistryConfig, initial_nodes: Vec<NodeInfo>) -> Result<RunningServer> {
28    let store = Arc::new(Store::new(cfg.max_nodes));
29    store.add_nodes(initial_nodes).await?;
30    let state = AppState {
31        store: store.clone(),
32    };
33    let router = build_router(state);
34    let router = apply_timeout(router, cfg.request_timeout_ms);
35    let addr: SocketAddr = cfg
36        .bind_addr
37        .parse()
38        .map_err(|_| EnigmaNodeRegistryError::InvalidInput("bind_addr"))?;
39    let listener = TcpListener::bind(addr).await?;
40    let bound_addr = listener.local_addr()?;
41    let base_url = format!("http://{}", bound_addr);
42    let (shutdown_tx, shutdown_rx) = oneshot::channel();
43    let (purger_tx, purger_rx) = oneshot::channel();
44    let purger_cfg = cfg.clone();
45    let purger_store = store.clone();
46    let purger_task = tokio::spawn(async move {
47        ttl::run_purger(purger_store, purger_cfg, purger_rx).await;
48    });
49    let server = axum::serve(listener, router.into_make_service()).with_graceful_shutdown(async {
50        let _ = shutdown_rx.await;
51        let _ = purger_tx.send(());
52    });
53    let handle = tokio::spawn(async move {
54        let result = server.await;
55        let _ = purger_task.await;
56        match result {
57            Ok(_) => Ok(()),
58            Err(_) => Err(EnigmaNodeRegistryError::Transport),
59        }
60    });
61    Ok(RunningServer {
62        base_url,
63        shutdown: shutdown_tx,
64        handle,
65        #[cfg(test)]
66        store,
67    })
68}
69
70fn apply_timeout(router: Router, request_timeout_ms: u64) -> Router {
71    if request_timeout_ms == 0 {
72        return router;
73    }
74    let timeout = TimeoutLayer::with_status_code(
75        StatusCode::REQUEST_TIMEOUT,
76        Duration::from_millis(request_timeout_ms),
77    );
78    router.layer(timeout)
79}