use core::time::Duration;
use metrics_exporter_prometheus::BuildError;
use tokio::net::TcpListener;
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum ServerError {
#[error(transparent)]
Listen(#[from] std::io::Error),
#[error("failed to initialize Prometheus metrics recorder: {0}")]
MetricsInit(#[source] BuildError),
#[error("failed to build tokio runtime: {0}")]
RuntimeBuild(#[source] std::io::Error),
}
pub struct ServerConfig {
pub host: String,
pub port: u16,
}
impl ServerConfig {
#[inline]
#[must_use]
pub fn new<Host>(host: Host, port: u16) -> Self
where
Host: Into<String>,
{
Self {
host: host.into(),
port,
}
}
}
#[inline]
pub async fn bind(config: &ServerConfig) -> std::io::Result<TcpListener> {
TcpListener::bind((config.host.as_str(), config.port)).await
}
#[inline]
pub async fn serve(config: ServerConfig) -> Result<(), ServerError> {
let listener = bind(&config).await?;
let bound_addr = listener.local_addr()?;
let handle = crate::metrics::install_global().map_err(ServerError::MetricsInit)?;
let upkeep_handle = handle.clone();
let upkeep_task = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(5));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
upkeep_handle.run_upkeep();
}
});
tracing::info!(addr = %bound_addr, "docspec-http listening");
let server_result = axum::serve(listener, crate::router::router_with_metrics(handle))
.with_graceful_shutdown(shutdown_signal())
.await;
upkeep_task.abort();
match upkeep_task.await {
Err(error) if error.is_cancelled() => {}
Err(error) => tracing::error!(%error, "metrics upkeep task failed during shutdown"),
Ok(()) => {}
}
server_result?;
Ok(())
}
#[allow(clippy::single_call_fn)]
#[inline]
async fn shutdown_signal() {
use core::future;
use tokio::signal;
let ctrl_c = async {
if let Err(error) = signal::ctrl_c().await {
tracing::error!(%error, "failed to install Ctrl+C handler");
future::pending::<()>().await;
}
};
#[cfg(unix)]
let terminate = async {
match signal::unix::signal(signal::unix::SignalKind::terminate()) {
Ok(mut stream) => {
stream.recv().await;
}
Err(error) => {
tracing::error!(%error, "failed to install SIGTERM handler");
future::pending::<()>().await;
}
}
};
#[cfg(not(unix))]
let terminate = future::pending::<()>();
tokio::select! {
() = ctrl_c => {
tracing::info!("shutdown signal received, draining");
},
() = terminate => {
tracing::info!("shutdown signal received, draining");
},
}
}