kubernix 0.3.1

Kubernetes development cluster bootstrapping with Nix packages
Documentation
//! Kubernetes kubelet node agent component.
//!
//! Runs `kubelet` on each node, responsible for managing pods and
//! containers via the CRI-O container runtime.

use crate::{
    component::{ClusterContext, Component, Phase},
    config::Config,
    container::Container,
    crio::{Crio, MAX_SOCKET_PATH_LEN},
    kubeconfig::KubeConfig,
    network::Network,
    node::Node,
    pki::Pki,
    process::{Process, ProcessState, Stoppable},
    write_if_changed,
};
use anyhow::{Context, Result, bail};
use std::fs::create_dir_all;

/// Component wrapper for registry-based startup (per-node).
pub struct KubeletComponent {
    node: u8,
    name: String,
}

impl KubeletComponent {
    /// Create a new Kubelet component for the given node index.
    pub fn new(node: u8) -> Self {
        Self {
            node,
            name: format!("Kubelet (node {})", node),
        }
    }
}

impl Component for KubeletComponent {
    fn name(&self) -> &str {
        &self.name
    }

    fn phase(&self) -> Phase {
        Phase::NodeAgent
    }

    fn start(&self, ctx: &ClusterContext<'_>) -> ProcessState {
        Kubelet::start(ctx.config, self.node, ctx.network, ctx.pki, ctx.kubeconfig)
    }
}

/// Manages the kubelet process lifecycle for a single node.
pub struct Kubelet {
    process: Process,
}

impl Kubelet {
    /// Start the kubelet for the given node index.
    pub fn start(
        config: &Config,
        node: u8,
        network: &Network,
        pki: &Pki,
        kubeconfig: &KubeConfig,
    ) -> ProcessState {
        let node_name = Node::name(config, network, node);
        const KUBELET: &str = "kubelet";

        let dir = config.root().join(KUBELET).join(&node_name);
        let root_dir = dir.join("run");
        // pod-resources/<pid> is the longest socket path kubelet creates
        if root_dir.display().to_string().len() + "pod-resources/1234567890".len()
            > MAX_SOCKET_PATH_LEN
        {
            bail!(
                "Kubelet run path '{}' is too long for unix sockets",
                root_dir.display()
            )
        }

        create_dir_all(&dir)?;

        let identity = pki
            .kubelets()
            .get(node as usize)
            .with_context(|| format!("Unable to retrieve kubelet identity for {}", node_name))?;

        let yml = format!(
            include_str!("assets/kubelet.yml"),
            ca = pki.ca().cert().display(),
            dns = network.dns()?,
            cidr = network
                .crio_cidrs()
                .get(node as usize)
                .context("Unable to retrieve kubelet CIDR")?,
            cert = identity.cert().display(),
            key = identity.key().display(),
            port = 11250 + u16::from(node),
            healthzPort = 12250 + u16::from(node),
        );
        let cfg = dir.join("config.yml");
        write_if_changed(&cfg, &yml)?;

        let args = &[
            &format!("--config={}", cfg.display()),
            &format!("--root-dir={}", root_dir.display()),
            &format!(
                "--container-runtime-endpoint={}",
                Crio::socket(config, network, node)?.to_socket_string(),
            ),
            &format!(
                "--kubeconfig={}",
                kubeconfig
                    .kubelets()
                    .get(node as usize)
                    .with_context(|| format!(
                        "Unable to retrieve kubelet config for {}",
                        node_name
                    ))?
                    .display()
            ),
            "--v=2",
        ];

        let mut process = if config.multi_node() {
            // Run inside a container
            let arg_hostname = &format!("--hostname-override={}", node_name);
            let mut modargs: Vec<&str> = vec![arg_hostname];
            modargs.extend(args);
            Container::exec(
                config,
                &dir,
                &format!("Kubelet {}", node_name),
                KUBELET,
                &node_name,
                &modargs,
            )?
        } else {
            // Run as usual process
            Process::start(&dir, "Kubelet", KUBELET, args)?
        };
        process.wait_ready("Successfully registered node")?;
        Ok(Box::new(Self { process }))
    }
}

impl Stoppable for Kubelet {
    fn stop(&mut self) -> Result<()> {
        self.process.stop()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::crio::MAX_SOCKET_PATH_LEN;

    #[test]
    fn config_template_renders() {
        let yml = format!(
            include_str!("assets/kubelet.yml"),
            ca = "/tmp/ca.pem",
            dns = "10.10.64.2",
            cidr = "10.10.128.0/18",
            cert = "/tmp/kubelet.pem",
            key = "/tmp/kubelet-key.pem",
            port = 11250,
            healthzPort = 12250,
        );
        assert!(yml.contains("kind: KubeletConfiguration"));
        assert!(yml.contains("clientCAFile: \"/tmp/ca.pem\""));
        assert!(yml.contains("clusterDNS:"));
        assert!(yml.contains("- \"10.10.64.2\""));
        assert!(yml.contains("podCIDR: \"10.10.128.0/18\""));
        assert!(yml.contains("port: 11250"));
        assert!(yml.contains("healthzPort: 12250"));
    }

    #[test]
    fn socket_path_too_long() {
        // A very long root_dir path should fail validation
        let long_path = "a".repeat(MAX_SOCKET_PATH_LEN);
        let total = long_path.len() + "pod-resources/1234567890".len();
        assert!(total > MAX_SOCKET_PATH_LEN);
    }

    #[test]
    fn component_metadata() {
        let c = KubeletComponent::new(0);
        assert_eq!(c.name(), "Kubelet (node 0)");
        assert_eq!(c.phase(), Phase::NodeAgent);
    }

    #[test]
    fn component_name_per_node() {
        assert_eq!(KubeletComponent::new(0).name(), "Kubelet (node 0)");
        assert_eq!(KubeletComponent::new(3).name(), "Kubelet (node 3)");
    }
}