fbc_starter/
server.rs

1use axum::{
2    http::StatusCode,
3    response::Json,
4    routing::{get, Router},
5    Router as AxumRouter,
6};
7use serde_json::json;
8use std::sync::Arc;
9use tokio::signal;
10use tower::ServiceBuilder;
11use tower_http::compression::CompressionLayer;
12use tracing::{info, warn};
13
14use crate::config::Config;
15use crate::http::{create_cors_layer, create_trace_layer, health_check, root};
16use crate::state::AppState;
17
18#[cfg(feature = "grpc")]
19use http_body_util::BodyExt;
20
21/// Web 服务器
22pub struct Server {
23    config: Config,
24    app_state: Arc<AppState>,
25    http_router: Option<AxumRouter>,
26    #[cfg(feature = "grpc")]
27    grpc_router: Option<tonic::transport::server::Router>,
28}
29
30/// 服务器构建器,用于在闭包中配置服务器
31pub struct ServerBuilder {
32    config: Config,
33    app_state: Arc<AppState>,
34    http_router: Option<AxumRouter>,
35    #[cfg(feature = "grpc")]
36    grpc_router: Option<tonic::transport::server::Router>,
37    #[cfg(feature = "consumer")]
38    kafka_handlers: Vec<Arc<dyn crate::messaging::KafkaMessageHandler>>,
39}
40
41impl ServerBuilder {
42    /// 创建新的服务器构建器
43    #[warn(dead_code)]
44    fn new(config: Config, app_state: Arc<AppState>) -> Self {
45        Self {
46            config,
47            app_state,
48            http_router: None,
49            #[cfg(feature = "grpc")]
50            grpc_router: None,
51            #[cfg(feature = "consumer")]
52            kafka_handlers: Vec::new(),
53        }
54    }
55
56    /// 获取应用状态的引用
57    pub fn app_state(&self) -> &Arc<AppState> {
58        &self.app_state
59    }
60
61    /// 注册 Kafka 消息处理器
62    ///
63    /// # 参数
64    /// - `handler`: 实现了 KafkaMessageHandler trait 的处理器
65    ///
66    /// # 示例
67    /// ```ignore
68    /// Server::run(|builder| {
69    ///     builder
70    ///         .with_kafka_handler(Arc::new(MyHandler::new()))
71    ///         .http_router(routes)
72    /// })
73    /// ```
74    #[cfg(feature = "consumer")]
75    pub fn with_kafka_handler(
76        mut self,
77        handler: Arc<dyn crate::messaging::KafkaMessageHandler>,
78    ) -> Self {
79        self.kafka_handlers.push(handler);
80        self
81    }
82
83    /// 批量注册 Kafka 消息处理器
84    ///
85    /// # 示例
86    ///
87    /// ```no_run
88    /// use fbc_starter::Server;
89    /// use std::sync::Arc;
90    ///
91    /// Server::run(|builder| {
92    ///     let handlers = vec![
93    ///         Arc::new(Handler1::new()) as Arc<dyn fbc_starter::KafkaMessageHandler>,
94    ///         Arc::new(Handler2::new()) as Arc<dyn fbc_starter::KafkaMessageHandler>,
95    ///     ];
96    ///     builder
97    ///         .with_kafka_handlers(handlers)
98    ///         .http_router(routes)
99    /// })
100    /// ```
101    #[cfg(feature = "consumer")]
102    pub fn with_kafka_handlers(
103        mut self,
104        handlers: Vec<Arc<dyn crate::messaging::KafkaMessageHandler>>,
105    ) -> Self {
106        self.kafka_handlers.extend(handlers);
107        self
108    }
109
110    /// 设置 HTTP 路由
111    pub fn http_router(mut self, router: AxumRouter) -> Self {
112        self.http_router = Some(router);
113        self
114    }
115
116    /// 设置 gRPC 路由(仅在启用 grpc 特性时可用)
117    #[cfg(feature = "grpc")]
118    pub fn grpc_router(mut self, router: tonic::transport::server::Router) -> Self {
119        self.grpc_router = Some(router);
120        self
121    }
122
123    /// 获取配置的引用
124    pub fn config(&self) -> &Config {
125        &self.config
126    }
127
128    /// 获取应用状态的引用
129    pub fn state(&self) -> &Arc<AppState> {
130        &self.app_state
131    }
132
133    /// 构建 Server
134    fn build(self) -> Server {
135        Server {
136            config: self.config,
137            app_state: self.app_state,
138            http_router: self.http_router,
139            #[cfg(feature = "grpc")]
140            grpc_router: self.grpc_router,
141        }
142    }
143}
144
145impl Server {
146    /// 启动服务器(使用闭包配置方式)
147    ///
148    /// # 参数
149    /// - `configure`: 配置闭包,接收 ServerBuilder 用于配置路由
150    ///
151    /// # 示例
152    /// ```rust,no_run
153    /// use fbc_starter::{Config, Server};
154    /// use axum::{routing::get, Router};
155    ///
156    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
157    /// Server::run(|builder| {
158    ///     // 获取配置
159    ///     let config = builder.config();
160    ///     
161    ///     // 配置 HTTP 路由
162    ///     let http_router = Router::new()
163    ///         .route("/api/users", get(|| async { "users" }));
164    ///     
165    ///     // 配置 gRPC 路由(如果启用)
166    ///     #[cfg(feature = "grpc")]
167    ///     let grpc_router = tonic::transport::Server::builder()
168    ///         .add_service(MyServiceServer::new(MyServiceImpl));
169    ///     
170    ///     builder
171    ///         .http_router(http_router)
172    ///         #[cfg(feature = "grpc")]
173    ///         .grpc_router(grpc_router)
174    /// }).await?;
175    /// # Ok(())
176    /// # }
177    /// ```
178    pub async fn run<F>(configure: F) -> Result<(), anyhow::Error>
179    where
180        F: FnOnce(ServerBuilder) -> ServerBuilder,
181    {
182        // 加载配置
183        let config = Config::from_env().unwrap_or_else(|e| {
184            eprintln!("⚠ 警告: 无法从环境变量加载配置: {}, 使用默认配置", e);
185            Config::default()
186        });
187
188        Self::run_with_config(config, configure).await
189    }
190
191    /// 使用指定配置启动服务器
192    ///
193    /// # 参数
194    /// - `config`: 服务器配置
195    /// - `configure`: 配置闭包,接收 ServerBuilder 用于配置路由
196    pub async fn run_with_config<F>(config: Config, configure: F) -> Result<(), anyhow::Error>
197    where
198        F: FnOnce(ServerBuilder) -> ServerBuilder,
199    {
200        // 初始化日志
201        crate::init_logging(&config).map_err(|e| anyhow::anyhow!("日志初始化失败: {}", e))?;
202
203        tracing::info!(
204            "启动 {} v{}",
205            env!("CARGO_PKG_NAME"),
206            env!("CARGO_PKG_VERSION")
207        );
208
209        // 初始化 Nacos(如果配置了且启用了 nacos 特性)
210        #[cfg(feature = "nacos")]
211        if let Some(ref nacos_config) = config.nacos {
212            crate::nacos::init_nacos(nacos_config)
213                .await
214                .map_err(|e| anyhow::anyhow!("Nacos 初始化失败: {}", e))?;
215
216            // 注册服务
217            crate::nacos::register_service(nacos_config, &config.server)
218                .await
219                .map_err(|e| anyhow::anyhow!("Nacos 服务注册失败: {}", e))?;
220
221            // 订阅服务
222            crate::nacos::subscribe_services(nacos_config)
223                .await
224                .map_err(|e| anyhow::anyhow!("Nacos 服务订阅失败: {}", e))?;
225
226            // 订阅配置
227            crate::nacos::subscribe_configs(nacos_config)
228                .await
229                .map_err(|e| anyhow::anyhow!("Nacos 配置订阅失败: {}", e))?;
230        }
231        // 初始化应用状态(不包括消费者组,因为需要在闭包中注册 handlers 后才能初始化)
232        let app_state = Self::init_app_state(&config).await?;
233
234        // 创建初始的 builder(不初始化 app_state)
235        let mut builder = ServerBuilder::new(config.clone(), Arc::new(app_state));
236
237        // 使用闭包配置服务器(此时可以注册 Kafka handlers)
238        builder = configure(builder);
239
240        // 初始化 Kafka 消费者组(需要在 handlers 注册后执行)
241        #[cfg(feature = "consumer")]
242        {
243            builder.app_state = Arc::new(
244                Self::init_kafka_consumers(
245                    &config,
246                    &builder.kafka_handlers,
247                    builder.app_state.clone(),
248                )
249                .await?,
250            );
251        }
252
253        let server = builder.build();
254
255        // 启动服务器
256        server.start_internal().await
257    }
258
259    /// 初始化应用状态(不包括 Kafka 消费者组)
260    ///
261    /// Kafka 消费者组需要在 handlers 注册后才能初始化,因此单独处理
262    #[allow(unused_variables)] // config 在条件编译中可能看起来未使用,但实际上被使用了
263    async fn init_app_state(config: &Config) -> Result<AppState, anyhow::Error> {
264        let app_state = AppState::new();
265
266        // 初始化数据库(如果配置了且启用了 mysql/postgres/sqlite 任一特性)
267        #[cfg(any(feature = "mysql", feature = "postgres", feature = "sqlite"))]
268        let app_state = if let Some(ref db_config) = config.database {
269            if db_config.has_any() {
270                let pools = crate::database::init_database(db_config)
271                    .await
272                    .map_err(|e| anyhow::anyhow!("数据库初始化失败: {}", e))?;
273                app_state.with_database_pools(pools)
274            } else {
275                app_state
276            }
277        } else {
278            app_state
279        };
280
281        // 初始化 Redis(如果配置了且启用了 redis 特性)
282        #[cfg(feature = "redis")]
283        let app_state = if let Some(ref redis_config) = config.redis {
284            let pool = crate::cache::redis::init_redis(
285                &redis_config.url,
286                redis_config.password.as_deref(),
287                redis_config.pool_size,
288            )
289            .await
290            .map_err(|e| anyhow::anyhow!("Redis 初始化失败: {}", e))?;
291            app_state.with_redis(pool)
292        } else {
293            app_state
294        };
295
296        // Kafka 消息代理初始化(按需初始化 producer 和 consumer)
297        #[cfg(feature = "kafka")]
298        let app_state = if let Some(ref kafka_config) = config.kafka {
299            use std::sync::Arc;
300            let mut state = app_state;
301
302            // 按需初始化 Producer(仅当启用 producer 特性且配置了 producer 时)
303            #[cfg(feature = "producer")]
304            {
305                if let Some(ref producer_config) = kafka_config.producer {
306                    use crate::messaging::kafka::KafkaProducer;
307
308                    let producer = Arc::new(
309                        KafkaProducer::new(&kafka_config.brokers, producer_config)
310                            .map_err(|e| anyhow::anyhow!("Kafka 生产者初始化失败: {}", e))?,
311                    )
312                        as Arc<dyn crate::messaging::MessageProducer + Send + Sync>;
313                    state = state.with_message_producer(producer);
314                }
315            }
316
317            state
318        } else {
319            app_state
320        };
321
322        Ok(app_state)
323    }
324
325    /// 初始化 Kafka 消费者组
326    ///
327    /// 此方法需要在 handlers 注册后才能调用,因为需要根据 handlers 来创建消费者
328    #[cfg(feature = "consumer")]
329    async fn init_kafka_consumers(
330        config: &Config,
331        kafka_handlers: &[Arc<dyn crate::messaging::KafkaMessageHandler>],
332        mut app_state: Arc<AppState>,
333    ) -> Result<AppState, anyhow::Error> {
334        use crate::messaging::{kafka, KafkaMessageRouter};
335        use std::sync::Arc;
336
337        // 检查是否配置了 Kafka 和 Consumer
338        if let Some(ref kafka_config) = config.kafka {
339            if let Some(ref consumer_config) = kafka_config.consumer {
340                // 如果有注册的 handlers,自动订阅并设置路由
341                if !kafka_handlers.is_empty() {
342                    let router = Arc::new(KafkaMessageRouter::new(kafka_handlers.to_vec()));
343                    let subscribe_topics = router.get_subscribe_topics();
344
345                    if !subscribe_topics.is_empty() {
346                        // 根据 handlers 创建多个按 group_id 分组的 consumer
347                        let consumers_by_group = kafka::create_consumers_from_handlers(
348                            &kafka_config.brokers,
349                            consumer_config,
350                            kafka_handlers,
351                        )
352                        .map_err(|e| anyhow::anyhow!("Kafka 消费者初始化失败: {}", e))?;
353
354                        let router_clone = router.clone();
355                        let handler = Arc::new(move |message: crate::messaging::Message| {
356                            let router = router_clone.clone();
357                            tokio::spawn(async move {
358                                router.dispatch(message).await;
359                            });
360                        });
361
362                        // 为每个 group_id 的 consumer 订阅对应的 topics
363                        let mut first_consumer: Option<
364                            Arc<dyn crate::messaging::MessageConsumer + Send + Sync>,
365                        > = None;
366                        for (group_id, (consumer, topics)) in consumers_by_group {
367                            // 过滤出该 consumer 需要订阅的 topics(与 handlers 注册的 topics 的交集)
368                            let topics_to_subscribe: Vec<String> = topics
369                                .into_iter()
370                                .filter(|t| subscribe_topics.contains(t))
371                                .collect();
372
373                            if !topics_to_subscribe.is_empty() {
374                                let consumer_arc = Arc::new(consumer)
375                                    as Arc<dyn crate::messaging::MessageConsumer + Send + Sync>;
376
377                                // 保存第一个 consumer 用于 state(向后兼容)
378                                if first_consumer.is_none() {
379                                    first_consumer = Some(consumer_arc.clone());
380                                }
381
382                                consumer_arc
383                                    .subscribe_topics(topics_to_subscribe.clone(), handler.clone())
384                                    .await
385                                    .map_err(|e| {
386                                        anyhow::anyhow!(
387                                            "Kafka 订阅失败 (group: {}): {}",
388                                            group_id,
389                                            e
390                                        )
391                                    })?;
392
393                                tracing::info!(
394                                    "✅ Kafka Consumer 初始化成功 (group: {}, 订阅 {} 个 topics: {:?})",
395                                    group_id,
396                                    topics_to_subscribe.len(),
397                                    topics_to_subscribe
398                                );
399                            }
400                        }
401
402                        // 向后兼容:将第一个 consumer 设置到 state 中
403                        if let Some(consumer) = first_consumer {
404                            app_state =
405                                Arc::new((*app_state).clone().with_message_consumer(consumer));
406                        }
407                    } else {
408                        tracing::warn!(
409                            "⚠️ Kafka Consumer 已初始化,但没有 handler 注册任何 topics"
410                        );
411                    }
412                } else {
413                    tracing::info!("ℹ️ 未注册 handlers,跳过 Kafka Consumer 初始化");
414                }
415            }
416        }
417
418        Ok((*app_state).clone())
419    }
420
421    /// 创建基础路由(包含健康检查等系统路由)
422    fn create_base_router(&self) -> AxumRouter {
423        let state = self.app_state.clone();
424
425        Router::new()
426            .route("/", get(root))
427            .route("/health", get(health_check))
428            .fallback(|| async {
429                (
430                    StatusCode::NOT_FOUND,
431                    Json(json!({
432                        "error": "Not found",
433                        "status": 404
434                    })),
435                )
436            })
437            .layer(
438                ServiceBuilder::new()
439                    .layer(CompressionLayer::new())
440                    .into_inner(),
441            )
442            .layer(create_cors_layer(&self.config))
443            .layer(create_trace_layer())
444            .with_state(state)
445    }
446
447    /// 创建完整路由(合并基础路由、用户自定义路由和 gRPC 服务)
448    async fn create_router(&mut self) -> AxumRouter {
449        let base_router = self.create_base_router();
450
451        // 合并用户提供的 HTTP 路由
452        let router = if let Some(custom) = self.http_router.take() {
453            base_router.merge(custom)
454        } else {
455            base_router
456        };
457
458        // 如果启用了 gRPC 特性,添加 gRPC 服务
459        #[cfg(feature = "grpc")]
460        let router = self.add_grpc_services(router).await;
461
462        // 如果配置了上下文路径,则嵌套到该路径下
463        if let Some(ref context_path) = self.config.server.context_path {
464            use std::borrow::Cow;
465            // 确保路径以 / 开头
466            let path: Cow<'_, str> = if context_path.starts_with('/') {
467                Cow::Borrowed(context_path.as_str())
468            } else {
469                // 如果用户没有以 / 开头,自动添加并记录警告
470                tracing::warn!("上下文路径 '{}' 应该以 '/' 开头,已自动修正", context_path);
471                Cow::Owned(format!("/{}", context_path))
472            };
473            Router::new().nest(path.as_ref(), router)
474        } else {
475            router
476        }
477    }
478
479    /// 添加 gRPC 服务到路由(仅在启用 grpc 特性时可用)
480    ///
481    /// 使用 axum 的 fallback_service 来托管 gRPC 服务
482    /// HTTP 路由优先匹配,未匹配的请求由 gRPC 服务处理
483    #[cfg(feature = "grpc")]
484    async fn add_grpc_services(&mut self, router: AxumRouter) -> AxumRouter {
485        if let Some(grpc_router) = self.grpc_router.take() {
486            tracing::info!("集成 gRPC 服务到 axum");
487
488            // 将 gRPC Router 转换为 tower::Service
489            let grpc_service = grpc_router.into_service();
490
491            // 创建一个适配器,将 axum Body 转换为 tonic Body
492            let grpc_adapter =
493                tower::service_fn(move |req: axum::http::Request<axum::body::Body>| {
494                    let mut grpc_service = grpc_service.clone();
495                    async move {
496                        // 转换请求体:axum Body -> tonic BoxBody
497                        let (parts, body) = req.into_parts();
498                        let body = body
499                            .map_err(|e| tonic::Status::internal(e.to_string()))
500                            .boxed_unsync();
501                        let req = axum::http::Request::from_parts(parts, body);
502
503                        // 调用 gRPC 服务
504                        use tower::Service;
505                        let res = match grpc_service.call(req).await {
506                            Ok(r) => r,
507                            Err(_) => {
508                                // gRPC 服务错误,返回 500
509                                return Ok(axum::http::Response::builder()
510                                    .status(StatusCode::INTERNAL_SERVER_ERROR)
511                                    .body(axum::body::Body::empty())
512                                    .unwrap());
513                            }
514                        };
515
516                        // 转换响应体:tonic BoxBody -> axum Body
517                        let (parts, body) = res.into_parts();
518                        let body = axum::body::Body::new(body);
519                        Ok::<_, std::convert::Infallible>(axum::http::Response::from_parts(
520                            parts, body,
521                        ))
522                    }
523                });
524
525            // 使用 fallback_service 托管 gRPC 服务
526            // HTTP 路由优先处理,未匹配的请求由 gRPC 服务处理
527            router.fallback_service(grpc_adapter)
528        } else {
529            router
530        }
531    }
532
533    /// 内部启动方法(被 start_internal 调用)
534    async fn start_internal(mut self) -> Result<(), anyhow::Error> {
535        let addr = self.config.server.socket_addr()?;
536        let app = self.create_router().await;
537
538        let base_url = if let Some(ref context_path) = self.config.server.context_path {
539            format!("http://{}{}", addr, context_path)
540        } else {
541            format!("http://{}", addr)
542        };
543
544        info!("服务器启动在 {}", base_url);
545        info!("健康检查: {}/health", base_url);
546
547        #[cfg(feature = "grpc")]
548        if self.grpc_router.is_some() {
549            info!("gRPC 服务已启用");
550        }
551
552        let listener = tokio::net::TcpListener::bind(&addr).await?;
553
554        axum::serve(listener, app)
555            .with_graceful_shutdown(shutdown_signal())
556            .await?;
557
558        Ok(())
559    }
560}
561
562/// 优雅关闭信号处理
563async fn shutdown_signal() {
564    let ctrl_c = async {
565        signal::ctrl_c().await.expect("无法安装 Ctrl+C 信号处理器");
566        info!("收到 Ctrl+C 信号,开始优雅关闭...");
567    };
568
569    #[cfg(unix)]
570    let terminate = async {
571        signal::unix::signal(signal::unix::SignalKind::terminate())
572            .expect("无法安装 SIGTERM 信号处理器")
573            .recv()
574            .await;
575        warn!("收到 SIGTERM 信号,开始优雅关闭...");
576    };
577
578    #[cfg(not(unix))]
579    let terminate = std::future::pending::<()>();
580
581    tokio::select! {
582        _ = ctrl_c => {},
583        _ = terminate => {},
584    }
585
586    info!("服务器正在关闭...");
587}