1use serde::Deserialize;
16use snafu::Snafu;
17use std::time::Duration;
18use tibba_config::Config;
19use tibba_error::Error as BaseError;
20use tibba_util::parse_multi_host_uri;
21use validator::Validate;
22
23#[derive(Debug, Snafu)]
24pub enum Error {
25 #[snafu(display("category: {category}, {message}"))]
26 Common { category: String, message: String },
27 #[snafu(display("{source}"))]
28 SingleBuild { source: deadpool_redis::BuildError },
29 #[snafu(display("{source}"))]
30 ClusterBuild {
31 source: deadpool_redis::cluster::CreatePoolError,
32 },
33 #[snafu(display("category: {category}, {source}"))]
34 Redis {
35 category: String,
36 source: deadpool_redis::redis::RedisError,
37 },
38 #[snafu(display("{source}"))]
39 Compression { source: tibba_util::Error },
40 #[snafu(display("category: {category}, {source}"))]
41 Url {
42 category: String,
43 source: url::ParseError,
44 },
45 #[snafu(display("category: {category}, {source}"))]
46 Validate {
47 category: String,
48 source: validator::ValidationErrors,
49 },
50}
51
52type Result<T> = std::result::Result<T, Error>;
53
54#[derive(Debug, Clone, Default, Validate)]
57pub struct RedisConfig {
58 #[validate(length(min = 1))]
60 pub nodes: Vec<String>,
61 pub pool_size: u32,
63 pub connection_timeout: Duration,
65 pub wait_timeout: Duration,
67 pub recycle_timeout: Duration,
69 pub password: Option<String>,
71}
72
73fn default_pool_size() -> u32 {
74 10
75}
76
77#[derive(Deserialize, Debug, Clone)]
78struct RedisParams {
79 #[serde(default = "default_pool_size")]
80 pool_size: u32,
81 #[serde(default)]
82 #[serde(with = "humantime_serde")]
83 connection_timeout: Option<Duration>,
84 #[serde(default)]
85 #[serde(with = "humantime_serde")]
86 wait_timeout: Option<Duration>,
87 #[serde(default)]
88 #[serde(with = "humantime_serde")]
89 recycle_timeout: Option<Duration>,
90 password: Option<String>,
91}
92
93fn new_redis_config(config: &Config) -> Result<RedisConfig> {
96 let uri = config.get_str("uri", "");
97 let parsed = parse_multi_host_uri::<RedisParams>(&uri).map_err(|e| Error::Common {
98 category: "redis".to_string(),
99 message: e.to_string(),
100 })?;
101 let nodes = parsed
102 .host_strings()
103 .iter()
104 .map(|item| format!("redis://{item}"))
105 .collect();
106 let query = parsed.query;
107 let redis_config = RedisConfig {
108 nodes,
109 pool_size: query.pool_size,
110 connection_timeout: query.connection_timeout.unwrap_or(Duration::from_secs(3)),
111 wait_timeout: query.wait_timeout.unwrap_or(Duration::from_secs(3)),
112 recycle_timeout: query.recycle_timeout.unwrap_or(Duration::from_secs(60)),
113 password: query.password,
114 };
115 redis_config.validate().map_err(|e| Error::Validate {
116 category: "redis".to_string(),
117 source: e,
118 })?;
119 Ok(redis_config)
120}
121
122impl From<Error> for BaseError {
123 fn from(val: Error) -> Self {
124 let err = match val {
125 Error::Common { category, message } => {
126 BaseError::new(message).with_sub_category(&category)
127 }
128 Error::SingleBuild { source } => BaseError::new(source)
129 .with_sub_category("single_build")
130 .with_status(500)
131 .with_exception(true),
132 Error::ClusterBuild { source } => BaseError::new(source)
133 .with_sub_category("cluster_build")
134 .with_status(500)
135 .with_exception(true),
136 Error::Redis { category, source } => BaseError::new(source)
137 .with_sub_category(&category)
138 .with_status(500)
139 .with_exception(true),
140 Error::Compression { source } => BaseError::new(source)
141 .with_sub_category("compression")
142 .with_exception(true),
143 Error::Url { category, source } => BaseError::new(source)
144 .with_sub_category(&category)
145 .with_status(500)
146 .with_exception(true),
147 Error::Validate { category, source } => {
148 BaseError::new(source).with_sub_category(&category)
149 }
150 };
151 err.with_category("cache")
152 }
153}
154
155mod cache;
156mod pool;
157mod ttl_lru_store;
158mod two_level_store;
159
160pub use cache::*;
161pub use pool::*;
162pub use ttl_lru_store::*;
163pub use two_level_store::*;