use super::*;
mod replication;
#[derive(Clone, Copy, Debug)]
pub(crate) struct ReplicationProgress {
pub match_index: Index,
pub next_index: Index,
pub next_max_cnt: Index,
}
impl ReplicationProgress {
pub fn new(init_next_index: Index) -> Self {
Self {
match_index: 0,
next_index: init_next_index,
next_max_cnt: 1,
}
}
}
#[derive(Clone)]
pub struct PeerContexts {
progress: ReplicationProgress,
}
#[allow(dead_code)]
struct ThreadHandles {
replicator_handle: thread::ThreadHandle,
heartbeater_handle: thread::ThreadHandle,
}
pub struct Inner {
membership: spin::RwLock<HashSet<NodeId>>,
peer_contexts: spin::RwLock<HashMap<NodeId, PeerContexts>>,
peer_threads: spin::Mutex<HashMap<NodeId, ThreadHandles>>,
command_log: Ref<CommandLog>,
driver: RaftDriver,
}
#[derive(shrinkwraprs::Shrinkwrap, Clone)]
pub struct PeerSvc(pub Arc<Inner>);
impl PeerSvc {
pub fn new(command_log: Ref<CommandLog>, driver: RaftDriver) -> Self {
let inner = Inner {
membership: HashSet::new().into(),
peer_contexts: HashMap::new().into(),
peer_threads: HashMap::new().into(),
command_log,
driver,
};
Self(Arc::new(inner))
}
pub async fn restore_state(&self, voter: Ref<Voter>) -> Result<()> {
let log_last_index = self.command_log.get_log_last_index().await?;
let last_membership_index = self
.command_log
.find_last_membership_index(log_last_index)
.await?;
if let Some(last_membership_index) = last_membership_index {
let last_membership = {
let entry = self.command_log.get_entry(last_membership_index).await?;
match Command::deserialize(&entry.command) {
Command::Snapshot { membership } => membership,
Command::ClusterConfiguration { membership } => membership,
_ => unreachable!(),
}
};
self.set_membership(last_membership, last_membership_index, voter)
.await?;
};
Ok(())
}
}
impl PeerSvc {
async fn add_peer(&self, id: NodeId, voter: Ref<Voter>) -> Result<()> {
if id == self.driver.self_node_id() {
return Ok(());
}
if self.peer_contexts.read().contains_key(&id) {
return Ok(());
}
let init_progress = {
let last_log_index = self.command_log.get_log_last_index().await?;
ReplicationProgress::new(last_log_index)
};
let mut peer_contexts = self.peer_contexts.write();
peer_contexts.insert(
id.clone(),
PeerContexts {
progress: init_progress,
},
);
let thread_handles = ThreadHandles {
replicator_handle: thread::replication::new(id.clone(), self.clone(), voter.clone()),
heartbeater_handle: thread::heartbeat::new(id.clone(), voter),
};
self.peer_threads.lock().insert(id, thread_handles);
Ok(())
}
fn remove_peer(&self, id: NodeId) {
self.peer_threads.lock().remove(&id);
self.peer_contexts.write().remove(&id);
}
pub async fn set_membership(
&self,
config: HashSet<NodeId>,
index: Index,
voter: Ref<Voter>,
) -> Result<()> {
let cur = self.read_membership();
let add_peers = {
let mut out = vec![];
for id in &config {
if !cur.contains(id) {
out.push(id.clone());
}
}
out
};
let remove_peers = {
let mut out = vec![];
for id in &cur {
if !config.contains(id) {
out.push(id.clone());
}
}
out
};
for id in add_peers {
self.add_peer(id, voter.clone()).await?;
}
for id in remove_peers {
self.remove_peer(id);
}
info!("membership changed -> {:?}", config);
*self.membership.write() = config;
self.command_log
.membership_pointer
.store(index, Ordering::SeqCst);
Ok(())
}
pub fn read_membership(&self) -> HashSet<NodeId> {
self.membership.read().clone()
}
pub async fn find_new_commit_index(&self) -> Result<Index> {
let mut match_indices = vec![];
let last_log_index = self.command_log.get_log_last_index().await?;
match_indices.push(last_log_index);
let peer_contexts = self.peer_contexts.read();
for (_, peer) in peer_contexts.iter() {
match_indices.push(peer.progress.match_index);
}
match_indices.sort();
match_indices.reverse();
let mid = match_indices.len() / 2;
let new_commit_index = match_indices[mid];
Ok(new_commit_index)
}
pub fn reset_progress(&self, init_next_index: Index) {
let mut peer_contexts = self.peer_contexts.write();
for (_, peer) in peer_contexts.iter_mut() {
peer.progress = ReplicationProgress::new(init_next_index);
}
}
pub async fn transfer_leadership(&self) -> Result<()> {
let mut xs = {
let peer_contexts = self.peer_contexts.read();
let mut out = vec![];
for (id, peer) in peer_contexts.iter() {
let progress = peer.progress;
out.push((id.clone(), progress.match_index));
}
out
};
xs.sort_by_key(|x| x.1);
if let Some(new_leader) = xs.pop() {
info!("transfer leadership to {}", new_leader.0);
let conn = self.driver.connect(new_leader.0.clone());
conn.send_timeout_now().await?;
}
Ok(())
}
}