1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use crate::{
resp::{Command, Value},
sleep, Config, Error, Result, RoleResult, SentinelCommands, SentinelConfig,
ServerCommands, StandaloneConnection, RetryReason,
};
use log::debug;
pub struct SentinelConnection {
pub inner_connection: StandaloneConnection,
}
impl SentinelConnection {
pub async fn write_batch(&mut self, commands: impl Iterator<Item = &Command>, retry_reasons: &[RetryReason]) -> Result<()> {
self.inner_connection.write_batch(commands, retry_reasons).await
}
pub async fn read(&mut self) -> Option<Result<Value>> {
self.inner_connection.read().await
}
pub async fn reconnect(&mut self) -> Result<()> {
self.inner_connection.reconnect().await
}
/// Follow `Redis service discovery via Sentinel` documentation
/// #See <https://redis.io/docs/reference/sentinel-clients/#redis-service-discovery-via-sentinel>
///
/// # Remark
/// this function must be desugared because of async recursion:
/// <https://doc.rust-lang.org/error-index.html#E0733>
pub async fn connect(
sentinel_config: &SentinelConfig,
config: &Config,
) -> Result<SentinelConnection> {
let mut restart = false;
let mut unreachable_sentinel = true;
loop {
for sentinel_instance in &sentinel_config.instances {
// Step 1: connecting to Sentinel
let (host, port) = sentinel_instance;
match StandaloneConnection::connect(host, *port, config).await {
Ok(mut sentinel_connection) => {
// Step 2: ask for master address
let result: Result<Option<(String, u16)>> = sentinel_connection
.sentinel_get_master_addr_by_name(sentinel_config.service_name.clone())
.await;
match result {
Ok(result) => {
match result {
Some((master_host, master_port)) => {
// Step 3: call the ROLE command in the target instance
let mut master_connection = StandaloneConnection::connect(
&master_host,
master_port,
config,
)
.await?;
let role: RoleResult = master_connection.role().await?;
if let RoleResult::Master {
master_replication_offset: _,
replica_infos: _,
} = role
{
return Ok(SentinelConnection {
inner_connection: master_connection,
});
} else {
sleep(sentinel_config.wait_beetween_failures).await;
// restart from the beginning
restart = true;
break;
}
}
None => {
debug!(
"Sentinel {}:{} does not know master `{}`",
*host, *port, sentinel_config.service_name
);
unreachable_sentinel = false;
continue;
}
}
}
Err(e) => {
debug!("Cannot execute command `SENTINEL get-master-addr-by-name` with Sentinel {}:{}: {}", *host, *port, e);
continue;
}
}
}
Err(e) => {
debug!("Cannot connect to Sentinel {}:{} : {}", *host, *port, e);
continue;
}
}
}
if !restart {
break;
} else {
restart = false;
}
}
if unreachable_sentinel {
Err(Error::Sentinel(
"All Sentinel instances are unreachable".to_owned(),
))
} else {
Err(Error::Sentinel(format!(
"master {} is unknown by all Sentinel instances",
sentinel_config.service_name
)))
}
}
}