Skip to main content

libaster/
com.rs

1use net2::TcpBuilder;
2use std::net::SocketAddr;
3use tokio::net::TcpListener;
4use tokio::net::TcpStream;
5
6pub use failure::Error;
7
8use std::collections::{BTreeMap, BTreeSet};
9use std::env;
10use std::fs::File;
11use std::io::Read;
12use std::num;
13use std::path::Path;
14
15pub mod meta;
16
17pub const ENV_ASTER_DEFAULT_THREADS: &str = "ASTER_DEFAULT_THREAD";
18
19#[derive(Debug, Fail)]
20pub enum AsError {
21    #[fail(display = "config is bad for fields {}", _0)]
22    BadConfig(String),
23    #[fail(display = "fail to parse int in config")]
24    StrParseIntError(num::ParseIntError),
25
26    #[fail(display = "invalid message")]
27    BadMessage,
28
29    #[fail(display = "message is ok but request bad or not allowed")]
30    BadReqeust,
31
32    #[fail(display = "request not supported")]
33    RequestNotSupport,
34
35    #[fail(display = "inline request don't support multi keys")]
36    RequestInlineWithMultiKeys,
37
38    #[fail(display = "message reply is bad")]
39    BadReply,
40
41    #[fail(display = "proxy fail")]
42    ProxyFail,
43
44    #[fail(display = "fail due retry send, reached limit")]
45    RequestReachMaxCycle,
46
47    #[fail(display = "fail to parse integer {}", _0)]
48    ParseIntError(btoi::ParseIntegerError),
49
50    #[fail(display = "CLUSTER SLOTS must be replied with array")]
51    WrongClusterSlotsReplyType,
52
53    #[fail(display = "CLUSTER SLOTS must contains slot info")]
54    WrongClusterSlotsReplySlot,
55
56    #[fail(display = "cluster fail to proxy command")]
57    ClusterFailDispatch,
58
59    #[fail(display = "unexpected io error {}", _0)]
60    IoError(tokio::io::Error), // io_error
61
62    #[fail(display = "remote connection has been active closed: {}", _0)]
63    BackendClosedError(String),
64
65    #[fail(display = "fail to redirect command")]
66    RedirectFailError,
67
68    #[fail(display = "fail to init cluster {} due to all seed nodes is die", _0)]
69    ClusterAllSeedsDie(String),
70
71    #[fail(display = "fail to load config toml error {}", _0)]
72    ConfigError(toml::de::Error), // de error
73
74    #[fail(display = "fail to load system info")]
75    SystemError,
76
77    #[fail(display = "there is nothing happening")]
78    None,
79
80    #[fail(display = "{}", exclusive)]
81    RetryRandom { exclusive: String },
82}
83
84impl PartialEq for AsError {
85    fn eq(&self, other: &AsError) -> bool {
86        match (self, other) {
87            (Self::None, Self::None) => true,
88            (Self::BadMessage, Self::BadMessage) => true,
89            (Self::BadReqeust, Self::BadReqeust) => true,
90            (Self::RequestNotSupport, Self::RequestNotSupport) => true,
91            (Self::RequestInlineWithMultiKeys, Self::RequestInlineWithMultiKeys) => true,
92            (Self::BadReply, Self::BadReply) => true,
93            (Self::ProxyFail, Self::ProxyFail) => true,
94            (Self::RequestReachMaxCycle, Self::RequestReachMaxCycle) => true,
95            (Self::ParseIntError(inner), Self::ParseIntError(other_inner)) => inner == other_inner,
96            (Self::WrongClusterSlotsReplyType, Self::WrongClusterSlotsReplyType) => true,
97            (Self::WrongClusterSlotsReplySlot, Self::WrongClusterSlotsReplySlot) => true,
98            (Self::ClusterFailDispatch, Self::ClusterFailDispatch) => true,
99            (Self::RedirectFailError, Self::RedirectFailError) => true,
100            (Self::BackendClosedError(inner), Self::BackendClosedError(other_inner)) => {
101                inner == other_inner
102            }
103            (Self::StrParseIntError(inner), Self::StrParseIntError(other_inner)) => {
104                inner == other_inner
105            }
106            (Self::ClusterAllSeedsDie(inner), Self::ClusterAllSeedsDie(other_inner)) => {
107                inner == other_inner
108            }
109
110            (Self::IoError(inner), Self::IoError(other_inner)) => {
111                inner.kind() == other_inner.kind()
112            }
113            (Self::ConfigError(_), Self::ConfigError(_)) => true,
114            (Self::SystemError, Self::SystemError) => true,
115            (
116                Self::RetryRandom { exclusive: ex },
117                Self::RetryRandom {
118                    exclusive: other_ex,
119                },
120            ) => ex == other_ex,
121            _ => false,
122        }
123    }
124}
125
126impl From<tokio::io::Error> for AsError {
127    fn from(oe: tokio::io::Error) -> AsError {
128        AsError::IoError(oe)
129    }
130}
131
132impl From<btoi::ParseIntegerError> for AsError {
133    fn from(oe: btoi::ParseIntegerError) -> AsError {
134        AsError::ParseIntError(oe)
135    }
136}
137
138impl From<toml::de::Error> for AsError {
139    fn from(oe: toml::de::Error) -> AsError {
140        AsError::ConfigError(oe)
141    }
142}
143
144impl From<num::ParseIntError> for AsError {
145    fn from(oe: num::ParseIntError) -> AsError {
146        AsError::StrParseIntError(oe)
147    }
148}
149
150#[derive(Deserialize, Debug, Clone)]
151pub struct Config {
152    #[serde(default)]
153    pub clusters: Vec<ClusterConfig>,
154}
155
156impl Config {
157    pub fn cluster(&self, name: &str) -> Option<ClusterConfig> {
158        for cluster in &self.clusters {
159            if cluster.name == name {
160                return Some(cluster.clone());
161            }
162        }
163        None
164    }
165
166    fn servers_map(&self) -> BTreeMap<String, BTreeSet<String>> {
167        self.clusters
168            .iter()
169            .map(|x| {
170                (
171                    x.name.clone(),
172                    x.servers.iter().map(|x| x.to_string()).collect(),
173                )
174            })
175            .collect()
176    }
177
178    pub fn reload_equals(&self, other: &Config) -> bool {
179        let equals_map = self.servers_map();
180        let others_map = other.servers_map();
181        equals_map == others_map
182    }
183
184    pub fn valid(&self) -> Result<(), AsError> {
185        Ok(())
186    }
187
188    pub fn load<P: AsRef<Path>>(p: P) -> Result<Config, AsError> {
189        let path = p.as_ref();
190        let mut data = String::new();
191        let mut fd = File::open(path)?;
192        fd.read_to_string(&mut data)?;
193        let mut cfg: Config = toml::from_str(&data)?;
194        let thread = Config::load_thread_from_env();
195        for cluster in &mut cfg.clusters[..] {
196            if cluster.thread.is_none() {
197                cluster.thread = Some(thread);
198            }
199        }
200        Ok(cfg)
201    }
202
203    fn load_thread_from_env() -> usize {
204        let thread_str = env::var(ENV_ASTER_DEFAULT_THREADS).unwrap_or("4".to_string());
205        let thread = thread_str.parse::<usize>().unwrap_or(4);
206        thread
207    }
208}
209
210#[derive(Deserialize, Debug, Clone, Copy)]
211pub enum CacheType {
212    #[serde(rename = "redis")]
213    Redis,
214    #[serde(rename = "memcache")]
215    Memcache,
216    #[serde(rename = "memcache_binary")]
217    MemcacheBinary,
218    #[serde(rename = "redis_cluster")]
219    RedisCluster,
220}
221
222impl Default for CacheType {
223    fn default() -> CacheType {
224        CacheType::RedisCluster
225    }
226}
227
228#[derive(Clone, Debug, Deserialize, Default)]
229pub struct ClusterConfig {
230    pub name: String,
231    pub listen_addr: String,
232    pub hash_tag: Option<String>,
233
234    pub thread: Option<usize>,
235    pub cache_type: CacheType,
236
237    pub read_timeout: Option<u64>,
238    pub write_timeout: Option<u64>,
239
240    #[serde(default)]
241    pub servers: Vec<String>,
242
243    // cluster special
244    pub fetch_interval: Option<u64>,
245    pub read_from_slave: Option<bool>,
246
247    // proxy special
248    pub ping_fail_limit: Option<u8>,
249    pub ping_interval: Option<u64>,
250    pub ping_succ_interval: Option<u64>,
251
252    // dead codes
253
254    // command not support now
255    pub dial_timeout: Option<u64>,
256    // dead option: not support other proto
257    pub listen_proto: Option<String>,
258
259    // dead option: always 1
260    pub node_connections: Option<usize>,
261}
262
263#[cfg(windows)]
264pub(crate) fn create_reuse_port_listener(addr: &SocketAddr) -> Result<TcpListener, std::io::Error> {
265    let builder = TcpBuilder::new_v4()?;
266    let std_listener = builder
267        .reuse_address(true)
268        .expect("os not support SO_REUSEADDR")
269        .bind(addr)?
270        .listen(std::i32::MAX)?;
271    let hd = tokio::reactor::Handle::current();
272    TcpListener::from_std(std_listener, &hd)
273}
274
275#[cfg(not(windows))]
276pub(crate) fn create_reuse_port_listener(addr: &SocketAddr) -> Result<TcpListener, std::io::Error> {
277    use net2::unix::UnixTcpBuilderExt;
278
279    let builder = TcpBuilder::new_v4()?;
280    let std_listener = builder
281        .reuse_address(true)
282        .expect("os not support SO_REUSEADDR")
283        .reuse_port(true)
284        .expect("os not support SO_REUSEPORT")
285        .bind(addr)?
286        .listen(std::i32::MAX)?;
287    let hd = tokio::reactor::Handle::default();
288    TcpListener::from_std(std_listener, &hd)
289}
290
291#[cfg(not(linux))]
292#[inline]
293pub fn set_read_write_timeout(
294    sock: TcpStream,
295    _rt: Option<u64>,
296    _wt: Option<u64>,
297) -> Result<TcpStream, AsError> {
298    Ok(sock)
299}
300
301#[cfg(linux)]
302#[inline]
303pub fn set_read_write_timeout(
304    mut sock: TcpStream,
305    rt: Option<u64>,
306    wt: Option<u64>,
307) -> Result<TcpStream, AsError> {
308    use std::os::unix::AsRawFd;
309    use std::os::unix::FromRawFd;
310    use std::time::Duration;
311
312    let nrt = rt.map(Duration::from_millis);
313    let nwt = wt.map(Duration::from_millis);
314    let fd = sock.as_raw_fd();
315    let mut nsock = unsafe { std::net::TcpStream::from_raw_fd(fd) };
316    nsock.set_read_timeout(nrt)?;
317    nsock.set_write_timeout(nwt)?;
318    let hd = tokio::reactor::Handle::default();
319    let stream = TcpStream::from_std(nsock, &hd)?;
320    return Ok(stream);
321}