Skip to main content

fbc_starter/
server.rs

1use axum::{
2    http::StatusCode,
3    response::Json,
4    routing::{get, Router},
5    Router as AxumRouter,
6};
7use serde_json::json;
8use std::sync::Arc;
9use tokio::signal;
10use tower::ServiceBuilder;
11use tower_http::compression::CompressionLayer;
12use tracing::{info, warn};
13
14use crate::http::{create_cors_layer, health_check, request_logging_middleware, root};
15// gRPC 专用日志工具函数,预留给需要对 gRPC 请求/响应做细粒度日志记录的场景
16// (当前全局 request_logging_middleware 已覆盖基本的 gRPC fallback 日志)
17#[cfg(feature = "grpc")]
18#[allow(unused_imports)]
19use crate::http::{grpc_log_request, grpc_log_response};
20use crate::state::AppState;
21use crate::{config::Config, AppResult};
22
23/// Web 服务器
24pub struct Server {
25    config: Config,
26    app_state: Arc<AppState>,
27    http_router: Option<AxumRouter>,
28    #[cfg(feature = "grpc")]
29    grpc_router: Option<tonic::service::Routes>,
30}
31
32/// 服务器构建器,用于在闭包中配置服务器
33pub struct ServerBuilder {
34    config: Config,
35    app_state: Arc<AppState>,
36    http_router: Option<AxumRouter>,
37    #[cfg(feature = "grpc")]
38    grpc_router: Option<tonic::service::Routes>,
39    #[cfg(feature = "consumer")]
40    kafka_handlers: Vec<Arc<dyn crate::messaging::KafkaMessageHandler>>,
41}
42
43impl ServerBuilder {
44    /// 创建新的服务器构建器
45    #[warn(dead_code)]
46    fn new(config: Config, app_state: Arc<AppState>) -> Self {
47        Self {
48            config,
49            app_state,
50            http_router: None,
51            #[cfg(feature = "grpc")]
52            grpc_router: None,
53            #[cfg(feature = "consumer")]
54            kafka_handlers: Vec::new(),
55        }
56    }
57
58    /// 获取应用状态的引用
59    pub fn app_state(&self) -> &Arc<AppState> {
60        &self.app_state
61    }
62
63    /// 注册 Kafka 消息处理器
64    ///
65    /// # 参数
66    /// - `handler`: 实现了 KafkaMessageHandler trait 的处理器
67    ///
68    /// # 示例
69    /// ```ignore
70    /// Server::run(|builder| {
71    ///     builder
72    ///         .with_kafka_handler(Arc::new(MyHandler::new()))
73    ///         .http_router(routes)
74    /// })
75    /// ```
76    #[cfg(feature = "consumer")]
77    pub fn with_kafka_handler(
78        mut self,
79        handler: Arc<dyn crate::messaging::KafkaMessageHandler>,
80    ) -> Self {
81        self.kafka_handlers.push(handler);
82        self
83    }
84
85    /// 批量注册 Kafka 消息处理器
86    ///
87    /// # 示例
88    ///
89    /// ```no_run
90    /// use fbc_starter::Server;
91    /// use std::sync::Arc;
92    ///
93    /// Server::run(|builder| {
94    ///     let handlers = vec![
95    ///         Arc::new(Handler1::new()) as Arc<dyn fbc_starter::KafkaMessageHandler>,
96    ///         Arc::new(Handler2::new()) as Arc<dyn fbc_starter::KafkaMessageHandler>,
97    ///     ];
98    ///     builder
99    ///         .with_kafka_handlers(handlers)
100    ///         .http_router(routes)
101    /// })
102    /// ```
103    #[cfg(feature = "consumer")]
104    pub fn with_kafka_handlers(
105        mut self,
106        handlers: Vec<Arc<dyn crate::messaging::KafkaMessageHandler>>,
107    ) -> Self {
108        self.kafka_handlers.extend(handlers);
109        self
110    }
111
112    /// 设置 HTTP 路由
113    pub fn http_router(mut self, router: AxumRouter) -> Self {
114        self.http_router = Some(router);
115        self
116    }
117
118    /// 设置 gRPC 路由(仅在启用 grpc 特性时可用)
119    #[cfg(feature = "grpc")]
120    pub fn grpc_router(mut self, router: tonic::service::Routes) -> Self {
121        self.grpc_router = Some(router);
122        self
123    }
124
125    /// 获取配置的引用
126    pub fn config(&self) -> &Config {
127        &self.config
128    }
129
130    /// 获取应用状态的引用
131    pub fn state(&self) -> &Arc<AppState> {
132        &self.app_state
133    }
134
135    /// 构建 Server
136    fn build(self) -> Server {
137        Server {
138            config: self.config,
139            app_state: self.app_state,
140            http_router: self.http_router,
141            #[cfg(feature = "grpc")]
142            grpc_router: self.grpc_router,
143        }
144    }
145}
146
147impl Server {
148    /// 启动服务器(使用闭包配置方式)
149    ///
150    /// # 参数
151    /// - `configure`: 配置闭包,接收 ServerBuilder 用于配置路由
152    ///
153    /// # 示例
154    /// ```rust,no_run
155    /// use fbc_starter::{Config, Server};
156    /// use axum::{routing::get, Router};
157    ///
158    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
159    /// Server::run(|builder| {
160    ///     // 获取配置
161    ///     let config = builder.config();
162    ///     
163    ///     // 配置 HTTP 路由
164    ///     let http_router = Router::new()
165    ///         .route("/api/users", get(|| async { "users" }));
166    ///     
167    ///     // 配置 gRPC 路由(如果启用)
168    ///     #[cfg(feature = "grpc")]
169    ///     let grpc_router = tonic::transport::Server::builder()
170    ///         .add_service(MyServiceServer::new(MyServiceImpl));
171    ///     
172    ///     builder
173    ///         .http_router(http_router)
174    ///         #[cfg(feature = "grpc")]
175    ///         .grpc_router(grpc_router)
176    /// }).await?;
177    /// # Ok(())
178    /// # }
179    /// ```
180    pub async fn run<F>(configure: F) -> AppResult<()>
181    where
182        F: FnOnce(ServerBuilder) -> ServerBuilder,
183    {
184        // 加载配置
185        let config = Config::from_env().unwrap_or_else(|e| {
186            eprintln!("⚠ 警告: 无法从环境变量加载配置: {}, 使用默认配置", e);
187            Config::default()
188        });
189
190        Self::run_with_config(config, configure).await
191    }
192
193    /// 使用指定配置启动服务器
194    ///
195    /// # 参数
196    /// - `config`: 服务器配置
197    /// - `configure`: 配置闭包,接收 ServerBuilder 用于配置路由
198    pub async fn run_with_config<F>(config: Config, configure: F) -> AppResult<()>
199    where
200        F: FnOnce(ServerBuilder) -> ServerBuilder,
201    {
202        // 初始化日志
203        crate::init_logging(&config).map_err(|e| anyhow::anyhow!("日志初始化失败: {}", e))?;
204
205        tracing::info!(
206            "启动 {} v{}",
207            env!("CARGO_PKG_NAME"),
208            env!("CARGO_PKG_VERSION")
209        );
210
211        // 初始化 Nacos(如果配置了且启用了 nacos 特性)
212        #[cfg(feature = "nacos")]
213        if let Some(ref nacos_config) = config.nacos {
214            crate::nacos::init_nacos(nacos_config)
215                .await
216                .map_err(|e| anyhow::anyhow!("Nacos 初始化失败: {}", e))?;
217
218            // 注册服务
219            crate::nacos::register_service(nacos_config, &config.server)
220                .await
221                .map_err(|e| anyhow::anyhow!("Nacos 服务注册失败: {}", e))?;
222
223            // 订阅服务
224            crate::nacos::subscribe_services(nacos_config)
225                .await
226                .map_err(|e| anyhow::anyhow!("Nacos 服务订阅失败: {}", e))?;
227
228            // 订阅配置
229            crate::nacos::subscribe_configs(nacos_config)
230                .await
231                .map_err(|e| anyhow::anyhow!("Nacos 配置订阅失败: {}", e))?;
232        }
233        // 初始化应用状态(不包括消费者组,因为需要在闭包中注册 handlers 后才能初始化)
234        let app_state = Self::init_app_state(&config).await?;
235
236        // 创建初始的 builder(不初始化 app_state)
237        let mut builder = ServerBuilder::new(config.clone(), Arc::new(app_state));
238
239        // 使用闭包配置服务器(此时可以注册 Kafka handlers)
240        builder = configure(builder);
241
242        // 初始化 Kafka 消费者组(需要在 handlers 注册后执行)
243        #[cfg(feature = "consumer")]
244        {
245            builder.app_state = Arc::new(
246                Self::init_kafka_consumers(
247                    &config,
248                    &builder.kafka_handlers,
249                    builder.app_state.clone(),
250                )
251                .await?,
252            );
253        }
254
255        let server = builder.build();
256
257        // 启动服务器
258        server.start_internal().await
259    }
260
261    /// 初始化应用状态(不包括 Kafka 消费者组)
262    ///
263    /// Kafka 消费者组需要在 handlers 注册后才能初始化,因此单独处理
264    #[allow(unused_variables)] // config 在条件编译中可能看起来未使用,但实际上被使用了
265    async fn init_app_state(config: &Config) -> AppResult<AppState> {
266        let app_state = AppState::new();
267
268        // 初始化数据库(如果配置了且启用了 mysql/postgres/sqlite 任一特性)
269        #[cfg(any(feature = "mysql", feature = "postgres", feature = "sqlite"))]
270        let app_state = if let Some(ref db_config) = config.database {
271            let pools = crate::database::init_database(db_config).await?;
272            app_state.with_database_pools(pools)
273        } else {
274            app_state
275        };
276
277        // 初始化 Redis(如果配置了且启用了 redis 特性)
278        #[cfg(feature = "redis")]
279        let app_state = if let Some(ref redis_config) = config.redis {
280            let pool = crate::cache::redis::init_redis(
281                &redis_config.url,
282                redis_config.password.as_deref(),
283                redis_config.pool_size,
284            )
285            .await?;
286            app_state.with_redis(pool)
287        } else {
288            app_state
289        };
290
291        // Kafka 消息代理初始化(按需初始化 producer 和 consumer)
292        #[cfg(feature = "kafka")]
293        let app_state = if let Some(ref kafka_config) = config.kafka {
294            // 按需初始化 Producer(仅当启用 producer 特性且配置了 producer 时)
295            #[cfg(feature = "producer")]
296            let state = if let Some(ref producer_config) = kafka_config.producer {
297                use crate::messaging::kafka::KafkaProducer;
298
299                let producer = Arc::new(
300                    KafkaProducer::new(&kafka_config.brokers, producer_config)
301                        .map_err(|e| anyhow::anyhow!("Kafka 生产者初始化失败: {}", e))?,
302                )
303                    as Arc<dyn crate::messaging::MessageProducer + Send + Sync>;
304                app_state.with_message_producer(producer)
305            } else {
306                app_state
307            };
308            #[cfg(not(feature = "producer"))]
309            let state = app_state;
310
311            state
312        } else {
313            app_state
314        };
315
316        Ok(app_state)
317    }
318
319    /// 初始化 Kafka 消费者组
320    ///
321    /// 此方法需要在 handlers 注册后才能调用,因为需要根据 handlers 来创建消费者
322    #[cfg(feature = "consumer")]
323    async fn init_kafka_consumers(
324        config: &Config,
325        kafka_handlers: &[Arc<dyn crate::messaging::KafkaMessageHandler>],
326        mut app_state: Arc<AppState>,
327    ) -> AppResult<AppState> {
328        use crate::messaging::{kafka, KafkaMessageRouter};
329        use std::sync::Arc;
330
331        // 检查是否配置了 Kafka 和 Consumer
332        if let Some(ref kafka_config) = config.kafka {
333            if let Some(ref consumer_config) = kafka_config.consumer {
334                // 如果有注册的 handlers,自动订阅并设置路由
335                if !kafka_handlers.is_empty() {
336                    let router = Arc::new(KafkaMessageRouter::new(kafka_handlers.to_vec()));
337                    let subscribe_topics = router.get_subscribe_topics();
338
339                    if !subscribe_topics.is_empty() {
340                        // 根据 handlers 创建多个按 group_id 分组的 consumer
341                        let consumers_by_group = kafka::create_consumers_from_handlers(
342                            &kafka_config.brokers,
343                            consumer_config,
344                            kafka_handlers,
345                        )?;
346
347                        let router_clone = router.clone();
348                        let handler = Arc::new(move |message: crate::messaging::Message| {
349                            let router = router_clone.clone();
350                            tokio::spawn(async move {
351                                router.dispatch(message).await;
352                            });
353                        });
354
355                        // 为每个 group_id 的 consumer 订阅对应的 topics
356                        let mut first_consumer: Option<
357                            Arc<dyn crate::messaging::MessageConsumer + Send + Sync>,
358                        > = None;
359                        for (group_id, (consumer, topics)) in consumers_by_group {
360                            // 过滤出该 consumer 需要订阅的 topics(与 handlers 注册的 topics 的交集)
361                            let topics_to_subscribe: Vec<String> = topics
362                                .into_iter()
363                                .filter(|t| subscribe_topics.contains(t))
364                                .collect();
365
366                            if !topics_to_subscribe.is_empty() {
367                                let consumer_arc = Arc::new(consumer)
368                                    as Arc<dyn crate::messaging::MessageConsumer + Send + Sync>;
369
370                                // 保存第一个 consumer 用于 state(向后兼容)
371                                if first_consumer.is_none() {
372                                    first_consumer = Some(consumer_arc.clone());
373                                }
374
375                                consumer_arc
376                                    .subscribe_topics(topics_to_subscribe.clone(), handler.clone())
377                                    .await
378                                    .map_err(|e| {
379                                        anyhow::anyhow!(
380                                            "Kafka 订阅失败 (group: {}): {}",
381                                            group_id,
382                                            e
383                                        )
384                                    })?;
385
386                                tracing::info!(
387                                    "✅ Kafka Consumer 初始化成功 (group: {}, 订阅 {} 个 topics: {:?})",
388                                    group_id,
389                                    topics_to_subscribe.len(),
390                                    topics_to_subscribe
391                                );
392                            }
393                        }
394
395                        // 向后兼容:将第一个 consumer 设置到 state 中
396                        if let Some(consumer) = first_consumer {
397                            app_state =
398                                Arc::new((*app_state).clone().with_message_consumer(consumer));
399                        }
400                    } else {
401                        tracing::warn!(
402                            "⚠️ Kafka Consumer 已初始化,但没有 handler 注册任何 topics"
403                        );
404                    }
405                } else {
406                    tracing::info!("ℹ️ 未注册 handlers,跳过 Kafka Consumer 初始化");
407                }
408            }
409        }
410
411        Ok((*app_state).clone())
412    }
413
414    /// 创建基础路由(包含健康检查等系统路由)
415    fn create_base_router(&self) -> AxumRouter {
416        let state = self.app_state.clone();
417
418        Router::new()
419            .route("/", get(root))
420            .route("/health", get(health_check))
421            .fallback(|| async {
422                (
423                    StatusCode::NOT_FOUND,
424                    Json(json!({
425                        "error": "Not found",
426                        "status": 404
427                    })),
428                )
429            })
430            .layer(
431                ServiceBuilder::new()
432                    .layer(CompressionLayer::new())
433                    .into_inner(),
434            )
435            .layer(create_cors_layer(&self.config))
436            .with_state(state)
437    }
438
439    /// 创建完整路由(合并基础路由、用户自定义路由和 gRPC 服务)
440    async fn create_router(&mut self) -> AxumRouter {
441        let base_router = self.create_base_router();
442
443        // 合并用户提供的 HTTP 路由
444        let router = if let Some(custom) = self.http_router.take() {
445            base_router.merge(custom)
446        } else {
447            base_router
448        };
449
450        // 如果启用了 gRPC 特性,添加 gRPC 服务
451        #[cfg(feature = "grpc")]
452        let router = self.add_grpc_services(router).await;
453
454        // 全局中间件:合并后添加,确保覆盖所有路由(用户路由 + 基础路由)
455        // 注意:axum 的 .merge() 不会将原 Router 的 layer 应用到被合并的路由,
456        //       所以必须在合并后统一添加。
457        let router = router
458            .layer(axum::middleware::from_fn(request_logging_middleware))
459            .layer(axum::middleware::from_fn(
460                crate::auth::user_context_middleware,
461            ));
462
463        // 如果配置了上下文路径,则嵌套到该路径下
464        if let Some(ref context_path) = self.config.server.context_path {
465            use std::borrow::Cow;
466            let path: Cow<'_, str> = if context_path.starts_with('/') {
467                Cow::Borrowed(context_path.as_str())
468            } else {
469                tracing::warn!("上下文路径 '{}' 应该以 '/' 开头,已自动修正", context_path);
470                Cow::Owned(format!("/{}", context_path))
471            };
472            Router::new().nest(path.as_ref(), router)
473        } else {
474            router
475        }
476    }
477
478    /// 添加 gRPC 服务到路由(仅在启用 grpc 特性时可用)
479    ///
480    /// 使用 axum 的 fallback_service 来托管 gRPC 服务
481    /// HTTP 路由优先匹配,未匹配的请求由 gRPC 服务处理
482    #[cfg(feature = "grpc")]
483    async fn add_grpc_services(&mut self, router: AxumRouter) -> AxumRouter {
484        if let Some(grpc_router) = self.grpc_router.take() {
485            tracing::info!("集成 gRPC 服务到 axum");
486
487            // tonic 0.13 原生支持 axum 0.8,直接转换为 axum Router
488            let grpc_axum_router = grpc_router.into_axum_router();
489
490            // 使用 fallback_service 托管 gRPC 服务
491            // HTTP 路由优先匹配,未匹配的请求由 gRPC 服务处理
492            router.fallback_service(grpc_axum_router)
493        } else {
494            router
495        }
496    }
497
498    /// 内部启动方法(被 start_internal 调用)
499    async fn start_internal(mut self) -> AppResult<()> {
500        let addr = self.config.server.socket_addr()?;
501        let app = self.create_router().await;
502
503        let base_url = if let Some(ref context_path) = self.config.server.context_path {
504            format!("http://{}{}", addr, context_path)
505        } else {
506            format!("http://{}", addr)
507        };
508
509        info!("服务器启动在 {}", base_url);
510        info!("健康检查: {}/health", base_url);
511
512        #[cfg(feature = "grpc")]
513        if self.grpc_router.is_some() {
514            info!("gRPC 服务已启用");
515        }
516
517        let listener = tokio::net::TcpListener::bind(&addr).await?;
518
519        // 保存配置副本用于优雅关闭时 Nacos 注销
520        #[cfg(feature = "nacos")]
521        let shutdown_config = self.config.clone();
522
523        // 使用 into_make_service_with_connect_info 以支持 ConnectInfo extractor
524        axum::serve(
525            listener,
526            app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
527        )
528        .with_graceful_shutdown(shutdown_signal())
529        .await?;
530
531        // 优雅关闭后:执行 Nacos 服务注销
532        #[cfg(feature = "nacos")]
533        if let Some(ref nacos_config) = shutdown_config.nacos {
534            info!("正在从 Nacos 注销服务...");
535            match crate::nacos::deregister_service(nacos_config, &shutdown_config.server).await {
536                Ok(_) => info!("Nacos 服务注销成功"),
537                Err(e) => warn!("Nacos 服务注销失败(将由心跳超时自动下线): {}", e),
538            }
539        }
540
541        info!("服务器已关闭");
542        Ok(())
543    }
544}
545
546/// 优雅关闭信号处理
547async fn shutdown_signal() {
548    let ctrl_c = async {
549        signal::ctrl_c().await.expect("无法安装 Ctrl+C 信号处理器");
550        info!("收到 Ctrl+C 信号,开始优雅关闭...");
551    };
552
553    #[cfg(unix)]
554    let terminate = async {
555        signal::unix::signal(signal::unix::SignalKind::terminate())
556            .expect("无法安装 SIGTERM 信号处理器")
557            .recv()
558            .await;
559        warn!("收到 SIGTERM 信号,开始优雅关闭...");
560    };
561
562    #[cfg(not(unix))]
563    let terminate = std::future::pending::<()>();
564
565    tokio::select! {
566        _ = ctrl_c => {},
567        _ = terminate => {},
568    }
569
570    info!("服务器正在关闭...");
571}