1use axum::{
2 http::StatusCode,
3 response::Json,
4 routing::{get, Router},
5 Router as AxumRouter,
6};
7use serde_json::json;
8use std::sync::Arc;
9use tokio::signal;
10use tower::ServiceBuilder;
11use tower_http::compression::CompressionLayer;
12use tracing::{info, warn};
13
14use crate::config::Config;
15use crate::http::{create_cors_layer, create_trace_layer, health_check, root};
16use crate::state::AppState;
17
18#[cfg(feature = "grpc")]
19use http_body_util::BodyExt;
20
21pub struct Server {
23 config: Config,
24 app_state: Arc<AppState>,
25 http_router: Option<AxumRouter>,
26 #[cfg(feature = "grpc")]
27 grpc_router: Option<tonic::transport::server::Router>,
28}
29
30pub struct ServerBuilder {
32 config: Config,
33 app_state: Arc<AppState>,
34 http_router: Option<AxumRouter>,
35 #[cfg(feature = "grpc")]
36 grpc_router: Option<tonic::transport::server::Router>,
37 #[cfg(feature = "consumer")]
38 kafka_handlers: Vec<Arc<dyn crate::messaging::KafkaMessageHandler>>,
39}
40
41impl ServerBuilder {
42 #[warn(dead_code)]
44 fn new(config: Config, app_state: Arc<AppState>) -> Self {
45 Self {
46 config,
47 app_state,
48 http_router: None,
49 #[cfg(feature = "grpc")]
50 grpc_router: None,
51 #[cfg(feature = "consumer")]
52 kafka_handlers: Vec::new(),
53 }
54 }
55
56 pub fn app_state(&self) -> &Arc<AppState> {
58 &self.app_state
59 }
60
61 #[cfg(feature = "consumer")]
75 pub fn with_kafka_handler(
76 mut self,
77 handler: Arc<dyn crate::messaging::KafkaMessageHandler>,
78 ) -> Self {
79 self.kafka_handlers.push(handler);
80 self
81 }
82
83 #[cfg(feature = "consumer")]
102 pub fn with_kafka_handlers(
103 mut self,
104 handlers: Vec<Arc<dyn crate::messaging::KafkaMessageHandler>>,
105 ) -> Self {
106 self.kafka_handlers.extend(handlers);
107 self
108 }
109
110 pub fn http_router(mut self, router: AxumRouter) -> Self {
112 self.http_router = Some(router);
113 self
114 }
115
116 #[cfg(feature = "grpc")]
118 pub fn grpc_router(mut self, router: tonic::transport::server::Router) -> Self {
119 self.grpc_router = Some(router);
120 self
121 }
122
123 pub fn config(&self) -> &Config {
125 &self.config
126 }
127
128 pub fn state(&self) -> &Arc<AppState> {
130 &self.app_state
131 }
132
133 fn build(self) -> Server {
135 Server {
136 config: self.config,
137 app_state: self.app_state,
138 http_router: self.http_router,
139 #[cfg(feature = "grpc")]
140 grpc_router: self.grpc_router,
141 }
142 }
143}
144
145impl Server {
146 pub async fn run<F>(configure: F) -> Result<(), anyhow::Error>
179 where
180 F: FnOnce(ServerBuilder) -> ServerBuilder,
181 {
182 let config = Config::from_env().unwrap_or_else(|e| {
184 eprintln!("⚠ 警告: 无法从环境变量加载配置: {}, 使用默认配置", e);
185 Config::default()
186 });
187
188 Self::run_with_config(config, configure).await
189 }
190
191 pub async fn run_with_config<F>(config: Config, configure: F) -> Result<(), anyhow::Error>
197 where
198 F: FnOnce(ServerBuilder) -> ServerBuilder,
199 {
200 crate::init_logging(&config).map_err(|e| anyhow::anyhow!("日志初始化失败: {}", e))?;
202
203 tracing::info!(
204 "启动 {} v{}",
205 env!("CARGO_PKG_NAME"),
206 env!("CARGO_PKG_VERSION")
207 );
208
209 #[cfg(feature = "nacos")]
211 if let Some(ref nacos_config) = config.nacos {
212 crate::nacos::init_nacos(nacos_config)
213 .await
214 .map_err(|e| anyhow::anyhow!("Nacos 初始化失败: {}", e))?;
215
216 crate::nacos::register_service(nacos_config, &config.server)
218 .await
219 .map_err(|e| anyhow::anyhow!("Nacos 服务注册失败: {}", e))?;
220
221 crate::nacos::subscribe_services(nacos_config)
223 .await
224 .map_err(|e| anyhow::anyhow!("Nacos 服务订阅失败: {}", e))?;
225
226 crate::nacos::subscribe_configs(nacos_config)
228 .await
229 .map_err(|e| anyhow::anyhow!("Nacos 配置订阅失败: {}", e))?;
230 }
231 let app_state = Self::init_app_state(&config).await?;
233
234 let mut builder = ServerBuilder::new(config.clone(), Arc::new(app_state));
236
237 builder = configure(builder);
239
240 #[cfg(feature = "consumer")]
242 {
243 builder.app_state = Arc::new(
244 Self::init_kafka_consumers(
245 &config,
246 &builder.kafka_handlers,
247 builder.app_state.clone(),
248 )
249 .await?,
250 );
251 }
252
253 let server = builder.build();
254
255 server.start_internal().await
257 }
258
259 #[allow(unused_variables)] async fn init_app_state(config: &Config) -> Result<AppState, anyhow::Error> {
264 let app_state = AppState::new();
265
266 #[cfg(any(feature = "mysql", feature = "postgres", feature = "sqlite"))]
268 let app_state = if let Some(ref db_config) = config.database {
269 if db_config.has_any() {
270 let pools = crate::database::init_database(db_config)
271 .await
272 .map_err(|e| anyhow::anyhow!("数据库初始化失败: {}", e))?;
273 app_state.with_database_pools(pools)
274 } else {
275 app_state
276 }
277 } else {
278 app_state
279 };
280
281 #[cfg(feature = "redis")]
283 let app_state = if let Some(ref redis_config) = config.redis {
284 let pool = crate::cache::redis::init_redis(
285 &redis_config.url,
286 redis_config.password.as_deref(),
287 redis_config.pool_size,
288 )
289 .await
290 .map_err(|e| anyhow::anyhow!("Redis 初始化失败: {}", e))?;
291 app_state.with_redis(pool)
292 } else {
293 app_state
294 };
295
296 #[cfg(feature = "kafka")]
298 let app_state = if let Some(ref kafka_config) = config.kafka {
299 use std::sync::Arc;
300 let mut state = app_state;
301
302 #[cfg(feature = "producer")]
304 {
305 if let Some(ref producer_config) = kafka_config.producer {
306 use crate::messaging::kafka::KafkaProducer;
307
308 let producer = Arc::new(
309 KafkaProducer::new(&kafka_config.brokers, producer_config)
310 .map_err(|e| anyhow::anyhow!("Kafka 生产者初始化失败: {}", e))?,
311 )
312 as Arc<dyn crate::messaging::MessageProducer + Send + Sync>;
313 state = state.with_message_producer(producer);
314 }
315 }
316
317 state
318 } else {
319 app_state
320 };
321
322 Ok(app_state)
323 }
324
325 #[cfg(feature = "consumer")]
329 async fn init_kafka_consumers(
330 config: &Config,
331 kafka_handlers: &[Arc<dyn crate::messaging::KafkaMessageHandler>],
332 mut app_state: Arc<AppState>,
333 ) -> Result<AppState, anyhow::Error> {
334 use crate::messaging::{kafka, KafkaMessageRouter};
335 use std::sync::Arc;
336
337 if let Some(ref kafka_config) = config.kafka {
339 if let Some(ref consumer_config) = kafka_config.consumer {
340 if !kafka_handlers.is_empty() {
342 let router = Arc::new(KafkaMessageRouter::new(kafka_handlers.to_vec()));
343 let subscribe_topics = router.get_subscribe_topics();
344
345 if !subscribe_topics.is_empty() {
346 let consumers_by_group = kafka::create_consumers_from_handlers(
348 &kafka_config.brokers,
349 consumer_config,
350 kafka_handlers,
351 )
352 .map_err(|e| anyhow::anyhow!("Kafka 消费者初始化失败: {}", e))?;
353
354 let router_clone = router.clone();
355 let handler = Arc::new(move |message: crate::messaging::Message| {
356 let router = router_clone.clone();
357 tokio::spawn(async move {
358 router.dispatch(message).await;
359 });
360 });
361
362 let mut first_consumer: Option<
364 Arc<dyn crate::messaging::MessageConsumer + Send + Sync>,
365 > = None;
366 for (group_id, (consumer, topics)) in consumers_by_group {
367 let topics_to_subscribe: Vec<String> = topics
369 .into_iter()
370 .filter(|t| subscribe_topics.contains(t))
371 .collect();
372
373 if !topics_to_subscribe.is_empty() {
374 let consumer_arc = Arc::new(consumer)
375 as Arc<dyn crate::messaging::MessageConsumer + Send + Sync>;
376
377 if first_consumer.is_none() {
379 first_consumer = Some(consumer_arc.clone());
380 }
381
382 consumer_arc
383 .subscribe_topics(topics_to_subscribe.clone(), handler.clone())
384 .await
385 .map_err(|e| {
386 anyhow::anyhow!(
387 "Kafka 订阅失败 (group: {}): {}",
388 group_id,
389 e
390 )
391 })?;
392
393 tracing::info!(
394 "✅ Kafka Consumer 初始化成功 (group: {}, 订阅 {} 个 topics: {:?})",
395 group_id,
396 topics_to_subscribe.len(),
397 topics_to_subscribe
398 );
399 }
400 }
401
402 if let Some(consumer) = first_consumer {
404 app_state =
405 Arc::new((*app_state).clone().with_message_consumer(consumer));
406 }
407 } else {
408 tracing::warn!(
409 "⚠️ Kafka Consumer 已初始化,但没有 handler 注册任何 topics"
410 );
411 }
412 } else {
413 tracing::info!("ℹ️ 未注册 handlers,跳过 Kafka Consumer 初始化");
414 }
415 }
416 }
417
418 Ok((*app_state).clone())
419 }
420
421 fn create_base_router(&self) -> AxumRouter {
423 let state = self.app_state.clone();
424
425 Router::new()
426 .route("/", get(root))
427 .route("/health", get(health_check))
428 .fallback(|| async {
429 (
430 StatusCode::NOT_FOUND,
431 Json(json!({
432 "error": "Not found",
433 "status": 404
434 })),
435 )
436 })
437 .layer(
438 ServiceBuilder::new()
439 .layer(CompressionLayer::new())
440 .into_inner(),
441 )
442 .layer(create_cors_layer(&self.config))
443 .layer(create_trace_layer())
444 .with_state(state)
445 }
446
447 async fn create_router(&mut self) -> AxumRouter {
449 let base_router = self.create_base_router();
450
451 let router = if let Some(custom) = self.http_router.take() {
453 base_router.merge(custom)
454 } else {
455 base_router
456 };
457
458 #[cfg(feature = "grpc")]
460 let router = self.add_grpc_services(router).await;
461
462 if let Some(ref context_path) = self.config.server.context_path {
464 use std::borrow::Cow;
465 let path: Cow<'_, str> = if context_path.starts_with('/') {
467 Cow::Borrowed(context_path.as_str())
468 } else {
469 tracing::warn!("上下文路径 '{}' 应该以 '/' 开头,已自动修正", context_path);
471 Cow::Owned(format!("/{}", context_path))
472 };
473 Router::new().nest(path.as_ref(), router)
474 } else {
475 router
476 }
477 }
478
479 #[cfg(feature = "grpc")]
484 async fn add_grpc_services(&mut self, router: AxumRouter) -> AxumRouter {
485 if let Some(grpc_router) = self.grpc_router.take() {
486 tracing::info!("集成 gRPC 服务到 axum");
487
488 let grpc_service = grpc_router.into_service();
490
491 let grpc_adapter =
493 tower::service_fn(move |req: axum::http::Request<axum::body::Body>| {
494 let mut grpc_service = grpc_service.clone();
495 async move {
496 let (parts, body) = req.into_parts();
498 let body = body
499 .map_err(|e| tonic::Status::internal(e.to_string()))
500 .boxed_unsync();
501 let req = axum::http::Request::from_parts(parts, body);
502
503 use tower::Service;
505 let res = match grpc_service.call(req).await {
506 Ok(r) => r,
507 Err(_) => {
508 return Ok(axum::http::Response::builder()
510 .status(StatusCode::INTERNAL_SERVER_ERROR)
511 .body(axum::body::Body::empty())
512 .unwrap());
513 }
514 };
515
516 let (parts, body) = res.into_parts();
518 let body = axum::body::Body::new(body);
519 Ok::<_, std::convert::Infallible>(axum::http::Response::from_parts(
520 parts, body,
521 ))
522 }
523 });
524
525 router.fallback_service(grpc_adapter)
528 } else {
529 router
530 }
531 }
532
533 async fn start_internal(mut self) -> Result<(), anyhow::Error> {
535 let addr = self.config.server.socket_addr()?;
536 let app = self.create_router().await;
537
538 let base_url = if let Some(ref context_path) = self.config.server.context_path {
539 format!("http://{}{}", addr, context_path)
540 } else {
541 format!("http://{}", addr)
542 };
543
544 info!("服务器启动在 {}", base_url);
545 info!("健康检查: {}/health", base_url);
546
547 #[cfg(feature = "grpc")]
548 if self.grpc_router.is_some() {
549 info!("gRPC 服务已启用");
550 }
551
552 let listener = tokio::net::TcpListener::bind(&addr).await?;
553
554 axum::serve(listener, app)
555 .with_graceful_shutdown(shutdown_signal())
556 .await?;
557
558 Ok(())
559 }
560}
561
562async fn shutdown_signal() {
564 let ctrl_c = async {
565 signal::ctrl_c().await.expect("无法安装 Ctrl+C 信号处理器");
566 info!("收到 Ctrl+C 信号,开始优雅关闭...");
567 };
568
569 #[cfg(unix)]
570 let terminate = async {
571 signal::unix::signal(signal::unix::SignalKind::terminate())
572 .expect("无法安装 SIGTERM 信号处理器")
573 .recv()
574 .await;
575 warn!("收到 SIGTERM 信号,开始优雅关闭...");
576 };
577
578 #[cfg(not(unix))]
579 let terminate = std::future::pending::<()>();
580
581 tokio::select! {
582 _ = ctrl_c => {},
583 _ = terminate => {},
584 }
585
586 info!("服务器正在关闭...");
587}