use std::ops::Deref;
use std::time::Duration;
use openraft_macros::since;
use crate::Raft;
use crate::RaftTypeConfig;
use crate::async_runtime::watch::WatchReceiver;
use crate::errors::Fatal;
use crate::metrics::WaitError;
use crate::raft::linearizable_read::LinearizeState;
use crate::type_config::alias::LogIdOf;
#[since(version = "0.10.0")]
#[must_use = "call `try_await_ready()` to ensure linearizability"]
#[derive(Debug, Clone)]
pub struct Linearizer<C>
where C: RaftTypeConfig
{
state: LinearizeState<C>,
}
impl<C> Deref for Linearizer<C>
where C: RaftTypeConfig
{
type Target = LinearizeState<C>;
fn deref(&self) -> &Self::Target {
&self.state
}
}
impl<C> Linearizer<C>
where C: RaftTypeConfig
{
#[since(version = "0.10.0")]
pub fn new(node_id: C::NodeId, read_log_id: LogIdOf<C>, applied: Option<LogIdOf<C>>) -> Self {
Self {
state: LinearizeState::new(node_id, read_log_id, applied),
}
}
#[since(version = "0.10.0")]
pub async fn await_ready<SM>(self, raft: &Raft<C, SM>) -> Result<LinearizeState<C>, Fatal<C>> {
let state_res = self.try_await_ready(raft, None).await?;
Ok(state_res.unwrap())
}
#[since(version = "0.10.0")]
pub async fn try_await_ready<SM>(
self,
raft: &Raft<C, SM>,
timeout: Option<Duration>,
) -> Result<Result<LinearizeState<C>, LinearizeState<C>>, Fatal<C>> {
let this_id = raft.inner.id();
if self.state.is_ready_on_node(this_id) {
return Ok(Ok(self.state));
}
let expected = Some(self.state.read_log_id().index());
let res = raft.inner.wait(timeout).applied_index_at_least(expected, "Linearizer::try_await_ready").await;
match res {
Ok(metrics) => Ok(Ok(self.state.with_applied(this_id.clone(), metrics.last_applied))),
Err(e) => match e {
WaitError::Timeout(_, _) => {
let metrics_rx = raft.metrics();
let ref_metrics = metrics_rx.borrow_watched();
let applied = ref_metrics.last_applied.clone();
let state = self.state.with_applied(this_id.clone(), applied);
if state.is_ready_on_node(this_id) {
Ok(Ok(state))
} else {
Ok(Err(state))
}
}
WaitError::ShuttingDown => {
let err = raft.inner.get_core_stop_error().await;
Err(err)
}
},
}
}
}