1use axum::{
2 http::StatusCode,
3 response::Json,
4 routing::{get, Router},
5 Router as AxumRouter,
6};
7use serde_json::json;
8use std::sync::Arc;
9use tokio::signal;
10use tower::ServiceBuilder;
11use tower_http::compression::CompressionLayer;
12use tracing::{info, warn};
13
14use crate::http::{create_cors_layer, create_trace_layer, health_check, root};
15use crate::state::AppState;
16use crate::{config::Config, AppResult};
17
18#[cfg(feature = "grpc")]
19use http_body_util::BodyExt;
20
21pub struct Server {
23 config: Config,
24 app_state: Arc<AppState>,
25 http_router: Option<AxumRouter>,
26 #[cfg(feature = "grpc")]
27 grpc_router: Option<tonic::transport::server::Router>,
28}
29
30pub struct ServerBuilder {
32 config: Config,
33 app_state: Arc<AppState>,
34 http_router: Option<AxumRouter>,
35 #[cfg(feature = "grpc")]
36 grpc_router: Option<tonic::transport::server::Router>,
37 #[cfg(feature = "consumer")]
38 kafka_handlers: Vec<Arc<dyn crate::messaging::KafkaMessageHandler>>,
39}
40
41impl ServerBuilder {
42 #[warn(dead_code)]
44 fn new(config: Config, app_state: Arc<AppState>) -> Self {
45 Self {
46 config,
47 app_state,
48 http_router: None,
49 #[cfg(feature = "grpc")]
50 grpc_router: None,
51 #[cfg(feature = "consumer")]
52 kafka_handlers: Vec::new(),
53 }
54 }
55
56 pub fn app_state(&self) -> &Arc<AppState> {
58 &self.app_state
59 }
60
61 #[cfg(feature = "consumer")]
75 pub fn with_kafka_handler(
76 mut self,
77 handler: Arc<dyn crate::messaging::KafkaMessageHandler>,
78 ) -> Self {
79 self.kafka_handlers.push(handler);
80 self
81 }
82
83 #[cfg(feature = "consumer")]
102 pub fn with_kafka_handlers(
103 mut self,
104 handlers: Vec<Arc<dyn crate::messaging::KafkaMessageHandler>>,
105 ) -> Self {
106 self.kafka_handlers.extend(handlers);
107 self
108 }
109
110 pub fn http_router(mut self, router: AxumRouter) -> Self {
112 self.http_router = Some(router);
113 self
114 }
115
116 #[cfg(feature = "grpc")]
118 pub fn grpc_router(mut self, router: tonic::transport::server::Router) -> Self {
119 self.grpc_router = Some(router);
120 self
121 }
122
123 pub fn config(&self) -> &Config {
125 &self.config
126 }
127
128 pub fn state(&self) -> &Arc<AppState> {
130 &self.app_state
131 }
132
133 fn build(self) -> Server {
135 Server {
136 config: self.config,
137 app_state: self.app_state,
138 http_router: self.http_router,
139 #[cfg(feature = "grpc")]
140 grpc_router: self.grpc_router,
141 }
142 }
143}
144
145impl Server {
146 pub async fn run<F>(configure: F) -> AppResult<()>
179 where
180 F: FnOnce(ServerBuilder) -> ServerBuilder,
181 {
182 let config = Config::from_env().unwrap_or_else(|e| {
184 eprintln!("⚠ 警告: 无法从环境变量加载配置: {}, 使用默认配置", e);
185 Config::default()
186 });
187
188 Self::run_with_config(config, configure).await
189 }
190
191 pub async fn run_with_config<F>(config: Config, configure: F) -> AppResult<()>
197 where
198 F: FnOnce(ServerBuilder) -> ServerBuilder,
199 {
200 crate::init_logging(&config).map_err(|e| anyhow::anyhow!("日志初始化失败: {}", e))?;
202
203 tracing::info!(
204 "启动 {} v{}",
205 env!("CARGO_PKG_NAME"),
206 env!("CARGO_PKG_VERSION")
207 );
208
209 #[cfg(feature = "nacos")]
211 if let Some(ref nacos_config) = config.nacos {
212 crate::nacos::init_nacos(nacos_config)
213 .await
214 .map_err(|e| anyhow::anyhow!("Nacos 初始化失败: {}", e))?;
215
216 crate::nacos::register_service(nacos_config, &config.server)
218 .await
219 .map_err(|e| anyhow::anyhow!("Nacos 服务注册失败: {}", e))?;
220
221 crate::nacos::subscribe_services(nacos_config)
223 .await
224 .map_err(|e| anyhow::anyhow!("Nacos 服务订阅失败: {}", e))?;
225
226 crate::nacos::subscribe_configs(nacos_config)
228 .await
229 .map_err(|e| anyhow::anyhow!("Nacos 配置订阅失败: {}", e))?;
230 }
231 let app_state = Self::init_app_state(&config).await?;
233
234 let mut builder = ServerBuilder::new(config.clone(), Arc::new(app_state));
236
237 builder = configure(builder);
239
240 #[cfg(feature = "consumer")]
242 {
243 builder.app_state = Arc::new(
244 Self::init_kafka_consumers(
245 &config,
246 &builder.kafka_handlers,
247 builder.app_state.clone(),
248 )
249 .await?,
250 );
251 }
252
253 let server = builder.build();
254
255 server.start_internal().await
257 }
258
259 #[allow(unused_variables)] async fn init_app_state(config: &Config) -> AppResult<AppState> {
264 let app_state = AppState::new();
265
266 #[cfg(any(feature = "mysql", feature = "postgres", feature = "sqlite"))]
268 let app_state = if let Some(ref db_config) = config.database {
269 let pools = crate::database::init_database(db_config).await?;
270 app_state.with_database_pools(pools)
271 } else {
272 app_state
273 };
274
275 #[cfg(feature = "redis")]
277 let app_state = if let Some(ref redis_config) = config.redis {
278 let pool = crate::cache::redis::init_redis(
279 &redis_config.url,
280 redis_config.password.as_deref(),
281 redis_config.pool_size,
282 )
283 .await?;
284 app_state.with_redis(pool)
285 } else {
286 app_state
287 };
288
289 #[cfg(feature = "kafka")]
291 let app_state = if let Some(ref kafka_config) = config.kafka {
292 #[cfg(feature = "producer")]
294 let state = if let Some(ref producer_config) = kafka_config.producer {
295 use crate::messaging::kafka::KafkaProducer;
296
297 let producer = Arc::new(
298 KafkaProducer::new(&kafka_config.brokers, producer_config)
299 .map_err(|e| anyhow::anyhow!("Kafka 生产者初始化失败: {}", e))?,
300 )
301 as Arc<dyn crate::messaging::MessageProducer + Send + Sync>;
302 app_state.with_message_producer(producer)
303 } else {
304 app_state
305 };
306 #[cfg(not(feature = "producer"))]
307 let state = app_state;
308
309 state
310 } else {
311 app_state
312 };
313
314 Ok(app_state)
315 }
316
317 #[cfg(feature = "consumer")]
321 async fn init_kafka_consumers(
322 config: &Config,
323 kafka_handlers: &[Arc<dyn crate::messaging::KafkaMessageHandler>],
324 mut app_state: Arc<AppState>,
325 ) -> AppResult<AppState> {
326 use crate::messaging::{kafka, KafkaMessageRouter};
327 use std::sync::Arc;
328
329 if let Some(ref kafka_config) = config.kafka {
331 if let Some(ref consumer_config) = kafka_config.consumer {
332 if !kafka_handlers.is_empty() {
334 let router = Arc::new(KafkaMessageRouter::new(kafka_handlers.to_vec()));
335 let subscribe_topics = router.get_subscribe_topics();
336
337 if !subscribe_topics.is_empty() {
338 let consumers_by_group = kafka::create_consumers_from_handlers(
340 &kafka_config.brokers,
341 consumer_config,
342 kafka_handlers,
343 )?;
344
345 let router_clone = router.clone();
346 let handler = Arc::new(move |message: crate::messaging::Message| {
347 let router = router_clone.clone();
348 tokio::spawn(async move {
349 router.dispatch(message).await;
350 });
351 });
352
353 let mut first_consumer: Option<
355 Arc<dyn crate::messaging::MessageConsumer + Send + Sync>,
356 > = None;
357 for (group_id, (consumer, topics)) in consumers_by_group {
358 let topics_to_subscribe: Vec<String> = topics
360 .into_iter()
361 .filter(|t| subscribe_topics.contains(t))
362 .collect();
363
364 if !topics_to_subscribe.is_empty() {
365 let consumer_arc = Arc::new(consumer)
366 as Arc<dyn crate::messaging::MessageConsumer + Send + Sync>;
367
368 if first_consumer.is_none() {
370 first_consumer = Some(consumer_arc.clone());
371 }
372
373 consumer_arc
374 .subscribe_topics(topics_to_subscribe.clone(), handler.clone())
375 .await
376 .map_err(|e| {
377 anyhow::anyhow!(
378 "Kafka 订阅失败 (group: {}): {}",
379 group_id,
380 e
381 )
382 })?;
383
384 tracing::info!(
385 "✅ Kafka Consumer 初始化成功 (group: {}, 订阅 {} 个 topics: {:?})",
386 group_id,
387 topics_to_subscribe.len(),
388 topics_to_subscribe
389 );
390 }
391 }
392
393 if let Some(consumer) = first_consumer {
395 app_state =
396 Arc::new((*app_state).clone().with_message_consumer(consumer));
397 }
398 } else {
399 tracing::warn!(
400 "⚠️ Kafka Consumer 已初始化,但没有 handler 注册任何 topics"
401 );
402 }
403 } else {
404 tracing::info!("ℹ️ 未注册 handlers,跳过 Kafka Consumer 初始化");
405 }
406 }
407 }
408
409 Ok((*app_state).clone())
410 }
411
412 fn create_base_router(&self) -> AxumRouter {
414 let state = self.app_state.clone();
415
416 Router::new()
417 .route("/", get(root))
418 .route("/health", get(health_check))
419 .fallback(|| async {
420 (
421 StatusCode::NOT_FOUND,
422 Json(json!({
423 "error": "Not found",
424 "status": 404
425 })),
426 )
427 })
428 .layer(
429 ServiceBuilder::new()
430 .layer(CompressionLayer::new())
431 .into_inner(),
432 )
433 .layer(create_cors_layer(&self.config))
434 .layer(create_trace_layer())
435 .with_state(state)
436 }
437
438 async fn create_router(&mut self) -> AxumRouter {
440 let base_router = self.create_base_router();
441
442 let router = if let Some(custom) = self.http_router.take() {
444 base_router.merge(custom)
445 } else {
446 base_router
447 };
448
449 #[cfg(feature = "grpc")]
451 let router = self.add_grpc_services(router).await;
452
453 if let Some(ref context_path) = self.config.server.context_path {
455 use std::borrow::Cow;
456 let path: Cow<'_, str> = if context_path.starts_with('/') {
458 Cow::Borrowed(context_path.as_str())
459 } else {
460 tracing::warn!("上下文路径 '{}' 应该以 '/' 开头,已自动修正", context_path);
462 Cow::Owned(format!("/{}", context_path))
463 };
464 Router::new().nest(path.as_ref(), router)
465 } else {
466 router
467 }
468 }
469
470 #[cfg(feature = "grpc")]
475 async fn add_grpc_services(&mut self, router: AxumRouter) -> AxumRouter {
476 if let Some(grpc_router) = self.grpc_router.take() {
477 tracing::info!("集成 gRPC 服务到 axum");
478
479 let grpc_service = grpc_router.into_service();
481
482 let grpc_adapter =
484 tower::service_fn(move |req: axum::http::Request<axum::body::Body>| {
485 let mut grpc_service = grpc_service.clone();
486 async move {
487 let (parts, body) = req.into_parts();
489 let body = body
490 .map_err(|e| tonic::Status::internal(e.to_string()))
491 .boxed_unsync();
492 let req = axum::http::Request::from_parts(parts, body);
493
494 use tower::Service;
496 let res = match grpc_service.call(req).await {
497 Ok(r) => r,
498 Err(_) => {
499 return Ok(axum::http::Response::builder()
501 .status(StatusCode::INTERNAL_SERVER_ERROR)
502 .body(axum::body::Body::empty())
503 .unwrap());
504 }
505 };
506
507 let (parts, body) = res.into_parts();
509 let body = axum::body::Body::new(body);
510 Ok::<_, std::convert::Infallible>(axum::http::Response::from_parts(
511 parts, body,
512 ))
513 }
514 });
515
516 router.fallback_service(grpc_adapter)
519 } else {
520 router
521 }
522 }
523
524 async fn start_internal(mut self) -> AppResult<()> {
526 let addr = self.config.server.socket_addr()?;
527 let app = self.create_router().await;
528
529 let base_url = if let Some(ref context_path) = self.config.server.context_path {
530 format!("http://{}{}", addr, context_path)
531 } else {
532 format!("http://{}", addr)
533 };
534
535 info!("服务器启动在 {}", base_url);
536 info!("健康检查: {}/health", base_url);
537
538 #[cfg(feature = "grpc")]
539 if self.grpc_router.is_some() {
540 info!("gRPC 服务已启用");
541 }
542
543 let listener = tokio::net::TcpListener::bind(&addr).await?;
544
545 axum::serve(listener, app)
546 .with_graceful_shutdown(shutdown_signal())
547 .await?;
548
549 Ok(())
550 }
551}
552
553async fn shutdown_signal() {
555 let ctrl_c = async {
556 signal::ctrl_c().await.expect("无法安装 Ctrl+C 信号处理器");
557 info!("收到 Ctrl+C 信号,开始优雅关闭...");
558 };
559
560 #[cfg(unix)]
561 let terminate = async {
562 signal::unix::signal(signal::unix::SignalKind::terminate())
563 .expect("无法安装 SIGTERM 信号处理器")
564 .recv()
565 .await;
566 warn!("收到 SIGTERM 信号,开始优雅关闭...");
567 };
568
569 #[cfg(not(unix))]
570 let terminate = std::future::pending::<()>();
571
572 tokio::select! {
573 _ = ctrl_c => {},
574 _ = terminate => {},
575 }
576
577 info!("服务器正在关闭...");
578}