sorock/process/raft_process/
cluster.rs1use super::*;
2
3impl RaftProcess {
4 pub(crate) async fn process_configuration_command(
10 &self,
11 command: &[u8],
12 index: Index,
13 ) -> Result<()> {
14 let config0 = match Command::deserialize(command) {
15 Command::Snapshot { membership } => Some(membership),
16 Command::ClusterConfiguration { membership } => Some(membership),
17 _ => None,
18 };
19 if let Some(config) = config0 {
20 self.peers
21 .set_membership(config, index, Ref(self.voter.clone()))
22 .await?;
23 }
24 Ok(())
25 }
26
27 async fn bootstrap_cluster(&self) -> Result<()> {
31 let mut membership = HashSet::new();
32 membership.insert(self.driver.self_node_id());
33
34 let init_command = Command::serialize(Command::Snapshot {
35 membership: membership.clone(),
36 });
37 let snapshot = Entry {
38 prev_clock: Clock { term: 0, index: 0 },
39 this_clock: Clock { term: 0, index: 1 },
40 command: init_command.clone(),
41 };
42
43 self.command_log.insert_snapshot(snapshot).await?;
44 self.process_configuration_command(&init_command, 1).await?;
45
46 let conn = self.driver.connect(self.driver.self_node_id());
50 conn.send_timeout_now().await?;
51
52 Ok(())
53 }
54
55 pub(crate) async fn add_server(&self, req: request::AddServer) -> Result<()> {
56 if self.peers.read_membership().is_empty() && req.server_id == self.driver.self_node_id() {
57 self.bootstrap_cluster().await?;
58 } else {
59 let msg = kern_message::KernRequest::AddServer(req.server_id);
60 let req = request::KernRequest {
61 message: msg.serialize(),
62 };
63 let conn = self.driver.connect(self.driver.self_node_id());
64 conn.process_kern_request(req).await?;
65 }
66 Ok(())
67 }
68
69 pub(crate) async fn remove_server(&self, req: request::RemoveServer) -> Result<()> {
70 let msg = kern_message::KernRequest::RemoveServer(req.server_id);
71 let req = request::KernRequest {
72 message: msg.serialize(),
73 };
74 let conn = self.driver.connect(self.driver.self_node_id());
75 conn.process_kern_request(req).await?;
76 Ok(())
77 }
78}