mothership 0.0.100

Process supervisor with HTTP exposure - wrap, monitor, and expose your fleet
Documentation
//! Flagship coordinator - orchestrates election and signaling
//!
//! The coordinator manages the state machine for flagship election:
//!
//! ```text
//!                     ┌──────────────────┐
//!                     │    STARTING      │
//!                     └────────┬─────────┘
//!//!                     ┌────────▼─────────┐
//!                     │  VERIFY_UPLINKS  │
//!                     └────────┬─────────┘
//!//!               ┌──────────────┴──────────────┐
//!               │                             │
//!      ┌────────▼─────────┐         ┌─────────▼────────┐
//!      │  FLAGSHIP_ELECT  │         │   ESCORT_WAIT    │
//!      └────────┬─────────┘         └─────────┬────────┘
//!               │                             │
//!      ┌────────▼─────────┐                   │
//!      │  RUN_PRELAUNCH   │                   │
//!      └────────┬─────────┘                   │
//!               │                             │
//!      ┌────────▼─────────┐         ┌─────────▼────────┐
//!      │ SIGNAL_READY     │────────►│ RECEIVED_READY   │
//!      └────────┬─────────┘         └─────────┬────────┘
//!               │                             │
//!               └──────────────┬──────────────┘
//!//!                     ┌────────▼─────────┐
//!                     │   LAUNCH_FLEET   │
//!                     └────────┬─────────┘
//!//!                     ┌────────▼─────────┐
//!                     │     RUNNING      │
//!                     └──────────────────┘
//! ```

use std::time::Duration;
use tracing::{info, warn};

use super::election::{Election, ElectionError, FlagshipSignal};

/// Role of this Mothership instance in the fleet
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FlagshipRole {
    /// This instance is the flagship - runs prelaunch jobs
    Flagship,
    /// This instance is an escort - waits for flagship signal
    Escort,
    /// Flagship coordination is disabled
    Disabled,
}

impl std::fmt::Display for FlagshipRole {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            FlagshipRole::Flagship => write!(f, "flagship"),
            FlagshipRole::Escort => write!(f, "escort"),
            FlagshipRole::Disabled => write!(f, "disabled"),
        }
    }
}

/// Current state of the flagship coordination
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FlagshipState {
    /// Not yet started
    Starting,
    /// Attempting to acquire flagship lock
    Electing,
    /// Running prelaunch jobs (flagship only)
    RunningPrelaunch,
    /// Waiting for flagship signal (escort only)
    WaitingForSignal,
    /// Ready to launch fleet
    Ready,
    /// Failed - deployment should abort
    Failed,
}

impl std::fmt::Display for FlagshipState {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            FlagshipState::Starting => write!(f, "starting"),
            FlagshipState::Electing => write!(f, "electing"),
            FlagshipState::RunningPrelaunch => write!(f, "running_prelaunch"),
            FlagshipState::WaitingForSignal => write!(f, "waiting_for_signal"),
            FlagshipState::Ready => write!(f, "ready"),
            FlagshipState::Failed => write!(f, "failed"),
        }
    }
}

/// Flagship coordinator - manages election and signaling
pub struct Coordinator<E: Election> {
    election: E,
    app_name: String,
    prelaunch_timeout: Duration,
    role: FlagshipRole,
    state: FlagshipState,
}

impl<E: Election> Coordinator<E> {
    /// Create a new coordinator with the given election backend
    pub fn new(election: E, app_name: String, prelaunch_timeout: Duration) -> Self {
        Self {
            election,
            app_name,
            prelaunch_timeout,
            role: FlagshipRole::Disabled,
            state: FlagshipState::Starting,
        }
    }

    /// Get the current role
    pub fn role(&self) -> FlagshipRole {
        self.role
    }

    /// Get the current state
    pub fn state(&self) -> FlagshipState {
        self.state
    }

    /// Attempt to become the flagship
    ///
    /// Returns the role assigned to this instance.
    pub async fn elect(&mut self) -> Result<FlagshipRole, ElectionError> {
        self.state = FlagshipState::Electing;
        info!(app = %self.app_name, "attempting flagship election");

        let acquired = self.election.try_acquire(&self.app_name).await?;

        if acquired {
            self.role = FlagshipRole::Flagship;
            info!(app = %self.app_name, "elected as flagship");
        } else {
            self.role = FlagshipRole::Escort;
            info!(app = %self.app_name, "running as escort");
        }

        Ok(self.role)
    }

    /// Signal that prelaunch is starting (flagship only)
    pub async fn signal_running(&mut self) -> Result<(), ElectionError> {
        if self.role != FlagshipRole::Flagship {
            return Ok(());
        }
        self.state = FlagshipState::RunningPrelaunch;
        self.election
            .signal(&self.app_name, FlagshipSignal::Running)
            .await
    }

    /// Signal that prelaunch completed successfully (flagship only)
    pub async fn signal_ready(&mut self) -> Result<(), ElectionError> {
        if self.role != FlagshipRole::Flagship {
            return Ok(());
        }
        self.state = FlagshipState::Ready;
        info!(app = %self.app_name, "signaling ready to escorts");
        self.election
            .signal(&self.app_name, FlagshipSignal::Ready)
            .await
    }

    /// Signal that prelaunch failed (flagship only)
    pub async fn signal_abort(&mut self) -> Result<(), ElectionError> {
        if self.role != FlagshipRole::Flagship {
            return Ok(());
        }
        self.state = FlagshipState::Failed;
        warn!(app = %self.app_name, "signaling abort to escorts");
        self.election
            .signal(&self.app_name, FlagshipSignal::Abort)
            .await
    }

    /// Wait for flagship to signal ready (escort only)
    pub async fn wait_for_ready(&mut self) -> Result<(), ElectionError> {
        if self.role != FlagshipRole::Escort {
            return Ok(());
        }
        self.state = FlagshipState::WaitingForSignal;
        info!(
            app = %self.app_name,
            timeout_secs = self.prelaunch_timeout.as_secs(),
            "waiting for flagship signal"
        );

        // Check for existing signal (late joiner)
        if let Some(signal) = self.election.get_signal(&self.app_name).await? {
            match signal {
                FlagshipSignal::Ready => {
                    info!(app = %self.app_name, "flagship already signaled ready");
                    self.state = FlagshipState::Ready;
                    return Ok(());
                }
                FlagshipSignal::Abort => {
                    warn!(app = %self.app_name, "flagship already signaled abort");
                    self.state = FlagshipState::Failed;
                    return Err(ElectionError::Aborted);
                }
                FlagshipSignal::Running => {
                    // Continue to wait
                }
            }
        }

        // Wait for signal
        let signal = self
            .election
            .wait_for_signal(&self.app_name, self.prelaunch_timeout)
            .await?;

        match signal {
            FlagshipSignal::Ready => {
                info!(app = %self.app_name, "received ready signal from flagship");
                self.state = FlagshipState::Ready;
                Ok(())
            }
            FlagshipSignal::Abort => {
                warn!(app = %self.app_name, "received abort signal from flagship");
                self.state = FlagshipState::Failed;
                Err(ElectionError::Aborted)
            }
            FlagshipSignal::Running => {
                // This shouldn't happen - wait_for_signal should wait until Ready/Abort
                warn!(app = %self.app_name, "unexpected running signal");
                self.state = FlagshipState::Failed;
                Err(ElectionError::Timeout)
            }
        }
    }

    /// Release the flagship lock (if held)
    pub async fn release(&mut self) -> Result<(), ElectionError> {
        if self.role == FlagshipRole::Flagship {
            self.election.release(&self.app_name).await?;
        }
        Ok(())
    }

    /// Check if this instance should run prelaunch jobs
    pub fn should_run_prelaunch(&self) -> bool {
        self.role == FlagshipRole::Flagship
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::flagship::StaticElection;

    #[tokio::test]
    async fn test_coordinator_flagship() {
        let election = StaticElection::new(true);
        let mut coordinator =
            Coordinator::new(election, "test-app".to_string(), Duration::from_secs(30));

        let role = coordinator.elect().await.unwrap();
        assert_eq!(role, FlagshipRole::Flagship);
        assert!(coordinator.should_run_prelaunch());
    }

    #[tokio::test]
    async fn test_coordinator_escort() {
        let election = StaticElection::new(false);
        let mut coordinator =
            Coordinator::new(election, "test-app".to_string(), Duration::from_secs(30));

        let role = coordinator.elect().await.unwrap();
        assert_eq!(role, FlagshipRole::Escort);
        assert!(!coordinator.should_run_prelaunch());
    }

    #[tokio::test]
    async fn test_coordinator_signal_ready() {
        let election = StaticElection::new(true);
        let mut coordinator =
            Coordinator::new(election, "test-app".to_string(), Duration::from_secs(30));

        coordinator.elect().await.unwrap();
        coordinator.signal_running().await.unwrap();
        coordinator.signal_ready().await.unwrap();

        assert_eq!(coordinator.state(), FlagshipState::Ready);
    }
}