Skip to main content

tibba_cache/
lib.rs

1// Copyright 2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use serde::Deserialize;
16use snafu::{ResultExt, Snafu};
17use std::time::Duration;
18use tibba_config::Config;
19use tibba_error::Error as BaseError;
20use tibba_util::parse_uri;
21use validator::Validate;
22
23#[derive(Debug, Snafu)]
24pub enum Error {
25    #[snafu(display("config error: {source}"))]
26    Config {
27        #[snafu(source(from(tibba_config::Error, Box::new)))]
28        source: Box<tibba_config::Error>,
29    },
30    #[snafu(display("parse uri error: {source}"))]
31    ParseUri {
32        #[snafu(source(from(tibba_util::Error, Box::new)))]
33        source: Box<tibba_util::Error>,
34    },
35    #[snafu(display("single connect error: {source}"))]
36    SingleConnect { source: deadpool_redis::PoolError },
37    #[snafu(display("cluster connect error: {source}"))]
38    ClusterConnect {
39        source: deadpool_redis::cluster::PoolError,
40    },
41    #[snafu(display("{source}"))]
42    SingleBuild { source: deadpool_redis::BuildError },
43    #[snafu(display("{source}"))]
44    ClusterBuild {
45        source: deadpool_redis::cluster::CreatePoolError,
46    },
47    #[snafu(display("category: {category}, {source}"))]
48    Redis {
49        category: String,
50        source: deadpool_redis::redis::RedisError,
51    },
52    #[snafu(display("{source}"))]
53    Compression { source: tibba_util::Error },
54    #[snafu(display("{source}"))]
55    SerdeJson { source: serde_json::Error },
56    #[snafu(display("category: {category}, {source}"))]
57    Url {
58        category: String,
59        source: url::ParseError,
60    },
61    #[snafu(display("category: {category}, {source}"))]
62    Validate {
63        category: String,
64        #[snafu(source(from(validator::ValidationErrors, Box::new)))]
65        source: Box<validator::ValidationErrors>,
66    },
67}
68
69type Result<T> = std::result::Result<T, Error>;
70
71// Redis 连接配置,含校验规则
72#[derive(Debug, Clone, Default, Validate)]
73pub struct RedisConfig {
74    // Redis 节点列表
75    #[validate(length(min = 1))]
76    pub nodes: Vec<String>,
77    // 连接池大小
78    pub pool_size: u32,
79    // 建立连接的超时时间
80    pub connection_timeout: Duration,
81    // 等待连接的超时时间
82    pub wait_timeout: Duration,
83    // 回收连接时的健康检测超时时间
84    pub recycle_timeout: Duration,
85    // 连接空闲超时时间
86    pub idle_timeout: Duration,
87    // 认证密码
88    pub password: Option<String>,
89    // 连接最大存活时间
90    pub max_conn_age: Duration,
91}
92
93fn default_pool_size() -> u32 {
94    10
95}
96
97#[derive(Deserialize, Debug, Clone)]
98struct RedisParams {
99    #[serde(default = "default_pool_size")]
100    pool_size: u32,
101    #[serde(default)]
102    #[serde(with = "humantime_serde")]
103    connection_timeout: Option<Duration>,
104    #[serde(default)]
105    #[serde(with = "humantime_serde")]
106    wait_timeout: Option<Duration>,
107    #[serde(default)]
108    #[serde(with = "humantime_serde")]
109    recycle_timeout: Option<Duration>,
110    #[serde(default)]
111    #[serde(with = "humantime_serde")]
112    max_conn_age: Option<Duration>,
113    #[serde(default)]
114    #[serde(with = "humantime_serde")]
115    idle_timeout: Option<Duration>,
116    password: Option<String>,
117}
118
119// 从配置中解析并构建 RedisConfig
120fn new_redis_config(config: &Config) -> Result<RedisConfig> {
121    let uri = config.get_string("uri").context(ConfigSnafu)?;
122    let parsed = parse_uri::<RedisParams>(&uri).context(ParseUriSnafu)?;
123    let nodes = parsed
124        .host_strings()
125        .iter()
126        .map(|item| format!("redis://{item}"))
127        .collect();
128    let query = parsed.query;
129    let redis_config = RedisConfig {
130        nodes,
131        pool_size: query.pool_size,
132        connection_timeout: query.connection_timeout.unwrap_or(Duration::from_secs(3)),
133        wait_timeout: query.wait_timeout.unwrap_or(Duration::from_secs(3)),
134        // 检测请求是否可用的超时时间,默认300ms
135        recycle_timeout: query.recycle_timeout.unwrap_or(Duration::from_millis(300)),
136        max_conn_age: query.max_conn_age.unwrap_or(Duration::from_secs(24 * 3600)),
137        // 由于pool本身没有idle timeout处理,因此现在的模块在复用前判断,需要根据redis server设置调整,默认10分钟
138        idle_timeout: query.idle_timeout.unwrap_or(Duration::from_secs(10 * 60)),
139        password: query.password,
140    };
141    redis_config
142        .validate()
143        .context(ValidateSnafu { category: "redis" })?;
144    Ok(redis_config)
145}
146
147impl From<Error> for BaseError {
148    fn from(val: Error) -> Self {
149        // 基础设施错误(Redis 不可达等)→ 500 + 异常标记
150        fn infra(err: BaseError) -> BaseError {
151            err.with_status(500).with_exception(true)
152        }
153        let err = match val {
154            Error::Config { source } => BaseError::new(*source).with_sub_category("config"),
155            Error::ParseUri { source } => BaseError::new(*source).with_sub_category("parse_uri"),
156            Error::SingleConnect { source } => {
157                infra(BaseError::new(source).with_sub_category("single_connect"))
158            }
159            Error::ClusterConnect { source } => {
160                infra(BaseError::new(source).with_sub_category("cluster_connect"))
161            }
162            Error::SingleBuild { source } => {
163                infra(BaseError::new(source).with_sub_category("single_build"))
164            }
165            Error::ClusterBuild { source } => {
166                infra(BaseError::new(source).with_sub_category("cluster_build"))
167            }
168            Error::Redis { category, source } => {
169                infra(BaseError::new(source).with_sub_category(&category))
170            }
171            Error::Compression { source } => BaseError::new(source)
172                .with_sub_category("compression")
173                .with_exception(true),
174            Error::SerdeJson { source } => BaseError::new(source)
175                .with_sub_category("serde_json")
176                .with_exception(true),
177            Error::Url { category, source } => {
178                infra(BaseError::new(source).with_sub_category(&category))
179            }
180            Error::Validate { category, source } => {
181                BaseError::new(*source).with_sub_category(&category)
182            }
183        };
184        err.with_category("cache")
185    }
186}
187
188/// 该 crate 所有日志事件的 tracing target。
189/// 可通过 `RUST_LOG=tibba:cache=info`(或 `debug`)进行过滤。
190pub(crate) const LOG_TARGET: &str = "tibba:cache";
191
192mod cache;
193mod pool;
194mod ttl_lru_store;
195mod two_level_store;
196
197pub use cache::*;
198pub use pool::*;
199pub use ttl_lru_store::*;
200pub use two_level_store::*;