use std::sync::{Arc, RwLock};
use serde::{Deserialize, Serialize};
use tracing::info;
use crate::readiness;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "phase", rename_all = "snake_case")]
pub enum ClusterLifecycleState {
Starting,
Restarting,
Bootstrapping,
Joining {
attempt: u32,
},
Ready {
nodes: usize,
},
Failed {
reason: String,
},
}
impl ClusterLifecycleState {
pub fn label(&self) -> &'static str {
match self {
Self::Starting => "starting",
Self::Restarting => "restarting",
Self::Bootstrapping => "bootstrapping",
Self::Joining { .. } => "joining",
Self::Ready { .. } => "ready",
Self::Failed { .. } => "failed",
}
}
pub fn is_ready(&self) -> bool {
matches!(self, Self::Ready { .. })
}
pub fn all_labels() -> &'static [&'static str] {
&[
"starting",
"restarting",
"bootstrapping",
"joining",
"ready",
"failed",
]
}
}
#[derive(Debug, Clone)]
pub struct ClusterLifecycleTracker {
inner: Arc<RwLock<ClusterLifecycleState>>,
}
impl ClusterLifecycleTracker {
pub fn new() -> Self {
Self {
inner: Arc::new(RwLock::new(ClusterLifecycleState::Starting)),
}
}
pub fn current(&self) -> ClusterLifecycleState {
self.inner.read().unwrap_or_else(|p| p.into_inner()).clone()
}
pub fn is_ready(&self) -> bool {
self.current().is_ready()
}
pub fn to_restarting(&self) {
self.transition(ClusterLifecycleState::Restarting, "restart");
}
pub fn to_bootstrapping(&self) {
self.transition(
ClusterLifecycleState::Bootstrapping,
"bootstrapping new cluster",
);
}
pub fn to_joining(&self, attempt: u32) {
let detail = format!("joining cluster (attempt {attempt})");
self.transition(ClusterLifecycleState::Joining { attempt }, &detail);
}
pub fn to_ready(&self, nodes: usize) {
let detail = format!("ready ({nodes} nodes)");
self.transition(ClusterLifecycleState::Ready { nodes }, &detail);
}
pub fn to_failed(&self, reason: impl Into<String>) {
let reason = reason.into();
let detail = format!("failed: {reason}");
self.transition(ClusterLifecycleState::Failed { reason }, &detail);
}
fn transition(&self, new: ClusterLifecycleState, human: &str) {
let prev = {
let mut guard = self.inner.write().unwrap_or_else(|p| p.into_inner());
std::mem::replace(&mut *guard, new.clone())
};
info!(
prev = prev.label(),
new = new.label(),
detail = human,
"cluster lifecycle transition"
);
readiness::notify_status(human);
}
}
impl Default for ClusterLifecycleTracker {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn initial_state_is_starting() {
let t = ClusterLifecycleTracker::new();
assert_eq!(t.current(), ClusterLifecycleState::Starting);
assert!(!t.is_ready());
}
#[test]
fn transition_sequence_logs_and_updates() {
let t = ClusterLifecycleTracker::new();
t.to_joining(0);
assert_eq!(t.current(), ClusterLifecycleState::Joining { attempt: 0 });
t.to_joining(1);
assert_eq!(t.current(), ClusterLifecycleState::Joining { attempt: 1 });
t.to_ready(3);
assert_eq!(t.current(), ClusterLifecycleState::Ready { nodes: 3 });
assert!(t.is_ready());
}
#[test]
fn bootstrapping_then_ready() {
let t = ClusterLifecycleTracker::new();
t.to_bootstrapping();
assert_eq!(t.current(), ClusterLifecycleState::Bootstrapping);
t.to_ready(1);
assert!(t.is_ready());
}
#[test]
fn restarting_path() {
let t = ClusterLifecycleTracker::new();
t.to_restarting();
assert_eq!(t.current(), ClusterLifecycleState::Restarting);
t.to_ready(3);
assert!(t.is_ready());
}
#[test]
fn failed_is_not_terminal_by_contract() {
let t = ClusterLifecycleTracker::new();
t.to_joining(5);
t.to_failed("timeout");
assert!(matches!(t.current(), ClusterLifecycleState::Failed { .. }));
t.to_ready(3);
assert_eq!(t.current(), ClusterLifecycleState::Ready { nodes: 3 });
}
#[test]
fn labels_are_stable() {
assert_eq!(ClusterLifecycleState::Starting.label(), "starting");
assert_eq!(ClusterLifecycleState::Restarting.label(), "restarting");
assert_eq!(
ClusterLifecycleState::Bootstrapping.label(),
"bootstrapping"
);
assert_eq!(
ClusterLifecycleState::Joining { attempt: 0 }.label(),
"joining"
);
assert_eq!(ClusterLifecycleState::Ready { nodes: 3 }.label(), "ready");
assert_eq!(
ClusterLifecycleState::Failed { reason: "x".into() }.label(),
"failed"
);
}
#[test]
fn all_labels_matches_variants() {
for variant in [
ClusterLifecycleState::Starting,
ClusterLifecycleState::Restarting,
ClusterLifecycleState::Bootstrapping,
ClusterLifecycleState::Joining { attempt: 0 },
ClusterLifecycleState::Ready { nodes: 0 },
ClusterLifecycleState::Failed { reason: "x".into() },
] {
assert!(
ClusterLifecycleState::all_labels().contains(&variant.label()),
"label {} missing from all_labels()",
variant.label()
);
}
}
#[test]
fn tracker_is_cheap_to_clone() {
let a = ClusterLifecycleTracker::new();
let b = a.clone();
a.to_bootstrapping();
assert_eq!(b.current(), ClusterLifecycleState::Bootstrapping);
}
}