sorock/process/raft_process/
cluster.rs

1use super::*;
2
3impl RaftProcess {
4    /// Process configuration change if the command contains configuration.
5    /// Configuration should be applied as soon as it is inserted into the log because doing so
6    /// guarantees that majority of the servers move to the configuration when the entry is committed.
7    /// Without this property, servers may still be in some old configuration which may cause split-brain
8    /// by electing two leaders in a single term which is not allowed in Raft.
9    pub(crate) async fn process_configuration_command(
10        &self,
11        command: &[u8],
12        index: Index,
13    ) -> Result<()> {
14        let config0 = match Command::deserialize(command) {
15            Command::Snapshot { membership } => Some(membership),
16            Command::ClusterConfiguration { membership } => Some(membership),
17            _ => None,
18        };
19        if let Some(config) = config0 {
20            self.peers
21                .set_membership(config, index, Ref(self.voter.clone()))
22                .await?;
23        }
24        Ok(())
25    }
26
27    /// Forming a new cluster with a single node is called "cluster bootstrapping".
28    /// Raft algorith doesn't define adding node when the cluster is empty.
29    /// We need to handle this special case.
30    async fn bootstrap_cluster(&self) -> Result<()> {
31        let mut membership = HashSet::new();
32        membership.insert(self.driver.self_node_id());
33
34        let init_command = Command::serialize(Command::Snapshot {
35            membership: membership.clone(),
36        });
37        let snapshot = Entry {
38            prev_clock: Clock { term: 0, index: 0 },
39            this_clock: Clock { term: 0, index: 1 },
40            command: init_command.clone(),
41        };
42
43        self.command_log.insert_snapshot(snapshot).await?;
44        self.process_configuration_command(&init_command, 1).await?;
45
46        // After this function is called
47        // this server should immediately become the leader by self-vote and advance commit index.
48        // Consequently, when initial install_snapshot is called this server is already the leader.
49        let conn = self.driver.connect(self.driver.self_node_id());
50        conn.send_timeout_now().await?;
51
52        Ok(())
53    }
54
55    pub(crate) async fn add_server(&self, req: request::AddServer) -> Result<()> {
56        if self.peers.read_membership().is_empty() && req.server_id == self.driver.self_node_id() {
57            self.bootstrap_cluster().await?;
58        } else {
59            let msg = kern_message::KernRequest::AddServer(req.server_id);
60            let req = request::KernRequest {
61                message: msg.serialize(),
62            };
63            let conn = self.driver.connect(self.driver.self_node_id());
64            conn.process_kern_request(req).await?;
65        }
66        Ok(())
67    }
68
69    pub(crate) async fn remove_server(&self, req: request::RemoveServer) -> Result<()> {
70        let msg = kern_message::KernRequest::RemoveServer(req.server_id);
71        let req = request::KernRequest {
72            message: msg.serialize(),
73        };
74        let conn = self.driver.connect(self.driver.self_node_id());
75        conn.process_kern_request(req).await?;
76        Ok(())
77    }
78}