use crate::error::{ClusterError, Result};
use crate::forward::RequestForwarder;
use crate::health;
use crate::rpc_codec::RaftRpc;
use crate::transport::RaftRpcHandler;
use super::loop_core::{CommitApplier, RaftLoop};
pub(super) const TOPOLOGY_GROUP_ID: u64 = 0;
#[derive(Debug, PartialEq, Eq)]
pub(super) enum JoinDecision {
Admit,
Redirect { leader_addr: String },
}
pub(super) fn decide_join(
group0_leader: u64,
self_node_id: u64,
leader_addr: Option<String>,
) -> JoinDecision {
if group0_leader == 0 || group0_leader == self_node_id {
JoinDecision::Admit
} else {
JoinDecision::Redirect {
leader_addr: leader_addr.unwrap_or_default(),
}
}
}
impl<A: CommitApplier, F: RequestForwarder> RaftRpcHandler for RaftLoop<A, F> {
async fn handle_rpc(&self, rpc: RaftRpc) -> Result<RaftRpc> {
match rpc {
RaftRpc::AppendEntriesRequest(req) => {
let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
let resp = mr.handle_append_entries(&req)?;
Ok(RaftRpc::AppendEntriesResponse(resp))
}
RaftRpc::RequestVoteRequest(req) => {
let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
let resp = mr.handle_request_vote(&req)?;
Ok(RaftRpc::RequestVoteResponse(resp))
}
RaftRpc::InstallSnapshotRequest(req) => {
let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
let resp = mr.handle_install_snapshot(&req)?;
Ok(RaftRpc::InstallSnapshotResponse(resp))
}
RaftRpc::JoinRequest(req) => Ok(RaftRpc::JoinResponse(self.join_flow(req).await)),
RaftRpc::Ping(req) => {
let topo_version = {
let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
topo.version()
};
Ok(health::handle_ping(self.node_id, topo_version, &req))
}
RaftRpc::TopologyUpdate(update) => {
let (updated, ack) =
health::handle_topology_update(self.node_id, &self.topology, &update);
if updated {
for node in &update.nodes {
if node.node_id == self.node_id {
continue;
}
match node.addr.parse::<std::net::SocketAddr>() {
Ok(addr) => self.transport.register_peer(node.node_id, addr),
Err(e) => tracing::warn!(
node_id = node.node_id,
addr = %node.addr,
error = %e,
"topology update contains unparseable peer address; skipping register_peer"
),
}
}
if let Some(catalog) = self.catalog.as_ref() {
let snap = self
.topology
.read()
.unwrap_or_else(|p| p.into_inner())
.clone();
if let Err(e) = catalog.save_topology(&snap) {
tracing::warn!(error = %e, "failed to persist topology update to catalog");
}
}
}
Ok(ack)
}
RaftRpc::ForwardRequest(req) => {
let resp = self.forwarder.execute_forwarded(req).await;
Ok(RaftRpc::ForwardResponse(resp))
}
RaftRpc::MetadataProposeRequest(req) => {
let resp = match self.propose_to_metadata_group(req.bytes) {
Ok(log_index) => crate::rpc_codec::MetadataProposeResponse::ok(log_index),
Err(crate::error::ClusterError::Raft(nodedb_raft::RaftError::NotLeader {
leader_hint,
})) => {
crate::rpc_codec::MetadataProposeResponse::err("not leader", leader_hint)
}
Err(e) => crate::rpc_codec::MetadataProposeResponse::err(e.to_string(), None),
};
Ok(RaftRpc::MetadataProposeResponse(resp))
}
RaftRpc::VShardEnvelope(bytes) => {
if let Some(ref handler) = self.vshard_handler {
let response_bytes = handler(bytes).await?;
Ok(RaftRpc::VShardEnvelope(response_bytes))
} else {
Err(ClusterError::Transport {
detail: "VShardEnvelope handler not configured".into(),
})
}
}
other => Err(ClusterError::Transport {
detail: format!("unexpected request type in RPC handler: {other:?}"),
}),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::multi_raft::MultiRaft;
use crate::routing::RoutingTable;
use crate::topology::{ClusterTopology, NodeInfo, NodeState};
use crate::transport::NexarTransport;
use nodedb_raft::message::LogEntry;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
struct NoopApplier;
impl CommitApplier for NoopApplier {
fn apply_committed(&self, _group_id: u64, entries: &[LogEntry]) -> u64 {
entries.last().map(|e| e.index).unwrap_or(0)
}
}
fn make_transport(node_id: u64) -> Arc<NexarTransport> {
Arc::new(NexarTransport::new(node_id, "127.0.0.1:0".parse().unwrap()).unwrap())
}
#[tokio::test]
async fn rpc_handler_routes_append_entries() {
let dir = tempfile::tempdir().unwrap();
let transport = make_transport(1);
let rt = RoutingTable::uniform(1, &[1], 1);
let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
mr.add_group(0, vec![]).unwrap();
for node in mr.groups_mut().values_mut() {
node.election_deadline_override(Instant::now() - Duration::from_millis(1));
}
let topo = Arc::new(RwLock::new(ClusterTopology::new()));
let raft_loop = RaftLoop::new(mr, transport, topo, NoopApplier);
raft_loop.do_tick();
tokio::time::sleep(Duration::from_millis(20)).await;
let req = RaftRpc::AppendEntriesRequest(nodedb_raft::AppendEntriesRequest {
term: 99,
leader_id: 2,
prev_log_index: 0,
prev_log_term: 0,
entries: vec![],
leader_commit: 0,
group_id: 0,
});
let resp = raft_loop.handle_rpc(req).await.unwrap();
match resp {
RaftRpc::AppendEntriesResponse(r) => {
assert!(r.success);
assert_eq!(r.term, 99);
}
other => panic!("expected AppendEntriesResponse, got {other:?}"),
}
}
#[tokio::test]
async fn rpc_handler_routes_request_vote() {
let dir = tempfile::tempdir().unwrap();
let transport = make_transport(1);
let rt = RoutingTable::uniform(1, &[1, 2, 3], 3);
let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
mr.add_group(0, vec![2, 3]).unwrap();
let topo = Arc::new(RwLock::new(ClusterTopology::new()));
let raft_loop = RaftLoop::new(mr, transport, topo, NoopApplier);
let req = RaftRpc::RequestVoteRequest(nodedb_raft::RequestVoteRequest {
term: 1,
candidate_id: 2,
last_log_index: 0,
last_log_term: 0,
group_id: 0,
});
let resp = raft_loop.handle_rpc(req).await.unwrap();
match resp {
RaftRpc::RequestVoteResponse(r) => {
assert!(r.vote_granted);
assert_eq!(r.term, 1);
}
other => panic!("expected RequestVoteResponse, got {other:?}"),
}
}
#[tokio::test]
async fn rpc_handler_accepts_join_on_bootstrap_seed() {
let dir = tempfile::tempdir().unwrap();
let transport = make_transport(1);
let rt = RoutingTable::uniform(2, &[1], 1);
let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
mr.add_group(0, vec![]).unwrap();
mr.add_group(1, vec![]).unwrap();
for node in mr.groups_mut().values_mut() {
node.election_deadline_override(Instant::now() - Duration::from_millis(1));
}
let mut topology = ClusterTopology::new();
topology.add_node(NodeInfo::new(
1,
"127.0.0.1:9400".parse().unwrap(),
NodeState::Active,
));
let topo = Arc::new(RwLock::new(topology));
let raft_loop = RaftLoop::new(mr, transport, topo.clone(), NoopApplier);
raft_loop.do_tick();
tokio::time::sleep(Duration::from_millis(20)).await;
let req = RaftRpc::JoinRequest(crate::rpc_codec::JoinRequest {
node_id: 2,
listen_addr: "127.0.0.1:9401".into(),
wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
});
let resp = raft_loop.handle_rpc(req).await.unwrap();
match resp {
RaftRpc::JoinResponse(r) => {
assert!(
r.success,
"join should succeed on bootstrap seed: {}",
r.error
);
assert_eq!(r.nodes.len(), 2);
assert_eq!(r.groups.len(), 2);
assert_eq!(r.vshard_to_group.len(), 1024);
for g in &r.groups {
assert!(
g.learners.contains(&2),
"expected node 2 as learner in group {}, got learners={:?} members={:?}",
g.group_id,
g.learners,
g.members
);
}
}
other => panic!("expected JoinResponse, got {other:?}"),
}
let topo_guard = topo.read().unwrap();
assert_eq!(topo_guard.node_count(), 2);
assert!(topo_guard.contains(2));
}
#[test]
fn decide_join_self_leader_admits() {
assert_eq!(
decide_join(7, 7, Some("10.0.0.7:9400".into())),
JoinDecision::Admit
);
}
#[test]
fn decide_join_no_leader_yet_admits() {
assert_eq!(decide_join(0, 7, None), JoinDecision::Admit);
}
#[test]
fn decide_join_other_leader_redirects() {
assert_eq!(
decide_join(1, 7, Some("10.0.0.1:9400".into())),
JoinDecision::Redirect {
leader_addr: "10.0.0.1:9400".into()
}
);
}
#[test]
fn decide_join_other_leader_unknown_addr_still_redirects() {
assert_eq!(
decide_join(1, 7, None),
JoinDecision::Redirect {
leader_addr: String::new()
}
);
}
}