use futures::StreamExt;
use std::sync::Arc;
use tsoracle_consensus::LeaderState;
use crate::server::{Server, ServerError, ServingState};
pub(crate) async fn run_leader_watch(server: Arc<Server>) -> Result<(), ServerError> {
let mut stream = server.consensus.leadership_events();
while let Some(evt) = stream.next().await {
match evt {
LeaderState::Leader { epoch } => {
let _ = server.state_tx.send(ServingState::NotServing {
leader_endpoint: None,
});
server.allocator.lock().on_leadership_lost();
let drain_guard = server.extension_gate.write().await;
let prior_max = server.consensus.load_high_water().await?;
let now = server.clock.now_ms();
let serving_floor = core::cmp::max(prior_max.saturating_add(1), now);
let requested =
serving_floor.saturating_add(server.failover_advance.as_millis() as u64);
let actual = server
.consensus
.persist_high_water(requested, epoch)
.await?;
server
.allocator
.lock()
.try_on_leadership_gained(serving_floor, actual, epoch)?;
let _ = server.state_tx.send(ServingState::Serving);
drop(drain_guard);
}
LeaderState::Follower { leader_endpoint } => {
server.allocator.lock().on_leadership_lost();
let _ = server
.state_tx
.send(ServingState::NotServing { leader_endpoint });
}
LeaderState::Unknown => {
server.allocator.lock().on_leadership_lost();
let _ = server.state_tx.send(ServingState::NotServing {
leader_endpoint: None,
});
}
}
}
Ok(())
}