use std::{ collections::BTreeMap, fmt, process::Command, result::Result::Ok };
use kube::core::ErrorResponse;
use serde::Serialize;
use colored::Colorize;
use k8s_openapi::api::core::v1::ConfigMap;
use k8s_openapi::serde_json::json;
use kube::api::{ Api, ObjectMeta, Patch, PatchParams, PostParams };
use kube::client::Client;
pub static BASE_COMMAND: &str = "kubectl";
#[derive(Debug)]
pub enum CliError {
InstallerError {
reason: String,
},
ClientError(kube::Error),
UninstallError {
reason: String,
},
AgentError(tonic_reflection::server::Error),
MonitoringError {
reason: String,
},
}
impl From<kube::Error> for CliError {
fn from(e: kube::Error) -> Self {
CliError::ClientError(e)
}
}
impl From<anyhow::Error> for CliError {
fn from(e: anyhow::Error) -> Self {
CliError::MonitoringError { reason: format!("{}", e) }
}
}
impl From<()> for CliError {
fn from (v: ()) -> Self{
return ().into()
}
}
impl fmt::Display for CliError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CliError::InstallerError { reason } => {
write!(
f,
"An error occured while installing cortexflow components. Reason: {}",
reason
)
}
CliError::UninstallError { reason } => {
write!(
f,
"An error occured while installing cortexflow components. Reason: {}",
reason
)
}
CliError::MonitoringError { reason } => {
write!(
f,
"An error occured while installing cortexflow components. Reason: {}",
reason
)
}
CliError::ClientError(e) => write!(f, "Client Error: {}", e),
CliError::AgentError(e) => write!(f, "Agent Error: {}", e),
}
}
}
#[derive(Serialize)]
pub struct MetadataConfigFile {
blocklist: Vec<String>,
}
pub async fn connect_to_client() -> Result<Client, kube::Error> {
let client = Client::try_default().await;
client
}
pub fn update_cli() {
println!("{} {}", "=====>".blue().bold(), "Updating CortexFlow CLI");
println!("{} {}", "=====>".blue().bold(), "Looking for a newer version");
let output = Command::new("cargo").args(["update", "cortexflow-cli"]).output().expect("error");
if !output.status.success() {
eprintln!("Error updating CLI : {}", String::from_utf8_lossy(&output.stderr));
} else {
println!("✅ Updated CLI");
}
}
pub fn info() {
println!("{} {} {}", "=====>".blue().bold(), "Version:", env!("CARGO_PKG_VERSION"));
println!("{} {} {}", "=====>".blue().bold(), "Author:", env!("CARGO_PKG_AUTHORS"));
println!("{} {} {}", "=====>".blue().bold(), "Description:", env!("CARGO_PKG_DESCRIPTION"));
}
pub fn create_configs() -> MetadataConfigFile {
let mut blocklist: Vec<String> = Vec::new();
blocklist.push("".to_string());
let configs = MetadataConfigFile { blocklist };
configs
}
pub async fn read_configs() -> Result<Vec<String>, CliError> {
match connect_to_client().await {
Ok(client) => {
let namespace = "cortexflow";
let configmap = "cortexbrain-client-config";
let api: Api<ConfigMap> = Api::namespaced(client, namespace);
let cm = api.get(configmap).await?;
if let Some(data) = cm.data {
if let Some(blocklist_raw) = data.get("blocklist") {
let lines: Vec<String> = blocklist_raw
.lines()
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty()) .collect();
return Ok(lines);
}
}
Ok(Vec::new()) }
Err(_) => {
Err(
CliError::ClientError(
kube::Error::Api(ErrorResponse {
status: "failed".to_string(),
message: "Failed to connect to kubernetes client".to_string(),
reason: "Your cluster is probably disconnected".to_string(),
code: 404,
})
)
)
}
}
}
pub async fn create_config_file(config_struct: MetadataConfigFile) -> Result<(), CliError> {
match connect_to_client().await {
Ok(client) => {
let namespace = "cortexflow";
let configmap = "cortexbrain-client-config";
let api: Api<ConfigMap> = Api::namespaced(client, namespace);
let mut data = BTreeMap::new();
for x in config_struct.blocklist {
data.insert("blocklist".to_string(), x);
}
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("cortexbrain-client-config".to_string()),
..Default::default()
}, data: Some(data), ..Default::default()
};
match api.create(&PostParams::default(), &cm).await {
Ok(_) => {
println!("Configmap created successfully");
}
Err(e) => {
eprintln!("An error occured: {}", e);
}
}
Ok(())
}
Err(_) => {
Err(
CliError::ClientError(
kube::Error::Api(ErrorResponse {
status: "failed".to_string(),
message: "Failed to connect to kubernetes client".to_string(),
reason: "Your cluster is probably disconnected".to_string(),
code: 404,
})
)
)
}
}
}
pub async fn update_config_metadata(input: &str, action: &str) -> Result<(), CliError> {
if action == "add" {
let mut ips = read_configs().await?;
println!("Readed current blocked ips: {:?}", ips);
ips.push(input.to_string());
let new_configs = MetadataConfigFile { blocklist: ips };
update_configmap(new_configs).await?;
} else if action == "delete" {
let mut ips = read_configs().await?;
if let Some(index) = ips.iter().position(|target| target == &input.to_string()) {
ips.remove(index);
} else {
eprintln!("Index of element not found");
}
let new_configs = MetadataConfigFile { blocklist: ips };
update_configmap(new_configs).await?;
}
Ok(())
}
pub async fn update_configmap(config_struct: MetadataConfigFile) -> Result<(), CliError> {
match connect_to_client().await {
Ok(client) => {
let namespace = "cortexflow";
let name = "cortexbrain-client-config";
let api: Api<ConfigMap> = Api::namespaced(client, namespace);
let blocklist_yaml = config_struct.blocklist
.iter()
.map(|x| format!("{}", x))
.collect::<Vec<String>>()
.join("\n");
let patch = Patch::Apply(
json!({
"apiVersion": "v1",
"kind": "ConfigMap",
"data": {
"blocklist": blocklist_yaml
}
})
);
let patch_params = PatchParams::apply("cortexbrain").force();
match api.patch(name, &patch_params, &patch).await {
Ok(_) => {
println!("Map updated successfully");
}
Err(e) => {
eprintln!("An error occured during the patching process: {}", e);
return Err(e.into());
}
}
Ok(())
}
Err(_) => {
Err(
CliError::ClientError(
kube::Error::Api(ErrorResponse {
status: "failed".to_string(),
message: "Failed to connect to kubernetes client".to_string(),
reason: "Your cluster is probably disconnected".to_string(),
code: 404,
})
)
)
}
}
}