use super::models::*;
use crate::error::AppError;
use lmrc_ssh::{SshClient, SshConfig};
use std::collections::HashMap;
pub struct KubernetesService {
ssh_client: Option<SshClient>,
}
impl KubernetesService {
pub fn new() -> Self {
Self { ssh_client: None }
}
pub async fn connect(&mut self, host: String, user: String, private_key_path: String) -> Result<(), AppError> {
let config = SshConfig {
host,
port: 22,
user,
private_key_path,
};
let client = SshClient::new(config)
.map_err(|e| AppError::External(format!("SSH connection error: {}", e)))?;
client.connect()
.map_err(|e| AppError::External(format!("SSH connection failed: {}", e)))?;
self.ssh_client = Some(client);
Ok(())
}
pub async fn list_nodes(&self) -> Result<NodesListResponse, AppError> {
let client = self.ssh_client.as_ref()
.ok_or_else(|| AppError::Internal("Not connected to cluster".to_string()))?;
let output = client.execute("kubectl get nodes -o json")
.map_err(|e| AppError::External(format!("kubectl error: {}", e)))?;
if output.exit_status != 0 {
return Err(AppError::External(format!("kubectl failed: {}", output.stderr)));
}
let nodes_json: serde_json::Value = serde_json::from_str(&output.stdout)
.map_err(|e| AppError::Internal(format!("JSON parse error: {}", e)))?;
let nodes = self.parse_nodes(&nodes_json)?;
Ok(NodesListResponse {
total: nodes.len(),
nodes,
})
}
pub async fn list_pods(&self, namespace: Option<String>) -> Result<PodsListResponse, AppError> {
let client = self.ssh_client.as_ref()
.ok_or_else(|| AppError::Internal("Not connected to cluster".to_string()))?;
let ns_flag = namespace.map(|ns| format!("-n {}", ns)).unwrap_or_else(|| "--all-namespaces".to_string());
let cmd = format!("kubectl get pods {} -o json", ns_flag);
let output = client.execute(&cmd)
.map_err(|e| AppError::External(format!("kubectl error: {}", e)))?;
if output.exit_status != 0 {
return Err(AppError::External(format!("kubectl failed: {}", output.stderr)));
}
let pods_json: serde_json::Value = serde_json::from_str(&output.stdout)
.map_err(|e| AppError::Internal(format!("JSON parse error: {}", e)))?;
let pods = self.parse_pods(&pods_json)?;
Ok(PodsListResponse {
total: pods.len(),
pods,
})
}
pub async fn list_services(&self, namespace: Option<String>) -> Result<ServicesListResponse, AppError> {
let client = self.ssh_client.as_ref()
.ok_or_else(|| AppError::Internal("Not connected to cluster".to_string()))?;
let ns_flag = namespace.map(|ns| format!("-n {}", ns)).unwrap_or_else(|| "--all-namespaces".to_string());
let cmd = format!("kubectl get services {} -o json", ns_flag);
let output = client.execute(&cmd)
.map_err(|e| AppError::External(format!("kubectl error: {}", e)))?;
if output.exit_status != 0 {
return Err(AppError::External(format!("kubectl failed: {}", output.stderr)));
}
let services_json: serde_json::Value = serde_json::from_str(&output.stdout)
.map_err(|e| AppError::Internal(format!("JSON parse error: {}", e)))?;
let services = self.parse_services(&services_json)?;
Ok(ServicesListResponse {
total: services.len(),
services,
})
}
pub async fn list_deployments(&self, namespace: Option<String>) -> Result<DeploymentsListResponse, AppError> {
let client = self.ssh_client.as_ref()
.ok_or_else(|| AppError::Internal("Not connected to cluster".to_string()))?;
let ns_flag = namespace.map(|ns| format!("-n {}", ns)).unwrap_or_else(|| "--all-namespaces".to_string());
let cmd = format!("kubectl get deployments {} -o json", ns_flag);
let output = client.execute(&cmd)
.map_err(|e| AppError::External(format!("kubectl error: {}", e)))?;
if output.exit_status != 0 {
return Err(AppError::External(format!("kubectl failed: {}", output.stderr)));
}
let deployments_json: serde_json::Value = serde_json::from_str(&output.stdout)
.map_err(|e| AppError::Internal(format!("JSON parse error: {}", e)))?;
let deployments = self.parse_deployments(&deployments_json)?;
Ok(DeploymentsListResponse {
total: deployments.len(),
deployments,
})
}
fn parse_nodes(&self, json: &serde_json::Value) -> Result<Vec<NodeResponse>, AppError> {
let items = json["items"].as_array()
.ok_or_else(|| AppError::Internal("Invalid nodes JSON".to_string()))?;
let nodes = items.iter().filter_map(|item| {
let metadata = &item["metadata"];
let status = &item["status"];
Some(NodeResponse {
name: metadata["name"].as_str()?.to_string(),
status: Self::extract_node_status(status),
roles: Self::extract_node_roles(metadata),
version: status["nodeInfo"]["kubeletVersion"].as_str()?.to_string(),
capacity: Self::extract_resource_capacity(&status["capacity"])?,
allocatable: Self::extract_resource_capacity(&status["allocatable"])?,
conditions: Self::extract_node_conditions(status),
})
}).collect();
Ok(nodes)
}
fn parse_pods(&self, json: &serde_json::Value) -> Result<Vec<PodResponse>, AppError> {
let items = json["items"].as_array()
.ok_or_else(|| AppError::Internal("Invalid pods JSON".to_string()))?;
let pods = items.iter().filter_map(|item| {
let metadata = &item["metadata"];
let spec = &item["spec"];
let status = &item["status"];
Some(PodResponse {
name: metadata["name"].as_str()?.to_string(),
namespace: metadata["namespace"].as_str()?.to_string(),
status: status["phase"].as_str()?.to_string(),
phase: status["phase"].as_str()?.to_string(),
node: spec["nodeName"].as_str().unwrap_or("").to_string(),
ip: status["podIP"].as_str().map(|s| s.to_string()),
containers: Self::extract_container_statuses(status),
labels: Self::extract_labels(metadata),
})
}).collect();
Ok(pods)
}
fn parse_services(&self, json: &serde_json::Value) -> Result<Vec<ServiceResponse>, AppError> {
let items = json["items"].as_array()
.ok_or_else(|| AppError::Internal("Invalid services JSON".to_string()))?;
let services = items.iter().filter_map(|item| {
let metadata = &item["metadata"];
let spec = &item["spec"];
Some(ServiceResponse {
name: metadata["name"].as_str()?.to_string(),
namespace: metadata["namespace"].as_str()?.to_string(),
type_: spec["type"].as_str()?.to_string(),
cluster_ip: spec["clusterIP"].as_str()?.to_string(),
ports: Self::extract_service_ports(spec),
labels: Self::extract_labels(metadata),
})
}).collect();
Ok(services)
}
fn parse_deployments(&self, json: &serde_json::Value) -> Result<Vec<DeploymentResponse>, AppError> {
let items = json["items"].as_array()
.ok_or_else(|| AppError::Internal("Invalid deployments JSON".to_string()))?;
let deployments = items.iter().filter_map(|item| {
let metadata = &item["metadata"];
let spec = &item["spec"];
let status = &item["status"];
Some(DeploymentResponse {
name: metadata["name"].as_str()?.to_string(),
namespace: metadata["namespace"].as_str()?.to_string(),
replicas: spec["replicas"].as_i64().unwrap_or(0) as i32,
ready_replicas: status["readyReplicas"].as_i64().unwrap_or(0) as i32,
available_replicas: status["availableReplicas"].as_i64().unwrap_or(0) as i32,
labels: Self::extract_labels(metadata),
})
}).collect();
Ok(deployments)
}
fn extract_node_status(status: &serde_json::Value) -> String {
if let Some(conditions) = status["conditions"].as_array() {
for condition in conditions {
if condition["type"] == "Ready" {
return condition["status"].as_str().unwrap_or("Unknown").to_string();
}
}
}
"Unknown".to_string()
}
fn extract_node_roles(metadata: &serde_json::Value) -> Vec<String> {
let mut roles = Vec::new();
if let Some(labels) = metadata["labels"].as_object() {
for (key, _) in labels {
if key.starts_with("node-role.kubernetes.io/") {
roles.push(key.replace("node-role.kubernetes.io/", ""));
}
}
}
if roles.is_empty() {
roles.push("worker".to_string());
}
roles
}
fn extract_resource_capacity(capacity: &serde_json::Value) -> Option<ResourceCapacity> {
Some(ResourceCapacity {
cpu: capacity["cpu"].as_str()?.to_string(),
memory: capacity["memory"].as_str()?.to_string(),
pods: capacity["pods"].as_str()?.to_string(),
})
}
fn extract_node_conditions(status: &serde_json::Value) -> Vec<NodeCondition> {
status["conditions"].as_array()
.map(|conditions| {
conditions.iter().filter_map(|c| {
Some(NodeCondition {
type_: c["type"].as_str()?.to_string(),
status: c["status"].as_str()?.to_string(),
reason: c["reason"].as_str().map(|s| s.to_string()),
message: c["message"].as_str().map(|s| s.to_string()),
})
}).collect()
})
.unwrap_or_default()
}
fn extract_container_statuses(status: &serde_json::Value) -> Vec<ContainerStatus> {
status["containerStatuses"].as_array()
.map(|statuses| {
statuses.iter().filter_map(|cs| {
Some(ContainerStatus {
name: cs["name"].as_str()?.to_string(),
ready: cs["ready"].as_bool().unwrap_or(false),
restart_count: cs["restartCount"].as_i64().unwrap_or(0) as i32,
image: cs["image"].as_str()?.to_string(),
})
}).collect()
})
.unwrap_or_default()
}
fn extract_service_ports(spec: &serde_json::Value) -> Vec<ServicePort> {
spec["ports"].as_array()
.map(|ports| {
ports.iter().filter_map(|p| {
Some(ServicePort {
name: p["name"].as_str().map(|s| s.to_string()),
port: p["port"].as_i64()? as i32,
target_port: p["targetPort"].to_string(),
node_port: p["nodePort"].as_i64().map(|n| n as i32),
protocol: p["protocol"].as_str().unwrap_or("TCP").to_string(),
})
}).collect()
})
.unwrap_or_default()
}
fn extract_labels(metadata: &serde_json::Value) -> HashMap<String, String> {
metadata["labels"].as_object()
.map(|labels| {
labels.iter()
.filter_map(|(k, v)| Some((k.clone(), v.as_str()?.to_string())))
.collect()
})
.unwrap_or_default()
}
}