Skip to main content

nodedb_cluster/multi_raft/
rpc_dispatch.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Inbound RPC dispatch — look up the target group and delegate.
4//!
5//! Also holds the response handlers (`handle_append_entries_response`,
6//! `handle_request_vote_response`) and the helpers for the tick loop
7//! (`snapshot_metadata`, `advance_applied`, `match_index_for`).
8
9use nodedb_raft::{
10    AppendEntriesRequest, AppendEntriesResponse, InstallSnapshotRequest, InstallSnapshotResponse,
11    RequestVoteRequest, RequestVoteResponse,
12};
13
14use crate::error::{ClusterError, Result};
15
16use super::core::MultiRaft;
17
18impl MultiRaft {
19    /// Route an AppendEntries RPC to the correct group.
20    pub fn handle_append_entries(
21        &mut self,
22        req: &AppendEntriesRequest,
23    ) -> Result<AppendEntriesResponse> {
24        let node = self
25            .groups
26            .get_mut(&req.group_id)
27            .ok_or(ClusterError::GroupNotFound {
28                group_id: req.group_id,
29            })?;
30        Ok(node.handle_append_entries(req))
31    }
32
33    /// Route a RequestVote RPC to the correct group.
34    pub fn handle_request_vote(&mut self, req: &RequestVoteRequest) -> Result<RequestVoteResponse> {
35        let node = self
36            .groups
37            .get_mut(&req.group_id)
38            .ok_or(ClusterError::GroupNotFound {
39                group_id: req.group_id,
40            })?;
41        Ok(node.handle_request_vote(req))
42    }
43
44    /// Route an InstallSnapshot RPC to the correct group.
45    pub fn handle_install_snapshot(
46        &mut self,
47        req: &InstallSnapshotRequest,
48    ) -> Result<InstallSnapshotResponse> {
49        let node = self
50            .groups
51            .get_mut(&req.group_id)
52            .ok_or(ClusterError::GroupNotFound {
53                group_id: req.group_id,
54            })?;
55        Ok(node.handle_install_snapshot(req))
56    }
57
58    /// Get the current term and snapshot metadata for a group (for building
59    /// InstallSnapshot RPCs).
60    pub fn snapshot_metadata(&self, group_id: u64) -> Result<(u64, u64, u64)> {
61        let node = self
62            .groups
63            .get(&group_id)
64            .ok_or(ClusterError::GroupNotFound { group_id })?;
65        Ok((
66            node.current_term(),
67            node.log_snapshot_index(),
68            node.log_snapshot_term(),
69        ))
70    }
71
72    /// Handle AppendEntries response for a specific group.
73    pub fn handle_append_entries_response(
74        &mut self,
75        group_id: u64,
76        peer: u64,
77        resp: &AppendEntriesResponse,
78    ) -> Result<()> {
79        let node = self
80            .groups
81            .get_mut(&group_id)
82            .ok_or(ClusterError::GroupNotFound { group_id })?;
83        node.handle_append_entries_response(peer, resp);
84        Ok(())
85    }
86
87    /// Handle RequestVote response for a specific group.
88    pub fn handle_request_vote_response(
89        &mut self,
90        group_id: u64,
91        peer: u64,
92        resp: &RequestVoteResponse,
93    ) -> Result<()> {
94        let node = self
95            .groups
96            .get_mut(&group_id)
97            .ok_or(ClusterError::GroupNotFound { group_id })?;
98        node.handle_request_vote_response(peer, resp);
99        Ok(())
100    }
101
102    /// Advance applied index for a group after processing committed entries.
103    pub fn advance_applied(&mut self, group_id: u64, applied_to: u64) -> Result<()> {
104        let node = self
105            .groups
106            .get_mut(&group_id)
107            .ok_or(ClusterError::GroupNotFound { group_id })?;
108        node.advance_applied(applied_to);
109        Ok(())
110    }
111
112    /// Query a peer's match_index from a specific Raft group's leader state.
113    pub fn match_index_for(&self, group_id: u64, peer: u64) -> Option<u64> {
114        self.groups.get(&group_id)?.match_index_for(peer)
115    }
116
117    /// Read the locally-applied index for a Raft group hosted on this
118    /// node. Returns `None` if the group is not mounted here.
119    ///
120    /// Used by the tick loop to mirror `last_applied` into the
121    /// per-group [`crate::applied_watcher::AppliedIndexWatcher`] —
122    /// covers both the regular apply path and the snapshot-install
123    /// path (which sets `last_applied = last_included_index`
124    /// directly without producing committed entries).
125    pub fn last_applied(&self, group_id: u64) -> Option<u64> {
126        self.groups.get(&group_id).map(|n| n.last_applied())
127    }
128
129    /// `(group_id, last_applied)` pairs for every locally-mounted
130    /// group. Cheap O(groups) snapshot — groups are few (one
131    /// metadata + handful of vshard groups per node).
132    pub fn applied_indices(&self) -> Vec<(u64, u64)> {
133        self.groups
134            .iter()
135            .map(|(gid, node)| (*gid, node.last_applied()))
136            .collect()
137    }
138}