#![deny(missing_docs)]
mod apiserver;
mod config;
mod controllermanager;
mod coredns;
mod crio;
mod encryptionconfig;
mod etcd;
mod kubeconfig;
mod kubelet;
mod pki;
mod process;
mod proxy;
mod scheduler;
pub use config::{Config, ConfigBuilder};
use apiserver::APIServer;
use controllermanager::ControllerManager;
use coredns::CoreDNS;
use crio::Crio;
use encryptionconfig::EncryptionConfig;
use etcd::Etcd;
use kubeconfig::KubeConfig;
use kubelet::Kubelet;
use pki::Pki;
use process::{Process, Startable};
use proxy::Proxy;
use scheduler::Scheduler;
use env_logger::Builder;
use failure::{bail, format_err, Fallible};
use ipnetwork::IpNetwork;
use log::{debug, error, info};
use nix::unistd::getuid;
use rayon::scope;
use std::{
env::{current_exe, split_paths, var, var_os},
fmt::Display,
fs::{self, create_dir_all},
net::{IpAddr, Ipv4Addr},
path::{Path, PathBuf},
process::Command,
};
const RUNTIME_ENV: &str = "CONTAINER_RUNTIME_ENDPOINT";
const KUBECONFIG_ENV: &str = "KUBECONFIG";
const NIX_SHELL_ENV: &str = "IN_NIX_SHELL";
const CRIO_DIR: &str = "crio";
const LOG_DIR: &str = "log";
const NIX_DIR: &str = "nix";
const KUBERNIX_ENV: &str = "kubernix.env";
type Stoppables = Vec<Startable>;
pub struct Kubernix {
config: Config,
crio_socket: PathBuf,
kubeconfig: KubeConfig,
processes: Stoppables,
}
impl Kubernix {
pub fn start(mut config: Config) -> Fallible<()> {
Self::prepare_env(&mut config)?;
if var(NIX_SHELL_ENV).is_err() {
info!("Nix environment not found, bootstrapping one");
Self::bootstrap_nix(config)
} else {
info!("Bootstrapping cluster inside nix environment");
Self::bootstrap_cluster(config)
}
}
pub fn new_shell(mut config: Config) -> Fallible<()> {
Self::prepare_env(&mut config)?;
info!(
"Spawning new kubernix shell in '{}'",
config.root().display()
);
Command::new(Self::find_executable("nix-shell")?)
.arg(&config.root().join(NIX_DIR))
.arg("--pure")
.arg("-Q")
.arg(format!("-j{}", num_cpus::get()))
.arg("--run")
.arg(format!(
"bash --init-file {}",
config.root().join(KUBERNIX_ENV).display()
))
.status()?;
Ok(())
}
fn prepare_env(config: &mut Config) -> Fallible<()> {
if !getuid().is_root() {
bail!("Please run kubernix as root")
}
if config.root().exists() {
config.update_from_file()?;
} else {
config.to_file()?;
}
config.canonicalize_root()?;
let mut builder = Builder::new();
builder
.format_timestamp(None)
.filter(None, config.log_level())
.try_init()?;
Ok(())
}
fn stop(&mut self) {
for x in &mut self.processes {
if let Err(e) = x.stop() {
debug!("{}", e)
}
}
}
fn bootstrap_cluster(config: Config) -> Fallible<()> {
let ip = Self::local_ip()?;
let hostname =
hostname::get_hostname().ok_or_else(|| format_err!("Unable to retrieve hostname"))?;
info!("Using local IP {}", ip);
let pki = Pki::new(&config, &ip, &hostname)?;
let kubeconfig = KubeConfig::new(&config, &pki, &ip, &hostname)?;
let encryptionconfig = EncryptionConfig::new(&config)?;
let crio_socket = config.root().join(CRIO_DIR).join("crio.sock");
let mut crio = Process::stopped();
let mut etcd = Process::stopped();
let mut apis = Process::stopped();
let mut cont = Process::stopped();
let mut sche = Process::stopped();
let mut kube = Process::stopped();
let mut prox = Process::stopped();
info!("Starting processes");
scope(|s| {
s.spawn(|_| crio = Crio::start(&config, &crio_socket));
s.spawn(|_| {
etcd = Etcd::start(&config, &pki);
apis = APIServer::start(&config, &ip, &pki, &encryptionconfig, &kubeconfig)
});
s.spawn(|_| cont = ControllerManager::start(&config, &pki, &kubeconfig));
s.spawn(|_| sche = Scheduler::start(&config, &kubeconfig));
s.spawn(|_| kube = Kubelet::start(&config, &pki, &kubeconfig, &crio_socket));
s.spawn(|_| prox = Proxy::start(&config, &kubeconfig));
});
let mut processes = vec![];
let results = vec![kube, sche, prox, cont, apis, etcd, crio];
let all_ok = results.iter().all(|x| x.is_ok());
for process in results {
match process {
Ok(p) => processes.push(p),
Err(e) => error!("{}", e),
}
}
let mut kubernix = Kubernix {
config,
crio_socket,
kubeconfig,
processes,
};
if all_ok {
kubernix.apply_addons()?;
info!("Everything is up and running");
kubernix.spawn_shell()?;
} else {
error!("Unable to start all processes")
}
Ok(())
}
fn apply_addons(&mut self) -> Fallible<()> {
if let Err(e) = CoreDNS::apply(&self.config, &self.kubeconfig) {
bail!("Unable to apply CoreDNS: {}", e);
}
Ok(())
}
fn bootstrap_nix(config: Config) -> Fallible<()> {
let nix_dir = config.root().join(NIX_DIR);
create_dir_all(&nix_dir)?;
fs::write(
nix_dir.join("nixpkgs.json"),
include_str!("../nix/nixpkgs.json"),
)?;
fs::write(
nix_dir.join("nixpkgs.nix"),
include_str!("../nix/nixpkgs.nix"),
)?;
fs::write(
nix_dir.join("default.nix"),
include_str!("../nix/default.nix"),
)?;
fs::write(nix_dir.join("deps.nix"), include_str!("../nix/deps.nix"))?;
let status = Command::new(Self::find_executable("nix-shell")?)
.arg(nix_dir)
.arg("--pure")
.arg("-Q")
.arg(format!("-j{}", num_cpus::get()))
.arg("--run")
.arg(
&[
¤t_exe()?.display().to_string(),
"--root",
&config.root().display().to_string(),
"--log-level",
&config.log_level().to_string().to_lowercase(),
"--crio-cidr",
&config.crio_cidr().to_string(),
"--cluster-cidr",
&config.cluster_cidr().to_string(),
"--service-cidr",
&config.service_cidr().to_string(),
]
.join(" "),
)
.status()?;
if !status.success() {
bail!("nix-shell command failed");
}
Ok(())
}
fn spawn_shell(&self) -> Fallible<()> {
info!("Spawning interactive shell");
info!("Please be aware that the cluster gets destroyed if you exit the shell");
let env_file = self.config.root().join(KUBERNIX_ENV);
fs::write(
&env_file,
format!(
"PS1='> '\nexport {}={}\nexport {}={}",
RUNTIME_ENV,
format!("unix://{}", self.crio_socket.display()),
KUBECONFIG_ENV,
self.kubeconfig.admin.display(),
),
)?;
Command::new("bash")
.current_dir(&self.config.root().join(LOG_DIR))
.arg("--init-file")
.arg(env_file)
.status()?;
Ok(())
}
fn local_ip() -> Fallible<String> {
let cmd = Command::new("ip")
.arg("route")
.arg("get")
.arg("1.2.3.4")
.output()?;
if !cmd.status.success() {
bail!("Unable to obtain `ip` output")
}
let output = String::from_utf8(cmd.stdout)?;
let ip = output
.split_whitespace()
.nth(6)
.ok_or_else(|| format_err!("Different `ip` command output expected"))?;
if let Err(e) = ip.parse::<IpAddr>() {
bail!("Unable to parse IP '{}': {}", ip, e);
}
Ok(ip.to_owned())
}
fn find_executable<P>(name: P) -> Fallible<PathBuf>
where
P: AsRef<Path> + Display,
{
var_os("PATH")
.and_then(|paths| {
split_paths(&paths)
.filter_map(|dir| {
let full_path = dir.join(&name);
if full_path.is_file() {
Some(full_path)
} else {
None
}
})
.next()
})
.ok_or_else(|| format_err!("Unable to find {} in $PATH", name))
}
fn dns(config: &Config) -> Fallible<Ipv4Addr> {
match config.service_cidr() {
IpNetwork::V4(n) => Ok(n.nth(2).ok_or_else(|| {
format_err!(
"Unable to retrieve second IP from service CIDR: {}",
config.service_cidr()
)
})?),
_ => bail!("Service CIDR is not for IPv4"),
}
}
}
impl Drop for Kubernix {
fn drop(&mut self) {
info!("Cleaning up");
self.stop();
}
}