fbc_starter/
server.rs

1use axum::{
2    http::StatusCode,
3    response::Json,
4    routing::{get, Router},
5    Router as AxumRouter,
6};
7use serde_json::json;
8use std::sync::Arc;
9use tokio::signal;
10use tower::ServiceBuilder;
11use tower_http::compression::CompressionLayer;
12use tracing::{info, warn};
13
14use crate::http::{create_cors_layer, create_trace_layer, health_check, root};
15use crate::state::AppState;
16use crate::{config::Config, AppResult};
17
18#[cfg(feature = "grpc")]
19use http_body_util::BodyExt;
20
21/// Web 服务器
22pub struct Server {
23    config: Config,
24    app_state: Arc<AppState>,
25    http_router: Option<AxumRouter>,
26    #[cfg(feature = "grpc")]
27    grpc_router: Option<tonic::transport::server::Router>,
28}
29
30/// 服务器构建器,用于在闭包中配置服务器
31pub struct ServerBuilder {
32    config: Config,
33    app_state: Arc<AppState>,
34    http_router: Option<AxumRouter>,
35    #[cfg(feature = "grpc")]
36    grpc_router: Option<tonic::transport::server::Router>,
37    #[cfg(feature = "consumer")]
38    kafka_handlers: Vec<Arc<dyn crate::messaging::KafkaMessageHandler>>,
39}
40
41impl ServerBuilder {
42    /// 创建新的服务器构建器
43    #[warn(dead_code)]
44    fn new(config: Config, app_state: Arc<AppState>) -> Self {
45        Self {
46            config,
47            app_state,
48            http_router: None,
49            #[cfg(feature = "grpc")]
50            grpc_router: None,
51            #[cfg(feature = "consumer")]
52            kafka_handlers: Vec::new(),
53        }
54    }
55
56    /// 获取应用状态的引用
57    pub fn app_state(&self) -> &Arc<AppState> {
58        &self.app_state
59    }
60
61    /// 注册 Kafka 消息处理器
62    ///
63    /// # 参数
64    /// - `handler`: 实现了 KafkaMessageHandler trait 的处理器
65    ///
66    /// # 示例
67    /// ```ignore
68    /// Server::run(|builder| {
69    ///     builder
70    ///         .with_kafka_handler(Arc::new(MyHandler::new()))
71    ///         .http_router(routes)
72    /// })
73    /// ```
74    #[cfg(feature = "consumer")]
75    pub fn with_kafka_handler(
76        mut self,
77        handler: Arc<dyn crate::messaging::KafkaMessageHandler>,
78    ) -> Self {
79        self.kafka_handlers.push(handler);
80        self
81    }
82
83    /// 批量注册 Kafka 消息处理器
84    ///
85    /// # 示例
86    ///
87    /// ```no_run
88    /// use fbc_starter::Server;
89    /// use std::sync::Arc;
90    ///
91    /// Server::run(|builder| {
92    ///     let handlers = vec![
93    ///         Arc::new(Handler1::new()) as Arc<dyn fbc_starter::KafkaMessageHandler>,
94    ///         Arc::new(Handler2::new()) as Arc<dyn fbc_starter::KafkaMessageHandler>,
95    ///     ];
96    ///     builder
97    ///         .with_kafka_handlers(handlers)
98    ///         .http_router(routes)
99    /// })
100    /// ```
101    #[cfg(feature = "consumer")]
102    pub fn with_kafka_handlers(
103        mut self,
104        handlers: Vec<Arc<dyn crate::messaging::KafkaMessageHandler>>,
105    ) -> Self {
106        self.kafka_handlers.extend(handlers);
107        self
108    }
109
110    /// 设置 HTTP 路由
111    pub fn http_router(mut self, router: AxumRouter) -> Self {
112        self.http_router = Some(router);
113        self
114    }
115
116    /// 设置 gRPC 路由(仅在启用 grpc 特性时可用)
117    #[cfg(feature = "grpc")]
118    pub fn grpc_router(mut self, router: tonic::transport::server::Router) -> Self {
119        self.grpc_router = Some(router);
120        self
121    }
122
123    /// 获取配置的引用
124    pub fn config(&self) -> &Config {
125        &self.config
126    }
127
128    /// 获取应用状态的引用
129    pub fn state(&self) -> &Arc<AppState> {
130        &self.app_state
131    }
132
133    /// 构建 Server
134    fn build(self) -> Server {
135        Server {
136            config: self.config,
137            app_state: self.app_state,
138            http_router: self.http_router,
139            #[cfg(feature = "grpc")]
140            grpc_router: self.grpc_router,
141        }
142    }
143}
144
145impl Server {
146    /// 启动服务器(使用闭包配置方式)
147    ///
148    /// # 参数
149    /// - `configure`: 配置闭包,接收 ServerBuilder 用于配置路由
150    ///
151    /// # 示例
152    /// ```rust,no_run
153    /// use fbc_starter::{Config, Server};
154    /// use axum::{routing::get, Router};
155    ///
156    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
157    /// Server::run(|builder| {
158    ///     // 获取配置
159    ///     let config = builder.config();
160    ///     
161    ///     // 配置 HTTP 路由
162    ///     let http_router = Router::new()
163    ///         .route("/api/users", get(|| async { "users" }));
164    ///     
165    ///     // 配置 gRPC 路由(如果启用)
166    ///     #[cfg(feature = "grpc")]
167    ///     let grpc_router = tonic::transport::Server::builder()
168    ///         .add_service(MyServiceServer::new(MyServiceImpl));
169    ///     
170    ///     builder
171    ///         .http_router(http_router)
172    ///         #[cfg(feature = "grpc")]
173    ///         .grpc_router(grpc_router)
174    /// }).await?;
175    /// # Ok(())
176    /// # }
177    /// ```
178    pub async fn run<F>(configure: F) -> AppResult<()>
179    where
180        F: FnOnce(ServerBuilder) -> ServerBuilder,
181    {
182        // 加载配置
183        let config = Config::from_env().unwrap_or_else(|e| {
184            eprintln!("⚠ 警告: 无法从环境变量加载配置: {}, 使用默认配置", e);
185            Config::default()
186        });
187
188        Self::run_with_config(config, configure).await
189    }
190
191    /// 使用指定配置启动服务器
192    ///
193    /// # 参数
194    /// - `config`: 服务器配置
195    /// - `configure`: 配置闭包,接收 ServerBuilder 用于配置路由
196    pub async fn run_with_config<F>(config: Config, configure: F) -> AppResult<()>
197    where
198        F: FnOnce(ServerBuilder) -> ServerBuilder,
199    {
200        // 初始化日志
201        crate::init_logging(&config).map_err(|e| anyhow::anyhow!("日志初始化失败: {}", e))?;
202
203        tracing::info!(
204            "启动 {} v{}",
205            env!("CARGO_PKG_NAME"),
206            env!("CARGO_PKG_VERSION")
207        );
208
209        // 初始化 Nacos(如果配置了且启用了 nacos 特性)
210        #[cfg(feature = "nacos")]
211        if let Some(ref nacos_config) = config.nacos {
212            crate::nacos::init_nacos(nacos_config)
213                .await
214                .map_err(|e| anyhow::anyhow!("Nacos 初始化失败: {}", e))?;
215
216            // 注册服务
217            crate::nacos::register_service(nacos_config, &config.server)
218                .await
219                .map_err(|e| anyhow::anyhow!("Nacos 服务注册失败: {}", e))?;
220
221            // 订阅服务
222            crate::nacos::subscribe_services(nacos_config)
223                .await
224                .map_err(|e| anyhow::anyhow!("Nacos 服务订阅失败: {}", e))?;
225
226            // 订阅配置
227            crate::nacos::subscribe_configs(nacos_config)
228                .await
229                .map_err(|e| anyhow::anyhow!("Nacos 配置订阅失败: {}", e))?;
230        }
231        // 初始化应用状态(不包括消费者组,因为需要在闭包中注册 handlers 后才能初始化)
232        let app_state = Self::init_app_state(&config).await?;
233
234        // 创建初始的 builder(不初始化 app_state)
235        let mut builder = ServerBuilder::new(config.clone(), Arc::new(app_state));
236
237        // 使用闭包配置服务器(此时可以注册 Kafka handlers)
238        builder = configure(builder);
239
240        // 初始化 Kafka 消费者组(需要在 handlers 注册后执行)
241        #[cfg(feature = "consumer")]
242        {
243            builder.app_state = Arc::new(
244                Self::init_kafka_consumers(
245                    &config,
246                    &builder.kafka_handlers,
247                    builder.app_state.clone(),
248                )
249                .await?,
250            );
251        }
252
253        let server = builder.build();
254
255        // 启动服务器
256        server.start_internal().await
257    }
258
259    /// 初始化应用状态(不包括 Kafka 消费者组)
260    ///
261    /// Kafka 消费者组需要在 handlers 注册后才能初始化,因此单独处理
262    #[allow(unused_variables)] // config 在条件编译中可能看起来未使用,但实际上被使用了
263    async fn init_app_state(config: &Config) -> AppResult<AppState> {
264        let app_state = AppState::new();
265
266        // 初始化数据库(如果配置了且启用了 mysql/postgres/sqlite 任一特性)
267        #[cfg(any(feature = "mysql", feature = "postgres", feature = "sqlite"))]
268        let app_state = if let Some(ref db_config) = config.database {
269            let pools = crate::database::init_database(db_config).await?;
270            app_state.with_database_pools(pools)
271        } else {
272            app_state
273        };
274
275        // 初始化 Redis(如果配置了且启用了 redis 特性)
276        #[cfg(feature = "redis")]
277        let app_state = if let Some(ref redis_config) = config.redis {
278            let pool = crate::cache::redis::init_redis(
279                &redis_config.url,
280                redis_config.password.as_deref(),
281                redis_config.pool_size,
282            )
283            .await?;
284            app_state.with_redis(pool)
285        } else {
286            app_state
287        };
288
289        // Kafka 消息代理初始化(按需初始化 producer 和 consumer)
290        #[cfg(feature = "kafka")]
291        let app_state = if let Some(ref kafka_config) = config.kafka {
292            // 按需初始化 Producer(仅当启用 producer 特性且配置了 producer 时)
293            #[cfg(feature = "producer")]
294            let state = if let Some(ref producer_config) = kafka_config.producer {
295                use crate::messaging::kafka::KafkaProducer;
296
297                let producer = Arc::new(
298                    KafkaProducer::new(&kafka_config.brokers, producer_config)
299                        .map_err(|e| anyhow::anyhow!("Kafka 生产者初始化失败: {}", e))?,
300                )
301                    as Arc<dyn crate::messaging::MessageProducer + Send + Sync>;
302                app_state.with_message_producer(producer)
303            } else {
304                app_state
305            };
306            #[cfg(not(feature = "producer"))]
307            let state = app_state;
308
309            state
310        } else {
311            app_state
312        };
313
314        Ok(app_state)
315    }
316
317    /// 初始化 Kafka 消费者组
318    ///
319    /// 此方法需要在 handlers 注册后才能调用,因为需要根据 handlers 来创建消费者
320    #[cfg(feature = "consumer")]
321    async fn init_kafka_consumers(
322        config: &Config,
323        kafka_handlers: &[Arc<dyn crate::messaging::KafkaMessageHandler>],
324        mut app_state: Arc<AppState>,
325    ) -> AppResult<AppState> {
326        use crate::messaging::{kafka, KafkaMessageRouter};
327        use std::sync::Arc;
328
329        // 检查是否配置了 Kafka 和 Consumer
330        if let Some(ref kafka_config) = config.kafka {
331            if let Some(ref consumer_config) = kafka_config.consumer {
332                // 如果有注册的 handlers,自动订阅并设置路由
333                if !kafka_handlers.is_empty() {
334                    let router = Arc::new(KafkaMessageRouter::new(kafka_handlers.to_vec()));
335                    let subscribe_topics = router.get_subscribe_topics();
336
337                    if !subscribe_topics.is_empty() {
338                        // 根据 handlers 创建多个按 group_id 分组的 consumer
339                        let consumers_by_group = kafka::create_consumers_from_handlers(
340                            &kafka_config.brokers,
341                            consumer_config,
342                            kafka_handlers,
343                        )?;
344
345                        let router_clone = router.clone();
346                        let handler = Arc::new(move |message: crate::messaging::Message| {
347                            let router = router_clone.clone();
348                            tokio::spawn(async move {
349                                router.dispatch(message).await;
350                            });
351                        });
352
353                        // 为每个 group_id 的 consumer 订阅对应的 topics
354                        let mut first_consumer: Option<
355                            Arc<dyn crate::messaging::MessageConsumer + Send + Sync>,
356                        > = None;
357                        for (group_id, (consumer, topics)) in consumers_by_group {
358                            // 过滤出该 consumer 需要订阅的 topics(与 handlers 注册的 topics 的交集)
359                            let topics_to_subscribe: Vec<String> = topics
360                                .into_iter()
361                                .filter(|t| subscribe_topics.contains(t))
362                                .collect();
363
364                            if !topics_to_subscribe.is_empty() {
365                                let consumer_arc = Arc::new(consumer)
366                                    as Arc<dyn crate::messaging::MessageConsumer + Send + Sync>;
367
368                                // 保存第一个 consumer 用于 state(向后兼容)
369                                if first_consumer.is_none() {
370                                    first_consumer = Some(consumer_arc.clone());
371                                }
372
373                                consumer_arc
374                                    .subscribe_topics(topics_to_subscribe.clone(), handler.clone())
375                                    .await
376                                    .map_err(|e| {
377                                        anyhow::anyhow!(
378                                            "Kafka 订阅失败 (group: {}): {}",
379                                            group_id,
380                                            e
381                                        )
382                                    })?;
383
384                                tracing::info!(
385                                    "✅ Kafka Consumer 初始化成功 (group: {}, 订阅 {} 个 topics: {:?})",
386                                    group_id,
387                                    topics_to_subscribe.len(),
388                                    topics_to_subscribe
389                                );
390                            }
391                        }
392
393                        // 向后兼容:将第一个 consumer 设置到 state 中
394                        if let Some(consumer) = first_consumer {
395                            app_state =
396                                Arc::new((*app_state).clone().with_message_consumer(consumer));
397                        }
398                    } else {
399                        tracing::warn!(
400                            "⚠️ Kafka Consumer 已初始化,但没有 handler 注册任何 topics"
401                        );
402                    }
403                } else {
404                    tracing::info!("ℹ️ 未注册 handlers,跳过 Kafka Consumer 初始化");
405                }
406            }
407        }
408
409        Ok((*app_state).clone())
410    }
411
412    /// 创建基础路由(包含健康检查等系统路由)
413    fn create_base_router(&self) -> AxumRouter {
414        let state = self.app_state.clone();
415
416        Router::new()
417            .route("/", get(root))
418            .route("/health", get(health_check))
419            .fallback(|| async {
420                (
421                    StatusCode::NOT_FOUND,
422                    Json(json!({
423                        "error": "Not found",
424                        "status": 404
425                    })),
426                )
427            })
428            .layer(
429                ServiceBuilder::new()
430                    .layer(CompressionLayer::new())
431                    .into_inner(),
432            )
433            .layer(create_cors_layer(&self.config))
434            .layer(create_trace_layer())
435            .with_state(state)
436    }
437
438    /// 创建完整路由(合并基础路由、用户自定义路由和 gRPC 服务)
439    async fn create_router(&mut self) -> AxumRouter {
440        let base_router = self.create_base_router();
441
442        // 合并用户提供的 HTTP 路由
443        let router = if let Some(custom) = self.http_router.take() {
444            base_router.merge(custom)
445        } else {
446            base_router
447        };
448
449        // 如果启用了 gRPC 特性,添加 gRPC 服务
450        #[cfg(feature = "grpc")]
451        let router = self.add_grpc_services(router).await;
452
453        // 如果配置了上下文路径,则嵌套到该路径下
454        if let Some(ref context_path) = self.config.server.context_path {
455            use std::borrow::Cow;
456            // 确保路径以 / 开头
457            let path: Cow<'_, str> = if context_path.starts_with('/') {
458                Cow::Borrowed(context_path.as_str())
459            } else {
460                // 如果用户没有以 / 开头,自动添加并记录警告
461                tracing::warn!("上下文路径 '{}' 应该以 '/' 开头,已自动修正", context_path);
462                Cow::Owned(format!("/{}", context_path))
463            };
464            Router::new().nest(path.as_ref(), router)
465        } else {
466            router
467        }
468    }
469
470    /// 添加 gRPC 服务到路由(仅在启用 grpc 特性时可用)
471    ///
472    /// 使用 axum 的 fallback_service 来托管 gRPC 服务
473    /// HTTP 路由优先匹配,未匹配的请求由 gRPC 服务处理
474    #[cfg(feature = "grpc")]
475    async fn add_grpc_services(&mut self, router: AxumRouter) -> AxumRouter {
476        if let Some(grpc_router) = self.grpc_router.take() {
477            tracing::info!("集成 gRPC 服务到 axum");
478
479            // 将 gRPC Router 转换为 tower::Service
480            let grpc_service = grpc_router.into_service();
481
482            // 创建一个适配器,将 axum Body 转换为 tonic Body
483            let grpc_adapter =
484                tower::service_fn(move |req: axum::http::Request<axum::body::Body>| {
485                    let mut grpc_service = grpc_service.clone();
486                    async move {
487                        // 转换请求体:axum Body -> tonic BoxBody
488                        let (parts, body) = req.into_parts();
489                        let body = body
490                            .map_err(|e| tonic::Status::internal(e.to_string()))
491                            .boxed_unsync();
492                        let req = axum::http::Request::from_parts(parts, body);
493
494                        // 调用 gRPC 服务
495                        use tower::Service;
496                        let res = match grpc_service.call(req).await {
497                            Ok(r) => r,
498                            Err(_) => {
499                                // gRPC 服务错误,返回 500
500                                return Ok(axum::http::Response::builder()
501                                    .status(StatusCode::INTERNAL_SERVER_ERROR)
502                                    .body(axum::body::Body::empty())
503                                    .unwrap());
504                            }
505                        };
506
507                        // 转换响应体:tonic BoxBody -> axum Body
508                        let (parts, body) = res.into_parts();
509                        let body = axum::body::Body::new(body);
510                        Ok::<_, std::convert::Infallible>(axum::http::Response::from_parts(
511                            parts, body,
512                        ))
513                    }
514                });
515
516            // 使用 fallback_service 托管 gRPC 服务
517            // HTTP 路由优先处理,未匹配的请求由 gRPC 服务处理
518            router.fallback_service(grpc_adapter)
519        } else {
520            router
521        }
522    }
523
524    /// 内部启动方法(被 start_internal 调用)
525    async fn start_internal(mut self) -> AppResult<()> {
526        let addr = self.config.server.socket_addr()?;
527        let app = self.create_router().await;
528
529        let base_url = if let Some(ref context_path) = self.config.server.context_path {
530            format!("http://{}{}", addr, context_path)
531        } else {
532            format!("http://{}", addr)
533        };
534
535        info!("服务器启动在 {}", base_url);
536        info!("健康检查: {}/health", base_url);
537
538        #[cfg(feature = "grpc")]
539        if self.grpc_router.is_some() {
540            info!("gRPC 服务已启用");
541        }
542
543        let listener = tokio::net::TcpListener::bind(&addr).await?;
544
545        axum::serve(listener, app)
546            .with_graceful_shutdown(shutdown_signal())
547            .await?;
548
549        Ok(())
550    }
551}
552
553/// 优雅关闭信号处理
554async fn shutdown_signal() {
555    let ctrl_c = async {
556        signal::ctrl_c().await.expect("无法安装 Ctrl+C 信号处理器");
557        info!("收到 Ctrl+C 信号,开始优雅关闭...");
558    };
559
560    #[cfg(unix)]
561    let terminate = async {
562        signal::unix::signal(signal::unix::SignalKind::terminate())
563            .expect("无法安装 SIGTERM 信号处理器")
564            .recv()
565            .await;
566        warn!("收到 SIGTERM 信号,开始优雅关闭...");
567    };
568
569    #[cfg(not(unix))]
570    let terminate = std::future::pending::<()>();
571
572    tokio::select! {
573        _ = ctrl_c => {},
574        _ = terminate => {},
575    }
576
577    info!("服务器正在关闭...");
578}