use futures::StreamExt;
use std::sync::Arc;
use std::time::Duration;
use tsoracle_consensus::{ConsensusError, LeaderState};
use crate::server::{Server, ServerError, ServingState};
const FENCE_RETRY_BASE: Duration = Duration::from_millis(25);
const FENCE_RETRY_CAP: Duration = Duration::from_millis(250);
#[cfg(feature = "tracing")]
const FENCE_TRANSIENT_RETRY_WARN_AFTER: u32 = 8;
#[cfg(feature = "tracing")]
const FENCE_TRANSIENT_RETRY_WARN_INTERVAL: u32 = 20;
#[cfg(feature = "tracing")]
fn warn_on_stuck_fence(transient_retries: u32) -> bool {
transient_retries >= FENCE_TRANSIENT_RETRY_WARN_AFTER
&& (transient_retries - FENCE_TRANSIENT_RETRY_WARN_AFTER)
% FENCE_TRANSIENT_RETRY_WARN_INTERVAL
== 0
}
pub(crate) async fn run_leader_watch(server: Arc<Server>) -> Result<(), ServerError> {
let mut stream = server.consensus.leadership_events();
let mut pending: Option<LeaderState> = None;
loop {
let evt = match pending.take() {
Some(evt) => evt,
None => match stream.next().await {
Some(evt) => evt,
None => break,
},
};
#[cfg(feature = "metrics")]
metrics::counter!("tsoracle.leader_transition.total").increment(1);
match evt {
LeaderState::Leader { epoch } => {
#[cfg(feature = "metrics")]
let fence_started_at = std::time::Instant::now();
let _ = server.state_tx.send(ServingState::NotServing {
leader_endpoint: None,
leader_epoch: None,
});
server.allocator.lock().on_leadership_lost();
let mut transient_retries: u32 = 0;
loop {
let attempt: Result<(), ServerError> = async {
let drain_guard = server.extension_gate.write().await;
let prior_max = server.consensus.load_high_water().await?;
crate::failpoint!(
"server::fence::after_load_before_persist",
|arg: Option<String>| -> Result<(), ServerError> {
let _ = arg;
Err(ServerError::Consensus(ConsensusError::TransientDriver(
Box::new(std::io::Error::other(
"failpoint: server::fence::after_load_before_persist",
)),
)))
}
);
tsoracle_yieldpoint::yieldpoint!(
"server::fence::after_load_before_persist"
);
let now = server.clock.now_ms();
let serving_floor = core::cmp::max(prior_max.saturating_add(1), now);
let requested = serving_floor
.saturating_add(server.failover_advance.as_millis() as u64);
let actual = server
.consensus
.persist_high_water(requested, epoch)
.await?;
crate::failpoint!("server::fence::after_persist_before_publish");
server.allocator.lock().try_on_leadership_gained(
serving_floor,
actual,
epoch,
)?;
let _ = server.state_tx.send(ServingState::Serving);
drop(drain_guard);
crate::failpoint!("server::fence::after_serving_published");
Ok(())
}
.await;
match attempt {
Ok(()) => {
#[cfg(feature = "metrics")]
metrics::histogram!("tsoracle.leader_transition.fence_latency")
.record(fence_started_at.elapsed().as_secs_f64());
break;
}
Err(ServerError::Consensus(
ConsensusError::NotLeader { .. } | ConsensusError::Fenced { .. },
)) => {
let _ = server.state_tx.send(ServingState::NotServing {
leader_endpoint: None,
leader_epoch: None,
});
break;
}
Err(ServerError::Consensus(ConsensusError::TransientDriver(_source))) => {
transient_retries += 1;
#[cfg(feature = "metrics")]
metrics::counter!(
"tsoracle.leader_transition.fence_transient_retries.total"
)
.increment(1);
#[cfg(feature = "tracing")]
if warn_on_stuck_fence(transient_retries) {
tracing::warn!(
error = %_source,
retries = transient_retries,
"fence still retrying a transient consensus error; serving is paused while this node remains leader"
);
}
let backoff = core::cmp::min(
FENCE_RETRY_BASE
.saturating_mul(1u32 << (transient_retries - 1).min(16)),
FENCE_RETRY_CAP,
);
tokio::select! {
_ = tokio::time::sleep(backoff) => {}
next = stream.next() => {
match next {
Some(evt) => {
pending = Some(evt);
break;
}
None => return Err(ServerError::WatchStreamClosed),
}
}
}
}
Err(e) => return Err(e),
}
}
}
LeaderState::Follower {
leader_endpoint,
leader_epoch,
} => {
server.allocator.lock().on_leadership_lost();
let _ = server.state_tx.send(ServingState::NotServing {
leader_endpoint,
leader_epoch,
});
}
LeaderState::Unknown => {
server.allocator.lock().on_leadership_lost();
let _ = server.state_tx.send(ServingState::NotServing {
leader_endpoint: None,
leader_epoch: None,
});
}
}
}
Err(ServerError::WatchStreamClosed)
}