use std::sync::Arc;
use std::sync::Mutex;
use crate::RaftTypeConfig;
use crate::errors::RPCError;
use crate::network::Backoff;
use crate::replication::backoff_consumer::BackoffConsumer;
const BACKOFF_RANK_THRESHOLD: u64 = 20;
pub(crate) struct BackoffState {
rank: u64,
inner: Arc<Mutex<Option<Backoff>>>,
}
impl BackoffState {
pub(crate) fn new() -> Self {
Self {
rank: 0,
inner: Arc::new(Mutex::new(None)),
}
}
pub(crate) fn consumer(&self) -> BackoffConsumer {
BackoffConsumer {
inner: self.inner.clone(),
}
}
pub(crate) fn on_success(&mut self) {
if self.rank == 0 {
return;
}
self.rank = 0;
*self.inner.lock().unwrap() = None;
}
pub(crate) fn on_error(&mut self, weight: u64) {
self.rank += weight;
}
pub(crate) fn observe<T, C>(&mut self, result: &Result<T, RPCError<C>>)
where C: RaftTypeConfig {
match result {
Ok(_) => self.on_success(),
Err(e) => self.on_error(e.backoff_rank()),
}
}
pub(crate) fn reconcile(&self, backoff_factory: impl FnOnce() -> Backoff) {
let mut inner = self.inner.lock().unwrap();
if self.rank > BACKOFF_RANK_THRESHOLD {
if inner.is_none() {
*inner = Some(backoff_factory());
}
} else {
*inner = None;
}
}
#[cfg(test)]
fn is_enabled(&self) -> bool {
self.inner.lock().unwrap().is_some()
}
#[cfg(test)]
fn rank(&self) -> u64 {
self.rank
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
fn make_test_backoff() -> Backoff {
Backoff::new(std::iter::repeat(Duration::from_millis(200)))
}
#[test]
fn on_error_accumulates_rank_and_reconcile_enables_backoff() {
let mut state = BackoffState::new();
assert!(!state.is_enabled());
assert_eq!(state.rank(), 0);
state.on_error(100);
state.reconcile(make_test_backoff);
assert!(state.is_enabled(), "backoff should be enabled after error > threshold");
assert_eq!(state.rank(), 100);
}
#[test]
fn on_success_clears_backoff_without_external_reconcile() {
let mut state = BackoffState::new();
state.on_error(100);
state.reconcile(make_test_backoff);
assert!(state.is_enabled(), "precondition: backoff is enabled");
state.on_success();
assert_eq!(state.rank(), 0, "rank must be reset on success");
assert!(
!state.is_enabled(),
"backoff must be cleared on success, not left for an external reconcile call \
that does not run during an active stream session"
);
}
#[test]
fn observe_dispatches_to_on_success_or_on_error() {
use crate::engine::testing::UTConfig;
use crate::errors::Unreachable;
let mut state = BackoffState::new();
let err: Result<(), RPCError<UTConfig>> = Err(RPCError::Unreachable(Unreachable::<UTConfig>::from_string("x")));
state.observe(&err);
assert_eq!(state.rank(), 100, "err arm: rank accumulated from Unreachable weight");
state.reconcile(make_test_backoff);
assert!(state.is_enabled(), "precondition: backoff is enabled");
let ok: Result<(), RPCError<UTConfig>> = Ok(());
state.observe(&ok);
assert_eq!(state.rank(), 0, "ok arm: rank reset");
assert!(!state.is_enabled(), "ok arm: inner cleared");
}
}