use super::{KubernetesError, KubernetesManager, KubernetesResult};
use base64::Engine;
use k8s_openapi::api::apps::v1::Deployment;
use k8s_openapi::api::core::v1::Pod;
use kube::Api;
use kube::api::ListParams;
use std::collections::{BTreeMap, HashMap};
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct RegistryConfig {
pub image: String,
pub replicas: i32,
pub port: i32,
pub node_port: i32,
pub username: Option<String>,
pub password: Option<String>,
pub persistence_enabled: bool,
pub persistence_size: String,
pub storage_delete_enabled: bool,
pub extra_env: HashMap<String, String>,
pub timeout_secs: u64,
pub reset: bool,
}
impl Default for RegistryConfig {
fn default() -> Self {
Self {
image: "registry:2".to_string(),
replicas: 1,
port: 5000,
node_port: 30500,
username: None,
password: None,
persistence_enabled: true,
persistence_size: "10Gi".to_string(),
storage_delete_enabled: true,
extra_env: HashMap::new(),
timeout_secs: 120,
reset: false, }
}
}
#[derive(Debug, Clone)]
pub struct RegistryConfigBuilder {
name: String,
config: RegistryConfig,
}
impl RegistryConfigBuilder {
pub fn new(name: &str) -> Self {
Self {
name: name.to_string(),
config: RegistryConfig::default(),
}
}
pub fn namespace(mut self, namespace: &str) -> Self {
self.config
.extra_env
.insert("_namespace".to_string(), namespace.to_string());
self
}
pub fn name(&self) -> &str {
&self.name
}
pub fn image(mut self, image: impl Into<String>) -> Self {
self.config.image = image.into();
self
}
pub fn replicas(mut self, replicas: i32) -> Self {
self.config.replicas = replicas;
self
}
pub fn port(mut self, port: u16) -> Self {
self.config.port = port as i32;
self
}
pub fn node_port(mut self, port: u16) -> Self {
self.config.node_port = port as i32;
self
}
pub fn with_auth(mut self, username: impl Into<String>, password: impl Into<String>) -> Self {
self.config.username = Some(username.into());
self.config.password = Some(password.into());
self
}
pub fn persistence(mut self, enabled: bool, size: impl Into<String>) -> Self {
self.config.persistence_enabled = enabled;
self.config.persistence_size = size.into();
self
}
pub fn storage_delete_enabled(mut self, enabled: bool) -> Self {
self.config.storage_delete_enabled = enabled;
self
}
pub fn extra_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.config.extra_env.insert(key.into(), value.into());
self
}
pub fn timeout_secs(mut self, secs: u64) -> Self {
self.config.timeout_secs = secs;
self
}
pub fn reset(mut self, reset: bool) -> Self {
self.config.reset = reset;
self
}
pub fn build(self) -> RegistryConfig {
self.config
}
pub fn storage_size(mut self, size: &str) -> Self {
self.config.persistence_size = size.to_string();
self
}
}
#[derive(Debug, Clone)]
pub struct RegistryDeploymentResult {
pub name: String,
pub namespace: String,
pub endpoint: String,
pub port: u16,
pub node_port: Option<u16>,
pub external_endpoint: Option<String>,
pub username: Option<String>,
pub password: Option<String>,
pub auth_secret_name: Option<String>,
pub pull_secret_name: Option<String>,
pub auth_enabled: bool,
}
impl RegistryDeploymentResult {
pub fn connection_string(&self) -> String {
format!("{}:{}", self.endpoint, self.port)
}
pub fn external_connection_string(&self) -> Option<String> {
self.external_endpoint.clone()
}
pub fn image_prefix(&self) -> String {
format!("{}:{}", self.endpoint, self.port)
}
pub fn external_image_prefix(&self) -> Option<String> {
self.external_endpoint.clone()
}
pub fn display(&self) -> String {
let mut output = String::new();
output.push_str(&format!("Registry Deployment: {}\n", self.name));
output.push_str(&format!("Namespace: {}\n", self.namespace));
output.push_str(&format!(
"Internal Endpoint: {}:{}\n",
self.endpoint, self.port
));
if let Some(ref ext) = self.external_endpoint {
output.push_str(&format!("External Endpoint: {}\n", ext));
}
output.push_str(&format!(
"Authentication: {}\n",
if self.auth_enabled {
"Enabled"
} else {
"Disabled"
}
));
if let Some(ref username) = self.username {
output.push_str(&format!("Username: {}\n", username));
}
if let Some(ref secret) = self.auth_secret_name {
output.push_str(&format!("Auth Secret: {}\n", secret));
}
if let Some(ref secret) = self.pull_secret_name {
output.push_str(&format!("Pull Secret: {}\n", secret));
}
if let Some(ref ext) = self.external_endpoint {
output.push_str("\nUsage:\n");
if self.auth_enabled {
output.push_str(&format!(
" Login: buildah login -u {} -p <password> {}\n",
self.username.as_deref().unwrap_or("admin"),
ext
));
}
output.push_str(&format!(
" Push: buildah push --tls-verify=false <image> docker://{}/\n",
ext
));
output.push_str(&format!(
" Pull: buildah pull --tls-verify=false docker://{}/\n",
ext
));
}
output
}
}
impl KubernetesManager {
pub async fn deploy_registry(
&self,
name: &str,
config: RegistryConfig,
) -> KubernetesResult<RegistryDeploymentResult> {
log::info!(
"Deploying Docker Registry '{}' in namespace '{}' (timeout: {}s, reset: {}, auth: {})",
name,
self.namespace(),
config.timeout_secs,
config.reset,
config.username.is_some()
);
let timeout = Duration::from_secs(config.timeout_secs);
let auth_enabled = config.username.is_some() && config.password.is_some();
if config.reset {
log::info!(
"Reset flag set, removing existing Registry deployment '{}'...",
name
);
let _ = self.delete_registry(name).await;
tokio::time::sleep(Duration::from_secs(2)).await;
}
let deployments: Api<Deployment> = Api::namespaced(self.client().clone(), self.namespace());
let existing = deployments.get(name).await;
if existing.is_ok() {
log::info!("Registry deployment '{}' already exists, reusing...", name);
self.wait_for_registry_deployment_ready(name, config.replicas, timeout)
.await?;
let namespace = self.namespace();
let endpoint = format!("{}.{}.svc.cluster.local", name, namespace);
let external_endpoint = format!("localhost:{}", config.node_port);
let auth_secret_name = if auth_enabled {
Some(format!("{}-auth", name))
} else {
None
};
let pull_secret_name = if auth_enabled {
Some(format!("{}-pull-secret", name))
} else {
None
};
let result = RegistryDeploymentResult {
name: name.to_string(),
namespace: namespace.to_string(),
endpoint,
port: config.port as u16,
node_port: Some(config.node_port as u16),
external_endpoint: Some(external_endpoint),
username: config.username,
password: config.password,
auth_secret_name,
pull_secret_name,
auth_enabled,
};
log::info!(
"Reusing existing Registry '{}'. Endpoint: {}",
name,
result.connection_string()
);
return Ok(result);
}
let auth_secret_name = if auth_enabled {
let secret_name = format!("{}-auth", name);
self.create_registry_auth_secret(
&secret_name,
config.username.as_ref().unwrap(),
config.password.as_ref().unwrap(),
)
.await?;
Some(secret_name)
} else {
None
};
let pull_secret_name = if auth_enabled {
let secret_name = format!("{}-pull-secret", name);
Some(secret_name)
} else {
None
};
let mut labels = HashMap::new();
labels.insert("app".to_string(), name.to_string());
labels.insert("component".to_string(), "docker-registry".to_string());
let env_vars = self.build_registry_env_vars(&config, auth_secret_name.as_deref());
self.deployment_create(
name,
&config.image,
config.replicas,
Some(labels.clone()),
Some(env_vars),
)
.await?;
self.service_create_nodeport(name, labels.clone(), config.port, config.node_port)
.await?;
log::info!("Waiting for Registry deployment '{}' to be ready...", name);
self.wait_for_registry_deployment_ready(name, config.replicas, timeout)
.await?;
let namespace = self.namespace();
let endpoint = format!("{}.{}.svc.cluster.local", name, namespace);
let external_endpoint = format!("localhost:{}", config.node_port);
let result = RegistryDeploymentResult {
name: name.to_string(),
namespace: namespace.to_string(),
endpoint,
port: config.port as u16,
node_port: Some(config.node_port as u16),
external_endpoint: Some(external_endpoint),
username: config.username,
password: config.password,
auth_secret_name,
pull_secret_name,
auth_enabled,
};
log::info!(
"Registry '{}' deployed and ready. Endpoint: {}",
name,
result.connection_string()
);
Ok(result)
}
async fn create_registry_auth_secret(
&self,
name: &str,
username: &str,
password: &str,
) -> KubernetesResult<()> {
let htpasswd = format!("{}:{}", username, Self::generate_htpasswd_hash(password));
let mut data = HashMap::new();
data.insert("htpasswd".to_string(), htpasswd);
self.secret_create(name, data, Some("Opaque")).await?;
log::info!("Created registry auth secret: {}", name);
Ok(())
}
fn generate_htpasswd_hash(password: &str) -> String {
use base64::engine::general_purpose::STANDARD;
let hash = STANDARD.encode(password.as_bytes());
format!("$2y$05${}", hash)
}
#[allow(dead_code)]
async fn create_docker_registry_secret(
&self,
name: &str,
registry: &str,
username: &str,
password: &str,
) -> KubernetesResult<()> {
use base64::engine::general_purpose::STANDARD;
let auth = STANDARD.encode(format!("{}:{}", username, password));
let docker_config = serde_json::json!({
"auths": {
(registry): {
"username": username,
"password": password,
"auth": auth
}
}
});
let mut data = HashMap::new();
data.insert(".dockerconfigjson".to_string(), docker_config.to_string());
self.secret_create(name, data, Some("kubernetes.io/dockerconfigjson"))
.await?;
log::info!("Created docker-registry pull secret: {}", name);
Ok(())
}
fn build_registry_env_vars(
&self,
config: &RegistryConfig,
auth_secret_name: Option<&str>,
) -> HashMap<String, String> {
let mut env = HashMap::new();
env.insert(
"REGISTRY_HTTP_ADDR".to_string(),
format!("0.0.0.0:{}", config.port),
);
if config.storage_delete_enabled {
env.insert(
"REGISTRY_STORAGE_DELETE_ENABLED".to_string(),
"true".to_string(),
);
}
if auth_secret_name.is_some() {
env.insert("REGISTRY_AUTH".to_string(), "htpasswd".to_string());
env.insert(
"REGISTRY_AUTH_HTPASSWD_REALM".to_string(),
"Registry Realm".to_string(),
);
env.insert(
"REGISTRY_AUTH_HTPASSWD_PATH".to_string(),
"/auth/htpasswd".to_string(),
);
}
for (k, v) in &config.extra_env {
env.insert(k.clone(), v.clone());
}
env
}
async fn service_create_nodeport(
&self,
name: &str,
selector: HashMap<String, String>,
port: i32,
node_port: i32,
) -> KubernetesResult<()> {
let selector_btree: BTreeMap<String, String> = selector.into_iter().collect();
use k8s_openapi::api::core::v1::{Service, ServicePort, ServiceSpec};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
use kube::api::PostParams;
let service = Service {
metadata: ObjectMeta {
name: Some(name.to_string()),
namespace: Some(self.namespace().to_string()),
..Default::default()
},
spec: Some(ServiceSpec {
type_: Some("NodePort".to_string()),
selector: Some(selector_btree),
ports: Some(vec![ServicePort {
port,
target_port: Some(IntOrString::Int(port)),
node_port: Some(node_port),
name: Some("registry".to_string()),
..Default::default()
}]),
..Default::default()
}),
..Default::default()
};
let services: Api<Service> = Api::namespaced(self.client().clone(), self.namespace());
services
.create(&PostParams::default(), &service)
.await
.map_err(|e| {
KubernetesError::operation_error(format!(
"Failed to create NodePort service: {}",
e
))
})?;
log::info!("Created NodePort service '{}' on port {}", name, node_port);
Ok(())
}
async fn wait_for_registry_deployment_ready(
&self,
name: &str,
expected_replicas: i32,
timeout: Duration,
) -> KubernetesResult<()> {
let deployments: Api<Deployment> = Api::namespaced(self.client().clone(), self.namespace());
let pods: Api<Pod> = Api::namespaced(self.client().clone(), self.namespace());
let poll_interval = Duration::from_secs(2);
let start = Instant::now();
loop {
let label_selector = format!("app={}", name);
let lp = ListParams::default().labels(&label_selector);
if let Ok(pod_list) = pods.list(&lp).await {
for pod in &pod_list.items {
let pod_name = pod.metadata.name.as_deref().unwrap_or("unknown");
if let Some(status) = &pod.status {
if let Some(container_statuses) = &status.container_statuses {
for cs in container_statuses {
if let Some(waiting) =
&cs.state.as_ref().and_then(|s| s.waiting.as_ref())
{
let reason = waiting.reason.as_deref().unwrap_or("Unknown");
let message = waiting.message.as_deref().unwrap_or("");
match reason {
"ImagePullBackOff" | "ErrImagePull" => {
return Err(KubernetesError::operation_error(format!(
"Pod '{}' failed to pull image: {} - {}",
pod_name, reason, message
)));
}
"CrashLoopBackOff" => {
return Err(KubernetesError::operation_error(format!(
"Pod '{}' is crash-looping: {}",
pod_name, message
)));
}
_ => {}
}
}
}
}
}
}
}
match deployments.get(name).await {
Ok(deployment) => {
if let Some(status) = &deployment.status {
let ready_replicas = status.ready_replicas.unwrap_or(0);
let available_replicas = status.available_replicas.unwrap_or(0);
log::info!(
"Registry deployment '{}': {}/{} ready, {}/{} available",
name,
ready_replicas,
expected_replicas,
available_replicas,
expected_replicas
);
if ready_replicas >= expected_replicas
&& available_replicas >= expected_replicas
{
log::info!("Registry deployment '{}' is ready!", name);
return Ok(());
}
}
}
Err(kube::Error::Api(api_err)) if api_err.code == 404 => {
log::warn!("Deployment '{}' not found yet, waiting...", name);
}
Err(e) => {
log::warn!("Error checking deployment status: {}", e);
}
}
if start.elapsed() > timeout {
return Err(KubernetesError::timeout(format!(
"Timeout waiting for Registry deployment '{}' to be ready after {:?}",
name, timeout
)));
}
tokio::time::sleep(poll_interval).await;
}
}
pub async fn delete_registry(&self, name: &str) -> KubernetesResult<()> {
log::info!("Deleting Registry deployment '{}'", name);
let _ = self.deployment_delete(name).await;
let _ = self.service_delete(name).await;
let _ = self.secret_delete(&format!("{}-auth", name)).await;
let _ = self.secret_delete(&format!("{}-pull-secret", name)).await;
log::info!("Registry deployment '{}' deleted successfully", name);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_registry_config_default() {
let config = RegistryConfig::default();
assert_eq!(config.image, "registry:2");
assert_eq!(config.replicas, 1);
assert_eq!(config.port, 5000);
assert_eq!(config.node_port, 30500);
assert!(config.username.is_none());
assert!(config.password.is_none());
assert!(config.persistence_enabled);
}
#[test]
fn test_registry_config_builder() {
let builder = RegistryConfigBuilder::new("my-registry")
.image("registry:2.8")
.replicas(2)
.port(5001)
.node_port(30501)
.with_auth("admin", "password123");
assert_eq!(builder.name(), "my-registry");
let config = builder.build();
assert_eq!(config.image, "registry:2.8");
assert_eq!(config.replicas, 2);
assert_eq!(config.port, 5001);
assert_eq!(config.node_port, 30501);
assert_eq!(config.username, Some("admin".to_string()));
assert_eq!(config.password, Some("password123".to_string()));
}
#[test]
fn test_registry_config_builder_no_auth() {
let config = RegistryConfigBuilder::new("simple-registry").build();
assert!(config.username.is_none());
assert!(config.password.is_none());
}
#[test]
fn test_registry_deployment_result() {
let result = RegistryDeploymentResult {
name: "test-registry".to_string(),
namespace: "default".to_string(),
endpoint: "test-registry.default.svc.cluster.local".to_string(),
port: 5000,
node_port: Some(30500),
external_endpoint: Some("localhost:30500".to_string()),
username: Some("admin".to_string()),
password: Some("secret".to_string()),
auth_secret_name: Some("test-registry-auth".to_string()),
pull_secret_name: Some("test-registry-pull-secret".to_string()),
auth_enabled: true,
};
assert_eq!(
result.connection_string(),
"test-registry.default.svc.cluster.local:5000"
);
assert_eq!(
result.external_connection_string(),
Some("localhost:30500".to_string())
);
assert!(result.auth_enabled);
}
}
#[derive(Debug, Clone)]
pub struct RegistryDeployer {
result: RegistryDeploymentResult,
}
impl RegistryDeployer {
pub fn connection_string(&self) -> String {
self.result.connection_string()
}
pub fn external_connection_string(&self) -> Option<String> {
self.result.external_connection_string()
}
pub fn image_prefix(&self) -> String {
self.result.image_prefix()
}
pub fn external_image_prefix(&self) -> Option<String> {
self.result.external_image_prefix()
}
pub fn name(&self) -> &str {
&self.result.name
}
pub fn namespace(&self) -> &str {
&self.result.namespace
}
pub fn port(&self) -> u16 {
self.result.port
}
pub fn node_port(&self) -> Option<u16> {
self.result.node_port
}
pub fn display(&self) -> String {
self.result.display()
}
}
#[derive(Clone)]
pub struct RegistryDeployerBuilder {
km: KubernetesManager,
name: String,
config_builder: RegistryConfigBuilder,
}
impl RegistryDeployerBuilder {
pub fn new(km: KubernetesManager, name: &str) -> Self {
Self {
km,
name: name.to_string(),
config_builder: RegistryConfigBuilder::new(name),
}
}
pub fn namespace(mut self, namespace: &str) -> Self {
self.config_builder = self.config_builder.namespace(namespace);
self
}
pub fn image(mut self, image: impl Into<String>) -> Self {
self.config_builder = self.config_builder.image(image);
self
}
pub fn port(mut self, port: u16) -> Self {
self.config_builder = self.config_builder.port(port);
self
}
pub fn node_port(mut self, port: u16) -> Self {
self.config_builder = self.config_builder.node_port(port);
self
}
pub fn with_auth(mut self, username: impl Into<String>, password: impl Into<String>) -> Self {
self.config_builder = self.config_builder.with_auth(username, password);
self
}
pub fn persistence(mut self, enabled: bool, size: impl Into<String>) -> Self {
self.config_builder = self.config_builder.persistence(enabled, size);
self
}
pub fn timeout_secs(mut self, secs: u64) -> Self {
self.config_builder = self.config_builder.timeout_secs(secs);
self
}
pub fn reset(mut self, reset: bool) -> Self {
self.config_builder = self.config_builder.reset(reset);
self
}
pub async fn deploy(self) -> KubernetesResult<RegistryDeployer> {
let config = self.config_builder.build();
let result = self.km.deploy_registry(&self.name, config).await?;
Ok(RegistryDeployer { result })
}
}