#![deny(missing_docs)]
mod apiserver;
mod component;
mod config;
mod container;
mod controllermanager;
mod coredns;
mod crio;
mod encryptionconfig;
mod etcd;
mod kubeconfig;
mod kubectl;
mod kubelet;
mod logger;
mod network;
mod nix;
mod node;
mod pki;
mod podman;
mod process;
mod progress;
mod proxy;
mod scheduler;
mod system;
pub use config::{Config, LogFormat};
pub use logger::Logger;
pub(crate) fn write_if_changed(path: &std::path::Path, content: &str) -> anyhow::Result<()> {
if path.exists() {
if let Ok(existing) = std::fs::read_to_string(path) {
if existing == content {
return Ok(());
}
}
}
std::fs::write(path, content)?;
Ok(())
}
use crate::nix::Nix;
use component::{ClusterContext, ComponentRegistry};
use container::Container;
use coredns::CoreDns;
use crio::Crio;
use encryptionconfig::EncryptionConfig;
use kubeconfig::KubeConfig;
use kubectl::Kubectl;
use network::Network;
use pki::Pki;
use process::Stoppables;
use progress::Progress;
use system::System;
use ::nix::{
mount::{MntFlags, umount2},
unistd::getuid,
};
use anyhow::{Context, Result, bail};
use log::{debug, error, info, set_boxed_logger};
use signal_hook::{
consts::signal::{SIGHUP, SIGINT, SIGTERM},
flag,
};
use std::{
fs,
io::{BufRead, BufReader},
path::{Path, PathBuf},
process::{Command, id},
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread::{self, sleep},
time::{Duration, Instant},
};
const RUNTIME_ENV: &str = "CONTAINER_RUNTIME_ENDPOINT";
pub struct Kubernix {
addon_shutdown: Arc<AtomicBool>,
addon_thread: Option<thread::JoinHandle<()>>,
config: Config,
network: Network,
kubectl: Kubectl,
processes: Stoppables,
system: System,
}
impl Kubernix {
pub fn start(mut config: Config) -> Result<()> {
Self::prepare_env(&mut config)?;
if Nix::is_active() {
Self::bootstrap_cluster(config)
} else {
Nix::bootstrap(config)
}
}
pub fn new_shell(mut config: Config) -> Result<()> {
Self::prepare_env(&mut config)?;
info!(
"Spawning new kubernix shell in: '{}'",
config.root().display()
);
let env_file = Self::env_file(&config);
if !env_file.exists() {
bail!(
"Necessary environment file '{}' does not exist",
env_file.display()
)
}
let shell_cmd = format!(". {} && exec {}", env_file.display(), config.shell_ok()?);
Nix::run(&config, &["bash", "-c", &format!("'{}'", shell_cmd)])?;
info!("Bye, leaving the Kubernix environment");
Ok(())
}
fn prepare_env(config: &mut Config) -> Result<()> {
if !getuid().is_root() {
bail!("Please run kubernix as root")
}
if config.root().exists() {
config.try_load_file()?;
} else {
config.to_file()?;
}
config.canonicalize_root()?;
set_boxed_logger(Logger::new(config.log_level(), config.log_format()))
.context("Unable to set logger")
}
fn stop(&mut self) {
for x in &mut self.processes {
if let Err(e) = x.stop() {
debug!("{}", e)
}
}
}
fn processes(config: &Config) -> u64 {
5 + 2 * u64::from(config.nodes())
}
fn bootstrap_cluster(config: Config) -> Result<()> {
const BASE_STEPS: u64 = 15;
let steps = if config.multi_node() {
u64::from(config.nodes()) * 2 + BASE_STEPS
} else {
BASE_STEPS
} + Self::processes(&config);
let p = Progress::new(steps, config.log_level());
info!("Bootstrapping cluster");
let system = System::setup(&config).context("Unable to setup system")?;
Container::build(&config)?;
let network = Network::new(&config)?;
let pki = Pki::new(&config, &network)?;
let kubeconfig = KubeConfig::new(&config, &pki)?;
let kubectl = Kubectl::new(kubeconfig.admin());
let encryptionconfig = EncryptionConfig::new(&config)?;
let ctx = ClusterContext {
config: &config,
network: &network,
pki: &pki,
kubeconfig: &kubeconfig,
encryptionconfig: &encryptionconfig,
kubectl: &kubectl,
};
let mut registry = ComponentRegistry::new();
registry.register(Box::new(etcd::EtcdComponent));
registry.register(Box::new(apiserver::ApiServerComponent));
registry.register(Box::new(controllermanager::ControllerManagerComponent));
registry.register(Box::new(scheduler::SchedulerComponent));
for node in 0..config.nodes() {
registry.register(Box::new(crio::CrioComponent::new(node)));
registry.register(Box::new(kubelet::KubeletComponent::new(node)));
}
registry.register(Box::new(proxy::ProxyComponent));
let (processes, all_ok) = registry.run(&ctx);
let spawn_shell = !config.no_shell();
let addon_shutdown = Arc::new(AtomicBool::new(false));
let mut kubernix = Kubernix {
addon_shutdown: Arc::clone(&addon_shutdown),
addon_thread: None,
config,
network,
kubectl,
processes,
system,
};
if all_ok {
if let Err(e) = kubernix.write_env_file() {
p.reset();
error!("Unable to write environment file: {}", e);
return Err(e);
}
let addon_config = kubernix.config.clone();
let addon_network = kubernix.network.clone();
let addon_kubeconfig = kubernix.kubectl.kubeconfig().to_path_buf();
kubernix.addon_thread = Some(thread::spawn(move || {
let kubectl = Kubectl::new(&addon_kubeconfig);
if addon_shutdown.load(Ordering::Relaxed) {
return;
}
if addon_config.addons().iter().any(|a| a == "coredns") {
if let Err(e) = CoreDns::apply(&addon_config, &addon_network, &kubectl) {
if !addon_shutdown.load(Ordering::Relaxed) {
error!("Failed to deploy CoreDNS addon: {}", e);
}
}
}
}));
info!("Everything is up and running");
p.reset();
if spawn_shell {
kubernix.spawn_shell()?;
} else {
kubernix.wait()?;
}
} else {
p.reset();
bail!("Unable to start all processes")
}
Ok(())
}
fn wait(&self) -> Result<()> {
let term = Arc::new(AtomicBool::new(false));
flag::register(SIGTERM, Arc::clone(&term))?;
flag::register(SIGINT, Arc::clone(&term))?;
flag::register(SIGHUP, Arc::clone(&term))?;
info!("Waiting for interrupt…");
let pid_file = self.config.root().join("kubernix.pid");
debug!("Writing pid file to: {}", pid_file.display());
fs::write(pid_file, id().to_string())?;
while !term.load(Ordering::Relaxed) {
sleep(Duration::from_millis(100));
}
Ok(())
}
fn spawn_shell(&self) -> Result<()> {
info!("Spawning interactive shell");
info!("Please be aware that the cluster stops if you exit the shell");
let mut cmd = Command::new(self.config.shell_ok()?);
cmd.current_dir(self.config.root());
Self::apply_env_file(&Self::env_file(&self.config), &mut cmd)?;
cmd.status()?;
Ok(())
}
fn apply_env_file(env_file: &Path, cmd: &mut Command) -> Result<()> {
let content = fs::read_to_string(env_file)
.with_context(|| format!("Unable to read env file '{}'", env_file.display()))?;
for line in content.lines() {
let line = line.strip_prefix("export ").unwrap_or(line);
if let Some((key, value)) = line.split_once('=') {
let value = value
.strip_prefix('"')
.and_then(|v| v.strip_suffix('"'))
.or_else(|| value.strip_prefix('\'').and_then(|v| v.strip_suffix('\'')))
.unwrap_or(value);
cmd.env(key, value);
}
}
Ok(())
}
fn write_env_file(&self) -> Result<()> {
info!("Writing environment file");
fs::write(
Self::env_file(&self.config),
format!(
"export {}={}\nexport {}={}",
RUNTIME_ENV,
Crio::socket(&self.config, &self.network, 0)?.to_socket_string(),
"KUBECONFIG",
self.kubectl.kubeconfig().display(),
),
)?;
Ok(())
}
fn env_file(config: &Config) -> PathBuf {
config.root().join("kubernix.env")
}
fn umount(&self) {
debug!("Removing active mounts");
let now = Instant::now();
while now.elapsed().as_secs() < 15 {
match Self::read_mount_points(self.config.root()) {
Err(e) => {
debug!("Unable to retrieve mounts: {}", e);
sleep(Duration::from_secs(1));
}
Ok(mount_points) => {
if mount_points.is_empty() {
break;
}
for dest in &mount_points {
debug!("Removing mount: {}", dest.display());
if let Err(e) = umount2(dest, MntFlags::MNT_FORCE) {
debug!("Unable to umount '{}': {}", dest.display(), e);
}
}
sleep(Duration::from_millis(500));
}
};
}
}
fn read_mount_points(root: &Path) -> Result<Vec<PathBuf>> {
let file = fs::File::open("/proc/mounts").context("Unable to open /proc/mounts")?;
let reader = BufReader::new(file);
let mut points: Vec<PathBuf> = reader
.lines()
.map_while(Result::ok)
.filter_map(|line| line.split_whitespace().nth(1).map(PathBuf::from))
.filter(|p| p.starts_with(root) && p != root)
.collect();
points.sort_by_key(|p| std::cmp::Reverse(p.components().count()));
Ok(points)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::tempdir;
#[test]
fn write_if_changed_creates_new_file() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.txt");
write_if_changed(&path, "hello").unwrap();
assert_eq!(fs::read_to_string(&path).unwrap(), "hello");
}
#[test]
fn write_if_changed_skips_identical() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.txt");
fs::write(&path, "hello").unwrap();
let before = fs::metadata(&path).unwrap().modified().unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
write_if_changed(&path, "hello").unwrap();
let after = fs::metadata(&path).unwrap().modified().unwrap();
assert_eq!(before, after);
}
#[test]
fn write_if_changed_updates_different() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.txt");
fs::write(&path, "old").unwrap();
write_if_changed(&path, "new").unwrap();
assert_eq!(fs::read_to_string(&path).unwrap(), "new");
}
#[test]
fn apply_env_file_parses_exports() {
let dir = tempdir().unwrap();
let env_file = dir.path().join("test.env");
let mut f = fs::File::create(&env_file).unwrap();
writeln!(f, "export FOO=bar").unwrap();
writeln!(f, "BAZ=qux").unwrap();
writeln!(f, "QUOTED=\"hello world\"").unwrap();
writeln!(f, "SINGLE='single'").unwrap();
let mut cmd = Command::new("echo");
Kubernix::apply_env_file(&env_file, &mut cmd).unwrap();
}
#[test]
fn read_mount_points_empty_for_nonexistent_root() {
let points = Kubernix::read_mount_points(Path::new("/nonexistent")).unwrap();
assert!(points.is_empty());
}
#[test]
fn read_mount_points_excludes_root_itself() {
let dir = tempdir().unwrap();
let points = Kubernix::read_mount_points(dir.path()).unwrap();
assert!(points.is_empty());
}
}
impl Drop for Kubernix {
fn drop(&mut self) {
let p = Progress::new(Self::processes(&self.config), self.config.log_level());
info!("Cleaning up");
self.addon_shutdown.store(true, Ordering::Relaxed);
if let Some(handle) = self.addon_thread.take() {
debug!("Waiting for addon deployment to finish");
let deadline = Instant::now() + Duration::from_secs(5);
while !handle.is_finished() && Instant::now() < deadline {
sleep(Duration::from_millis(100));
}
if handle.is_finished() {
let _ = handle.join();
} else {
debug!("Addon thread did not finish in time, proceeding with shutdown");
}
}
self.stop();
self.umount();
self.system.cleanup();
info!("Cleanup done");
p.reset();
debug!("All done");
}
}