fbc_starter/
state.rs

1use chrono::Local;
2#[cfg(feature = "redis")]
3use deadpool_redis::Pool;
4#[cfg(feature = "mysql")]
5use sqlx::mysql::MySqlPool;
6#[cfg(feature = "postgres")]
7use sqlx::postgres::PgPool;
8#[cfg(feature = "sqlite")]
9use sqlx::sqlite::SqlitePool;
10#[cfg(any(feature = "mysql", feature = "postgres", feature = "sqlite"))]
11use std::sync::Arc;
12
13/// 应用状态(包含数据库连接池、Redis 客户端、Kafka 消息生产者/消费者等)
14#[derive(Clone)]
15pub struct AppState {
16    pub start_time: chrono::DateTime<Local>,
17    #[cfg(feature = "mysql")]
18    pub mysql: Option<Arc<MySqlPool>>,
19    #[cfg(feature = "postgres")]
20    pub postgres: Option<Arc<PgPool>>,
21    #[cfg(feature = "sqlite")]
22    pub sqlite: Option<Arc<SqlitePool>>,
23    #[cfg(feature = "redis")]
24    pub redis: Option<Pool>,
25    #[cfg(feature = "producer")]
26    pub message_producer: Option<crate::messaging::MessageProducerType>,
27    #[cfg(feature = "consumer")]
28    pub message_consumer: Option<crate::messaging::MessageConsumerType>,
29}
30
31impl AppState {
32    pub fn new() -> Self {
33        Self {
34            start_time: Local::now(),
35            #[cfg(feature = "mysql")]
36            mysql: None,
37            #[cfg(feature = "postgres")]
38            postgres: None,
39            #[cfg(feature = "sqlite")]
40            sqlite: None,
41            #[cfg(feature = "redis")]
42            redis: None,
43            #[cfg(feature = "producer")]
44            message_producer: None,
45            #[cfg(feature = "consumer")]
46            message_consumer: None,
47        }
48    }
49
50    /// 设置 MySQL 连接池
51    #[cfg(feature = "mysql")]
52    pub fn with_mysql(mut self, pool: Arc<MySqlPool>) -> Self {
53        self.mysql = Some(pool);
54        self
55    }
56
57    /// 设置 Postgres 连接池
58    #[cfg(feature = "postgres")]
59    pub fn with_postgres(mut self, pool: Arc<PgPool>) -> Self {
60        self.postgres = Some(pool);
61        self
62    }
63
64    /// 设置 SQLite 连接池
65    #[cfg(feature = "sqlite")]
66    pub fn with_sqlite(mut self, pool: Arc<SqlitePool>) -> Self {
67        self.sqlite = Some(pool);
68        self
69    }
70
71    /// 批量设置三个数据库连接池
72    #[cfg(any(feature = "mysql", feature = "postgres", feature = "sqlite"))]
73    pub fn with_database_pools(mut self, pools: crate::database::DatabasePools) -> Self {
74        #[cfg(feature = "mysql")]
75        if let Some(pool) = pools.mysql {
76            self.mysql = Some(Arc::new(pool));
77        }
78        #[cfg(feature = "postgres")]
79        if let Some(pool) = pools.postgres {
80            self.postgres = Some(Arc::new(pool));
81        }
82        #[cfg(feature = "sqlite")]
83        if let Some(pool) = pools.sqlite {
84            self.sqlite = Some(Arc::new(pool));
85        }
86        self
87    }
88
89    /// 设置 Redis 连接池
90    #[cfg(feature = "redis")]
91    pub fn with_redis(mut self, pool: Pool) -> Self {
92        self.redis = Some(pool);
93        self
94    }
95
96    /// 设置 Kafka 消息生产者
97    #[cfg(feature = "producer")]
98    pub fn with_message_producer(
99        mut self,
100        message_producer: crate::messaging::MessageProducerType,
101    ) -> Self {
102        self.message_producer = Some(message_producer);
103        self
104    }
105
106    /// 设置 Kafka 消息消费者
107    #[cfg(feature = "consumer")]
108    pub fn with_message_consumer(
109        mut self,
110        message_consumer: crate::messaging::MessageConsumerType,
111    ) -> Self {
112        self.message_consumer = Some(message_consumer);
113        self
114    }
115
116    /// 获取 MySQL 连接池
117    #[cfg(feature = "mysql")]
118    pub fn mysql(&self) -> crate::error::AppResult<Arc<MySqlPool>> {
119        self.mysql
120            .clone()
121            .ok_or(crate::error::AppError::DatabaseNotInitialized)
122    }
123
124    /// 获取 Postgres 连接池
125    #[cfg(feature = "postgres")]
126    pub fn postgres(&self) -> crate::error::AppResult<Arc<PgPool>> {
127        self.postgres
128            .clone()
129            .ok_or(crate::error::AppError::DatabaseNotInitialized)
130    }
131
132    /// 获取 SQLite 连接池
133    #[cfg(feature = "sqlite")]
134    pub fn sqlite(&self) -> crate::error::AppResult<Arc<SqlitePool>> {
135        self.sqlite
136            .clone()
137            .ok_or(crate::error::AppError::DatabaseNotInitialized)
138    }
139
140    /// 获取 Redis 连接
141    ///
142    /// 每次调用都会从连接池获取一个新的连接,支持并发操作
143    #[cfg(feature = "redis")]
144    pub async fn redis(&self) -> crate::error::AppResult<deadpool_redis::Connection> {
145        self.redis
146            .as_ref()
147            .ok_or(crate::error::AppError::RedisNotInitialized)?
148            .get()
149            .await
150            .map_err(|e| {
151                crate::error::AppError::Internal(anyhow::anyhow!("获取 Redis 连接失败: {}", e))
152            })
153    }
154
155    /// 获取 Kafka 消息生产者
156    #[cfg(feature = "producer")]
157    pub fn message_producer(
158        &self,
159    ) -> crate::error::AppResult<&crate::messaging::MessageProducerType> {
160        self.message_producer.as_ref().ok_or_else(|| {
161            crate::error::AppError::Internal(anyhow::anyhow!("Kafka 消息生产者未初始化"))
162        })
163    }
164
165    /// 获取 Kafka 消息消费者
166    #[cfg(feature = "consumer")]
167    pub fn message_consumer(
168        &self,
169    ) -> crate::error::AppResult<&crate::messaging::MessageConsumerType> {
170        self.message_consumer.as_ref().ok_or_else(|| {
171            crate::error::AppError::Internal(anyhow::anyhow!("Kafka 消息消费者未初始化"))
172        })
173    }
174}
175
176impl Default for AppState {
177    fn default() -> Self {
178        Self::new()
179    }
180}