use std::time::Duration;
use openraft::error::{
InstallSnapshotError, NetworkError, RPCError, RaftError, ReplicationClosed, StreamingError,
Unreachable,
};
use openraft::network::{Backoff, RPCOption, RaftNetwork, RaftNetworkFactory};
use openraft::raft::{
AppendEntriesRequest, AppendEntriesResponse, InstallSnapshotRequest, InstallSnapshotResponse,
SnapshotResponse, VoteRequest, VoteResponse,
};
use openraft::{AnyError, Snapshot, Vote};
use super::types::{YantrikNode, YantrikNodeId, YantrikRaftTypeConfig};
pub struct StubRaftNetwork {
target: YantrikNodeId,
target_addr: String,
}
impl StubRaftNetwork {
fn unreachable<E>(&self, op: &'static str) -> RPCError<YantrikNodeId, YantrikNode, E>
where
E: std::error::Error,
{
RPCError::Unreachable(Unreachable::new(&NetworkError::new(&AnyError::error(
format!(
"stub network: {op} to {} ({}) — PR-4-d-b will ship real HTTP transport",
self.target, self.target_addr
),
))))
}
}
impl RaftNetwork<YantrikRaftTypeConfig> for StubRaftNetwork {
async fn append_entries(
&mut self,
_rpc: AppendEntriesRequest<YantrikRaftTypeConfig>,
_option: RPCOption,
) -> Result<
AppendEntriesResponse<YantrikNodeId>,
RPCError<YantrikNodeId, YantrikNode, RaftError<YantrikNodeId>>,
> {
Err(self.unreachable("append_entries"))
}
async fn install_snapshot(
&mut self,
_rpc: InstallSnapshotRequest<YantrikRaftTypeConfig>,
_option: RPCOption,
) -> Result<
InstallSnapshotResponse<YantrikNodeId>,
RPCError<YantrikNodeId, YantrikNode, RaftError<YantrikNodeId, InstallSnapshotError>>,
> {
Err(self.unreachable("install_snapshot"))
}
async fn vote(
&mut self,
_rpc: VoteRequest<YantrikNodeId>,
_option: RPCOption,
) -> Result<
VoteResponse<YantrikNodeId>,
RPCError<YantrikNodeId, YantrikNode, RaftError<YantrikNodeId>>,
> {
Err(self.unreachable("vote"))
}
async fn full_snapshot(
&mut self,
_vote: Vote<YantrikNodeId>,
_snapshot: Snapshot<YantrikRaftTypeConfig>,
_cancel: impl std::future::Future<Output = ReplicationClosed> + Send + 'static,
_option: RPCOption,
) -> Result<
SnapshotResponse<YantrikNodeId>,
StreamingError<YantrikRaftTypeConfig, openraft::error::Fatal<YantrikNodeId>>,
> {
Err(StreamingError::Network(NetworkError::new(
&AnyError::error(format!(
"stub network: full_snapshot to {} ({}) — PR-4-d-b will ship real HTTP transport",
self.target, self.target_addr
)),
)))
}
fn backoff(&self) -> Backoff {
Backoff::new(std::iter::repeat(Duration::from_millis(100)))
}
}
#[derive(Default, Clone)]
pub struct StubRaftNetworkFactory;
impl RaftNetworkFactory<YantrikRaftTypeConfig> for StubRaftNetworkFactory {
type Network = StubRaftNetwork;
async fn new_client(&mut self, target: YantrikNodeId, node: &YantrikNode) -> Self::Network {
StubRaftNetwork {
target,
target_addr: node.addr.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use openraft::raft::AppendEntriesRequest;
use openraft::LeaderId;
#[tokio::test]
async fn factory_creates_clients_per_peer() {
let mut f = StubRaftNetworkFactory;
let n1 = f
.new_client(YantrikNodeId::new(1), &YantrikNode::new("http://n1"))
.await;
let n2 = f
.new_client(YantrikNodeId::new(2), &YantrikNode::new("http://n2"))
.await;
assert_eq!(n1.target, YantrikNodeId::new(1));
assert_eq!(n2.target, YantrikNodeId::new(2));
assert_eq!(n1.target_addr, "http://n1");
assert_eq!(n2.target_addr, "http://n2");
}
#[tokio::test]
async fn append_entries_returns_unreachable() {
let mut net = StubRaftNetwork {
target: YantrikNodeId::new(7),
target_addr: "http://nowhere".into(),
};
let rpc = AppendEntriesRequest {
vote: Vote::new(0, YantrikNodeId::new(0)),
prev_log_id: None,
entries: Vec::new(),
leader_commit: None,
};
let err = net
.append_entries(rpc, RPCOption::new(Duration::from_millis(100)))
.await
.unwrap_err();
match err {
RPCError::Unreachable(_) => {}
other => panic!("expected Unreachable, got {other:?}"),
}
}
#[tokio::test]
async fn vote_rpc_returns_unreachable() {
let mut net = StubRaftNetwork {
target: YantrikNodeId::new(7),
target_addr: "http://nowhere".into(),
};
let rpc = VoteRequest {
vote: Vote::new(1, YantrikNodeId::new(7)),
last_log_id: Some(openraft::LogId::new(
LeaderId::new(0, YantrikNodeId::new(0)),
0,
)),
};
let err = net
.vote(rpc, RPCOption::new(Duration::from_millis(100)))
.await
.unwrap_err();
assert!(matches!(err, RPCError::Unreachable(_)));
}
#[test]
fn backoff_is_tight_for_tests() {
let net = StubRaftNetwork {
target: YantrikNodeId::new(1),
target_addr: "http://x".into(),
};
let b = net.backoff();
let first = b.into_iter().next().unwrap();
assert_eq!(first, Duration::from_millis(100));
}
}