use futures::StreamExt;
use std::sync::Arc;
use std::time::Duration;
use tsoracle_consensus::{ConsensusError, LeaderState};
use tsoracle_core::Epoch;
use crate::persist_disposition::{PersistDisposition, classify};
use crate::server::{Server, ServerError};
const FENCE_RETRY_BASE: Duration = Duration::from_millis(25);
const FENCE_RETRY_CAP: Duration = Duration::from_millis(250);
#[cfg(feature = "tracing")]
const FENCE_TRANSIENT_RETRY_WARN_AFTER: u32 = 8;
#[cfg(feature = "tracing")]
const FENCE_TRANSIENT_RETRY_WARN_INTERVAL: u32 = 20;
#[cfg(feature = "tracing")]
fn warn_on_stuck_fence(transient_retries: u32) -> bool {
transient_retries >= FENCE_TRANSIENT_RETRY_WARN_AFTER
&& (transient_retries - FENCE_TRANSIENT_RETRY_WARN_AFTER)
% FENCE_TRANSIENT_RETRY_WARN_INTERVAL
== 0
}
fn debug_assert_leader_state_contract(evt: &LeaderState, last_epoch: &mut Option<Epoch>) {
let epoch = match evt {
LeaderState::Leader { epoch } => Some(*epoch),
LeaderState::Follower { leader_epoch, .. } => *leader_epoch,
LeaderState::Unknown => None,
};
if let Some(epoch) = epoch {
if let Some(prev) = *last_epoch {
debug_assert!(
epoch >= prev,
"consensus driver surfaced a leader_epoch that regressed \
({epoch:?} < {prev:?}); LeaderState epochs must be non-decreasing — \
the client's monotone-forward leader cache drops lower-epoch redirects",
);
}
*last_epoch = Some(epoch);
}
}
pub(crate) async fn run_leader_watch(
server: Arc<Server>,
cancel: impl std::future::Future<Output = ()>,
) -> Result<(), ServerError> {
let mut stream = server.consensus.leadership_events();
let mut pending: Option<LeaderState> = None;
let mut last_epoch: Option<Epoch> = None;
tokio::pin!(cancel);
loop {
let evt = match pending.take() {
Some(evt) => evt,
None => {
tokio::select! {
biased;
_ = &mut cancel => {
server.core.step_down(None, None);
return Ok(());
}
next = stream.next() => match next {
Some(evt) => evt,
None => break,
},
}
}
};
debug_assert_leader_state_contract(&evt, &mut last_epoch);
match evt {
LeaderState::Leader { epoch } => {
let fence_started_at = std::time::Instant::now();
server.core.enter_fencing();
server.reporter.leader_transitions.increment(1);
server.reporter.last_leader_transition.touch_now();
let mut transient_retries: u32 = 0;
loop {
let attempt: Result<(), ServerError> = async {
let drain_guard = server.core.drain_barrier_write().await;
let prior_max = server.consensus.load_high_water().await?;
tsoracle_failpoint::failpoint!(
"server::fence::after_load_before_persist",
|arg: Option<String>| -> Result<(), ServerError> {
let _ = arg;
Err(ServerError::Consensus(ConsensusError::TransientDriver(
Box::new(std::io::Error::other(
"failpoint: server::fence::after_load_before_persist",
)),
)))
}
);
tsoracle_yieldpoint::yieldpoint!(
"server::fence::after_load_before_persist"
);
let now = server.clock.now_ms();
let serving_floor = core::cmp::max(prior_max.saturating_add(1), now);
let requested = serving_floor
.saturating_add(server.failover_advance.as_millis() as u64);
let actual = server
.consensus
.persist_high_water(requested, epoch)
.await?;
tsoracle_failpoint::failpoint!(
"server::fence::after_persist_before_publish"
);
server
.core
.seed_on_leadership_gained(serving_floor, actual, epoch)?;
server.core.publish_serving();
drop(drain_guard);
tsoracle_failpoint::failpoint!("server::fence::after_serving_published");
Ok(())
}
.await;
match attempt {
Ok(()) => {
server
.reporter
.fence_latency
.record(fence_started_at.elapsed().as_secs_f64());
break;
}
Err(ServerError::Consensus(consensus_error)) => {
match classify(consensus_error) {
PersistDisposition::SteppedDown { .. } => {
server.core.publish_not_serving(None, None);
break;
}
PersistDisposition::Transient(_source) => {
transient_retries += 1;
server.reporter.fence_transient_retries.increment(1);
#[cfg(feature = "tracing")]
if warn_on_stuck_fence(transient_retries) {
tracing::warn!(
error = %_source,
retries = transient_retries,
"fence still retrying a transient consensus error; serving is paused while this node remains leader"
);
}
let backoff = core::cmp::min(
FENCE_RETRY_BASE.saturating_mul(
1u32 << (transient_retries - 1).min(16),
),
FENCE_RETRY_CAP,
);
tokio::select! {
biased;
_ = &mut cancel => {
server.core.step_down(None, None);
return Ok(());
}
_ = tokio::time::sleep(backoff) => {}
next = stream.next() => {
match next {
Some(evt) => {
pending = Some(evt);
break;
}
None => return Err(ServerError::WatchStreamClosed),
}
}
}
}
PersistDisposition::Permanent(source) => {
return Err(ServerError::Consensus(
ConsensusError::PermanentDriver(source),
));
}
}
}
Err(e) => return Err(e),
}
}
}
LeaderState::Follower {
leader_endpoint,
leader_epoch,
} => {
server.core.step_down(leader_endpoint, leader_epoch);
server.reporter.leader_transitions.increment(1);
server.reporter.last_leader_transition.touch_now();
}
LeaderState::Unknown => {
server.core.step_down(None, None);
server.reporter.leader_transitions.increment(1);
server.reporter.last_leader_transition.touch_now();
}
}
}
Err(ServerError::WatchStreamClosed)
}
#[cfg(test)]
mod tests {
use super::*;
use tsoracle_core::{Epoch, PeerEndpoint};
#[test]
fn monotone_sequence_with_bare_endpoints_passes_guard() {
let mut last_epoch = None;
for evt in [
LeaderState::Unknown,
LeaderState::Leader { epoch: Epoch(5) },
LeaderState::Follower {
leader_endpoint: Some(PeerEndpoint::try_from("leader:9000").unwrap()),
leader_epoch: Some(Epoch(5)),
},
LeaderState::Follower {
leader_endpoint: None,
leader_epoch: None,
},
LeaderState::Leader { epoch: Epoch(6) },
] {
debug_assert_leader_state_contract(&evt, &mut last_epoch);
}
assert_eq!(last_epoch, Some(Epoch(6)));
}
#[cfg(debug_assertions)]
#[test]
#[should_panic(expected = "leader_epoch that regressed")]
fn regressing_epoch_trips_guard() {
let mut last_epoch = None;
debug_assert_leader_state_contract(
&LeaderState::Leader { epoch: Epoch(7) },
&mut last_epoch,
);
debug_assert_leader_state_contract(
&LeaderState::Follower {
leader_endpoint: Some(PeerEndpoint::try_from("leader:9000").unwrap()),
leader_epoch: Some(Epoch(6)),
},
&mut last_epoch,
);
}
}