enigma_node_registry/
server.rs1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::time::Duration;
4
5use axum::http::StatusCode;
6use axum::Router;
7use enigma_node_types::NodeInfo;
8use tokio::net::TcpListener;
9use tokio::sync::oneshot;
10use tokio::task::JoinHandle;
11use tower_http::timeout::TimeoutLayer;
12
13use crate::config::RegistryConfig;
14use crate::error::{EnigmaNodeRegistryError, Result};
15use crate::routes::{build_router, AppState};
16use crate::store::Store;
17use crate::ttl;
18
19pub struct RunningServer {
20 pub base_url: String,
21 pub shutdown: oneshot::Sender<()>,
22 pub handle: JoinHandle<Result<()>>,
23 #[cfg(test)]
24 pub store: Arc<Store>,
25}
26
27pub async fn start(cfg: RegistryConfig, initial_nodes: Vec<NodeInfo>) -> Result<RunningServer> {
28 let store = Arc::new(Store::new(cfg.max_nodes));
29 store.add_nodes(initial_nodes).await?;
30 let state = AppState {
31 store: store.clone(),
32 };
33 let router = build_router(state);
34 let router = apply_timeout(router, cfg.request_timeout_ms);
35 let addr: SocketAddr = cfg
36 .bind_addr
37 .parse()
38 .map_err(|_| EnigmaNodeRegistryError::InvalidInput("bind_addr"))?;
39 let listener = TcpListener::bind(addr).await?;
40 let bound_addr = listener.local_addr()?;
41 let base_url = format!("http://{}", bound_addr);
42 let (shutdown_tx, shutdown_rx) = oneshot::channel();
43 let (purger_tx, purger_rx) = oneshot::channel();
44 let purger_cfg = cfg.clone();
45 let purger_store = store.clone();
46 let purger_task = tokio::spawn(async move {
47 ttl::run_purger(purger_store, purger_cfg, purger_rx).await;
48 });
49 let server = axum::serve(listener, router.into_make_service()).with_graceful_shutdown(async {
50 let _ = shutdown_rx.await;
51 let _ = purger_tx.send(());
52 });
53 let handle = tokio::spawn(async move {
54 let result = server.await;
55 let _ = purger_task.await;
56 match result {
57 Ok(_) => Ok(()),
58 Err(_) => Err(EnigmaNodeRegistryError::Transport),
59 }
60 });
61 Ok(RunningServer {
62 base_url,
63 shutdown: shutdown_tx,
64 handle,
65 #[cfg(test)]
66 store,
67 })
68}
69
70fn apply_timeout(router: Router, request_timeout_ms: u64) -> Router {
71 if request_timeout_ms == 0 {
72 return router;
73 }
74 let timeout = TimeoutLayer::with_status_code(
75 StatusCode::REQUEST_TIMEOUT,
76 Duration::from_millis(request_timeout_ms),
77 );
78 router.layer(timeout)
79}