use crate::server::app_state::AppState;
use axum::Router;
use color_eyre::Result;
use convertor::config::ConvertorConfig;
use std::net::{SocketAddr, SocketAddrV4};
use tokio::net::{TcpListener, TcpSocket};
use tokio::signal;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
pub mod app_state;
pub mod query;
pub mod response;
pub mod router;
pub mod service;
pub async fn start_server(listen_addr: SocketAddrV4, config: ConvertorConfig) -> Result<()> {
let (redis_client, connection_manager) = match config.redis.as_ref() {
Some(redis_config) => {
info!("+──────────────────────────────────────────────+");
info!("│ 初始化 Redis 连接... │");
info!("+──────────────────────────────────────────────+");
let redis_client = redis_config.build_redis_client()?;
tracing::debug!("等待 connection_manager 就绪...");
let connection_manager = redis::aio::ConnectionManager::new_with_config(
redis_client.clone(),
redis::aio::ConnectionManagerConfig::new()
.set_number_of_retries(5)
.set_max_delay(2000),
)
.await?;
info!("Redis 连接就绪");
(Some(redis_client), Some(connection_manager))
}
None => (None, None),
};
let current_config = config;
let cancel_token = CancellationToken::new();
info!("+──────────────────────────────────────────────+");
info!("│ 启动服务... │");
info!("+──────────────────────────────────────────────+");
let stop_this = cancel_token.child_token();
let state = AppState::new(current_config, redis_client.clone(), connection_manager.clone());
let app: Router = router::router(state);
let listener = bind_once(listen_addr)?;
let serve_handle = tokio::spawn({
let stop_this = stop_this.clone();
async move {
warn!("使用 Ctrl+C 或 SIGTERM 关闭服务, 建议使用 nginx 等网关进行反向代理, 以开启 HTTPS 支持");
info!("服务启动, 监听于: {listen_addr}");
axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>())
.with_graceful_shutdown(async move { stop_this.cancelled().await })
.await
}
});
tokio::select! {
_ = shutdown_signal() => {
info!("收到退出信号,准备关闭服务…");
stop_this.cancel();
let _ = serve_handle.await;
}
_ = cancel_token.cancelled() => {
stop_this.cancel();
let _ = serve_handle.await;
}
}
info!("服务关闭");
Ok(())
}
fn bind_once(addr: SocketAddrV4) -> Result<TcpListener> {
let sock = TcpSocket::new_v4()?;
sock.set_reuseaddr(true)?;
sock.bind(SocketAddr::V4(addr))?;
Ok(sock.listen(1024)?)
}
async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c().await.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
}