use crate::{
apis::coredb_types::{CoreDB, CoreDBSpec, CoreDBStatus},
Context, Result, COREDB_FINALIZER,
};
use assert_json_diff::assert_json_include;
use http::{Request, Response};
use k8s_openapi::api::core::v1::{Pod, Secret};
use kube::{
api::ObjectMeta, api::TypeMeta, client::Body, core::ObjectList, runtime::events::Recorder,
Client, Resource, ResourceExt,
};
use std::sync::Arc;
impl CoreDB {
pub fn illegal() -> Self {
let mut d = CoreDB::new("illegal", CoreDBSpec::default());
d.meta_mut().namespace = Some("default".into());
d
}
pub fn test() -> Self {
let mut d = CoreDB::new("testdb", CoreDBSpec::default());
d.meta_mut().namespace = Some("testns".into());
d.meta_mut().uid = Some("752d59ef-2671-4890-9feb-0097459b18c8".into());
d.spec.replicas = 1;
d.spec.postgresExporterEnabled = false;
d
}
pub fn needs_stop(mut self) -> Self {
self.spec.stop = true;
self
}
pub fn needs_delete(mut self) -> Self {
use chrono::prelude::{DateTime, TimeZone, Utc};
let now: DateTime<Utc> = Utc.with_ymd_and_hms(2017, 4, 2, 12, 50, 32).unwrap();
use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time;
self.meta_mut().deletion_timestamp = Some(Time(now));
self
}
pub fn finalized(mut self) -> Self {
self.finalizers_mut().push(COREDB_FINALIZER.to_string());
self
}
pub fn with_status(mut self, status: CoreDBStatus) -> Self {
self.status = Some(status);
self
}
}
type ApiServerHandle = tower_test::mock::Handle<Request<Body>, Response<Body>>;
pub struct ApiServerVerifier(ApiServerHandle);
pub enum Scenario {
FinalizerCreation(CoreDB),
StatusPatch(CoreDB),
EventPublishThenStatusPatch(String, CoreDB),
RadioSilence,
Cleanup(String, CoreDB),
}
pub async fn timeout_after_1s(handle: tokio::task::JoinHandle<()>) {
tokio::time::timeout(std::time::Duration::from_secs(1), handle)
.await
.expect("timeout on mock apiserver")
.expect("scenario succeeded")
}
impl ApiServerVerifier {
pub fn run(self, scenario: Scenario) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
match scenario {
Scenario::FinalizerCreation(cdb) => self.handle_finalizer_creation(&cdb).await,
Scenario::StatusPatch(cdb) => self.handle_coredb_patch(&cdb).await,
Scenario::EventPublishThenStatusPatch(reason, cdb) => {
self.handle_event_create(reason)
.await
.unwrap()
.handle_coredb_patch(&cdb)
.await
}
Scenario::RadioSilence => Ok(self),
Scenario::Cleanup(reason, cdb) => {
self.handle_event_create(reason)
.await
.unwrap()
.handle_finalizer_removal(cdb)
.await
}
}
.expect("scenario completed without errors");
})
}
async fn handle_finalizer_creation(mut self, cdb: &CoreDB) -> Result<Self> {
let (request, send) = self.0.next_request().await.expect("service not called");
let coredb = cdb.clone();
assert_eq!(request.method(), http::Method::PATCH);
assert_eq!(
request.uri().to_string(),
format!(
"/apis/coredb.io/v1alpha1/namespaces/testns/coredbs/{}?",
coredb.name_any()
)
);
let expected_patch = serde_json::json!([
{ "op": "test", "path": "/metadata/finalizers", "value": null },
{ "op": "add", "path": "/metadata/finalizers", "value": vec![COREDB_FINALIZER] }
]);
let req_body = request.into_body().collect_bytes().await.unwrap();
let runtime_patch: serde_json::Value =
serde_json::from_slice(&req_body).expect("valid document from runtime");
assert_json_include!(actual: runtime_patch, expected: expected_patch);
let response = serde_json::to_vec(&coredb.finalized()).unwrap(); send.send_response(Response::builder().body(Body::from(response)).unwrap());
Ok(self)
}
async fn handle_finalizer_removal(mut self, cdb: CoreDB) -> Result<Self> {
let (request, send) = self.0.next_request().await.expect("service not called");
let coredb = cdb.clone();
assert_eq!(request.method(), http::Method::PATCH);
assert_eq!(
request.uri().to_string(),
format!(
"/apis/coredb.io/v1alpha1/namespaces/testns/coredbs/{}?",
coredb.name_any()
)
);
let expected_patch = serde_json::json!([
{ "op": "test", "path": "/metadata/finalizers/0", "value": COREDB_FINALIZER },
{ "op": "remove", "path": "/metadata/finalizers/0" }
]);
let req_body = request.into_body().collect_bytes().await.unwrap();
let runtime_patch: serde_json::Value =
serde_json::from_slice(&req_body).expect("valid document from runtime");
assert_json_include!(actual: runtime_patch, expected: expected_patch);
let response = serde_json::to_vec(&coredb).unwrap(); send.send_response(Response::builder().body(Body::from(response)).unwrap());
Ok(self)
}
async fn handle_coredb_patch(mut self, coredb: &CoreDB) -> Result<Self> {
let (request, send) = self.0.next_request().await.expect("service not called");
let coredb = coredb.clone();
assert_eq!(request.method(), http::Method::GET);
assert_eq!(
request.uri().to_string(),
format!("/api/v1/namespaces/testns/secrets?&labelSelector=app%3Dcoredb")
);
let obj: ObjectList<Secret> = ObjectList {
metadata: Default::default(),
items: vec![],
types: TypeMeta::default(),
};
let response = serde_json::to_vec(&obj).unwrap();
send.send_response(Response::builder().body(Body::from(response)).unwrap());
let (request, send) = self.0.next_request().await.expect("service not called");
assert_eq!(request.method(), http::Method::PATCH);
assert_eq!(
request.uri().to_string(),
format!(
"/api/v1/namespaces/testns/secrets/testdb-connection?&force=true&fieldManager=cntrlr"
)
);
send.send_response(Response::builder().body(request.into_body()).unwrap());
let (request, send) = self.0.next_request().await.expect("service not called");
assert_eq!(request.method(), http::Method::PATCH);
assert_eq!(
request.uri().to_string(),
format!(
"/apis/apps/v1/namespaces/testns/statefulsets/testdb?&force=true&fieldManager=cntrlr"
)
);
send.send_response(Response::builder().body(request.into_body()).unwrap());
let (request, send) = self.0.next_request().await.expect("service not called");
assert_eq!(request.method(), http::Method::PATCH);
assert_eq!(
request.uri().to_string(),
format!("/api/v1/namespaces/testns/services/testdb?&force=true&fieldManager=cntrlr")
);
send.send_response(Response::builder().body(request.into_body()).unwrap());
let (request, send) = self.0.next_request().await.expect("service not called");
assert_eq!(request.method(), http::Method::GET);
assert_eq!(
request.uri().to_string(),
format!("/api/v1/namespaces/testns/pods?&labelSelector=statefulset%3Dtestdb")
);
let pod: Pod = Pod {
metadata: ObjectMeta {
name: Some("testdb-0".to_string()),
namespace: Some("testns".to_string()),
..ObjectMeta::default()
},
..Pod::default()
};
let obj: ObjectList<Pod> = ObjectList {
metadata: Default::default(),
items: vec![pod],
types: TypeMeta::default(),
};
let response = serde_json::to_vec(&obj).unwrap();
send.send_response(Response::builder().body(Body::from(response)).unwrap());
let (request, send) = self.0.next_request().await.expect("service not called");
assert_eq!(request.method(), http::Method::PATCH);
assert_eq!(
request.uri().to_string(),
format!(
"/apis/coredb.io/v1alpha1/namespaces/testns/coredbs/{}/status?&force=true&fieldManager=cntrlr",
coredb.name_any()
)
);
let req_body = request.into_body().collect_bytes().await.unwrap();
let json: serde_json::Value =
serde_json::from_slice(&req_body).expect("patch_status object is json");
let status_json = json.get("status").expect("status object").clone();
let status: CoreDBStatus =
serde_json::from_value(status_json).expect("contains valid status");
assert!(
status.running,
"CoreDB::test says the status isn't running, but it was expected to be running."
);
let response = serde_json::to_vec(&coredb.with_status(status)).unwrap();
send.send_response(Response::builder().body(Body::from(response)).unwrap());
Ok(self)
}
async fn handle_event_create(mut self, reason: String) -> Result<Self> {
let (request, send) = self.0.next_request().await.expect("service not called");
assert_eq!(request.method(), http::Method::POST);
assert_eq!(
request.uri().to_string(),
format!("/apis/events.k8s.io/v1/namespaces/default/events?")
);
let req_body = request.into_body().collect_bytes().await.unwrap();
let postdata: serde_json::Value =
serde_json::from_slice(&req_body).expect("valid event from runtime");
dbg!("postdata for event: {}", postdata.clone());
assert_eq!(
postdata.get("reason").unwrap().as_str().map(String::from),
Some(reason)
);
send.send_response(Response::builder().body(Body::from(req_body)).unwrap());
Ok(self)
}
}
impl Context {
pub fn test() -> (Arc<Self>, ApiServerVerifier) {
let (mock_service, handle) = tower_test::mock::pair::<Request<Body>, Response<Body>>();
let mock_client = Client::new(mock_service, "default");
let mock_recorder = Recorder::new(mock_client.clone(), "doc-ctrl-test".into());
let ctx = Self {
client: mock_client,
metrics: Arc::default(),
diagnostics: Arc::default(),
recorder: mock_recorder,
};
(Arc::new(ctx), ApiServerVerifier(handle))
}
}