1use axum::{
2 http::StatusCode,
3 response::Json,
4 routing::{get, Router},
5 Router as AxumRouter,
6};
7use serde_json::json;
8use std::sync::Arc;
9use tokio::signal;
10use tower::ServiceBuilder;
11use tower_http::compression::CompressionLayer;
12use tracing::{info, warn};
13
14use crate::config::Config;
15use crate::http::{create_cors_layer, create_trace_layer, health_check, root};
16use crate::state::AppState;
17
18#[cfg(feature = "grpc")]
19use http_body_util::BodyExt;
20
21pub struct Server {
23 config: Config,
24 app_state: Arc<AppState>,
25 http_router: Option<AxumRouter>,
26 #[cfg(feature = "grpc")]
27 grpc_router: Option<tonic::transport::server::Router>,
28}
29
30pub struct ServerBuilder {
32 config: Config,
33 app_state: Arc<AppState>,
34 http_router: Option<AxumRouter>,
35 #[cfg(feature = "grpc")]
36 grpc_router: Option<tonic::transport::server::Router>,
37 #[cfg(feature = "consumer")]
38 kafka_handlers: Vec<Arc<dyn crate::messaging::KafkaMessageHandler>>,
39}
40
41impl ServerBuilder {
42 #[warn(dead_code)]
44 fn new(config: Config, app_state: Arc<AppState>) -> Self {
45 Self {
46 config,
47 app_state,
48 http_router: None,
49 #[cfg(feature = "grpc")]
50 grpc_router: None,
51 #[cfg(feature = "consumer")]
52 kafka_handlers: Vec::new(),
53 }
54 }
55
56 pub fn app_state(&self) -> &Arc<AppState> {
58 &self.app_state
59 }
60
61 #[cfg(feature = "consumer")]
75 pub fn with_kafka_handler(
76 mut self,
77 handler: Arc<dyn crate::messaging::KafkaMessageHandler>,
78 ) -> Self {
79 self.kafka_handlers.push(handler);
80 self
81 }
82
83 #[cfg(feature = "consumer")]
102 pub fn with_kafka_handlers(
103 mut self,
104 handlers: Vec<Arc<dyn crate::messaging::KafkaMessageHandler>>,
105 ) -> Self {
106 self.kafka_handlers.extend(handlers);
107 self
108 }
109
110 pub fn http_router(mut self, router: AxumRouter) -> Self {
112 self.http_router = Some(router);
113 self
114 }
115
116 #[cfg(feature = "grpc")]
118 pub fn grpc_router(mut self, router: tonic::transport::server::Router) -> Self {
119 self.grpc_router = Some(router);
120 self
121 }
122
123 pub fn config(&self) -> &Config {
125 &self.config
126 }
127
128 pub fn state(&self) -> &Arc<AppState> {
130 &self.app_state
131 }
132
133 fn build(self) -> Server {
135 Server {
136 config: self.config,
137 app_state: self.app_state,
138 http_router: self.http_router,
139 #[cfg(feature = "grpc")]
140 grpc_router: self.grpc_router,
141 }
142 }
143}
144
145impl Server {
146 pub async fn run<F>(configure: F) -> Result<(), anyhow::Error>
179 where
180 F: FnOnce(ServerBuilder) -> ServerBuilder,
181 {
182 let config = Config::from_env().unwrap_or_else(|e| {
184 eprintln!("⚠ 警告: 无法从环境变量加载配置: {}, 使用默认配置", e);
185 Config::default()
186 });
187
188 Self::run_with_config(config, configure).await
189 }
190
191 pub async fn run_with_config<F>(config: Config, configure: F) -> Result<(), anyhow::Error>
197 where
198 F: FnOnce(ServerBuilder) -> ServerBuilder,
199 {
200 crate::init_logging(&config).map_err(|e| anyhow::anyhow!("日志初始化失败: {}", e))?;
202
203 tracing::info!(
204 "启动 {} v{}",
205 env!("CARGO_PKG_NAME"),
206 env!("CARGO_PKG_VERSION")
207 );
208
209 #[cfg(feature = "nacos")]
211 if let Some(ref nacos_config) = config.nacos {
212 crate::nacos::init_nacos(nacos_config)
213 .await
214 .map_err(|e| anyhow::anyhow!("Nacos 初始化失败: {}", e))?;
215
216 crate::nacos::register_service(nacos_config, &config.server)
218 .await
219 .map_err(|e| anyhow::anyhow!("Nacos 服务注册失败: {}", e))?;
220
221 crate::nacos::subscribe_services(nacos_config)
223 .await
224 .map_err(|e| anyhow::anyhow!("Nacos 服务订阅失败: {}", e))?;
225
226 crate::nacos::subscribe_configs(nacos_config)
228 .await
229 .map_err(|e| anyhow::anyhow!("Nacos 配置订阅失败: {}", e))?;
230 }
231
232 let mut builder = ServerBuilder::new(config.clone(), Arc::new(AppState::new()));
234
235 builder = configure(builder);
237
238 #[cfg(feature = "consumer")]
240 let app_state = Self::init_app_state(&builder.config, &builder.kafka_handlers).await?;
241
242 #[cfg(not(feature = "consumer"))]
243 let app_state = Self::init_app_state(&builder.config).await?;
244
245 builder.app_state = Arc::new(app_state);
247
248 let server = builder.build();
249
250 server.start_internal().await
252 }
253
254 #[cfg(feature = "consumer")]
256 async fn init_app_state(
257 config: &Config,
258 kafka_handlers: &[Arc<dyn crate::messaging::KafkaMessageHandler>],
259 ) -> Result<AppState, anyhow::Error> {
260 let app_state = AppState::new();
261
262 #[cfg(feature = "database")]
264 let app_state = if let Some(ref db_config) = config.database {
265 let pool = crate::database::init_database(
266 &db_config.url,
267 db_config.max_connections,
268 db_config.min_connections,
269 )
270 .await
271 .map_err(|e| anyhow::anyhow!("数据库初始化失败: {}", e))?;
272 app_state.with_database(pool)
273 } else {
274 app_state
275 };
276
277 #[cfg(feature = "redis")]
279 let app_state = if let Some(ref redis_config) = config.redis {
280 let conn = crate::cache::redis::init_redis(
281 &redis_config.url,
282 redis_config.password.as_deref(),
283 redis_config.pool_size,
284 )
285 .await
286 .map_err(|e| anyhow::anyhow!("Redis 初始化失败: {}", e))?;
287 app_state.with_redis(conn)
288 } else {
289 app_state
290 };
291
292 #[cfg(feature = "kafka")]
294 let app_state = if let Some(ref kafka_config) = config.kafka {
295 use std::sync::Arc;
296 let mut state = app_state;
297
298 #[cfg(feature = "producer")]
300 {
301 if let Some(ref producer_config) = kafka_config.producer {
302 use crate::messaging::kafka::KafkaProducer;
303
304 let producer = Arc::new(
305 KafkaProducer::new(&kafka_config.brokers, producer_config)
306 .map_err(|e| anyhow::anyhow!("Kafka 生产者初始化失败: {}", e))?,
307 )
308 as Arc<dyn crate::messaging::MessageProducer + Send + Sync>;
309 state = state.with_message_producer(producer);
310 tracing::info!("✅ Kafka Producer 初始化成功");
311 }
312 }
313
314 #[cfg(feature = "consumer")]
316 {
317 if let Some(ref consumer_config) = kafka_config.consumer {
318 use crate::messaging::{kafka::KafkaConsumer, KafkaMessageRouter};
319
320 let consumer = Arc::new(
321 KafkaConsumer::new(&kafka_config.brokers, consumer_config)
322 .map_err(|e| anyhow::anyhow!("Kafka 消费者初始化失败: {}", e))?,
323 )
324 as Arc<dyn crate::messaging::MessageConsumer + Send + Sync>;
325
326 if !kafka_handlers.is_empty() {
328 let router = Arc::new(KafkaMessageRouter::new(kafka_handlers.to_vec()));
329 let subscribe_topics = router.get_subscribe_topics();
330
331 if !subscribe_topics.is_empty() {
332 let router_clone = router.clone();
333 let handler = Arc::new(move |message: crate::messaging::Message| {
334 let router = router_clone.clone();
335 tokio::spawn(async move {
336 router.dispatch(message).await;
337 });
338 });
339
340 consumer
341 .subscribe_topics(subscribe_topics.clone(), handler)
342 .await
343 .map_err(|e| anyhow::anyhow!("Kafka 订阅失败: {}", e))?;
344
345 tracing::info!(
346 "✅ Kafka Consumer 初始化成功 (group: {}, 自动订阅 {} 个 topics: {:?})",
347 consumer_config.group_id,
348 subscribe_topics.len(),
349 subscribe_topics
350 );
351 } else {
352 tracing::warn!(
353 "⚠️ Kafka Consumer 已初始化,但没有 handler 注册任何 topics"
354 );
355 }
356 } else {
357 tracing::info!(
358 "✅ Kafka Consumer 初始化成功 (group: {},未注册 handler,需手动订阅)",
359 consumer_config.group_id
360 );
361 }
362
363 state = state.with_message_consumer(consumer);
364 }
365 }
366
367 state
368 } else {
369 app_state
370 };
371
372 Ok(app_state)
373 }
374
375 #[cfg(not(feature = "consumer"))]
377 #[allow(unused_variables)]
378 async fn init_app_state(config: &Config) -> Result<AppState, anyhow::Error> {
379 let app_state = AppState::new();
380
381 #[cfg(feature = "database")]
383 let app_state = if let Some(ref db_config) = config.database {
384 let pool = crate::database::init_database(
385 &db_config.url,
386 db_config.max_connections,
387 db_config.min_connections,
388 )
389 .await
390 .map_err(|e| anyhow::anyhow!("数据库初始化失败: {}", e))?;
391 app_state.with_database(pool)
392 } else {
393 app_state
394 };
395
396 #[cfg(feature = "redis")]
398 let app_state = if let Some(ref redis_config) = config.redis {
399 let conn = crate::cache::redis::init_redis(
400 &redis_config.url,
401 redis_config.password.as_deref(),
402 redis_config.pool_size,
403 )
404 .await
405 .map_err(|e| anyhow::anyhow!("Redis 初始化失败: {}", e))?;
406 app_state.with_redis(conn)
407 } else {
408 app_state
409 };
410
411 #[cfg(all(feature = "kafka", feature = "producer"))]
413 let app_state = if let Some(ref kafka_config) = config.kafka {
414 use std::sync::Arc;
415 let mut state = app_state;
416
417 if let Some(ref producer_config) = kafka_config.producer {
418 use crate::messaging::kafka::KafkaProducer;
419
420 let producer = Arc::new(
421 KafkaProducer::new(&kafka_config.brokers, producer_config)
422 .map_err(|e| anyhow::anyhow!("Kafka 生产者初始化失败: {}", e))?,
423 )
424 as Arc<dyn crate::messaging::MessageProducer + Send + Sync>;
425 state = state.with_message_producer(producer);
426 tracing::info!("✅ Kafka Producer 初始化成功");
427 }
428
429 state
430 } else {
431 app_state
432 };
433
434 Ok(app_state)
435 }
436
437 fn create_base_router(&self) -> AxumRouter {
439 let state = self.app_state.clone();
440
441 Router::new()
442 .route("/", get(root))
443 .route("/health", get(health_check))
444 .fallback(|| async {
445 (
446 StatusCode::NOT_FOUND,
447 Json(json!({
448 "error": "Not found",
449 "status": 404
450 })),
451 )
452 })
453 .layer(
454 ServiceBuilder::new()
455 .layer(CompressionLayer::new())
456 .into_inner(),
457 )
458 .layer(create_cors_layer(&self.config))
459 .layer(create_trace_layer())
460 .with_state(state)
461 }
462
463 async fn create_router(&mut self) -> AxumRouter {
465 let base_router = self.create_base_router();
466
467 let router = if let Some(custom) = self.http_router.take() {
469 base_router.merge(custom)
470 } else {
471 base_router
472 };
473
474 #[cfg(feature = "grpc")]
476 let router = self.add_grpc_services(router).await;
477
478 if let Some(ref context_path) = self.config.server.context_path {
480 use std::borrow::Cow;
481 let path: Cow<'_, str> = if context_path.starts_with('/') {
483 Cow::Borrowed(context_path.as_str())
484 } else {
485 tracing::warn!("上下文路径 '{}' 应该以 '/' 开头,已自动修正", context_path);
487 Cow::Owned(format!("/{}", context_path))
488 };
489 Router::new().nest(path.as_ref(), router)
490 } else {
491 router
492 }
493 }
494
495 #[cfg(feature = "grpc")]
500 async fn add_grpc_services(&mut self, router: AxumRouter) -> AxumRouter {
501 if let Some(grpc_router) = self.grpc_router.take() {
502 tracing::info!("集成 gRPC 服务到 axum");
503
504 let grpc_service = grpc_router.into_service();
506
507 let grpc_adapter =
509 tower::service_fn(move |req: axum::http::Request<axum::body::Body>| {
510 let mut grpc_service = grpc_service.clone();
511 async move {
512 let (parts, body) = req.into_parts();
514 let body = body
515 .map_err(|e| tonic::Status::internal(e.to_string()))
516 .boxed_unsync();
517 let req = axum::http::Request::from_parts(parts, body);
518
519 use tower::Service;
521 let res = match grpc_service.call(req).await {
522 Ok(r) => r,
523 Err(_) => {
524 return Ok(axum::http::Response::builder()
526 .status(StatusCode::INTERNAL_SERVER_ERROR)
527 .body(axum::body::Body::empty())
528 .unwrap());
529 }
530 };
531
532 let (parts, body) = res.into_parts();
534 let body = axum::body::Body::new(body);
535 Ok::<_, std::convert::Infallible>(axum::http::Response::from_parts(
536 parts, body,
537 ))
538 }
539 });
540
541 router.fallback_service(grpc_adapter)
544 } else {
545 router
546 }
547 }
548
549 async fn start_internal(mut self) -> Result<(), anyhow::Error> {
551 let addr = self.config.server.socket_addr()?;
552 let app = self.create_router().await;
553
554 let base_url = if let Some(ref context_path) = self.config.server.context_path {
555 format!("http://{}{}", addr, context_path)
556 } else {
557 format!("http://{}", addr)
558 };
559
560 info!("服务器启动在 {}", base_url);
561 info!("健康检查: {}/health", base_url);
562
563 #[cfg(feature = "grpc")]
564 if self.grpc_router.is_some() {
565 info!("gRPC 服务已启用");
566 }
567
568 let listener = tokio::net::TcpListener::bind(&addr).await?;
569
570 axum::serve(listener, app)
571 .with_graceful_shutdown(shutdown_signal())
572 .await?;
573
574 Ok(())
575 }
576}
577
578async fn shutdown_signal() {
580 let ctrl_c = async {
581 signal::ctrl_c().await.expect("无法安装 Ctrl+C 信号处理器");
582 info!("收到 Ctrl+C 信号,开始优雅关闭...");
583 };
584
585 #[cfg(unix)]
586 let terminate = async {
587 signal::unix::signal(signal::unix::SignalKind::terminate())
588 .expect("无法安装 SIGTERM 信号处理器")
589 .recv()
590 .await;
591 warn!("收到 SIGTERM 信号,开始优雅关闭...");
592 };
593
594 #[cfg(not(unix))]
595 let terminate = std::future::pending::<()>();
596
597 tokio::select! {
598 _ = ctrl_c => {},
599 _ = terminate => {},
600 }
601
602 info!("服务器正在关闭...");
603}