use std::time::Duration;
use tracing::{info, warn};
use super::election::{Election, ElectionError, FlagshipSignal};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FlagshipRole {
Flagship,
Escort,
Disabled,
}
impl std::fmt::Display for FlagshipRole {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FlagshipRole::Flagship => write!(f, "flagship"),
FlagshipRole::Escort => write!(f, "escort"),
FlagshipRole::Disabled => write!(f, "disabled"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FlagshipState {
Starting,
Electing,
RunningPrelaunch,
WaitingForSignal,
Ready,
Failed,
}
impl std::fmt::Display for FlagshipState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FlagshipState::Starting => write!(f, "starting"),
FlagshipState::Electing => write!(f, "electing"),
FlagshipState::RunningPrelaunch => write!(f, "running_prelaunch"),
FlagshipState::WaitingForSignal => write!(f, "waiting_for_signal"),
FlagshipState::Ready => write!(f, "ready"),
FlagshipState::Failed => write!(f, "failed"),
}
}
}
pub struct Coordinator<E: Election> {
election: E,
app_name: String,
prelaunch_timeout: Duration,
role: FlagshipRole,
state: FlagshipState,
}
impl<E: Election> Coordinator<E> {
pub fn new(election: E, app_name: String, prelaunch_timeout: Duration) -> Self {
Self {
election,
app_name,
prelaunch_timeout,
role: FlagshipRole::Disabled,
state: FlagshipState::Starting,
}
}
pub fn role(&self) -> FlagshipRole {
self.role
}
pub fn state(&self) -> FlagshipState {
self.state
}
pub async fn elect(&mut self) -> Result<FlagshipRole, ElectionError> {
self.state = FlagshipState::Electing;
info!(app = %self.app_name, "attempting flagship election");
let acquired = self.election.try_acquire(&self.app_name).await?;
if acquired {
self.role = FlagshipRole::Flagship;
info!(app = %self.app_name, "elected as flagship");
} else {
self.role = FlagshipRole::Escort;
info!(app = %self.app_name, "running as escort");
}
Ok(self.role)
}
pub async fn signal_running(&mut self) -> Result<(), ElectionError> {
if self.role != FlagshipRole::Flagship {
return Ok(());
}
self.state = FlagshipState::RunningPrelaunch;
self.election
.signal(&self.app_name, FlagshipSignal::Running)
.await
}
pub async fn signal_ready(&mut self) -> Result<(), ElectionError> {
if self.role != FlagshipRole::Flagship {
return Ok(());
}
self.state = FlagshipState::Ready;
info!(app = %self.app_name, "signaling ready to escorts");
self.election
.signal(&self.app_name, FlagshipSignal::Ready)
.await
}
pub async fn signal_abort(&mut self) -> Result<(), ElectionError> {
if self.role != FlagshipRole::Flagship {
return Ok(());
}
self.state = FlagshipState::Failed;
warn!(app = %self.app_name, "signaling abort to escorts");
self.election
.signal(&self.app_name, FlagshipSignal::Abort)
.await
}
pub async fn wait_for_ready(&mut self) -> Result<(), ElectionError> {
if self.role != FlagshipRole::Escort {
return Ok(());
}
self.state = FlagshipState::WaitingForSignal;
info!(
app = %self.app_name,
timeout_secs = self.prelaunch_timeout.as_secs(),
"waiting for flagship signal"
);
if let Some(signal) = self.election.get_signal(&self.app_name).await? {
match signal {
FlagshipSignal::Ready => {
info!(app = %self.app_name, "flagship already signaled ready");
self.state = FlagshipState::Ready;
return Ok(());
}
FlagshipSignal::Abort => {
warn!(app = %self.app_name, "flagship already signaled abort");
self.state = FlagshipState::Failed;
return Err(ElectionError::Aborted);
}
FlagshipSignal::Running => {
}
}
}
let signal = self
.election
.wait_for_signal(&self.app_name, self.prelaunch_timeout)
.await?;
match signal {
FlagshipSignal::Ready => {
info!(app = %self.app_name, "received ready signal from flagship");
self.state = FlagshipState::Ready;
Ok(())
}
FlagshipSignal::Abort => {
warn!(app = %self.app_name, "received abort signal from flagship");
self.state = FlagshipState::Failed;
Err(ElectionError::Aborted)
}
FlagshipSignal::Running => {
warn!(app = %self.app_name, "unexpected running signal");
self.state = FlagshipState::Failed;
Err(ElectionError::Timeout)
}
}
}
pub async fn release(&mut self) -> Result<(), ElectionError> {
if self.role == FlagshipRole::Flagship {
self.election.release(&self.app_name).await?;
}
Ok(())
}
pub fn should_run_prelaunch(&self) -> bool {
self.role == FlagshipRole::Flagship
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::flagship::StaticElection;
#[tokio::test]
async fn test_coordinator_flagship() {
let election = StaticElection::new(true);
let mut coordinator =
Coordinator::new(election, "test-app".to_string(), Duration::from_secs(30));
let role = coordinator.elect().await.unwrap();
assert_eq!(role, FlagshipRole::Flagship);
assert!(coordinator.should_run_prelaunch());
}
#[tokio::test]
async fn test_coordinator_escort() {
let election = StaticElection::new(false);
let mut coordinator =
Coordinator::new(election, "test-app".to_string(), Duration::from_secs(30));
let role = coordinator.elect().await.unwrap();
assert_eq!(role, FlagshipRole::Escort);
assert!(!coordinator.should_run_prelaunch());
}
#[tokio::test]
async fn test_coordinator_signal_ready() {
let election = StaticElection::new(true);
let mut coordinator =
Coordinator::new(election, "test-app".to_string(), Duration::from_secs(30));
coordinator.elect().await.unwrap();
coordinator.signal_running().await.unwrap();
coordinator.signal_ready().await.unwrap();
assert_eq!(coordinator.state(), FlagshipState::Ready);
}
}