use crate::{
component::{ClusterContext, Component, Phase},
config::Config,
container::Container,
crio::{Crio, MAX_SOCKET_PATH_LEN},
kubeconfig::KubeConfig,
network::Network,
node::Node,
pki::Pki,
process::{Process, ProcessState, Stoppable},
write_if_changed,
};
use anyhow::{Context, Result, bail};
use std::fs::create_dir_all;
pub struct KubeletComponent {
node: u8,
name: String,
}
impl KubeletComponent {
pub fn new(node: u8) -> Self {
Self {
node,
name: format!("Kubelet (node {})", node),
}
}
}
impl Component for KubeletComponent {
fn name(&self) -> &str {
&self.name
}
fn phase(&self) -> Phase {
Phase::NodeAgent
}
fn start(&self, ctx: &ClusterContext<'_>) -> ProcessState {
Kubelet::start(ctx.config, self.node, ctx.network, ctx.pki, ctx.kubeconfig)
}
}
pub struct Kubelet {
process: Process,
}
impl Kubelet {
pub fn start(
config: &Config,
node: u8,
network: &Network,
pki: &Pki,
kubeconfig: &KubeConfig,
) -> ProcessState {
let node_name = Node::name(config, network, node);
const KUBELET: &str = "kubelet";
let dir = config.root().join(KUBELET).join(&node_name);
let root_dir = dir.join("run");
if root_dir.display().to_string().len() + "pod-resources/1234567890".len()
> MAX_SOCKET_PATH_LEN
{
bail!(
"Kubelet run path '{}' is too long for unix sockets",
root_dir.display()
)
}
create_dir_all(&dir)?;
let identity = pki
.kubelets()
.get(node as usize)
.with_context(|| format!("Unable to retrieve kubelet identity for {}", node_name))?;
let yml = format!(
include_str!("assets/kubelet.yml"),
ca = pki.ca().cert().display(),
dns = network.dns()?,
cidr = network
.crio_cidrs()
.get(node as usize)
.context("Unable to retrieve kubelet CIDR")?,
cert = identity.cert().display(),
key = identity.key().display(),
port = 11250 + u16::from(node),
healthzPort = 12250 + u16::from(node),
);
let cfg = dir.join("config.yml");
write_if_changed(&cfg, &yml)?;
let args = &[
&format!("--config={}", cfg.display()),
&format!("--root-dir={}", root_dir.display()),
&format!(
"--container-runtime-endpoint={}",
Crio::socket(config, network, node)?.to_socket_string(),
),
&format!(
"--kubeconfig={}",
kubeconfig
.kubelets()
.get(node as usize)
.with_context(|| format!(
"Unable to retrieve kubelet config for {}",
node_name
))?
.display()
),
"--v=2",
];
let mut process = if config.multi_node() {
let arg_hostname = &format!("--hostname-override={}", node_name);
let mut modargs: Vec<&str> = vec![arg_hostname];
modargs.extend(args);
Container::exec(
config,
&dir,
&format!("Kubelet {}", node_name),
KUBELET,
&node_name,
&modargs,
)?
} else {
Process::start(&dir, "Kubelet", KUBELET, args)?
};
process.wait_ready("Successfully registered node")?;
Ok(Box::new(Self { process }))
}
}
impl Stoppable for Kubelet {
fn stop(&mut self) -> Result<()> {
self.process.stop()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::crio::MAX_SOCKET_PATH_LEN;
#[test]
fn config_template_renders() {
let yml = format!(
include_str!("assets/kubelet.yml"),
ca = "/tmp/ca.pem",
dns = "10.10.64.2",
cidr = "10.10.128.0/18",
cert = "/tmp/kubelet.pem",
key = "/tmp/kubelet-key.pem",
port = 11250,
healthzPort = 12250,
);
assert!(yml.contains("kind: KubeletConfiguration"));
assert!(yml.contains("clientCAFile: \"/tmp/ca.pem\""));
assert!(yml.contains("clusterDNS:"));
assert!(yml.contains("- \"10.10.64.2\""));
assert!(yml.contains("podCIDR: \"10.10.128.0/18\""));
assert!(yml.contains("port: 11250"));
assert!(yml.contains("healthzPort: 12250"));
}
#[test]
fn socket_path_too_long() {
let long_path = "a".repeat(MAX_SOCKET_PATH_LEN);
let total = long_path.len() + "pod-resources/1234567890".len();
assert!(total > MAX_SOCKET_PATH_LEN);
}
#[test]
fn component_metadata() {
let c = KubeletComponent::new(0);
assert_eq!(c.name(), "Kubelet (node 0)");
assert_eq!(c.phase(), Phase::NodeAgent);
}
#[test]
fn component_name_per_node() {
assert_eq!(KubeletComponent::new(0).name(), "Kubelet (node 0)");
assert_eq!(KubeletComponent::new(3).name(), "Kubelet (node 3)");
}
}