#[cfg(test)]
mod test {
use chrono::{DateTime, SecondsFormat, Utc};
use controller::{
apis::coredb_types::CoreDB,
defaults::{default_resources, default_storage},
is_pod_ready,
};
use k8s_openapi::{
api::{
apps::v1::StatefulSet,
batch::v1::CronJob,
core::v1::{
Container, Namespace, PersistentVolumeClaim, Pod, PodSpec, ResourceRequirements, Secret,
ServiceAccount,
},
rbac::v1::{Role, RoleBinding},
},
apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition,
apimachinery::pkg::{api::resource::Quantity, apis::meta::v1::ObjectMeta},
};
use kube::{
api::{AttachParams, ListParams, Patch, PatchParams, PostParams},
runtime::wait::{await_condition, conditions, Condition},
Api, Client, Config,
};
use rand::Rng;
use std::{collections::BTreeMap, str, thread, time::Duration};
use tokio::io::AsyncReadExt;
const API_VERSION: &str = "coredb.io/v1alpha1";
const TIMEOUT_SECONDS_START_POD: u64 = 120;
const TIMEOUT_SECONDS_POD_READY: u64 = 30;
const TIMEOUT_SECONDS_SECRET_PRESENT: u64 = 30;
const TIMEOUT_SECONDS_NS_DELETED: u64 = 30;
const TIMEOUT_SECONDS_COREDB_DELETED: u64 = 45;
async fn create_test_buddy(pods_api: Api<Pod>, name: String) -> String {
let test_pod_name = format!("test-buddy-{}", name);
let pod = Pod {
metadata: ObjectMeta {
name: Some(test_pod_name.clone()),
..ObjectMeta::default()
},
spec: Some(PodSpec {
containers: vec![Container {
command: Some(vec!["sleep".to_string()]),
args: Some(vec!["360".to_string()]),
name: "test-connection".to_string(),
image: Some("curlimages/curl:latest".to_string()),
..Container::default()
}],
restart_policy: Some("Never".to_string()),
..PodSpec::default()
}),
..Pod::default()
};
let _pod = pods_api.create(&PostParams::default(), &pod).await.unwrap();
test_pod_name
}
async fn run_command_in_container(
pods_api: Api<Pod>,
pod_name: String,
command: Vec<String>,
container: Option<String>,
) -> String {
let attach_params = AttachParams {
container: container.clone(),
tty: false,
stdin: true,
stdout: true,
stderr: true,
max_stdin_buf_size: Some(1024),
max_stdout_buf_size: Some(1024),
max_stderr_buf_size: Some(1024),
};
let attach_res = pods_api.exec(pod_name.as_str(), &command, &attach_params).await;
let mut attached_process = match attach_res {
Ok(ap) => ap,
Err(e) => {
panic!(
"Error attaching to pod: {}, container: {:?}, error: {}",
pod_name, container, e
)
}
};
let mut stdout_reader = attached_process.stdout().unwrap();
let mut result_stdout = String::new();
stdout_reader.read_to_string(&mut result_stdout).await.unwrap();
result_stdout
}
#[tokio::test]
#[ignore]
async fn functional_test_basic_create() {
let client = kube_client().await;
let mut rng = rand::thread_rng();
let name = &format!("test-coredb-{}", rng.gen_range(0..100000));
let namespace = "default";
let kind = "CoreDB";
let replicas = 1;
let pods: Api<Pod> = Api::namespaced(client.clone(), namespace);
let test_pod_name = create_test_buddy(pods.clone(), name.to_string()).await;
println!("Creating CoreDB resource {}", name);
let test_metric_decr = format!("coredb_integration_test_{}", rng.gen_range(0..100000));
let coredbs: Api<CoreDB> = Api::namespaced(client.clone(), namespace);
let coredb_json = serde_json::json!({
"apiVersion": API_VERSION,
"kind": kind,
"metadata": {
"name": name
},
"spec": {
"replicas": replicas,
"extensions": [
{
"name": "postgis",
"description": "PostGIS extension",
"locations": [{
"enabled": true,
"version": "1.1.1",
"database": "postgres",
"schema": "public"}
]
}],
"metrics": {
"enabled": true,
"queries": {
"test_ns": {
"query": "SELECT 10 as my_metric, 'cat' as animal",
"master": true,
"metrics": [
{
"my_metric": {
"usage": "GAUGE",
"description": test_metric_decr
}
},
{
"animal": {
"usage": "LABEL",
"description": "Animal type"
}
}
]
},
}
}
}
});
let params = PatchParams::apply("coredb-integration-test");
let patch = Patch::Apply(&coredb_json);
let coredb_resource = coredbs.patch(name, ¶ms, &patch).await.unwrap();
let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
let secret_name = format!("{}-connection", name);
println!("Waiting for secret to be created: {}", secret_name);
let establish = await_condition(secret_api.clone(), &secret_name, wait_for_secret());
let _ = tokio::time::timeout(Duration::from_secs(TIMEOUT_SECONDS_SECRET_PRESENT), establish)
.await
.unwrap_or_else(|_| {
panic!(
"Did not find the secret {} present after waiting {} seconds",
secret_name, TIMEOUT_SECONDS_SECRET_PRESENT
)
});
println!("Found secret: {}", secret_name);
let pod_name = format!("{}-0", name);
println!("Waiting for pod to be running: {}", pod_name);
let _check_for_pod = tokio::time::timeout(
Duration::from_secs(TIMEOUT_SECONDS_START_POD),
await_condition(pods.clone(), &pod_name, conditions::is_pod_running()),
)
.await
.unwrap_or_else(|_| {
panic!(
"Did not find the pod {} to be running after waiting {} seconds",
pod_name, TIMEOUT_SECONDS_START_POD
)
});
println!("Waiting for pod to be ready: {}", pod_name);
let _check_for_pod_ready = tokio::time::timeout(
Duration::from_secs(TIMEOUT_SECONDS_POD_READY),
await_condition(pods.clone(), &pod_name, is_pod_ready()),
)
.await
.unwrap_or_else(|_| {
panic!(
"Did not find the pod {} to be ready after waiting {} seconds",
pod_name, TIMEOUT_SECONDS_POD_READY
)
});
println!("Found pod ready: {}", pod_name);
let pods: Api<Pod> = Api::namespaced(client.clone(), namespace);
let c = vec![
"wget".to_owned(),
"-qO-".to_owned(),
"http://localhost:9187/metrics".to_owned(),
];
thread::sleep(Duration::from_millis(10000));
let result_stdout = run_command_in_container(
pods.clone(),
pod_name.clone(),
c,
Some("postgres-exporter".to_string()),
)
.await;
assert!(result_stdout.contains(&test_metric_decr));
let pvc_api: Api<PersistentVolumeClaim> = Api::namespaced(client.clone(), namespace);
let default_storage: Quantity = default_storage();
let pvc = pvc_api.get(&format!("data-{}", pod_name)).await.unwrap();
let storage = pvc.spec.unwrap().resources.unwrap().requests.unwrap();
let s = storage.get("storage").unwrap().to_owned();
assert_eq!(default_storage, s);
let default_resources: ResourceRequirements = default_resources();
let pg_pod = pods.get(&pod_name).await.unwrap();
let resources = pg_pod.spec.unwrap().containers[0].clone().resources;
assert_eq!(default_resources, resources.unwrap());
let result = coredb_resource
.psql("\\dt".to_string(), "postgres".to_string(), client.clone())
.await
.unwrap();
println!("psql out: {}", result.stdout.clone().unwrap());
assert!(!result.stdout.clone().unwrap().contains("customers"));
let result = coredb_resource
.psql(
"
CREATE TABLE customers (
id serial PRIMARY KEY,
name VARCHAR(50) NOT NULL,
email VARCHAR(50) NOT NULL UNIQUE,
created_at TIMESTAMP DEFAULT NOW()
);
"
.to_string(),
"postgres".to_string(),
client.clone(),
)
.await
.unwrap();
println!("{}", result.stdout.clone().unwrap());
assert!(result.stdout.clone().unwrap().contains("CREATE TABLE"));
let result = coredb_resource
.psql("\\dt".to_string(), "postgres".to_string(), client.clone())
.await
.unwrap();
println!("{}", result.stdout.clone().unwrap());
assert!(result.stdout.clone().unwrap().contains("customers"));
thread::sleep(Duration::from_millis(10000));
let result = coredb_resource
.psql(
"select extname from pg_catalog.pg_extension;".to_string(),
"postgres".to_string(),
client.clone(),
)
.await
.unwrap();
println!("{}", result.stdout.clone().unwrap());
assert!(result.stdout.clone().unwrap().contains("postgis"));
let result = coredb_resource
.psql(
"SELECT rolname FROM pg_roles;".to_string(),
"postgres".to_string(),
client.clone(),
)
.await
.unwrap();
assert!(
result.stdout.clone().unwrap().contains("postgres_exporter"),
"results must contain postgres_exporter: {}",
result.stdout.clone().unwrap()
);
let metrics_service_name = format!("{}-metrics", name);
let command = vec![
String::from("curl"),
format!("http://{metrics_service_name}/metrics"),
];
let result_stdout =
run_command_in_container(pods.clone(), test_pod_name.clone(), command, None).await;
assert!(result_stdout.contains("pg_up 1"));
println!("Found metrics when curling the metrics service");
let coredb_json = serde_json::json!({
"apiVersion": API_VERSION,
"kind": kind,
"metadata": {
"name": name
},
"spec": {
"replicas": replicas,
"extensions": [
{
"name": "postgis",
"description": "PostGIS extension",
"locations": [{
"enabled": false,
"version": "1.1.1",
"database": "postgres",
"schema": "public"}
]
}]
}
});
let params = PatchParams::apply("coredb-integration-test");
let patch = Patch::Apply(&coredb_json);
let coredb_resource = coredbs.patch(name, ¶ms, &patch).await.unwrap();
thread::sleep(Duration::from_millis(5000));
let result = coredb_resource
.psql(
"select extname from pg_catalog.pg_extension;".to_string(),
"postgres".to_string(),
client.clone(),
)
.await
.unwrap();
assert!(
!result.stdout.clone().unwrap().contains("postgis"),
"results should not contain postgis: {}",
result.stdout.clone().unwrap()
);
let spec = coredbs.get(name).await.expect("spec not found");
let status = spec.status.expect("no status on coredb");
let extensions = status.extensions;
assert!(extensions.clone().expect("expected extensions").len() > 0);
assert!(extensions.expect("expected extensions")[0].description.len() > 0);
let coredb_json = serde_json::json!({
"apiVersion": API_VERSION,
"kind": kind,
"metadata": {
"name": name
},
"spec": {
"pkglibdirStorage": "2Gi",
"sharedirStorage" : "2Gi",
}
});
let params = PatchParams::apply("coredb-integration-test");
let patch = Patch::Apply(&coredb_json);
let _ = coredbs.patch(name, ¶ms, &patch).await.unwrap();
thread::sleep(Duration::from_millis(10000));
let pvc = pvc_api.get(&format!("pkglibdir-{}", pod_name)).await.unwrap();
let storage = pvc.spec.unwrap().resources.unwrap().requests.unwrap();
let s = storage.get("storage").unwrap().to_owned();
assert_eq!(Quantity("2Gi".to_owned()), s);
let coredb_json = serde_json::json!({
"apiVersion": API_VERSION,
"kind": kind,
"metadata": {
"name": name
},
"spec": {
"serviceAccountTemplate": {
"metadata": {
"annotations": {
"eks.amazonaws.com/role-arn": "arn:aws:iam::012345678901:role/cdb-test-iam"
}
}
}
}
});
let params = PatchParams::apply("coredb-integration-test");
let patch = Patch::Apply(&coredb_json);
let _coredb_resource = coredbs.patch(name, ¶ms, &patch).await.unwrap();
thread::sleep(Duration::from_millis(5000));
let sa_api: Api<ServiceAccount> = Api::namespaced(client.clone(), namespace);
let sa_name = format!("{}-sa", name);
let sa = sa_api.get(&sa_name).await.unwrap();
assert_eq!(sa.metadata.name.unwrap(), sa_name);
assert_eq!(
sa.metadata.annotations.unwrap().get("eks.amazonaws.com/role-arn"),
Some(&"arn:aws:iam::012345678901:role/cdb-test-iam".to_string())
);
let role_api: Api<Role> = Api::namespaced(client.clone(), namespace);
let role_name = format!("{}-role", name);
let role = role_api.get(&role_name).await.unwrap();
assert_eq!(role.metadata.name.unwrap(), role_name);
let rb_api: Api<RoleBinding> = Api::namespaced(client.clone(), namespace);
let rb_name = format!("{}-role-binding", name);
let role_binding = rb_api.get(&rb_name).await.unwrap();
assert_eq!(role_binding.metadata.name.unwrap(), rb_name);
let stateful_sets_api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
let stateful_set_name = format!("{}", name);
let stateful_set = stateful_sets_api.get(&stateful_set_name).await.unwrap();
let stateful_set_service_account_name = stateful_set
.spec
.as_ref()
.unwrap()
.template
.spec
.as_ref()
.unwrap()
.service_account_name
.as_ref();
assert_eq!(stateful_set_service_account_name, Some(&sa_name));
let mut stateful_set_updated = stateful_set.clone();
stateful_set_updated
.spec
.as_mut()
.unwrap()
.template
.metadata
.as_mut()
.unwrap()
.annotations
.get_or_insert_with(BTreeMap::new)
.insert(
"kubectl.kubernetes.io/restartedAt".to_string(),
Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true),
);
let params = PatchParams::default();
let patch = Patch::Merge(&stateful_set_updated);
let _stateful_set_patched = stateful_sets_api
.patch(&stateful_set_name, ¶ms, &patch)
.await
.unwrap();
let pods_api: Api<Pod> = Api::namespaced(client.clone(), namespace);
let lp = ListParams::default().labels(format!("statefulset={}", stateful_set_name).as_str());
let restart_time = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true).to_string();
loop {
let pods = pods_api.list(&lp).await.unwrap();
let all_pods_ready_and_restarted = pods.iter().all(|pod| {
let pod_restart_time = pod
.metadata
.annotations
.as_ref()
.and_then(|annotations| annotations.get("kubectl.kubernetes.io/restartedAt"))
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&Utc)) .unwrap_or_else(|| Utc::now());
let restart_time_as_datetime = DateTime::parse_from_rfc3339(&restart_time)
.unwrap()
.with_timezone(&Utc);
pod_restart_time > restart_time_as_datetime
&& pod
.status
.as_ref()
.and_then(|status| status.container_statuses.as_ref())
.map(|container_statuses| container_statuses.iter().all(|cs| cs.ready))
.unwrap_or(false)
});
if all_pods_ready_and_restarted {
break;
}
thread::sleep(Duration::from_secs(15));
}
let pods = pods_api.list(&lp).await.unwrap();
let pod = match pods.iter().next() {
Some(pod) => pod,
None => {
println!("Expected label: {}", format!("statefulset={}", stateful_set_name));
panic!("No matching pods found")
}
};
let pod_service_account_name = pod.spec.as_ref().unwrap().service_account_name.as_ref();
assert_eq!(pod_service_account_name, Some(&sa_name));
let coredb_json = serde_json::json!({
"apiVersion": API_VERSION,
"kind": kind,
"metadata": {
"name": name
},
"spec": {
"backup": {
"destinationPath": "s3://test-bucket/coredb/test-org/test-db",
"encryption": "AES256",
"retentionPolicy": "30",
"schedule": "0 0 * * *",
}
}
});
let params = PatchParams::apply("coredb-integration-test");
let patch = Patch::Apply(&coredb_json);
let _coredb_resource = coredbs.patch(name, ¶ms, &patch).await.unwrap();
thread::sleep(Duration::from_millis(5000));
let stateful_sets_api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
let stateful_set_name = format!("{}", name);
let stateful_set = stateful_sets_api.get(&stateful_set_name).await.unwrap();
if let Some(container) = stateful_set
.spec
.as_ref()
.unwrap()
.template
.spec
.as_ref()
.and_then(|s| s.containers.get(0))
{
if let Some(env) = container.env.as_ref() {
let destination_path_env = env
.iter()
.find(|e| e.name == "WALG_S3_PREFIX")
.and_then(|e| e.value.clone());
let walg_s3_sse_env = env
.iter()
.find(|e| e.name == "WALG_S3_SSE")
.and_then(|e| e.value.clone());
assert_eq!(
destination_path_env,
Some(String::from("s3://test-bucket/coredb/test-org/test-db"))
);
assert_eq!(walg_s3_sse_env, Some(String::from("AES256")));
} else {
panic!("No environment variables found in the StatefulSet's container");
}
} else {
panic!("No container found in the StatefulSet's template spec");
}
let cron_jobs_api: Api<CronJob> = Api::namespaced(client.clone(), namespace);
let cron_job_name = format!("{}-daily", name);
let cron_job = cron_jobs_api.get(&cron_job_name).await.unwrap();
assert_eq!(
cron_job.spec.as_ref().unwrap().schedule,
String::from("0 0 * * *")
);
}
#[tokio::test]
#[ignore]
async fn function_test_skip_reconciliation() {
let client = kube_client().await;
let mut rng = rand::thread_rng();
let name = &format!("test-coredb-{}", rng.gen_range(0..100000));
let namespace = "default";
let kind = "CoreDB";
let replicas = 1;
let pods: Api<Pod> = Api::namespaced(client.clone(), namespace);
println!("Creating CoreDB resource {}", name);
let coredbs: Api<CoreDB> = Api::namespaced(client.clone(), namespace);
let coredb_json = serde_json::json!({
"apiVersion": API_VERSION,
"kind": kind,
"metadata": {
"name": name,
"annotations": {
"coredbs.coredb.io/watch": "false"
}
},
"spec": {
"replicas": replicas,
}
});
let params = PatchParams::apply("coredb-integration-test-skip-reconciliation");
let patch = Patch::Apply(&coredb_json);
let _coredb_resource = coredbs.patch(name, ¶ms, &patch).await.unwrap();
thread::sleep(Duration::from_millis(5000));
let coredb = coredbs.get(name).await.unwrap();
let annotations = coredb.metadata.annotations.as_ref().unwrap();
assert_eq!(
annotations.get("coredbs.coredb.io/watch"),
Some(&String::from("false"))
);
let expected_pod_name = format!("{}-{}", name, 0);
let pod = pods.get(&expected_pod_name).await;
assert!(pod.is_err());
}
#[tokio::test]
#[ignore]
async fn functional_test_delete_namespace() {
let client = kube_client().await;
let mut rng = rand::thread_rng();
let name = &format!("test-coredb-{}", rng.gen_range(0..100000));
let namespace = name;
let ns_api: Api<Namespace> = Api::all(client.clone());
let params = PatchParams::apply("coredb-integration-test").force();
let ns = serde_json::json!({
"apiVersion": "v1",
"kind": "Namespace",
"metadata": {
"name": format!("{}", namespace),
}
});
ns_api
.patch(namespace, ¶ms, &Patch::Apply(&ns))
.await
.unwrap();
println!("Creating CoreDB resource {}", name);
let coredbs: Api<CoreDB> = Api::namespaced(client.clone(), namespace);
let coredb_json = serde_json::json!({
"apiVersion": API_VERSION,
"kind": "CoreDB",
"metadata": {
"name": name
},
"spec": {
"replicas": 1,
"extensions": [
{
"name": "postgis",
"description": "PostGIS extension",
"locations": [{
"enabled": false,
"version": "1.1.1",
"database": "postgres",
"schema": "public"}
]
}]
}
});
let params = PatchParams::apply("coredb-integration-test");
let patch = Patch::Apply(&coredb_json);
coredbs.patch(name, ¶ms, &patch).await.unwrap();
let pod_name = format!("{}-0", name);
let pods: Api<Pod> = Api::namespaced(client.clone(), namespace);
println!("Waiting for pod to be running: {}", pod_name);
let _check_for_pod = tokio::time::timeout(
Duration::from_secs(TIMEOUT_SECONDS_START_POD),
await_condition(pods.clone(), &pod_name, conditions::is_pod_running()),
)
.await
.unwrap_or_else(|_| {
panic!(
"Did not find the pod {} to be running after waiting {} seconds",
pod_name, TIMEOUT_SECONDS_START_POD
)
});
println!("Waiting for pod to be ready: {}", pod_name);
let _check_for_pod_ready = tokio::time::timeout(
Duration::from_secs(TIMEOUT_SECONDS_POD_READY),
await_condition(pods.clone(), &pod_name, is_pod_ready()),
)
.await
.unwrap_or_else(|_| {
panic!(
"Did not find the pod {} to be ready after waiting {} seconds",
pod_name, TIMEOUT_SECONDS_POD_READY
)
});
println!("Found pod ready: {}", pod_name);
ns_api.delete(namespace, &Default::default()).await.unwrap();
println!("Waiting for CoreDB to be deleted: {}", &name);
let _assert_coredb_deleted = tokio::time::timeout(
Duration::from_secs(TIMEOUT_SECONDS_COREDB_DELETED),
await_condition(coredbs.clone(), name, conditions::is_deleted("")),
)
.await
.unwrap_or_else(|_| {
panic!(
"CoreDB {} was not deleted after waiting {} seconds",
name, TIMEOUT_SECONDS_COREDB_DELETED
)
});
println!("Waiting for namespace to be deleted: {}", &namespace);
tokio::time::timeout(Duration::from_secs(TIMEOUT_SECONDS_NS_DELETED), async move {
loop {
let get_ns = ns_api.get_opt(namespace).await.unwrap();
if get_ns.is_none() {
break;
}
}
})
.await
.unwrap_or_else(|_| {
panic!(
"Namespace {} was not deleted after waiting {} seconds",
namespace, TIMEOUT_SECONDS_NS_DELETED
)
});
}
#[tokio::test]
#[ignore]
async fn test_stop_instance() {
let client = kube_client().await;
let mut rng = rand::thread_rng();
let name = &format!("test-stop-coredb-{}", rng.gen_range(0..100000));
let namespace = "default";
let kind = "CoreDB";
let pods: Api<Pod> = Api::namespaced(client.clone(), namespace);
println!("Creating CoreDB resource {}", name);
let coredbs: Api<CoreDB> = Api::namespaced(client.clone(), namespace);
let coredb_json = serde_json::json!({
"apiVersion": API_VERSION,
"kind": kind,
"metadata": {
"name": name
},
"spec": {
"stop": false
}
});
let params = PatchParams::apply("coredb-integration-test");
let patch = Patch::Apply(&coredb_json);
let coredb_resource = coredbs.patch(name, ¶ms, &patch).await.unwrap();
let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
let secret_name = format!("{}-connection", name);
println!("Waiting for secret to be created: {}", secret_name);
let establish = await_condition(secret_api.clone(), &secret_name, wait_for_secret());
let _ = tokio::time::timeout(Duration::from_secs(TIMEOUT_SECONDS_SECRET_PRESENT), establish)
.await
.unwrap_or_else(|_| {
panic!(
"Did not find the secret {} present after waiting {} seconds",
secret_name, TIMEOUT_SECONDS_SECRET_PRESENT
)
});
println!("Found secret: {}", secret_name);
let pod_name = format!("{}-0", name);
println!("Waiting for pod to be running: {}", pod_name);
let _check_for_pod = tokio::time::timeout(
Duration::from_secs(TIMEOUT_SECONDS_START_POD),
await_condition(pods.clone(), &pod_name, conditions::is_pod_running()),
)
.await
.unwrap_or_else(|_| {
panic!(
"Did not find the pod {} to be running after waiting {} seconds",
pod_name, TIMEOUT_SECONDS_START_POD
)
});
println!("Waiting for pod to be ready: {}", pod_name);
let _check_for_pod_ready = tokio::time::timeout(
Duration::from_secs(TIMEOUT_SECONDS_POD_READY),
await_condition(pods.clone(), &pod_name, is_pod_ready()),
)
.await
.unwrap_or_else(|_| {
panic!(
"Did not find the pod {} to be ready after waiting {} seconds",
pod_name, TIMEOUT_SECONDS_POD_READY
)
});
println!("Found pod ready: {}", pod_name);
let default_resources: ResourceRequirements = default_resources();
let pg_pod = pods.get(&pod_name).await.unwrap();
let resources = pg_pod.spec.unwrap().containers[0].clone().resources;
assert_eq!(default_resources, resources.unwrap());
let result = coredb_resource
.psql("\\dt".to_string(), "postgres".to_string(), client.clone())
.await
.unwrap();
println!("{}", result.stderr.clone().unwrap());
assert!(result
.stderr
.clone()
.unwrap()
.contains("Did not find any relations."));
let result = coredb_resource
.psql(
"
CREATE TABLE stop_test (
id serial PRIMARY KEY,
created_at TIMESTAMP DEFAULT NOW()
);
"
.to_string(),
"postgres".to_string(),
client.clone(),
)
.await
.unwrap();
println!("{}", result.stdout.clone().unwrap());
assert!(result.stdout.clone().unwrap().contains("CREATE TABLE"));
let result = coredb_resource
.psql("\\dt".to_string(), "postgres".to_string(), client.clone())
.await
.unwrap();
println!("{}", result.stdout.clone().unwrap());
assert!(result.stdout.clone().unwrap().contains("stop_test"));
thread::sleep(Duration::from_millis(5000));
let coredb_json = serde_json::json!({
"apiVersion": API_VERSION,
"kind": kind,
"metadata": {
"name": name
},
"spec": {
"stop": true,
}
});
let params = PatchParams::apply("coredb-integration-test");
let patch = Patch::Apply(&coredb_json);
let coredb_resource = coredbs.patch(name, ¶ms, &patch).await.unwrap();
thread::sleep(Duration::from_millis(8000));
let res = coredb_resource.primary_pod(client.clone()).await;
assert!(res.is_err());
let coredb_json = serde_json::json!({
"apiVersion": API_VERSION,
"kind": kind,
"metadata": {
"name": name
},
"spec": {
"stop": false,
}
});
let params = PatchParams::apply("coredb-integration-test");
let patch = Patch::Apply(&coredb_json);
let coredb_resource = coredbs.patch(name, ¶ms, &patch).await.unwrap();
println!("Waiting for pod to be running: {}", pod_name);
let check_for_pod = tokio::time::timeout(
Duration::from_secs(TIMEOUT_SECONDS_START_POD),
await_condition(pods.clone(), &pod_name, conditions::is_pod_running()),
);
assert!(check_for_pod.await.is_ok());
let result = coredb_resource
.psql("\\dt".to_string(), "postgres".to_string(), client.clone())
.await
.unwrap();
println!("{}", result.stdout.clone().unwrap());
assert!(result.stdout.clone().unwrap().contains("stop_test"));
}
async fn kube_client() -> Client {
let kube_config = Config::infer()
.await
.expect("Please configure your Kubernetes context.");
let selected_namespace = &kube_config.default_namespace;
let client = Client::try_from(kube_config.clone()).expect("Failed to initialize Kubernetes client");
let namespaces: Api<Namespace> = Api::all(client.clone());
let namespace = namespaces.get(selected_namespace).await.unwrap();
let labels = namespace.metadata.labels.unwrap();
assert!(
labels.contains_key("safe-to-run-coredb-tests"),
"expected to find label 'safe-to-run-coredb-tests'"
);
assert_eq!(
labels["safe-to-run-coredb-tests"], "true",
"expected to find label 'safe-to-run-coredb-tests' with value 'true'"
);
let custom_resource_definitions: Api<CustomResourceDefinition> = Api::all(client.clone());
let _check_for_crd = tokio::time::timeout(
Duration::from_secs(2),
await_condition(
custom_resource_definitions,
"coredbs.coredb.io",
conditions::is_crd_established(),
),
)
.await
.expect("Custom Resource Definition for CoreDB was not found.");
client
}
fn wait_for_secret() -> impl Condition<Secret> {
|obj: Option<&Secret>| {
if let Some(secret) = &obj {
if let Some(t) = &secret.type_ {
return t == "Opaque";
}
}
false
}
}
}