1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
//! KIP-853 reconfiguration coordinator: single-voter add/remove/update with safety guards.
use crabka_metadata::{Voter, VoterSet};
use crate::{NodeId, RaftError};
/// A request to add one voter. The candidate must already be a caught-up observer.
#[derive(Debug, Clone)]
pub struct AddVoter {
pub voter: Voter,
}
/// A request to remove one voter.
#[derive(Debug, Clone)]
pub struct RemoveVoter {
pub id: NodeId,
pub directory_id: uuid::Uuid,
}
/// A request to update one voter's endpoints / supported version range.
#[derive(Debug, Clone)]
pub struct UpdateVoter {
pub voter: Voter,
}
/// Outcome shared by all three operations.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReconfigOutcome {
Committed,
NotLeader { leader: Option<NodeId> },
}
/// The raft operations the coordinator needs. Implemented by `ControllerHandle`.
#[async_trait::async_trait]
pub trait ReconfigOps: Send + Sync {
fn current_voters(&self) -> VoterSet;
fn leader(&self) -> Option<NodeId>;
fn is_leader(&self) -> bool;
/// Highest log index the leader has; used for observer-lag checks.
fn leader_last_index(&self) -> u64;
/// Last replicated index for an observer/learner, if known.
fn observer_index(&self, id: NodeId) -> Option<u64>;
async fn add_learner(&self, id: NodeId, node: crate::Node) -> Result<(), RaftError>;
async fn change_membership(
&self,
ids: std::collections::BTreeSet<NodeId>,
) -> Result<(), RaftError>;
async fn submit_records(
&self,
records: Vec<crabka_metadata::MetadataRecord>,
) -> Result<(), RaftError>;
}
pub struct Coordinator<'a, O: ReconfigOps> {
ops: &'a O,
lock: &'a tokio::sync::Mutex<()>,
observer_lag_bound: u64,
}
impl<'a, O: ReconfigOps> Coordinator<'a, O> {
pub fn new(ops: &'a O, lock: &'a tokio::sync::Mutex<()>, observer_lag_bound: u64) -> Self {
Self {
ops,
lock,
observer_lag_bound,
}
}
/// Add a single voter. The candidate is first registered as a learner and
/// must be caught up within `observer_lag_bound` before promotion. On
/// success the new membership is committed and an authoritative
/// `V1Voters` record is written.
///
/// # Errors
///
/// - [`RaftError::ReconfigInProgress`] if another reconfiguration holds the lock.
/// - [`RaftError::VoterNotCaughtUp`] if the candidate observer lags too far.
/// - Any error surfaced by the underlying raft operations.
pub async fn add_voter(&self, req: AddVoter) -> Result<ReconfigOutcome, RaftError> {
if !self.ops.is_leader() {
return Ok(ReconfigOutcome::NotLeader {
leader: self.ops.leader(),
});
}
let _guard = self
.lock
.try_lock()
.map_err(|_| RaftError::ReconfigInProgress)?;
let current = self.ops.current_voters();
if current.contains(req.voter.id) {
return Ok(ReconfigOutcome::Committed); // idempotent
}
let node = crate::Node {
directory_id: req.voter.directory_id,
endpoints: req.voter.endpoints.clone(),
kraft_version: req.voter.kraft_version,
};
self.ops.add_learner(req.voter.id, node).await?;
let lag = self
.ops
.leader_last_index()
.saturating_sub(self.ops.observer_index(req.voter.id).unwrap_or(0));
if lag > self.observer_lag_bound {
return Err(RaftError::VoterNotCaughtUp {
id: req.voter.id,
lag,
});
}
let next = current.with_voter(req.voter.clone());
self.ops.change_membership(next.ids()).await?;
self.ops
.submit_records(vec![crabka_metadata::MetadataRecord::V1Voters(
crabka_metadata::VotersRecord { voters: next },
)])
.await?;
Ok(ReconfigOutcome::Committed)
}
/// Remove a single voter, refusing to drop the last one.
///
/// # Errors
///
/// - [`RaftError::ReconfigInProgress`] if another reconfiguration holds the lock.
/// - [`RaftError::ReconfigRejected`] if the removal would leave no voters.
/// - Any error surfaced by the underlying raft operations.
pub async fn remove_voter(&self, req: RemoveVoter) -> Result<ReconfigOutcome, RaftError> {
if !self.ops.is_leader() {
return Ok(ReconfigOutcome::NotLeader {
leader: self.ops.leader(),
});
}
let _guard = self
.lock
.try_lock()
.map_err(|_| RaftError::ReconfigInProgress)?;
let current = self.ops.current_voters();
match current.get(req.id) {
// No voter with this id: already absent, idempotent no-op.
None => return Ok(ReconfigOutcome::Committed),
// A voter with this id exists, but it is a different incarnation than
// the one targeted (e.g. the node rejoined under a new directory_id
// after a restart). Do not remove the current voter on a stale request.
Some(v) if v.directory_id != req.directory_id => {
return Ok(ReconfigOutcome::Committed);
}
Some(_) => {}
}
let next = current.without_voter(req.id);
if next.is_empty() {
return Err(RaftError::ReconfigRejected(
"cannot remove the last voter".into(),
));
}
self.ops.change_membership(next.ids()).await?;
self.ops
.submit_records(vec![crabka_metadata::MetadataRecord::V1Voters(
crabka_metadata::VotersRecord { voters: next },
)])
.await?;
Ok(ReconfigOutcome::Committed)
}
/// Update an existing voter's endpoints / supported version range.
///
/// # Errors
///
/// - [`RaftError::ReconfigInProgress`] if another reconfiguration holds the lock.
/// - [`RaftError::ReconfigRejected`] if the voter id is unknown.
/// - Any error surfaced by the underlying raft operations.
pub async fn update_voter(&self, req: UpdateVoter) -> Result<ReconfigOutcome, RaftError> {
if !self.ops.is_leader() {
return Ok(ReconfigOutcome::NotLeader {
leader: self.ops.leader(),
});
}
let _guard = self
.lock
.try_lock()
.map_err(|_| RaftError::ReconfigInProgress)?;
let current = self.ops.current_voters();
if !current.contains(req.voter.id) {
return Err(RaftError::ReconfigRejected("unknown voter".into()));
}
let next = current.with_voter(req.voter);
self.ops
.submit_records(vec![crabka_metadata::MetadataRecord::V1Voters(
crabka_metadata::VotersRecord { voters: next },
)])
.await?;
Ok(ReconfigOutcome::Committed)
}
}