use crate::{
config::Config,
encryptionconfig::EncryptionConfig,
kubeconfig::KubeConfig,
kubectl::Kubectl,
network::Network,
pki::Pki,
process::{ProcessState, Started, Stoppables},
};
use log::{debug, info};
use rayon::prelude::*;
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum Phase {
Infrastructure,
ControlPlane,
Controller,
NodeAgent,
}
pub struct ClusterContext<'a> {
pub config: &'a Config,
pub network: &'a Network,
pub pki: &'a Pki,
pub kubeconfig: &'a KubeConfig,
pub encryptionconfig: &'a EncryptionConfig,
pub kubectl: &'a Kubectl,
}
pub trait Component: Send + Sync {
fn name(&self) -> &str;
fn phase(&self) -> Phase;
fn start(&self, ctx: &ClusterContext<'_>) -> ProcessState;
}
#[must_use]
pub struct ComponentRegistry {
components: Vec<Box<dyn Component>>,
}
impl ComponentRegistry {
pub fn new() -> Self {
Self {
components: Vec::new(),
}
}
pub fn register(&mut self, component: Box<dyn Component>) {
self.components.push(component);
}
pub fn by_phase(&self) -> Vec<(Phase, Vec<&dyn Component>)> {
let mut phases: Vec<Phase> = self.components.iter().map(|c| c.phase()).collect();
phases.sort();
phases.dedup();
phases
.into_iter()
.map(|phase| {
let group: Vec<&dyn Component> = self
.components
.iter()
.filter(|c| c.phase() == phase)
.map(|c| c.as_ref())
.collect();
(phase, group)
})
.collect()
}
pub fn run(&self, ctx: &ClusterContext<'_>) -> (Stoppables, bool) {
info!("Starting processes");
let mut all_results: Vec<(Phase, ProcessState)> = Vec::new();
for (phase, components) in &self.by_phase() {
debug!(
"Starting {:?} phase ({} components)",
phase,
components.len()
);
let results: Vec<ProcessState> = components
.par_iter()
.map(|c| {
debug!("Starting {}", c.name());
c.start(ctx)
})
.collect();
let phase_ok = results.iter().all(|r| r.is_ok());
if !phase_ok {
let failures: Vec<_> = results
.iter()
.filter_map(|r| r.as_ref().err().map(|e| e.to_string()))
.collect();
info!(
"Phase {:?} failed ({} errors: {}), skipping remaining phases",
phase,
failures.len(),
failures.join(", ")
);
all_results.extend(results.into_iter().map(|r| (*phase, r)));
break;
}
all_results.extend(results.into_iter().map(|r| (*phase, r)));
}
let all_ok = all_results.iter().all(|(_, r)| r.is_ok());
let mut stoppables: Vec<(Phase, Started)> = all_results
.into_iter()
.filter_map(|(phase, r)| match r {
Ok(p) => Some((phase, p)),
Err(e) => {
debug!("{}", e);
None
}
})
.collect();
stoppables.reverse();
let processes: Stoppables = stoppables.into_iter().map(|(_, p)| p).collect();
(processes, all_ok)
}
}
#[cfg(test)]
mod tests {
use super::*;
use anyhow::bail;
struct FakeComponent {
name: String,
phase: Phase,
}
impl Component for FakeComponent {
fn name(&self) -> &str {
&self.name
}
fn phase(&self) -> Phase {
self.phase
}
fn start(&self, _ctx: &ClusterContext<'_>) -> ProcessState {
bail!("fake component")
}
}
#[test]
fn registry_groups_by_phase() {
let mut reg = ComponentRegistry::new();
reg.register(Box::new(FakeComponent {
name: "etcd".into(),
phase: Phase::Infrastructure,
}));
reg.register(Box::new(FakeComponent {
name: "apiserver".into(),
phase: Phase::ControlPlane,
}));
reg.register(Box::new(FakeComponent {
name: "scheduler".into(),
phase: Phase::Controller,
}));
reg.register(Box::new(FakeComponent {
name: "controller-manager".into(),
phase: Phase::Controller,
}));
let phases = reg.by_phase();
assert_eq!(phases.len(), 3);
assert_eq!(phases[0].0, Phase::Infrastructure);
assert_eq!(phases[0].1.len(), 1);
assert_eq!(phases[1].0, Phase::ControlPlane);
assert_eq!(phases[1].1.len(), 1);
assert_eq!(phases[2].0, Phase::Controller);
assert_eq!(phases[2].1.len(), 2);
}
#[test]
fn empty_registry() {
let reg = ComponentRegistry::new();
assert!(reg.by_phase().is_empty());
}
}