use super::KubernetesManager;
use super::error::KubernetesError;
use k8s_openapi::api::core::v1::{
Container, EmptyDirVolumeSource, Pod, PodSpec, ResourceRequirements, SecurityContext, Volume,
VolumeMount,
};
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::Client;
use kube::api::{Api, DeleteParams, PostParams};
use std::collections::BTreeMap;
use std::time::Duration;
#[derive(Debug)]
pub enum KubeBuilderError {
ProvisionFailed(String),
NotReady(String),
ExecutionFailed(String),
Kubernetes(KubernetesError),
Timeout(String),
}
impl std::fmt::Display for KubeBuilderError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
KubeBuilderError::ProvisionFailed(msg) => write!(f, "Provision failed: {}", msg),
KubeBuilderError::NotReady(msg) => write!(f, "Build machine not ready: {}", msg),
KubeBuilderError::ExecutionFailed(msg) => write!(f, "Execution failed: {}", msg),
KubeBuilderError::Kubernetes(e) => write!(f, "Kubernetes error: {}", e),
KubeBuilderError::Timeout(msg) => write!(f, "Timeout: {}", msg),
}
}
}
impl std::error::Error for KubeBuilderError {}
impl From<KubernetesError> for KubeBuilderError {
fn from(e: KubernetesError) -> Self {
KubeBuilderError::Kubernetes(e)
}
}
pub type KubeBuilderResult<T> = Result<T, KubeBuilderError>;
#[derive(Debug, Clone)]
pub struct BuildMachine {
namespace: String,
name: String,
image: String,
cpu: Option<String>,
memory: Option<String>,
registry_secret: Option<String>,
privileged: bool,
debug: bool,
}
impl BuildMachine {
pub fn new(namespace: impl Into<String>, name: impl Into<String>) -> Self {
Self {
namespace: namespace.into(),
name: name.into(),
image: "ubuntu:24.04".to_string(),
cpu: Some("2".to_string()),
memory: Some("4Gi".to_string()),
registry_secret: None,
privileged: true,
debug: false,
}
}
pub fn image(mut self, image: impl Into<String>) -> Self {
self.image = image.into();
self
}
pub fn cpu(mut self, cpu: impl Into<String>) -> Self {
self.cpu = Some(cpu.into());
self
}
pub fn memory(mut self, memory: impl Into<String>) -> Self {
self.memory = Some(memory.into());
self
}
pub fn registry_secret(mut self, secret: impl Into<String>) -> Self {
self.registry_secret = Some(secret.into());
self
}
pub fn privileged(mut self, privileged: bool) -> Self {
self.privileged = privileged;
self
}
pub fn debug(mut self, debug: bool) -> Self {
self.debug = debug;
self
}
pub fn namespace(&self) -> &str {
&self.namespace
}
pub fn name(&self) -> &str {
&self.name
}
pub fn set_image(&mut self, image: impl Into<String>) {
self.image = image.into();
}
pub fn set_cpu(&mut self, cpu: impl Into<String>) {
self.cpu = Some(cpu.into());
}
pub fn set_memory(&mut self, memory: impl Into<String>) {
self.memory = Some(memory.into());
}
pub fn set_registry_secret(&mut self, secret: impl Into<String>) {
self.registry_secret = Some(secret.into());
}
pub fn set_debug(&mut self, debug: bool) {
self.debug = debug;
}
pub async fn provision(&self) -> KubeBuilderResult<BuildMachineHandle> {
let km = KubernetesManager::new(&self.namespace).await?;
km.namespace_create(&self.namespace).await?;
let client = km.client().clone();
let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
if let Ok(existing) = pods.get(&self.name).await {
if let Some(status) = &existing.status {
if status.phase.as_deref() == Some("Running") {
log::info!("Reusing existing running pod '{}'", self.name);
return Ok(BuildMachineHandle {
namespace: self.namespace.clone(),
pod_name: self.name.clone(),
client,
});
}
}
let _ = pods.delete(&self.name, &DeleteParams::default()).await;
tokio::time::sleep(Duration::from_secs(2)).await;
}
let mut limits = BTreeMap::new();
if let Some(cpu) = &self.cpu {
limits.insert("cpu".to_string(), Quantity(cpu.clone()));
}
if let Some(memory) = &self.memory {
limits.insert("memory".to_string(), Quantity(memory.clone()));
}
let pod = Pod {
metadata: ObjectMeta {
name: Some(self.name.clone()),
namespace: Some(self.namespace.clone()),
labels: Some(BTreeMap::from([
("app".to_string(), "kubebuilder".to_string()),
("builder".to_string(), self.name.clone()),
])),
..Default::default()
},
spec: Some(PodSpec {
containers: vec![Container {
name: "builder".to_string(),
image: Some(self.image.clone()),
command: Some(vec!["/bin/bash".to_string()]),
args: Some(vec![
"-c".to_string(),
"apt-get update && apt-get install -y ca-certificates buildah fuse-overlayfs uidmap && sleep infinity".to_string(),
]),
security_context: if self.privileged {
Some(SecurityContext {
privileged: Some(true),
..Default::default()
})
} else {
None
},
volume_mounts: Some(vec![VolumeMount {
name: "containers".to_string(),
mount_path: "/var/lib/containers".to_string(),
..Default::default()
}]),
resources: if !limits.is_empty() {
Some(ResourceRequirements {
limits: Some(limits.clone()),
requests: Some(limits),
..Default::default()
})
} else {
None
},
..Default::default()
}],
volumes: Some(vec![Volume {
name: "containers".to_string(),
empty_dir: Some(EmptyDirVolumeSource::default()),
..Default::default()
}]),
image_pull_secrets: self.registry_secret.as_ref().map(|s| {
vec![k8s_openapi::api::core::v1::LocalObjectReference {
name: s.clone(),
}]
}),
..Default::default()
}),
..Default::default()
};
pods.create(&PostParams::default(), &pod)
.await
.map_err(|e| {
KubeBuilderError::ProvisionFailed(format!("Failed to create pod: {}", e))
})?;
let handle = BuildMachineHandle {
namespace: self.namespace.clone(),
pod_name: self.name.clone(),
client,
};
handle.wait_ready(Duration::from_secs(300)).await?;
tokio::time::sleep(Duration::from_secs(15)).await;
Ok(handle)
}
}
#[derive(Clone)]
pub struct BuildMachineHandle {
namespace: String,
pod_name: String,
client: Client,
}
impl BuildMachineHandle {
pub fn namespace(&self) -> &str {
&self.namespace
}
pub fn pod_name(&self) -> &str {
&self.pod_name
}
async fn wait_ready(&self, timeout: Duration) -> KubeBuilderResult<()> {
let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.namespace);
let start = std::time::Instant::now();
loop {
if start.elapsed() > timeout {
return Err(KubeBuilderError::Timeout(format!(
"Pod '{}' not ready after {:?}",
self.pod_name, timeout
)));
}
match pods.get(&self.pod_name).await {
Ok(pod) => {
if let Some(status) = &pod.status {
let phase = status.phase.as_deref().unwrap_or("Unknown");
if phase == "Running" {
if let Some(container_statuses) = &status.container_statuses {
let ready_count =
container_statuses.iter().filter(|cs| cs.ready).count();
if ready_count == container_statuses.len()
&& !container_statuses.is_empty()
{
return Ok(());
}
}
} else if phase == "Failed" {
return Err(KubeBuilderError::ProvisionFailed(format!(
"Pod '{}' failed: {:?}",
self.pod_name, status.message
)));
}
}
}
Err(_) => {}
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
pub async fn destroy(&self) -> KubeBuilderResult<()> {
let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.namespace);
pods.delete(&self.pod_name, &DeleteParams::default())
.await
.map_err(|e| {
KubeBuilderError::ExecutionFailed(format!("Failed to delete pod: {}", e))
})?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_build_machine_builder() {
let machine = BuildMachine::new("test-ns", "test-builder")
.image("ubuntu:22.04")
.cpu("4")
.memory("8Gi")
.registry_secret("my-secret")
.debug(true);
assert_eq!(machine.namespace(), "test-ns");
assert_eq!(machine.name(), "test-builder");
assert_eq!(machine.image, "ubuntu:22.04");
assert_eq!(machine.cpu, Some("4".to_string()));
assert_eq!(machine.memory, Some("8Gi".to_string()));
assert_eq!(machine.registry_secret, Some("my-secret".to_string()));
assert!(machine.debug);
}
}