use anyhow::Result;
use clap::Args;
use log::info;
use scopeguard::defer;
use serde::{Deserialize, Serialize};
use sqlx::{Pool, Postgres, query};
use std::{io::BufRead, io::BufReader, process::Command, process::Stdio};
use tokio::sync::watch::{Receiver, channel};
use toml_config_trait::TomlConfig;
use toml_config_trait::TomlConfigTrait;
#[derive(TomlConfig, Serialize, Deserialize, Debug, Clone, Args)]
pub struct CockroachConfig {
pub namespace: String,
pub primary_port: i32,
pub secondary_port: i32,
pub database_names: Vec<String>,
pub replicas: i32,
pub cpus: String,
pub memory: String,
pub storage: String,
}
impl Default for CockroachConfig {
fn default() -> Self {
Self {
namespace: String::from("default"),
primary_port: 26257,
secondary_port: 8080,
database_names: vec![String::from("test_db")],
replicas: 3,
cpus: String::from("2"),
memory: String::from("8Gi"),
storage: String::from("50Gi"),
}
}
}
impl CockroachConfig {
pub fn new_from_path(config_path: &str) -> Result<Self> {
let config = CockroachConfig::read_from_path(config_path.into())?;
Ok(config)
}
pub fn write_default(config_path: &str) -> Result<Self> {
let cockroach_config = CockroachConfig::default();
cockroach_config.write_to_path(config_path.into())?;
Ok(cockroach_config)
}
pub(crate) fn create_db(&self) -> String {
self.database_names.iter().fold("".to_string(), |acc, x| {
acc + &format!("create database if not exists {}; ", x)
})
}
async fn cockroach_port_forward(&self, mut cockroach_rx: Receiver<bool>) -> Result<()> {
let mut cmd = Command::new("kubectl")
.arg("port-forward")
.arg("--address=0.0.0.0")
.arg("-n")
.arg(self.namespace.clone())
.arg("statefulset/cockroachdb")
.arg(format!("{}:{}", self.primary_port, self.primary_port))
.stdout(Stdio::piped())
.spawn()?;
let stdout = cmd.stdout.take().unwrap();
let stdout_reader = BufReader::new(stdout);
let stdout_lines = stdout_reader.lines();
if cockroach_rx.wait_for(|val| *val).await.is_ok() {
info!("terminating process");
cmd.kill().expect("error killing cockroach_child");
return Ok(());
}
for line in stdout_lines {
info!("{:?}", line?);
}
Ok(())
}
pub async fn clear_database(&self) -> Result<()> {
info!("Running: 'refresh_database'");
let (tx, rx) = channel(false);
let c = self.clone();
tokio::task::spawn(async move {
c.cockroach_port_forward(rx)
.await
.expect("cockroach_port_forward error");
});
defer!(tx.send(true).expect("send err"););
for db in &self.database_names {
info!("db: {}", &db);
let db_url = format!(
"postgresql://root@localhost:{}/{}?sslmode=disable",
&self.primary_port, db
);
info!("db_url: {}", &db_url);
let db_conn: Pool<Postgres> = Pool::connect_lazy(&db_url)?;
let create_db = format!("create database if not exists {};", &db);
let delete_db = format!("drop database {};", &db);
query(&delete_db).execute(&db_conn).await?;
query(&create_db).execute(&db_conn).await?;
}
info!("refresh_database -> Success!");
Ok(())
}
}