use futures_util::Stream;
use futures_util::StreamExt;
use crate::OptionalSend;
use crate::OptionalSync;
use crate::RaftTypeConfig;
use crate::base::BoxFuture;
use crate::base::BoxStream;
use crate::errors::RPCError;
use crate::network::NetAppend;
use crate::network::RPCOption;
use crate::raft::AppendEntriesRequest;
use crate::raft::StreamAppendResult;
pub trait NetStreamAppend<C>: OptionalSend + OptionalSync + 'static
where C: RaftTypeConfig
{
fn stream_append<'s, S>(
&'s mut self,
input: S,
option: RPCOption,
) -> BoxFuture<'s, Result<BoxStream<'s, Result<StreamAppendResult<C>, RPCError<C>>>, RPCError<C>>>
where
S: Stream<Item = AppendEntriesRequest<C>> + OptionalSend + Unpin + 'static;
}
pub fn stream_append_sequential<'s, C, N, S>(
network: &'s mut N,
input: S,
option: RPCOption,
) -> BoxFuture<'s, Result<BoxStream<'s, Result<StreamAppendResult<C>, RPCError<C>>>, RPCError<C>>>
where
C: RaftTypeConfig,
N: NetAppend<C> + ?Sized,
S: Stream<Item = AppendEntriesRequest<C>> + OptionalSend + Unpin + 'static,
{
let fu = async move {
let strm = futures_util::stream::unfold(Some((network, input)), move |state| {
let option = option.clone();
async move {
let (network, mut input) = state?;
let req = input.next().await?;
let range = req.log_id_range();
let result = network.append_entries(req, option).await;
match result {
Ok(resp) => {
let partial_success = resp.get_partial_success().cloned();
let stream_result = resp.into_stream_result(range.prev, range.last.clone());
let is_err = stream_result.is_err();
let next_state = if is_err {
None
} else if let Some(partial) = partial_success {
if partial == range.last {
Some((network, input))
} else {
None
}
} else {
Some((network, input))
};
Some((Ok(stream_result), next_state))
}
Err(e) => Some((Err(e), None)),
}
}
});
let strm: BoxStream<'s, _> = Box::pin(strm);
Ok(strm)
};
Box::pin(fu)
}