use futures::StreamExt;
use std::sync::Arc;
#[cfg(feature = "failpoints")]
use tsoracle_consensus::ConsensusError;
use tsoracle_consensus::LeaderState;
use crate::server::{Server, ServerError, ServingState};
pub(crate) async fn run_leader_watch(server: Arc<Server>) -> Result<(), ServerError> {
let mut stream = server.consensus.leadership_events();
while let Some(evt) = stream.next().await {
#[cfg(feature = "metrics")]
metrics::counter!("tsoracle.leader_transition.total").increment(1);
match evt {
LeaderState::Leader { epoch } => {
#[cfg(feature = "metrics")]
let fence_started_at = std::time::Instant::now();
let _ = server.state_tx.send(ServingState::NotServing {
leader_endpoint: None,
});
server.allocator.lock().on_leadership_lost();
let drain_guard = server.extension_gate.write().await;
let prior_max = server.consensus.load_high_water().await?;
crate::failpoint!(
"server::fence::after_load_before_persist",
|arg: Option<String>| -> Result<(), ServerError> {
let _ = arg;
Err(ServerError::Consensus(ConsensusError::TransientDriver(
Box::new(std::io::Error::other(
"failpoint: server::fence::after_load_before_persist",
)),
)))
}
);
let now = server.clock.now_ms();
let serving_floor = core::cmp::max(prior_max.saturating_add(1), now);
let requested =
serving_floor.saturating_add(server.failover_advance.as_millis() as u64);
let actual = server
.consensus
.persist_high_water(requested, epoch)
.await?;
crate::failpoint!("server::fence::after_persist_before_publish");
server
.allocator
.lock()
.try_on_leadership_gained(serving_floor, actual, epoch)?;
let _ = server.state_tx.send(ServingState::Serving);
drop(drain_guard);
#[cfg(feature = "metrics")]
metrics::histogram!("tsoracle.leader_transition.fence_latency")
.record(fence_started_at.elapsed().as_secs_f64());
crate::failpoint!("server::fence::after_serving_published");
}
LeaderState::Follower { leader_endpoint } => {
server.allocator.lock().on_leadership_lost();
let _ = server
.state_tx
.send(ServingState::NotServing { leader_endpoint });
}
LeaderState::Unknown => {
server.allocator.lock().on_leadership_lost();
let _ = server.state_tx.send(ServingState::NotServing {
leader_endpoint: None,
});
}
}
}
Ok(())
}