fbc_starter/
server.rs

1use axum::{
2    http::StatusCode,
3    response::Json,
4    routing::{get, Router},
5    Router as AxumRouter,
6};
7use serde_json::json;
8use std::sync::Arc;
9use tokio::signal;
10use tower::ServiceBuilder;
11use tower_http::compression::CompressionLayer;
12use tracing::{info, warn};
13
14use crate::config::Config;
15use crate::http::{create_cors_layer, create_trace_layer, health_check, root};
16use crate::state::AppState;
17
18#[cfg(feature = "grpc")]
19use http_body_util::BodyExt;
20
21/// Web 服务器
22pub struct Server {
23    config: Config,
24    app_state: Arc<AppState>,
25    http_router: Option<AxumRouter>,
26    #[cfg(feature = "grpc")]
27    grpc_router: Option<tonic::transport::server::Router>,
28}
29
30/// 服务器构建器,用于在闭包中配置服务器
31pub struct ServerBuilder {
32    config: Config,
33    app_state: Arc<AppState>,
34    http_router: Option<AxumRouter>,
35    #[cfg(feature = "grpc")]
36    grpc_router: Option<tonic::transport::server::Router>,
37    #[cfg(feature = "consumer")]
38    kafka_handlers: Vec<Arc<dyn crate::messaging::KafkaMessageHandler>>,
39}
40
41impl ServerBuilder {
42    /// 创建新的服务器构建器
43    #[warn(dead_code)]
44    fn new(config: Config, app_state: Arc<AppState>) -> Self {
45        Self {
46            config,
47            app_state,
48            http_router: None,
49            #[cfg(feature = "grpc")]
50            grpc_router: None,
51            #[cfg(feature = "consumer")]
52            kafka_handlers: Vec::new(),
53        }
54    }
55
56    /// 获取应用状态的引用
57    pub fn app_state(&self) -> &Arc<AppState> {
58        &self.app_state
59    }
60
61    /// 注册 Kafka 消息处理器
62    ///
63    /// # 参数
64    /// - `handler`: 实现了 KafkaMessageHandler trait 的处理器
65    ///
66    /// # 示例
67    /// ```ignore
68    /// Server::run(|builder| {
69    ///     builder
70    ///         .with_kafka_handler(Arc::new(MyHandler::new()))
71    ///         .http_router(routes)
72    /// })
73    /// ```
74    #[cfg(feature = "consumer")]
75    pub fn with_kafka_handler(
76        mut self,
77        handler: Arc<dyn crate::messaging::KafkaMessageHandler>,
78    ) -> Self {
79        self.kafka_handlers.push(handler);
80        self
81    }
82
83    /// 批量注册 Kafka 消息处理器
84    ///
85    /// # 示例
86    ///
87    /// ```no_run
88    /// use fbc_starter::Server;
89    /// use std::sync::Arc;
90    ///
91    /// Server::run(|builder| {
92    ///     let handlers = vec![
93    ///         Arc::new(Handler1::new()) as Arc<dyn fbc_starter::KafkaMessageHandler>,
94    ///         Arc::new(Handler2::new()) as Arc<dyn fbc_starter::KafkaMessageHandler>,
95    ///     ];
96    ///     builder
97    ///         .with_kafka_handlers(handlers)
98    ///         .http_router(routes)
99    /// })
100    /// ```
101    #[cfg(feature = "consumer")]
102    pub fn with_kafka_handlers(
103        mut self,
104        handlers: Vec<Arc<dyn crate::messaging::KafkaMessageHandler>>,
105    ) -> Self {
106        self.kafka_handlers.extend(handlers);
107        self
108    }
109
110    /// 设置 HTTP 路由
111    pub fn http_router(mut self, router: AxumRouter) -> Self {
112        self.http_router = Some(router);
113        self
114    }
115
116    /// 设置 gRPC 路由(仅在启用 grpc 特性时可用)
117    #[cfg(feature = "grpc")]
118    pub fn grpc_router(mut self, router: tonic::transport::server::Router) -> Self {
119        self.grpc_router = Some(router);
120        self
121    }
122
123    /// 获取配置的引用
124    pub fn config(&self) -> &Config {
125        &self.config
126    }
127
128    /// 获取应用状态的引用
129    pub fn state(&self) -> &Arc<AppState> {
130        &self.app_state
131    }
132
133    /// 构建 Server
134    fn build(self) -> Server {
135        Server {
136            config: self.config,
137            app_state: self.app_state,
138            http_router: self.http_router,
139            #[cfg(feature = "grpc")]
140            grpc_router: self.grpc_router,
141        }
142    }
143}
144
145impl Server {
146    /// 启动服务器(使用闭包配置方式)
147    ///
148    /// # 参数
149    /// - `configure`: 配置闭包,接收 ServerBuilder 用于配置路由
150    ///
151    /// # 示例
152    /// ```rust,no_run
153    /// use fbc_starter::{Config, Server};
154    /// use axum::{routing::get, Router};
155    ///
156    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
157    /// Server::run(|builder| {
158    ///     // 获取配置
159    ///     let config = builder.config();
160    ///     
161    ///     // 配置 HTTP 路由
162    ///     let http_router = Router::new()
163    ///         .route("/api/users", get(|| async { "users" }));
164    ///     
165    ///     // 配置 gRPC 路由(如果启用)
166    ///     #[cfg(feature = "grpc")]
167    ///     let grpc_router = tonic::transport::Server::builder()
168    ///         .add_service(MyServiceServer::new(MyServiceImpl));
169    ///     
170    ///     builder
171    ///         .http_router(http_router)
172    ///         #[cfg(feature = "grpc")]
173    ///         .grpc_router(grpc_router)
174    /// }).await?;
175    /// # Ok(())
176    /// # }
177    /// ```
178    pub async fn run<F>(configure: F) -> Result<(), anyhow::Error>
179    where
180        F: FnOnce(ServerBuilder) -> ServerBuilder,
181    {
182        // 加载配置
183        let config = Config::from_env().unwrap_or_else(|e| {
184            eprintln!("⚠ 警告: 无法从环境变量加载配置: {}, 使用默认配置", e);
185            Config::default()
186        });
187
188        Self::run_with_config(config, configure).await
189    }
190
191    /// 使用指定配置启动服务器
192    ///
193    /// # 参数
194    /// - `config`: 服务器配置
195    /// - `configure`: 配置闭包,接收 ServerBuilder 用于配置路由
196    pub async fn run_with_config<F>(config: Config, configure: F) -> Result<(), anyhow::Error>
197    where
198        F: FnOnce(ServerBuilder) -> ServerBuilder,
199    {
200        // 初始化日志
201        crate::init_logging(&config).map_err(|e| anyhow::anyhow!("日志初始化失败: {}", e))?;
202
203        tracing::info!(
204            "启动 {} v{}",
205            env!("CARGO_PKG_NAME"),
206            env!("CARGO_PKG_VERSION")
207        );
208
209        // 初始化 Nacos(如果配置了且启用了 nacos 特性)
210        #[cfg(feature = "nacos")]
211        if let Some(ref nacos_config) = config.nacos {
212            crate::nacos::init_nacos(nacos_config)
213                .await
214                .map_err(|e| anyhow::anyhow!("Nacos 初始化失败: {}", e))?;
215
216            // 注册服务
217            crate::nacos::register_service(nacos_config, &config.server)
218                .await
219                .map_err(|e| anyhow::anyhow!("Nacos 服务注册失败: {}", e))?;
220
221            // 订阅服务
222            crate::nacos::subscribe_services(nacos_config)
223                .await
224                .map_err(|e| anyhow::anyhow!("Nacos 服务订阅失败: {}", e))?;
225
226            // 订阅配置
227            crate::nacos::subscribe_configs(nacos_config)
228                .await
229                .map_err(|e| anyhow::anyhow!("Nacos 配置订阅失败: {}", e))?;
230        }
231
232        // 创建初始的 builder(不初始化 app_state)
233        let mut builder = ServerBuilder::new(config.clone(), Arc::new(AppState::new()));
234
235        // 使用闭包配置服务器
236        builder = configure(builder);
237
238        // 初始化应用状态(使用builder中的kafka_handlers)
239        #[cfg(feature = "consumer")]
240        let app_state = Self::init_app_state(&builder.config, &builder.kafka_handlers).await?;
241
242        #[cfg(not(feature = "consumer"))]
243        let app_state = Self::init_app_state(&builder.config).await?;
244
245        // 更新 app_state
246        builder.app_state = Arc::new(app_state);
247
248        let server = builder.build();
249
250        // 启动服务器
251        server.start_internal().await
252    }
253
254    /// 初始化应用状态
255    #[cfg(feature = "consumer")]
256    async fn init_app_state(
257        config: &Config,
258        kafka_handlers: &[Arc<dyn crate::messaging::KafkaMessageHandler>],
259    ) -> Result<AppState, anyhow::Error> {
260        let app_state = AppState::new();
261
262        // 初始化数据库(如果配置了且启用了 database 特性)
263        #[cfg(feature = "database")]
264        let app_state = if let Some(ref db_config) = config.database {
265            let pool = crate::database::init_database(
266                &db_config.url,
267                db_config.max_connections,
268                db_config.min_connections,
269            )
270            .await
271            .map_err(|e| anyhow::anyhow!("数据库初始化失败: {}", e))?;
272            app_state.with_database(pool)
273        } else {
274            app_state
275        };
276
277        // 初始化 Redis(如果配置了且启用了 redis 特性)
278        #[cfg(feature = "redis")]
279        let app_state = if let Some(ref redis_config) = config.redis {
280            let conn = crate::cache::redis::init_redis(
281                &redis_config.url,
282                redis_config.password.as_deref(),
283                redis_config.pool_size,
284            )
285            .await
286            .map_err(|e| anyhow::anyhow!("Redis 初始化失败: {}", e))?;
287            app_state.with_redis(conn)
288        } else {
289            app_state
290        };
291
292        // Kafka 消息代理初始化(按需初始化 producer 和 consumer)
293        #[cfg(feature = "kafka")]
294        let app_state = if let Some(ref kafka_config) = config.kafka {
295            use std::sync::Arc;
296            let mut state = app_state;
297
298            // 按需初始化 Producer(仅当启用 producer 特性且配置了 producer 时)
299            #[cfg(feature = "producer")]
300            {
301                if let Some(ref producer_config) = kafka_config.producer {
302                    use crate::messaging::kafka::KafkaProducer;
303
304                    let producer = Arc::new(
305                        KafkaProducer::new(&kafka_config.brokers, producer_config)
306                            .map_err(|e| anyhow::anyhow!("Kafka 生产者初始化失败: {}", e))?,
307                    )
308                        as Arc<dyn crate::messaging::MessageProducer + Send + Sync>;
309                    state = state.with_message_producer(producer);
310                    tracing::info!("✅ Kafka Producer 初始化成功");
311                }
312            }
313
314            // 按需初始化 Consumer(仅当启用 consumer 特性且配置了 consumer 时)
315            #[cfg(feature = "consumer")]
316            {
317                if let Some(ref consumer_config) = kafka_config.consumer {
318                    use crate::messaging::{kafka::KafkaConsumer, KafkaMessageRouter};
319
320                    let consumer = Arc::new(
321                        KafkaConsumer::new(&kafka_config.brokers, consumer_config)
322                            .map_err(|e| anyhow::anyhow!("Kafka 消费者初始化失败: {}", e))?,
323                    )
324                        as Arc<dyn crate::messaging::MessageConsumer + Send + Sync>;
325
326                    // 如果有注册的 handlers,自动订阅并设置路由
327                    if !kafka_handlers.is_empty() {
328                        let router = Arc::new(KafkaMessageRouter::new(kafka_handlers.to_vec()));
329                        let subscribe_topics = router.get_subscribe_topics();
330
331                        if !subscribe_topics.is_empty() {
332                            let router_clone = router.clone();
333                            let handler = Arc::new(move |message: crate::messaging::Message| {
334                                let router = router_clone.clone();
335                                tokio::spawn(async move {
336                                    router.dispatch(message).await;
337                                });
338                            });
339
340                            consumer
341                                .subscribe_topics(subscribe_topics.clone(), handler)
342                                .await
343                                .map_err(|e| anyhow::anyhow!("Kafka 订阅失败: {}", e))?;
344
345                            tracing::info!(
346                                "✅ Kafka Consumer 初始化成功 (group: {}, 自动订阅 {} 个 topics: {:?})",
347                                consumer_config.group_id,
348                                subscribe_topics.len(),
349                                subscribe_topics
350                            );
351                        } else {
352                            tracing::warn!(
353                                "⚠️ Kafka Consumer 已初始化,但没有 handler 注册任何 topics"
354                            );
355                        }
356                    } else {
357                        tracing::info!(
358                            "✅ Kafka Consumer 初始化成功 (group: {},未注册 handler,需手动订阅)",
359                            consumer_config.group_id
360                        );
361                    }
362
363                    state = state.with_message_consumer(consumer);
364                }
365            }
366
367            state
368        } else {
369            app_state
370        };
371
372        Ok(app_state)
373    }
374
375    /// 初始化应用状态(无 consumer 特性版本)
376    #[cfg(not(feature = "consumer"))]
377    #[allow(unused_variables)]
378    async fn init_app_state(config: &Config) -> Result<AppState, anyhow::Error> {
379        let app_state = AppState::new();
380
381        // 初始化数据库(如果配置了且启用了 database 特性)
382        #[cfg(feature = "database")]
383        let app_state = if let Some(ref db_config) = config.database {
384            let pool = crate::database::init_database(
385                &db_config.url,
386                db_config.max_connections,
387                db_config.min_connections,
388            )
389            .await
390            .map_err(|e| anyhow::anyhow!("数据库初始化失败: {}", e))?;
391            app_state.with_database(pool)
392        } else {
393            app_state
394        };
395
396        // 初始化 Redis(如果配置了且启用了 redis 特性)
397        #[cfg(feature = "redis")]
398        let app_state = if let Some(ref redis_config) = config.redis {
399            let conn = crate::cache::redis::init_redis(
400                &redis_config.url,
401                redis_config.password.as_deref(),
402                redis_config.pool_size,
403            )
404            .await
405            .map_err(|e| anyhow::anyhow!("Redis 初始化失败: {}", e))?;
406            app_state.with_redis(conn)
407        } else {
408            app_state
409        };
410
411        // Kafka Producer 初始化(仅当启用 producer 特性)
412        #[cfg(all(feature = "kafka", feature = "producer"))]
413        let app_state = if let Some(ref kafka_config) = config.kafka {
414            use std::sync::Arc;
415            let mut state = app_state;
416
417            if let Some(ref producer_config) = kafka_config.producer {
418                use crate::messaging::kafka::KafkaProducer;
419
420                let producer = Arc::new(
421                    KafkaProducer::new(&kafka_config.brokers, producer_config)
422                        .map_err(|e| anyhow::anyhow!("Kafka 生产者初始化失败: {}", e))?,
423                )
424                    as Arc<dyn crate::messaging::MessageProducer + Send + Sync>;
425                state = state.with_message_producer(producer);
426                tracing::info!("✅ Kafka Producer 初始化成功");
427            }
428
429            state
430        } else {
431            app_state
432        };
433
434        Ok(app_state)
435    }
436
437    /// 创建基础路由(包含健康检查等系统路由)
438    fn create_base_router(&self) -> AxumRouter {
439        let state = self.app_state.clone();
440
441        Router::new()
442            .route("/", get(root))
443            .route("/health", get(health_check))
444            .fallback(|| async {
445                (
446                    StatusCode::NOT_FOUND,
447                    Json(json!({
448                        "error": "Not found",
449                        "status": 404
450                    })),
451                )
452            })
453            .layer(
454                ServiceBuilder::new()
455                    .layer(CompressionLayer::new())
456                    .into_inner(),
457            )
458            .layer(create_cors_layer(&self.config))
459            .layer(create_trace_layer())
460            .with_state(state)
461    }
462
463    /// 创建完整路由(合并基础路由、用户自定义路由和 gRPC 服务)
464    async fn create_router(&mut self) -> AxumRouter {
465        let base_router = self.create_base_router();
466
467        // 合并用户提供的 HTTP 路由
468        let router = if let Some(custom) = self.http_router.take() {
469            base_router.merge(custom)
470        } else {
471            base_router
472        };
473
474        // 如果启用了 gRPC 特性,添加 gRPC 服务
475        #[cfg(feature = "grpc")]
476        let router = self.add_grpc_services(router).await;
477
478        // 如果配置了上下文路径,则嵌套到该路径下
479        if let Some(ref context_path) = self.config.server.context_path {
480            use std::borrow::Cow;
481            // 确保路径以 / 开头
482            let path: Cow<'_, str> = if context_path.starts_with('/') {
483                Cow::Borrowed(context_path.as_str())
484            } else {
485                // 如果用户没有以 / 开头,自动添加并记录警告
486                tracing::warn!("上下文路径 '{}' 应该以 '/' 开头,已自动修正", context_path);
487                Cow::Owned(format!("/{}", context_path))
488            };
489            Router::new().nest(path.as_ref(), router)
490        } else {
491            router
492        }
493    }
494
495    /// 添加 gRPC 服务到路由(仅在启用 grpc 特性时可用)
496    ///
497    /// 使用 axum 的 fallback_service 来托管 gRPC 服务
498    /// HTTP 路由优先匹配,未匹配的请求由 gRPC 服务处理
499    #[cfg(feature = "grpc")]
500    async fn add_grpc_services(&mut self, router: AxumRouter) -> AxumRouter {
501        if let Some(grpc_router) = self.grpc_router.take() {
502            tracing::info!("集成 gRPC 服务到 axum");
503
504            // 将 gRPC Router 转换为 tower::Service
505            let grpc_service = grpc_router.into_service();
506
507            // 创建一个适配器,将 axum Body 转换为 tonic Body
508            let grpc_adapter =
509                tower::service_fn(move |req: axum::http::Request<axum::body::Body>| {
510                    let mut grpc_service = grpc_service.clone();
511                    async move {
512                        // 转换请求体:axum Body -> tonic BoxBody
513                        let (parts, body) = req.into_parts();
514                        let body = body
515                            .map_err(|e| tonic::Status::internal(e.to_string()))
516                            .boxed_unsync();
517                        let req = axum::http::Request::from_parts(parts, body);
518
519                        // 调用 gRPC 服务
520                        use tower::Service;
521                        let res = match grpc_service.call(req).await {
522                            Ok(r) => r,
523                            Err(_) => {
524                                // gRPC 服务错误,返回 500
525                                return Ok(axum::http::Response::builder()
526                                    .status(StatusCode::INTERNAL_SERVER_ERROR)
527                                    .body(axum::body::Body::empty())
528                                    .unwrap());
529                            }
530                        };
531
532                        // 转换响应体:tonic BoxBody -> axum Body
533                        let (parts, body) = res.into_parts();
534                        let body = axum::body::Body::new(body);
535                        Ok::<_, std::convert::Infallible>(axum::http::Response::from_parts(
536                            parts, body,
537                        ))
538                    }
539                });
540
541            // 使用 fallback_service 托管 gRPC 服务
542            // HTTP 路由优先处理,未匹配的请求由 gRPC 服务处理
543            router.fallback_service(grpc_adapter)
544        } else {
545            router
546        }
547    }
548
549    /// 内部启动方法(被 start_internal 调用)
550    async fn start_internal(mut self) -> Result<(), anyhow::Error> {
551        let addr = self.config.server.socket_addr()?;
552        let app = self.create_router().await;
553
554        let base_url = if let Some(ref context_path) = self.config.server.context_path {
555            format!("http://{}{}", addr, context_path)
556        } else {
557            format!("http://{}", addr)
558        };
559
560        info!("服务器启动在 {}", base_url);
561        info!("健康检查: {}/health", base_url);
562
563        #[cfg(feature = "grpc")]
564        if self.grpc_router.is_some() {
565            info!("gRPC 服务已启用");
566        }
567
568        let listener = tokio::net::TcpListener::bind(&addr).await?;
569
570        axum::serve(listener, app)
571            .with_graceful_shutdown(shutdown_signal())
572            .await?;
573
574        Ok(())
575    }
576}
577
578/// 优雅关闭信号处理
579async fn shutdown_signal() {
580    let ctrl_c = async {
581        signal::ctrl_c().await.expect("无法安装 Ctrl+C 信号处理器");
582        info!("收到 Ctrl+C 信号,开始优雅关闭...");
583    };
584
585    #[cfg(unix)]
586    let terminate = async {
587        signal::unix::signal(signal::unix::SignalKind::terminate())
588            .expect("无法安装 SIGTERM 信号处理器")
589            .recv()
590            .await;
591        warn!("收到 SIGTERM 信号,开始优雅关闭...");
592    };
593
594    #[cfg(not(unix))]
595    let terminate = std::future::pending::<()>();
596
597    tokio::select! {
598        _ = ctrl_c => {},
599        _ = terminate => {},
600    }
601
602    info!("服务器正在关闭...");
603}