use std::fmt::Debug;
use std::io;
use std::ops::RangeBounds;
use std::ops::RangeInclusive;
use futures_util::Stream;
use openraft_macros::add_async_trait;
use openraft_macros::since;
use crate::OptionalSend;
use crate::OptionalSync;
use crate::RaftTypeConfig;
use crate::base::BoxStream;
use crate::engine::LogIdList;
use crate::errors::LeaderChanged;
use crate::type_config::alias::CommittedLeaderIdOf;
use crate::type_config::alias::EntryOf;
use crate::type_config::alias::LeaderIdOf;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::VoteOf;
use crate::vote::RaftVote;
#[derive(Debug, thiserror::Error)]
pub enum LeaderBoundedStreamError<C>
where C: RaftTypeConfig
{
#[error(transparent)]
LeaderChanged(#[from] LeaderChanged<C>),
#[error(transparent)]
IoError(#[from] io::Error),
}
pub type LeaderBoundedStreamResult<C> = Result<EntryOf<C>, LeaderBoundedStreamError<C>>;
pub type EntriesStreamResult<C> = Result<EntryOf<C>, io::Error>;
#[add_async_trait]
pub trait RaftLogReader<C>: OptionalSend + OptionalSync + 'static
where C: RaftTypeConfig
{
#[since(version = "0.10.0")]
async fn leader_bounded_stream<RB>(
&mut self,
leader: LeaderIdOf<C>,
range: RB,
) -> impl Stream<Item = LeaderBoundedStreamResult<C>> + OptionalSend
where
RB: RangeBounds<u64> + Clone + Debug + OptionalSend,
{
use futures_util::stream;
let changed_err = |leader, vote| {
let err = LeaderChanged::new(leader, vote);
LeaderBoundedStreamError::LeaderChanged(err)
};
let fu = async move {
let vote = self.read_vote().await?;
let Some(vote) = vote else {
return Ok::<BoxStream<_>, _>(Box::pin(stream::iter([Err(changed_err(leader, None))])));
};
if vote.leader_id() == &leader && vote.is_committed() {
} else {
return Ok::<BoxStream<_>, _>(Box::pin(stream::iter([Err(changed_err(leader, Some(vote)))])));
}
let entries = self.try_get_log_entries(range).await?;
let current_vote = self.read_vote().await?;
if current_vote.as_ref() != Some(&vote) {
return Ok::<BoxStream<_>, _>(Box::pin(stream::iter([Err(changed_err(leader, current_vote))])));
}
let stream = stream::iter(entries.into_iter().map(Ok));
Ok::<BoxStream<_>, io::Error>(Box::pin(stream))
};
match fu.await {
Ok(strm) => strm,
Err(io_err) => Box::pin(stream::iter([Err(LeaderBoundedStreamError::IoError(io_err))])),
}
}
#[since(version = "0.10.0")]
async fn entries_stream<RB>(&mut self, range: RB) -> impl Stream<Item = EntriesStreamResult<C>> + OptionalSend
where RB: RangeBounds<u64> + Clone + Debug + OptionalSend {
use futures_util::stream;
let fu = async move {
let entries = self.try_get_log_entries(range).await?;
let stream = stream::iter(entries.into_iter().map(Ok));
Ok::<BoxStream<_>, io::Error>(Box::pin(stream))
};
match fu.await {
Ok(strm) => strm,
Err(io_err) => Box::pin(stream::iter([Err(io_err)])),
}
}
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend>(
&mut self,
range: RB,
) -> Result<Vec<C::Entry>, io::Error>;
async fn read_vote(&mut self) -> Result<Option<VoteOf<C>>, io::Error>;
#[since(version = "0.10.0")]
async fn limited_get_log_entries(&mut self, start: u64, end: u64) -> Result<Vec<C::Entry>, io::Error> {
self.try_get_log_entries(start..end).await
}
#[since(version = "0.10.0")]
async fn get_key_log_ids(&mut self, range: RangeInclusive<LogIdOf<C>>) -> Result<Vec<LogIdOf<C>>, io::Error> {
LogIdList::<CommittedLeaderIdOf<C>>::get_key_log_ids(range, self)
.await
.map_err(|e| io::Error::other(e.to_string()))
}
}