use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Mutex, Weak};
use tokio::sync::watch;
use super::error::StartupError;
use super::phase::StartupPhase;
use super::startup_sequencer::SequencerState;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub(super) struct GateId(pub(super) u64);
#[derive(Debug, Clone)]
pub struct StartupGate {
pub(super) rx: watch::Receiver<SequencerSnapshot>,
}
#[derive(Debug, Clone)]
pub struct SequencerSnapshot {
pub phase: StartupPhase,
pub failed: Option<Arc<StartupError>>,
}
impl StartupGate {
pub(super) fn new(rx: watch::Receiver<SequencerSnapshot>) -> Self {
Self { rx }
}
pub fn pre_fired() -> Arc<Self> {
let (tx, rx) = watch::channel(SequencerSnapshot {
phase: StartupPhase::GatewayEnable,
failed: None,
});
let gate = Arc::new(Self { rx });
drop(tx);
gate
}
pub async fn await_phase(&self, phase: StartupPhase) -> Result<(), StartupError> {
let mut rx = self.rx.clone();
loop {
let snap = rx.borrow_and_update().clone();
if let Some(err) = snap.failed {
return Err((*err).clone());
}
if snap.phase >= phase {
return Ok(());
}
if rx.changed().await.is_err() {
return Err(StartupError::AlreadyTerminated);
}
}
}
pub fn current_phase(&self) -> StartupPhase {
self.rx.borrow().phase
}
pub fn is_failed(&self) -> Option<Arc<StartupError>> {
self.rx.borrow().failed.clone()
}
}
pub struct ReadyGate {
pub(super) id: GateId,
pub(super) phase: StartupPhase,
pub(super) subsystem: String,
pub(super) sequencer: Weak<Mutex<SequencerState>>,
pub(super) fired: AtomicBool,
pub(super) phase_tx: Arc<watch::Sender<SequencerSnapshot>>,
}
impl std::fmt::Debug for ReadyGate {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReadyGate")
.field("id", &self.id)
.field("phase", &self.phase)
.field("subsystem", &self.subsystem)
.field("fired", &self.fired.load(Ordering::Relaxed))
.finish_non_exhaustive()
}
}
impl ReadyGate {
pub fn fire(&self) {
if self.fired.swap(true, Ordering::AcqRel) {
return;
}
let Some(state_arc) = self.sequencer.upgrade() else {
return;
};
let mut state = match state_arc.lock() {
Ok(g) => g,
Err(poisoned) => {
tracing::error!(
subsystem = %self.subsystem,
"StartupSequencer mutex poisoned when firing gate — proceeding with recovery"
);
poisoned.into_inner()
}
};
state.fire_gate(self.id, self.phase, &self.phase_tx);
}
pub fn fail(&self, reason: impl Into<String>) {
self.fired.store(true, Ordering::Release);
let err = StartupError::SubsystemFailed {
phase: self.phase,
subsystem: self.subsystem.clone(),
reason: reason.into(),
};
let Some(state_arc) = self.sequencer.upgrade() else {
return;
};
let mut state = match state_arc.lock() {
Ok(g) => g,
Err(poisoned) => {
tracing::error!(
subsystem = %self.subsystem,
"StartupSequencer mutex poisoned when failing gate"
);
poisoned.into_inner()
}
};
state.set_failed(err, &self.phase_tx);
}
}
impl Drop for ReadyGate {
fn drop(&mut self) {
if self.fired.load(Ordering::Acquire) {
return;
}
self.fired.store(true, Ordering::Release);
let err = StartupError::GateDroppedWithoutFire {
phase: self.phase,
subsystem: self.subsystem.clone(),
};
tracing::error!(
subsystem = %self.subsystem,
phase = ?self.phase,
"ReadyGate dropped without firing — startup sequencer transitioning to Failed"
);
let Some(state_arc) = self.sequencer.upgrade() else {
return;
};
let Ok(mut state) = state_arc.lock() else {
return;
};
state.set_failed(err, &self.phase_tx);
}
}