1use serde::Deserialize;
16use snafu::{ResultExt, Snafu};
17use std::time::Duration;
18use tibba_config::Config;
19use tibba_error::Error as BaseError;
20use tibba_util::parse_uri;
21use validator::Validate;
22
23#[derive(Debug, Snafu)]
24pub enum Error {
25 #[snafu(display("config error: {source}"))]
26 Config {
27 #[snafu(source(from(tibba_config::Error, Box::new)))]
28 source: Box<tibba_config::Error>,
29 },
30 #[snafu(display("parse uri error: {source}"))]
31 ParseUri {
32 #[snafu(source(from(tibba_util::Error, Box::new)))]
33 source: Box<tibba_util::Error>,
34 },
35 #[snafu(display("single connect error: {source}"))]
36 SingleConnect { source: deadpool_redis::PoolError },
37 #[snafu(display("cluster connect error: {source}"))]
38 ClusterConnect {
39 source: deadpool_redis::cluster::PoolError,
40 },
41 #[snafu(display("{source}"))]
42 SingleBuild { source: deadpool_redis::BuildError },
43 #[snafu(display("{source}"))]
44 ClusterBuild {
45 source: deadpool_redis::cluster::CreatePoolError,
46 },
47 #[snafu(display("category: {category}, {source}"))]
48 Redis {
49 category: String,
50 source: deadpool_redis::redis::RedisError,
51 },
52 #[snafu(display("{source}"))]
53 Compression { source: tibba_util::Error },
54 #[snafu(display("{source}"))]
55 SerdeJson { source: serde_json::Error },
56 #[snafu(display("category: {category}, {source}"))]
57 Url {
58 category: String,
59 source: url::ParseError,
60 },
61 #[snafu(display("category: {category}, {source}"))]
62 Validate {
63 category: String,
64 #[snafu(source(from(validator::ValidationErrors, Box::new)))]
65 source: Box<validator::ValidationErrors>,
66 },
67}
68
69type Result<T> = std::result::Result<T, Error>;
70
71#[derive(Debug, Clone, Default, Validate)]
73pub struct RedisConfig {
74 #[validate(length(min = 1))]
76 pub nodes: Vec<String>,
77 pub pool_size: u32,
79 pub connection_timeout: Duration,
81 pub wait_timeout: Duration,
83 pub recycle_timeout: Duration,
85 pub idle_timeout: Duration,
87 pub password: Option<String>,
89 pub max_conn_age: Duration,
91}
92
93fn default_pool_size() -> u32 {
94 10
95}
96
97#[derive(Deserialize, Debug, Clone)]
98struct RedisParams {
99 #[serde(default = "default_pool_size")]
100 pool_size: u32,
101 #[serde(default)]
102 #[serde(with = "humantime_serde")]
103 connection_timeout: Option<Duration>,
104 #[serde(default)]
105 #[serde(with = "humantime_serde")]
106 wait_timeout: Option<Duration>,
107 #[serde(default)]
108 #[serde(with = "humantime_serde")]
109 recycle_timeout: Option<Duration>,
110 #[serde(default)]
111 #[serde(with = "humantime_serde")]
112 max_conn_age: Option<Duration>,
113 #[serde(default)]
114 #[serde(with = "humantime_serde")]
115 idle_timeout: Option<Duration>,
116 password: Option<String>,
117}
118
119fn new_redis_config(config: &Config) -> Result<RedisConfig> {
121 let uri = config.get_string("uri").context(ConfigSnafu)?;
122 let parsed = parse_uri::<RedisParams>(&uri).context(ParseUriSnafu)?;
123 let nodes = parsed
124 .host_strings()
125 .iter()
126 .map(|item| format!("redis://{item}"))
127 .collect();
128 let query = parsed.query;
129 let redis_config = RedisConfig {
130 nodes,
131 pool_size: query.pool_size,
132 connection_timeout: query.connection_timeout.unwrap_or(Duration::from_secs(3)),
133 wait_timeout: query.wait_timeout.unwrap_or(Duration::from_secs(3)),
134 recycle_timeout: query.recycle_timeout.unwrap_or(Duration::from_millis(300)),
136 max_conn_age: query.max_conn_age.unwrap_or(Duration::from_secs(24 * 3600)),
137 idle_timeout: query.idle_timeout.unwrap_or(Duration::from_secs(10 * 60)),
139 password: query.password,
140 };
141 redis_config
142 .validate()
143 .context(ValidateSnafu { category: "redis" })?;
144 Ok(redis_config)
145}
146
147impl From<Error> for BaseError {
148 fn from(val: Error) -> Self {
149 fn infra(err: BaseError) -> BaseError {
151 err.with_status(500).with_exception(true)
152 }
153 let err = match val {
154 Error::Config { source } => BaseError::new(*source).with_sub_category("config"),
155 Error::ParseUri { source } => BaseError::new(*source).with_sub_category("parse_uri"),
156 Error::SingleConnect { source } => {
157 infra(BaseError::new(source).with_sub_category("single_connect"))
158 }
159 Error::ClusterConnect { source } => {
160 infra(BaseError::new(source).with_sub_category("cluster_connect"))
161 }
162 Error::SingleBuild { source } => {
163 infra(BaseError::new(source).with_sub_category("single_build"))
164 }
165 Error::ClusterBuild { source } => {
166 infra(BaseError::new(source).with_sub_category("cluster_build"))
167 }
168 Error::Redis { category, source } => {
169 infra(BaseError::new(source).with_sub_category(&category))
170 }
171 Error::Compression { source } => BaseError::new(source)
172 .with_sub_category("compression")
173 .with_exception(true),
174 Error::SerdeJson { source } => BaseError::new(source)
175 .with_sub_category("serde_json")
176 .with_exception(true),
177 Error::Url { category, source } => {
178 infra(BaseError::new(source).with_sub_category(&category))
179 }
180 Error::Validate { category, source } => {
181 BaseError::new(*source).with_sub_category(&category)
182 }
183 };
184 err.with_category("cache")
185 }
186}
187
188pub(crate) const LOG_TARGET: &str = "tibba:cache";
191
192mod cache;
193mod pool;
194mod ttl_lru_store;
195mod two_level_store;
196
197pub use cache::*;
198pub use pool::*;
199pub use ttl_lru_store::*;
200pub use two_level_store::*;