1use axum::{
2 http::StatusCode,
3 response::Json,
4 routing::{get, Router},
5 Router as AxumRouter,
6};
7use serde_json::json;
8use std::sync::Arc;
9use tokio::signal;
10use tower::ServiceBuilder;
11use tower_http::compression::CompressionLayer;
12use tracing::{info, warn};
13
14use crate::http::{create_cors_layer, health_check, request_logging_middleware, root};
15#[cfg(feature = "grpc")]
18#[allow(unused_imports)]
19use crate::http::{grpc_log_request, grpc_log_response};
20use crate::state::AppState;
21use crate::{config::Config, AppResult};
22
23pub struct Server {
25 config: Config,
26 app_state: Arc<AppState>,
27 http_router: Option<AxumRouter>,
28 #[cfg(feature = "grpc")]
29 grpc_router: Option<tonic::service::Routes>,
30}
31
32pub struct ServerBuilder {
34 config: Config,
35 app_state: Arc<AppState>,
36 http_router: Option<AxumRouter>,
37 #[cfg(feature = "grpc")]
38 grpc_router: Option<tonic::service::Routes>,
39 #[cfg(feature = "consumer")]
40 kafka_handlers: Vec<Arc<dyn crate::messaging::KafkaMessageHandler>>,
41}
42
43impl ServerBuilder {
44 #[warn(dead_code)]
46 fn new(config: Config, app_state: Arc<AppState>) -> Self {
47 Self {
48 config,
49 app_state,
50 http_router: None,
51 #[cfg(feature = "grpc")]
52 grpc_router: None,
53 #[cfg(feature = "consumer")]
54 kafka_handlers: Vec::new(),
55 }
56 }
57
58 pub fn app_state(&self) -> &Arc<AppState> {
60 &self.app_state
61 }
62
63 #[cfg(feature = "consumer")]
77 pub fn with_kafka_handler(
78 mut self,
79 handler: Arc<dyn crate::messaging::KafkaMessageHandler>,
80 ) -> Self {
81 self.kafka_handlers.push(handler);
82 self
83 }
84
85 #[cfg(feature = "consumer")]
104 pub fn with_kafka_handlers(
105 mut self,
106 handlers: Vec<Arc<dyn crate::messaging::KafkaMessageHandler>>,
107 ) -> Self {
108 self.kafka_handlers.extend(handlers);
109 self
110 }
111
112 pub fn http_router(mut self, router: AxumRouter) -> Self {
114 self.http_router = Some(router);
115 self
116 }
117
118 #[cfg(feature = "grpc")]
120 pub fn grpc_router(mut self, router: tonic::service::Routes) -> Self {
121 self.grpc_router = Some(router);
122 self
123 }
124
125 pub fn config(&self) -> &Config {
127 &self.config
128 }
129
130 pub fn state(&self) -> &Arc<AppState> {
132 &self.app_state
133 }
134
135 fn build(self) -> Server {
137 Server {
138 config: self.config,
139 app_state: self.app_state,
140 http_router: self.http_router,
141 #[cfg(feature = "grpc")]
142 grpc_router: self.grpc_router,
143 }
144 }
145}
146
147impl Server {
148 pub async fn run<F>(configure: F) -> AppResult<()>
181 where
182 F: FnOnce(ServerBuilder) -> ServerBuilder,
183 {
184 let config = Config::from_env().unwrap_or_else(|e| {
186 eprintln!("⚠ 警告: 无法从环境变量加载配置: {}, 使用默认配置", e);
187 Config::default()
188 });
189
190 Self::run_with_config(config, configure).await
191 }
192
193 pub async fn run_with_config<F>(config: Config, configure: F) -> AppResult<()>
199 where
200 F: FnOnce(ServerBuilder) -> ServerBuilder,
201 {
202 crate::init_logging(&config).map_err(|e| anyhow::anyhow!("日志初始化失败: {}", e))?;
204
205 tracing::info!(
206 "启动 {} v{}",
207 env!("CARGO_PKG_NAME"),
208 env!("CARGO_PKG_VERSION")
209 );
210
211 #[cfg(feature = "nacos")]
213 if let Some(ref nacos_config) = config.nacos {
214 crate::nacos::init_nacos(nacos_config)
215 .await
216 .map_err(|e| anyhow::anyhow!("Nacos 初始化失败: {}", e))?;
217
218 crate::nacos::register_service(nacos_config, &config.server)
220 .await
221 .map_err(|e| anyhow::anyhow!("Nacos 服务注册失败: {}", e))?;
222
223 crate::nacos::subscribe_services(nacos_config)
225 .await
226 .map_err(|e| anyhow::anyhow!("Nacos 服务订阅失败: {}", e))?;
227
228 crate::nacos::subscribe_configs(nacos_config)
230 .await
231 .map_err(|e| anyhow::anyhow!("Nacos 配置订阅失败: {}", e))?;
232 }
233 let app_state = Self::init_app_state(&config).await?;
235
236 let mut builder = ServerBuilder::new(config.clone(), Arc::new(app_state));
238
239 builder = configure(builder);
241
242 #[cfg(feature = "consumer")]
244 {
245 builder.app_state = Arc::new(
246 Self::init_kafka_consumers(
247 &config,
248 &builder.kafka_handlers,
249 builder.app_state.clone(),
250 )
251 .await?,
252 );
253 }
254
255 let server = builder.build();
256
257 server.start_internal().await
259 }
260
261 #[allow(unused_variables)] async fn init_app_state(config: &Config) -> AppResult<AppState> {
266 let app_state = AppState::new();
267
268 #[cfg(any(feature = "mysql", feature = "postgres", feature = "sqlite"))]
270 let app_state = if let Some(ref db_config) = config.database {
271 let pools = crate::database::init_database(db_config).await?;
272 app_state.with_database_pools(pools)
273 } else {
274 app_state
275 };
276
277 #[cfg(feature = "redis")]
279 let app_state = if let Some(ref redis_config) = config.redis {
280 let pool = crate::cache::redis::init_redis(
281 &redis_config.url,
282 redis_config.password.as_deref(),
283 redis_config.pool_size,
284 )
285 .await?;
286 app_state.with_redis(pool)
287 } else {
288 app_state
289 };
290
291 #[cfg(feature = "kafka")]
293 let app_state = if let Some(ref kafka_config) = config.kafka {
294 #[cfg(feature = "producer")]
296 let state = if let Some(ref producer_config) = kafka_config.producer {
297 use crate::messaging::kafka::KafkaProducer;
298
299 let producer = Arc::new(
300 KafkaProducer::new(&kafka_config.brokers, producer_config)
301 .map_err(|e| anyhow::anyhow!("Kafka 生产者初始化失败: {}", e))?,
302 )
303 as Arc<dyn crate::messaging::MessageProducer + Send + Sync>;
304 app_state.with_message_producer(producer)
305 } else {
306 app_state
307 };
308 #[cfg(not(feature = "producer"))]
309 let state = app_state;
310
311 state
312 } else {
313 app_state
314 };
315
316 Ok(app_state)
317 }
318
319 #[cfg(feature = "consumer")]
323 async fn init_kafka_consumers(
324 config: &Config,
325 kafka_handlers: &[Arc<dyn crate::messaging::KafkaMessageHandler>],
326 mut app_state: Arc<AppState>,
327 ) -> AppResult<AppState> {
328 use crate::messaging::{kafka, KafkaMessageRouter};
329 use std::sync::Arc;
330
331 if let Some(ref kafka_config) = config.kafka {
333 if let Some(ref consumer_config) = kafka_config.consumer {
334 if !kafka_handlers.is_empty() {
336 let router = Arc::new(KafkaMessageRouter::new(kafka_handlers.to_vec()));
337 let subscribe_topics = router.get_subscribe_topics();
338
339 if !subscribe_topics.is_empty() {
340 let consumers_by_group = kafka::create_consumers_from_handlers(
342 &kafka_config.brokers,
343 consumer_config,
344 kafka_handlers,
345 )?;
346
347 let router_clone = router.clone();
348 let handler = Arc::new(move |message: crate::messaging::Message| {
349 let router = router_clone.clone();
350 tokio::spawn(async move {
351 router.dispatch(message).await;
352 });
353 });
354
355 let mut first_consumer: Option<
357 Arc<dyn crate::messaging::MessageConsumer + Send + Sync>,
358 > = None;
359 for (group_id, (consumer, topics)) in consumers_by_group {
360 let topics_to_subscribe: Vec<String> = topics
362 .into_iter()
363 .filter(|t| subscribe_topics.contains(t))
364 .collect();
365
366 if !topics_to_subscribe.is_empty() {
367 let consumer_arc = Arc::new(consumer)
368 as Arc<dyn crate::messaging::MessageConsumer + Send + Sync>;
369
370 if first_consumer.is_none() {
372 first_consumer = Some(consumer_arc.clone());
373 }
374
375 consumer_arc
376 .subscribe_topics(topics_to_subscribe.clone(), handler.clone())
377 .await
378 .map_err(|e| {
379 anyhow::anyhow!(
380 "Kafka 订阅失败 (group: {}): {}",
381 group_id,
382 e
383 )
384 })?;
385
386 tracing::info!(
387 "✅ Kafka Consumer 初始化成功 (group: {}, 订阅 {} 个 topics: {:?})",
388 group_id,
389 topics_to_subscribe.len(),
390 topics_to_subscribe
391 );
392 }
393 }
394
395 if let Some(consumer) = first_consumer {
397 app_state =
398 Arc::new((*app_state).clone().with_message_consumer(consumer));
399 }
400 } else {
401 tracing::warn!(
402 "⚠️ Kafka Consumer 已初始化,但没有 handler 注册任何 topics"
403 );
404 }
405 } else {
406 tracing::info!("ℹ️ 未注册 handlers,跳过 Kafka Consumer 初始化");
407 }
408 }
409 }
410
411 Ok((*app_state).clone())
412 }
413
414 fn create_base_router(&self) -> AxumRouter {
416 let state = self.app_state.clone();
417
418 Router::new()
419 .route("/", get(root))
420 .route("/health", get(health_check))
421 .fallback(|| async {
422 (
423 StatusCode::NOT_FOUND,
424 Json(json!({
425 "error": "Not found",
426 "status": 404
427 })),
428 )
429 })
430 .layer(
431 ServiceBuilder::new()
432 .layer(CompressionLayer::new())
433 .into_inner(),
434 )
435 .layer(create_cors_layer(&self.config))
436 .with_state(state)
437 }
438
439 async fn create_router(&mut self) -> AxumRouter {
441 let base_router = self.create_base_router();
442
443 let router = if let Some(custom) = self.http_router.take() {
445 base_router.merge(custom)
446 } else {
447 base_router
448 };
449
450 #[cfg(feature = "grpc")]
452 let router = self.add_grpc_services(router).await;
453
454 let router = router
458 .layer(axum::middleware::from_fn(request_logging_middleware))
459 .layer(axum::middleware::from_fn(
460 crate::auth::user_context_middleware,
461 ));
462
463 if let Some(ref context_path) = self.config.server.context_path {
465 use std::borrow::Cow;
466 let path: Cow<'_, str> = if context_path.starts_with('/') {
467 Cow::Borrowed(context_path.as_str())
468 } else {
469 tracing::warn!("上下文路径 '{}' 应该以 '/' 开头,已自动修正", context_path);
470 Cow::Owned(format!("/{}", context_path))
471 };
472 Router::new().nest(path.as_ref(), router)
473 } else {
474 router
475 }
476 }
477
478 #[cfg(feature = "grpc")]
483 async fn add_grpc_services(&mut self, router: AxumRouter) -> AxumRouter {
484 if let Some(grpc_router) = self.grpc_router.take() {
485 tracing::info!("集成 gRPC 服务到 axum");
486
487 let grpc_axum_router = grpc_router.into_axum_router();
489
490 router.fallback_service(grpc_axum_router)
493 } else {
494 router
495 }
496 }
497
498 async fn start_internal(mut self) -> AppResult<()> {
500 let addr = self.config.server.socket_addr()?;
501 let app = self.create_router().await;
502
503 let base_url = if let Some(ref context_path) = self.config.server.context_path {
504 format!("http://{}{}", addr, context_path)
505 } else {
506 format!("http://{}", addr)
507 };
508
509 info!("服务器启动在 {}", base_url);
510 info!("健康检查: {}/health", base_url);
511
512 #[cfg(feature = "grpc")]
513 if self.grpc_router.is_some() {
514 info!("gRPC 服务已启用");
515 }
516
517 let listener = tokio::net::TcpListener::bind(&addr).await?;
518
519 #[cfg(feature = "nacos")]
521 let shutdown_config = self.config.clone();
522
523 axum::serve(
525 listener,
526 app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
527 )
528 .with_graceful_shutdown(shutdown_signal())
529 .await?;
530
531 #[cfg(feature = "nacos")]
533 if let Some(ref nacos_config) = shutdown_config.nacos {
534 info!("正在从 Nacos 注销服务...");
535 match crate::nacos::deregister_service(nacos_config, &shutdown_config.server).await {
536 Ok(_) => info!("Nacos 服务注销成功"),
537 Err(e) => warn!("Nacos 服务注销失败(将由心跳超时自动下线): {}", e),
538 }
539 }
540
541 info!("服务器已关闭");
542 Ok(())
543 }
544}
545
546async fn shutdown_signal() {
548 let ctrl_c = async {
549 signal::ctrl_c().await.expect("无法安装 Ctrl+C 信号处理器");
550 info!("收到 Ctrl+C 信号,开始优雅关闭...");
551 };
552
553 #[cfg(unix)]
554 let terminate = async {
555 signal::unix::signal(signal::unix::SignalKind::terminate())
556 .expect("无法安装 SIGTERM 信号处理器")
557 .recv()
558 .await;
559 warn!("收到 SIGTERM 信号,开始优雅关闭...");
560 };
561
562 #[cfg(not(unix))]
563 let terminate = std::future::pending::<()>();
564
565 tokio::select! {
566 _ = ctrl_c => {},
567 _ = terminate => {},
568 }
569
570 info!("服务器正在关闭...");
571}