use futures::StreamExt;
use std::sync::Arc;
use std::time::Duration;
use tsoracle_consensus::{ConsensusError, LeaderState};
use crate::server::{Server, ServerError, ServingState};
const FENCE_MAX_TRANSIENT_RETRIES: u32 = 8;
const FENCE_RETRY_BASE: Duration = Duration::from_millis(25);
const FENCE_RETRY_CAP: Duration = Duration::from_millis(250);
pub(crate) async fn run_leader_watch(server: Arc<Server>) -> Result<(), ServerError> {
let mut stream = server.consensus.leadership_events();
while let Some(evt) = stream.next().await {
#[cfg(feature = "metrics")]
metrics::counter!("tsoracle.leader_transition.total").increment(1);
match evt {
LeaderState::Leader { epoch } => {
#[cfg(feature = "metrics")]
let fence_started_at = std::time::Instant::now();
let _ = server.state_tx.send(ServingState::NotServing {
leader_endpoint: None,
});
server.allocator.lock().on_leadership_lost();
let mut transient_retries: u32 = 0;
loop {
let attempt: Result<(), ServerError> = async {
let drain_guard = server.extension_gate.write().await;
let prior_max = server.consensus.load_high_water().await?;
crate::failpoint!(
"server::fence::after_load_before_persist",
|arg: Option<String>| -> Result<(), ServerError> {
let _ = arg;
Err(ServerError::Consensus(ConsensusError::TransientDriver(
Box::new(std::io::Error::other(
"failpoint: server::fence::after_load_before_persist",
)),
)))
}
);
tsoracle_yieldpoint::yieldpoint!(
"server::fence::after_load_before_persist"
);
let now = server.clock.now_ms();
let serving_floor = core::cmp::max(prior_max.saturating_add(1), now);
let requested = serving_floor
.saturating_add(server.failover_advance.as_millis() as u64);
let actual = server
.consensus
.persist_high_water(requested, epoch)
.await?;
crate::failpoint!("server::fence::after_persist_before_publish");
server.allocator.lock().try_on_leadership_gained(
serving_floor,
actual,
epoch,
)?;
let _ = server.state_tx.send(ServingState::Serving);
drop(drain_guard);
crate::failpoint!("server::fence::after_serving_published");
Ok(())
}
.await;
match attempt {
Ok(()) => {
#[cfg(feature = "metrics")]
metrics::histogram!("tsoracle.leader_transition.fence_latency")
.record(fence_started_at.elapsed().as_secs_f64());
break;
}
Err(ServerError::Consensus(
ConsensusError::NotLeader { .. } | ConsensusError::Fenced { .. },
)) => {
let _ = server.state_tx.send(ServingState::NotServing {
leader_endpoint: None,
});
break;
}
Err(ServerError::Consensus(ConsensusError::TransientDriver(_source))) => {
transient_retries += 1;
if transient_retries > FENCE_MAX_TRANSIENT_RETRIES {
#[cfg(feature = "tracing")]
tracing::warn!(
error = %_source,
retries = transient_retries,
"fence exhausted transient retries; awaiting next leadership event"
);
let _ = server.state_tx.send(ServingState::NotServing {
leader_endpoint: None,
});
break;
}
let backoff = core::cmp::min(
FENCE_RETRY_BASE
.saturating_mul(1u32 << (transient_retries - 1).min(16)),
FENCE_RETRY_CAP,
);
tokio::time::sleep(backoff).await;
}
Err(e) => return Err(e),
}
}
}
LeaderState::Follower { leader_endpoint } => {
server.allocator.lock().on_leadership_lost();
let _ = server
.state_tx
.send(ServingState::NotServing { leader_endpoint });
}
LeaderState::Unknown => {
server.allocator.lock().on_leadership_lost();
let _ = server.state_tx.send(ServingState::NotServing {
leader_endpoint: None,
});
}
}
}
Err(ServerError::WatchStreamClosed)
}