use crate::ext::WatchDebounceExt;
use crate::server::app_state::AppState;
use axum::Router;
use color_eyre::eyre::WrapErr;
use color_eyre::{Report, Result};
use convertor::common::config::ConvertorConfig;
use convertor::common::config::provider_config::Provider;
use convertor::common::redis::REDIS_CONVERTOR_CONFIG_PUBLISH_CHANNEL;
use convertor::provider_api::ProviderApi;
use std::collections::HashMap;
use std::net::{SocketAddr, SocketAddrV4};
use std::path::Path;
use tokio::net::{TcpListener, TcpSocket};
use tokio::signal;
use tokio::sync::watch;
use tokio::sync::watch::Receiver;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
pub mod app_state;
pub mod error;
pub mod query;
pub mod router;
pub mod service;
pub async fn start_server(
listen_addr: SocketAddrV4,
config: ConvertorConfig,
api_map: HashMap<Provider, ProviderApi>,
base_dir: impl AsRef<Path>,
client: redis::Client,
) -> Result<()> {
info!("工作环境: {}", base_dir.as_ref().display());
info!("监听中: {}", &listen_addr);
warn!("建议使用 nginx 等网关进行反向代理,以开启 HTTPS 支持");
let cancel_token = CancellationToken::new();
let mut config_receiver = start_sub_config(client, config);
let mut current_config = config_receiver.borrow().clone();
loop {
let stop_this = cancel_token.child_token();
let state = AppState::new(current_config, api_map.clone());
let app: Router = router::router(state);
let listener = bind_once(listen_addr)?;
let serve_handle = tokio::spawn({
let stop_this = stop_this.clone();
async move {
info!("服务启动,使用 Ctrl+C 或 SIGTERM 关闭服务");
axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>())
.with_graceful_shutdown(async move { stop_this.cancelled().await })
.await
}
});
tokio::select! {
_ = shutdown_signal() => {
info!("收到退出信号,准备关闭服务…");
stop_this.cancel();
let _ = serve_handle.await;
break;
}
_ = cancel_token.cancelled() => {
stop_this.cancel();
let _ = serve_handle.await;
break;
}
Ok(config) = config_receiver.recv_debounced_distinct(tokio::time::Duration::from_secs(1), |_| true) => {
info!("收到配置更新通知,准备重启服务…");
info!("停止旧的服务…");
stop_this.cancel();
let _ = serve_handle.await;
info!("更新配置…");
current_config = config.clone();
continue;
}
}
}
info!("服务关闭");
Ok(())
}
fn bind_once(addr: SocketAddrV4) -> Result<TcpListener> {
let sock = TcpSocket::new_v4()?;
sock.set_reuseaddr(true)?;
sock.bind(SocketAddr::V4(addr))?;
Ok(sock.listen(1024)?)
}
fn start_sub_config(client: redis::Client, config: ConvertorConfig) -> Receiver<ConvertorConfig> {
let (tx, rx) = watch::channel(config);
tokio::spawn(async move {
info!("启动 Redis subscribe");
let connection = client.get_multiplexed_async_connection().await?;
let mut sub = client.get_async_pubsub().await?;
sub.subscribe(REDIS_CONVERTOR_CONFIG_PUBLISH_CHANNEL).await?;
let mut stream = sub.into_on_message();
while let Some(msg) = stream.next().await {
info!("Redis sub 接收到: {msg:?}");
if msg.get_channel_name() == REDIS_CONVERTOR_CONFIG_PUBLISH_CHANNEL {
if let Err(e) = ConvertorConfig::from_redis(connection.clone())
.await
.and_then(|config| tx.send(config).wrap_err("无法发送更新配置"))
{
error!("{e:?}");
}
}
}
Ok::<(), Report>(())
});
rx
}
async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c().await.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
}