use std::future::Future;
use std::time::Duration;
use anyerror::AnyError;
use futures_util::Stream;
use openraft_macros::add_async_trait;
use openraft_macros::since;
use crate::OptionalSend;
use crate::OptionalSync;
use crate::RaftTypeConfig;
use crate::base::BoxFuture;
use crate::base::BoxStream;
use crate::errors::RPCError;
use crate::errors::ReplicationClosed;
use crate::errors::StreamingError;
use crate::errors::Unreachable;
use crate::network::Backoff;
use crate::network::NetAppend;
use crate::network::RPCOption;
use crate::network::stream_append_sequential;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::SnapshotResponse;
use crate::raft::StreamAppendResult;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft::message::TransferLeaderRequest;
use crate::type_config::alias::SnapshotOf;
use crate::type_config::alias::VoteOf;
#[since(version = "0.10.0")]
#[add_async_trait]
pub trait RaftNetworkV2<C>: OptionalSend + OptionalSync + 'static
where C: RaftTypeConfig
{
async fn append_entries(
&mut self,
rpc: AppendEntriesRequest<C>,
option: RPCOption,
) -> Result<AppendEntriesResponse<C>, RPCError<C>>;
#[since(version = "0.10.0")]
fn stream_append<'s, S>(
&'s mut self,
input: S,
option: RPCOption,
) -> BoxFuture<'s, Result<BoxStream<'s, Result<StreamAppendResult<C>, RPCError<C>>>, RPCError<C>>>
where
S: Stream<Item = AppendEntriesRequest<C>> + OptionalSend + Unpin + 'static,
{
stream_append_sequential(self, input, option)
}
async fn vote(&mut self, rpc: VoteRequest<C>, option: RPCOption) -> Result<VoteResponse<C>, RPCError<C>>;
async fn full_snapshot(
&mut self,
vote: VoteOf<C>,
snapshot: SnapshotOf<C>,
cancel: impl Future<Output = ReplicationClosed> + OptionalSend + 'static,
option: RPCOption,
) -> Result<SnapshotResponse<C>, StreamingError<C>>;
#[since(version = "0.10.0")]
async fn transfer_leader(&mut self, _req: TransferLeaderRequest<C>, _option: RPCOption) -> Result<(), RPCError<C>> {
Err(RPCError::Unreachable(Unreachable::new(&AnyError::error(
"transfer_leader not implemented",
))))
}
fn backoff(&self) -> Backoff {
Backoff::new(std::iter::repeat(Duration::from_millis(200)))
}
}
use crate::network::NetBackoff;
use crate::network::NetSnapshot;
use crate::network::NetStreamAppend;
use crate::network::NetTransferLeader;
use crate::network::NetVote;
#[allow(clippy::manual_async_fn)]
impl<C, T> NetAppend<C> for T
where
C: RaftTypeConfig,
T: RaftNetworkV2<C> + ?Sized,
{
async fn append_entries(
&mut self,
rpc: AppendEntriesRequest<C>,
option: RPCOption,
) -> Result<AppendEntriesResponse<C>, RPCError<C>> {
RaftNetworkV2::append_entries(self, rpc, option).await
}
}
impl<C, T> NetBackoff<C> for T
where
C: RaftTypeConfig,
T: RaftNetworkV2<C> + ?Sized,
{
fn backoff(&self) -> Backoff {
RaftNetworkV2::backoff(self)
}
}
#[allow(clippy::manual_async_fn)]
impl<C, T> NetVote<C> for T
where
C: RaftTypeConfig,
T: RaftNetworkV2<C> + ?Sized,
{
async fn vote(&mut self, rpc: VoteRequest<C>, option: RPCOption) -> Result<VoteResponse<C>, RPCError<C>> {
RaftNetworkV2::vote(self, rpc, option).await
}
}
#[allow(clippy::manual_async_fn)]
impl<C, T> NetSnapshot<C> for T
where
C: RaftTypeConfig,
T: RaftNetworkV2<C> + ?Sized,
{
async fn full_snapshot(
&mut self,
vote: VoteOf<C>,
snapshot: SnapshotOf<C>,
cancel: impl Future<Output = ReplicationClosed> + OptionalSend + 'static,
option: RPCOption,
) -> Result<SnapshotResponse<C>, StreamingError<C>> {
RaftNetworkV2::full_snapshot(self, vote, snapshot, cancel, option).await
}
}
#[allow(clippy::manual_async_fn)]
impl<C, T> NetTransferLeader<C> for T
where
C: RaftTypeConfig,
T: RaftNetworkV2<C> + ?Sized,
{
async fn transfer_leader(&mut self, req: TransferLeaderRequest<C>, option: RPCOption) -> Result<(), RPCError<C>> {
RaftNetworkV2::transfer_leader(self, req, option).await
}
}
impl<C, T> NetStreamAppend<C> for T
where
C: RaftTypeConfig,
T: RaftNetworkV2<C> + ?Sized,
{
fn stream_append<'s, S>(
&'s mut self,
input: S,
option: RPCOption,
) -> BoxFuture<'s, Result<BoxStream<'s, Result<StreamAppendResult<C>, RPCError<C>>>, RPCError<C>>>
where
S: Stream<Item = AppendEntriesRequest<C>> + OptionalSend + Unpin + 'static,
{
RaftNetworkV2::stream_append(self, input, option)
}
}