Skip to main content

nodedb_cluster/multi_raft/
rpc_dispatch.rs

1//! Inbound RPC dispatch — look up the target group and delegate.
2//!
3//! Also holds the response handlers (`handle_append_entries_response`,
4//! `handle_request_vote_response`) and the helpers for the tick loop
5//! (`snapshot_metadata`, `advance_applied`, `match_index_for`).
6
7use nodedb_raft::{
8    AppendEntriesRequest, AppendEntriesResponse, InstallSnapshotRequest, InstallSnapshotResponse,
9    RequestVoteRequest, RequestVoteResponse,
10};
11
12use crate::error::{ClusterError, Result};
13
14use super::core::MultiRaft;
15
16impl MultiRaft {
17    /// Route an AppendEntries RPC to the correct group.
18    pub fn handle_append_entries(
19        &mut self,
20        req: &AppendEntriesRequest,
21    ) -> Result<AppendEntriesResponse> {
22        let node = self
23            .groups
24            .get_mut(&req.group_id)
25            .ok_or(ClusterError::GroupNotFound {
26                group_id: req.group_id,
27            })?;
28        Ok(node.handle_append_entries(req))
29    }
30
31    /// Route a RequestVote RPC to the correct group.
32    pub fn handle_request_vote(&mut self, req: &RequestVoteRequest) -> Result<RequestVoteResponse> {
33        let node = self
34            .groups
35            .get_mut(&req.group_id)
36            .ok_or(ClusterError::GroupNotFound {
37                group_id: req.group_id,
38            })?;
39        Ok(node.handle_request_vote(req))
40    }
41
42    /// Route an InstallSnapshot RPC to the correct group.
43    pub fn handle_install_snapshot(
44        &mut self,
45        req: &InstallSnapshotRequest,
46    ) -> Result<InstallSnapshotResponse> {
47        let node = self
48            .groups
49            .get_mut(&req.group_id)
50            .ok_or(ClusterError::GroupNotFound {
51                group_id: req.group_id,
52            })?;
53        Ok(node.handle_install_snapshot(req))
54    }
55
56    /// Get the current term and snapshot metadata for a group (for building
57    /// InstallSnapshot RPCs).
58    pub fn snapshot_metadata(&self, group_id: u64) -> Result<(u64, u64, u64)> {
59        let node = self
60            .groups
61            .get(&group_id)
62            .ok_or(ClusterError::GroupNotFound { group_id })?;
63        Ok((
64            node.current_term(),
65            node.log_snapshot_index(),
66            node.log_snapshot_term(),
67        ))
68    }
69
70    /// Handle AppendEntries response for a specific group.
71    pub fn handle_append_entries_response(
72        &mut self,
73        group_id: u64,
74        peer: u64,
75        resp: &AppendEntriesResponse,
76    ) -> Result<()> {
77        let node = self
78            .groups
79            .get_mut(&group_id)
80            .ok_or(ClusterError::GroupNotFound { group_id })?;
81        node.handle_append_entries_response(peer, resp);
82        Ok(())
83    }
84
85    /// Handle RequestVote response for a specific group.
86    pub fn handle_request_vote_response(
87        &mut self,
88        group_id: u64,
89        peer: u64,
90        resp: &RequestVoteResponse,
91    ) -> Result<()> {
92        let node = self
93            .groups
94            .get_mut(&group_id)
95            .ok_or(ClusterError::GroupNotFound { group_id })?;
96        node.handle_request_vote_response(peer, resp);
97        Ok(())
98    }
99
100    /// Advance applied index for a group after processing committed entries.
101    pub fn advance_applied(&mut self, group_id: u64, applied_to: u64) -> Result<()> {
102        let node = self
103            .groups
104            .get_mut(&group_id)
105            .ok_or(ClusterError::GroupNotFound { group_id })?;
106        node.advance_applied(applied_to);
107        Ok(())
108    }
109
110    /// Query a peer's match_index from a specific Raft group's leader state.
111    pub fn match_index_for(&self, group_id: u64, peer: u64) -> Option<u64> {
112        self.groups.get(&group_id)?.match_index_for(peer)
113    }
114}