nodedb_cluster/multi_raft/
rpc_dispatch.rs1use nodedb_raft::{
10 AppendEntriesRequest, AppendEntriesResponse, InstallSnapshotRequest, InstallSnapshotResponse,
11 RequestVoteRequest, RequestVoteResponse,
12};
13
14use crate::error::{ClusterError, Result};
15
16use super::core::MultiRaft;
17
18impl MultiRaft {
19 pub fn handle_append_entries(
21 &mut self,
22 req: &AppendEntriesRequest,
23 ) -> Result<AppendEntriesResponse> {
24 let node = self
25 .groups
26 .get_mut(&req.group_id)
27 .ok_or(ClusterError::GroupNotFound {
28 group_id: req.group_id,
29 })?;
30 Ok(node.handle_append_entries(req))
31 }
32
33 pub fn handle_request_vote(&mut self, req: &RequestVoteRequest) -> Result<RequestVoteResponse> {
35 let node = self
36 .groups
37 .get_mut(&req.group_id)
38 .ok_or(ClusterError::GroupNotFound {
39 group_id: req.group_id,
40 })?;
41 Ok(node.handle_request_vote(req))
42 }
43
44 pub fn handle_install_snapshot(
46 &mut self,
47 req: &InstallSnapshotRequest,
48 ) -> Result<InstallSnapshotResponse> {
49 let node = self
50 .groups
51 .get_mut(&req.group_id)
52 .ok_or(ClusterError::GroupNotFound {
53 group_id: req.group_id,
54 })?;
55 Ok(node.handle_install_snapshot(req))
56 }
57
58 pub fn snapshot_metadata(&self, group_id: u64) -> Result<(u64, u64, u64)> {
61 let node = self
62 .groups
63 .get(&group_id)
64 .ok_or(ClusterError::GroupNotFound { group_id })?;
65 Ok((
66 node.current_term(),
67 node.log_snapshot_index(),
68 node.log_snapshot_term(),
69 ))
70 }
71
72 pub fn handle_append_entries_response(
74 &mut self,
75 group_id: u64,
76 peer: u64,
77 resp: &AppendEntriesResponse,
78 ) -> Result<()> {
79 let node = self
80 .groups
81 .get_mut(&group_id)
82 .ok_or(ClusterError::GroupNotFound { group_id })?;
83 node.handle_append_entries_response(peer, resp);
84 Ok(())
85 }
86
87 pub fn handle_request_vote_response(
89 &mut self,
90 group_id: u64,
91 peer: u64,
92 resp: &RequestVoteResponse,
93 ) -> Result<()> {
94 let node = self
95 .groups
96 .get_mut(&group_id)
97 .ok_or(ClusterError::GroupNotFound { group_id })?;
98 node.handle_request_vote_response(peer, resp);
99 Ok(())
100 }
101
102 pub fn advance_applied(&mut self, group_id: u64, applied_to: u64) -> Result<()> {
104 let node = self
105 .groups
106 .get_mut(&group_id)
107 .ok_or(ClusterError::GroupNotFound { group_id })?;
108 node.advance_applied(applied_to);
109 Ok(())
110 }
111
112 pub fn match_index_for(&self, group_id: u64, peer: u64) -> Option<u64> {
114 self.groups.get(&group_id)?.match_index_for(peer)
115 }
116
117 pub fn last_applied(&self, group_id: u64) -> Option<u64> {
126 self.groups.get(&group_id).map(|n| n.last_applied())
127 }
128
129 pub fn applied_indices(&self) -> Vec<(u64, u64)> {
133 self.groups
134 .iter()
135 .map(|(gid, node)| (*gid, node.last_applied()))
136 .collect()
137 }
138}