1use net2::TcpBuilder;
2use std::net::SocketAddr;
3use tokio::net::TcpListener;
4use tokio::net::TcpStream;
5
6pub use failure::Error;
7
8use std::collections::{BTreeMap, BTreeSet};
9use std::env;
10use std::fs::File;
11use std::io::Read;
12use std::num;
13use std::path::Path;
14
15pub mod meta;
16
17pub const ENV_ASTER_DEFAULT_THREADS: &str = "ASTER_DEFAULT_THREAD";
18
19#[derive(Debug, Fail)]
20pub enum AsError {
21 #[fail(display = "config is bad for fields {}", _0)]
22 BadConfig(String),
23 #[fail(display = "fail to parse int in config")]
24 StrParseIntError(num::ParseIntError),
25
26 #[fail(display = "invalid message")]
27 BadMessage,
28
29 #[fail(display = "message is ok but request bad or not allowed")]
30 BadReqeust,
31
32 #[fail(display = "request not supported")]
33 RequestNotSupport,
34
35 #[fail(display = "inline request don't support multi keys")]
36 RequestInlineWithMultiKeys,
37
38 #[fail(display = "message reply is bad")]
39 BadReply,
40
41 #[fail(display = "proxy fail")]
42 ProxyFail,
43
44 #[fail(display = "fail due retry send, reached limit")]
45 RequestReachMaxCycle,
46
47 #[fail(display = "fail to parse integer {}", _0)]
48 ParseIntError(btoi::ParseIntegerError),
49
50 #[fail(display = "CLUSTER SLOTS must be replied with array")]
51 WrongClusterSlotsReplyType,
52
53 #[fail(display = "CLUSTER SLOTS must contains slot info")]
54 WrongClusterSlotsReplySlot,
55
56 #[fail(display = "cluster fail to proxy command")]
57 ClusterFailDispatch,
58
59 #[fail(display = "unexpected io error {}", _0)]
60 IoError(tokio::io::Error), #[fail(display = "remote connection has been active closed: {}", _0)]
63 BackendClosedError(String),
64
65 #[fail(display = "fail to redirect command")]
66 RedirectFailError,
67
68 #[fail(display = "fail to init cluster {} due to all seed nodes is die", _0)]
69 ClusterAllSeedsDie(String),
70
71 #[fail(display = "fail to load config toml error {}", _0)]
72 ConfigError(toml::de::Error), #[fail(display = "fail to load system info")]
75 SystemError,
76
77 #[fail(display = "there is nothing happening")]
78 None,
79
80 #[fail(display = "{}", exclusive)]
81 RetryRandom { exclusive: String },
82}
83
84impl PartialEq for AsError {
85 fn eq(&self, other: &AsError) -> bool {
86 match (self, other) {
87 (Self::None, Self::None) => true,
88 (Self::BadMessage, Self::BadMessage) => true,
89 (Self::BadReqeust, Self::BadReqeust) => true,
90 (Self::RequestNotSupport, Self::RequestNotSupport) => true,
91 (Self::RequestInlineWithMultiKeys, Self::RequestInlineWithMultiKeys) => true,
92 (Self::BadReply, Self::BadReply) => true,
93 (Self::ProxyFail, Self::ProxyFail) => true,
94 (Self::RequestReachMaxCycle, Self::RequestReachMaxCycle) => true,
95 (Self::ParseIntError(inner), Self::ParseIntError(other_inner)) => inner == other_inner,
96 (Self::WrongClusterSlotsReplyType, Self::WrongClusterSlotsReplyType) => true,
97 (Self::WrongClusterSlotsReplySlot, Self::WrongClusterSlotsReplySlot) => true,
98 (Self::ClusterFailDispatch, Self::ClusterFailDispatch) => true,
99 (Self::RedirectFailError, Self::RedirectFailError) => true,
100 (Self::BackendClosedError(inner), Self::BackendClosedError(other_inner)) => {
101 inner == other_inner
102 }
103 (Self::StrParseIntError(inner), Self::StrParseIntError(other_inner)) => {
104 inner == other_inner
105 }
106 (Self::ClusterAllSeedsDie(inner), Self::ClusterAllSeedsDie(other_inner)) => {
107 inner == other_inner
108 }
109
110 (Self::IoError(inner), Self::IoError(other_inner)) => {
111 inner.kind() == other_inner.kind()
112 }
113 (Self::ConfigError(_), Self::ConfigError(_)) => true,
114 (Self::SystemError, Self::SystemError) => true,
115 (
116 Self::RetryRandom { exclusive: ex },
117 Self::RetryRandom {
118 exclusive: other_ex,
119 },
120 ) => ex == other_ex,
121 _ => false,
122 }
123 }
124}
125
126impl From<tokio::io::Error> for AsError {
127 fn from(oe: tokio::io::Error) -> AsError {
128 AsError::IoError(oe)
129 }
130}
131
132impl From<btoi::ParseIntegerError> for AsError {
133 fn from(oe: btoi::ParseIntegerError) -> AsError {
134 AsError::ParseIntError(oe)
135 }
136}
137
138impl From<toml::de::Error> for AsError {
139 fn from(oe: toml::de::Error) -> AsError {
140 AsError::ConfigError(oe)
141 }
142}
143
144impl From<num::ParseIntError> for AsError {
145 fn from(oe: num::ParseIntError) -> AsError {
146 AsError::StrParseIntError(oe)
147 }
148}
149
150#[derive(Deserialize, Debug, Clone)]
151pub struct Config {
152 #[serde(default)]
153 pub clusters: Vec<ClusterConfig>,
154}
155
156impl Config {
157 pub fn cluster(&self, name: &str) -> Option<ClusterConfig> {
158 for cluster in &self.clusters {
159 if cluster.name == name {
160 return Some(cluster.clone());
161 }
162 }
163 None
164 }
165
166 fn servers_map(&self) -> BTreeMap<String, BTreeSet<String>> {
167 self.clusters
168 .iter()
169 .map(|x| {
170 (
171 x.name.clone(),
172 x.servers.iter().map(|x| x.to_string()).collect(),
173 )
174 })
175 .collect()
176 }
177
178 pub fn reload_equals(&self, other: &Config) -> bool {
179 let equals_map = self.servers_map();
180 let others_map = other.servers_map();
181 equals_map == others_map
182 }
183
184 pub fn valid(&self) -> Result<(), AsError> {
185 Ok(())
186 }
187
188 pub fn load<P: AsRef<Path>>(p: P) -> Result<Config, AsError> {
189 let path = p.as_ref();
190 let mut data = String::new();
191 let mut fd = File::open(path)?;
192 fd.read_to_string(&mut data)?;
193 let mut cfg: Config = toml::from_str(&data)?;
194 let thread = Config::load_thread_from_env();
195 for cluster in &mut cfg.clusters[..] {
196 if cluster.thread.is_none() {
197 cluster.thread = Some(thread);
198 }
199 }
200 Ok(cfg)
201 }
202
203 fn load_thread_from_env() -> usize {
204 let thread_str = env::var(ENV_ASTER_DEFAULT_THREADS).unwrap_or("4".to_string());
205 let thread = thread_str.parse::<usize>().unwrap_or(4);
206 thread
207 }
208}
209
210#[derive(Deserialize, Debug, Clone, Copy)]
211pub enum CacheType {
212 #[serde(rename = "redis")]
213 Redis,
214 #[serde(rename = "memcache")]
215 Memcache,
216 #[serde(rename = "memcache_binary")]
217 MemcacheBinary,
218 #[serde(rename = "redis_cluster")]
219 RedisCluster,
220}
221
222impl Default for CacheType {
223 fn default() -> CacheType {
224 CacheType::RedisCluster
225 }
226}
227
228#[derive(Clone, Debug, Deserialize, Default)]
229pub struct ClusterConfig {
230 pub name: String,
231 pub listen_addr: String,
232 pub hash_tag: Option<String>,
233
234 pub thread: Option<usize>,
235 pub cache_type: CacheType,
236
237 pub read_timeout: Option<u64>,
238 pub write_timeout: Option<u64>,
239
240 #[serde(default)]
241 pub servers: Vec<String>,
242
243 pub fetch_interval: Option<u64>,
245 pub read_from_slave: Option<bool>,
246
247 pub ping_fail_limit: Option<u8>,
249 pub ping_interval: Option<u64>,
250 pub ping_succ_interval: Option<u64>,
251
252 pub dial_timeout: Option<u64>,
256 pub listen_proto: Option<String>,
258
259 pub node_connections: Option<usize>,
261}
262
263#[cfg(windows)]
264pub(crate) fn create_reuse_port_listener(addr: &SocketAddr) -> Result<TcpListener, std::io::Error> {
265 let builder = TcpBuilder::new_v4()?;
266 let std_listener = builder
267 .reuse_address(true)
268 .expect("os not support SO_REUSEADDR")
269 .bind(addr)?
270 .listen(std::i32::MAX)?;
271 let hd = tokio::reactor::Handle::current();
272 TcpListener::from_std(std_listener, &hd)
273}
274
275#[cfg(not(windows))]
276pub(crate) fn create_reuse_port_listener(addr: &SocketAddr) -> Result<TcpListener, std::io::Error> {
277 use net2::unix::UnixTcpBuilderExt;
278
279 let builder = TcpBuilder::new_v4()?;
280 let std_listener = builder
281 .reuse_address(true)
282 .expect("os not support SO_REUSEADDR")
283 .reuse_port(true)
284 .expect("os not support SO_REUSEPORT")
285 .bind(addr)?
286 .listen(std::i32::MAX)?;
287 let hd = tokio::reactor::Handle::default();
288 TcpListener::from_std(std_listener, &hd)
289}
290
291#[cfg(not(linux))]
292#[inline]
293pub fn set_read_write_timeout(
294 sock: TcpStream,
295 _rt: Option<u64>,
296 _wt: Option<u64>,
297) -> Result<TcpStream, AsError> {
298 Ok(sock)
299}
300
301#[cfg(linux)]
302#[inline]
303pub fn set_read_write_timeout(
304 mut sock: TcpStream,
305 rt: Option<u64>,
306 wt: Option<u64>,
307) -> Result<TcpStream, AsError> {
308 use std::os::unix::AsRawFd;
309 use std::os::unix::FromRawFd;
310 use std::time::Duration;
311
312 let nrt = rt.map(Duration::from_millis);
313 let nwt = wt.map(Duration::from_millis);
314 let fd = sock.as_raw_fd();
315 let mut nsock = unsafe { std::net::TcpStream::from_raw_fd(fd) };
316 nsock.set_read_timeout(nrt)?;
317 nsock.set_write_timeout(nwt)?;
318 let hd = tokio::reactor::Handle::default();
319 let stream = TcpStream::from_std(nsock, &hd)?;
320 return Ok(stream);
321}