1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#[path = "../tests/common.rs"]
mod common;
use common::{ConsulServer, ConsulServerHelper};
use consulrs::{
api::{
features::Blocking,
kv::requests::{ReadKeyRequest, SetKeyRequest},
Features,
},
kv, session,
};
use futures::future::*;
use serde::{Deserialize, Serialize};
use tokio::time::{sleep, Duration};
/// The data structure stored in the `lead` key. In this case only the node name
// is being stored so followers can see who the leader is - more advanced usage
// would put all relevant information here (i.e. the lead's IP address).
#[derive(Debug, Deserialize, Serialize)]
struct Node {
pub name: String,
}
fn main() {
// Setup a test environment. The `test.run()` method is responsible for
// bringing up a Consul Docker container to use in the example.
let test = common::new_test();
test.run(|instance| async move {
// A wrapper around the Docker container to make interacting with it
// easier.
let server: ConsulServer = instance.server();
// Create three futures, each acting as a node to represent multiple
// services in a micro-service architecture. Each node will attempt to
// become the leader when it initializes.
let a = (1..4)
.map(|i| {
let server = &server;
async move {
let node = Node {
name: format!("Node {}", i),
};
println!("{}: starting up", node.name);
// The `server.client()` method is just a simple wrapper
// that returns a `ConsulClient` already configured to talk
// to our Docker container.
let client = server.client();
// A session is roughly equivalent to a lock. Each node
// must have its own lock in order to uniquely hold the
// `leader` key.
let session = session::create(&client, None).await.unwrap().response.id;
println!("{}: created session {}", node.name, session);
// Here is where election happens. By specifying the
// `acquire` parameter with our write request we ask Consul
// to attempt to lock this key for our unique session. The
// actual content we're writing is the `Node` serialized
// into a JSON string.
let res = kv::set_json(
&client,
"lead",
&node,
Some(SetKeyRequest::builder().acquire(&session)),
)
.await
.unwrap();
// Consul returns `true` is the lock was successfully
// acquired and `false` if the key was already locked.
let is_lead = res.response;
if is_lead {
println!("{}: elected the leader", node.name);
} else {
println!("{}: following", node.name);
}
// We can confirm who the leader is by querying the `lead`
// key.
let res = kv::read_json::<Node, _>(&client, "lead", None)
.await
.unwrap();
println!(
"{}: the current leader is {}",
node.name, res.response.value.name
);
// To simulate a dropped service the leader will now drop
// its lock.
if is_lead {
sleep(Duration::from_secs(2)).await;
println!("{}: dropping lead", node.name);
// Releasing the lock is as simple as writing to the key
// and specifying the `release` parameter with the
// session.
kv::set(
&client,
"lead",
b"",
Some(SetKeyRequest::builder().release(&session)),
)
.await
.unwrap();
println!("{}: dropped!", node.name);
} else {
// All nodes should be watching the `lead` key for
// changes in order to determine if a new election is
// needed. This includes the leader, but for the sake of
// this example, only the followers will watch the key.
println!("{} is watching for changes...", node.name);
// Watching is done through using the blocking feature
// of the KV endpoint. When the key changes the index
// value also changes. Below we pass the index we got
// from our last read which will cause the HTP request
// to hang until either a new index is created (because
// something happened to the key) or our timeout of five
// seconds is reached.
kv::read(
&client,
"lead",
Some(
ReadKeyRequest::builder().features(
Features::builder()
.blocking(Blocking {
index: res.index.unwrap().parse::<u64>().unwrap(),
wait: Some("5s".into()),
})
.build()
.unwrap(),
),
),
)
.await
.unwrap();
// In our example we can assume that if we reached this
// point the leader has dropped its lock. Real use-cases
// would have all of this logic on loop.
println!("{}: attempting to become leader", node.name);
let res = kv::set_json(
&client,
"lead",
&node,
Some(SetKeyRequest::builder().acquire(session)),
)
.await
.unwrap();
let is_lead = res.response;
if is_lead {
println!("{}: elected the leader", node.name);
} else {
println!("{}: following", node.name);
}
}
}
})
.collect::<Vec<_>>();
join_all(a).await;
});
}