use std::collections::HashSet;
use std::sync::Arc;
use assertables::*;
use clockabilly::Clockable;
use clockabilly::mock::MockUtcClock;
use futures::stream;
use futures::stream::StreamExt;
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1;
use kube::ResourceExt;
use kube::discovery::ApiResource;
use kube::runtime::watcher::Event;
use serde_json::json;
use sk_api::v1::ExportFilters;
use sk_core::k8s::{
DynamicApiSet,
GVK,
format_gvk_name,
};
use sk_core::macros::*;
use tokio::sync::{
Mutex,
mpsc,
};
use super::*;
use crate::TraceStore;
use crate::manager::handle_messages;
use crate::watchers::{
ObjStream,
dyn_obj_watcher,
pod_watcher,
};
fn d(idx: i64) -> DynamicObject {
test_deployment(&format!("depl{idx}"))
}
fn rs(idx: i64) -> DynamicObject {
let mut obj = DynamicObject::new(&format!("repset{idx}"), &ApiResource::from_gvk(&REPLICASET_GVK))
.within(TEST_NAMESPACE)
.data(json!({"spec": {"replicas": 42}}));
obj.owner_references_mut().push(metav1::OwnerReference {
api_version: "apps/v1".into(),
kind: "Deployment".into(),
name: format!("depl{idx}"),
..Default::default()
});
obj
}
fn test_stream(clock: MockUtcClock) -> ObjStream<DynamicObject> {
stream::unfold((-1, -1), move |state| {
println!("current state machine state: {state:?}");
let mut c = clock.clone();
async move {
match state {
(-1, -1) => {
return Some((Ok(Event::Init), (-1, 0)));
},
(-1, id) if id < 10 => {
let obj = d(id);
return Some((Ok(Event::InitApply(obj)), (-1, id + 1)));
},
(-1, id) if id < 20 => {
let obj = rs(id - 10);
return Some((Ok(Event::InitApply(obj)), (-1, id + 1)));
},
(-1, 20) => {
return Some((Ok(Event::InitDone), (0, 0)));
},
(0, id) => {
let obj = d(id);
let new_ts = c.advance(5);
return Some((Ok(Event::Apply(obj)), (new_ts, id)));
},
(5..=19, id) if id < 10 => {
let obj = d(id);
let curr_ts = c.now_ts();
return Some((Ok(Event::Delete(obj)), (curr_ts, id + 10)));
},
(5..=19, id) if id >= 10 => {
let obj = rs(id - 10);
let new_ts = c.advance(5);
return Some((Ok(Event::Delete(obj)), (new_ts, id - 9)));
},
(20, id) => {
let mut obj = d(30);
obj.metadata.namespace = Some("kube-system".into());
let new_ts = c.advance(4);
return Some((Ok(Event::Apply(obj)), (new_ts, id)));
},
(24, id) => {
let mut obj = d(31);
obj.labels_mut().insert("foo".into(), "bar".into());
let new_ts = c.advance(1);
return Some((Ok(Event::Apply(obj)), (new_ts, id)));
},
(25..=55, id) if id < 10 => {
let obj = d(id);
let curr_ts = c.now_ts();
return Some((Ok(Event::Delete(obj)), (curr_ts, id + 10)));
},
(25..=55, id) if id >= 10 => {
let obj = rs(id - 10);
let new_ts = c.advance(5);
return Some((Ok(Event::Delete(obj)), (new_ts, id - 9)));
},
_ => None,
}
}
})
.boxed()
}
fn objs_in_trace(trace: &ExportedTrace) -> HashSet<String> {
let mut objs = HashSet::new();
for evt in &trace.events {
for obj in &evt.applied_objs {
objs.insert(format_gvk_name(&GVK::from_dynamic_obj(&obj).unwrap(), &obj.namespaced_name()));
}
for obj in &evt.deleted_objs {
objs.remove(&format_gvk_name(&GVK::from_dynamic_obj(&obj).unwrap(), &obj.namespaced_name()));
}
}
objs
}
mod itest {
use super::*;
#[rstest(tokio::test)]
#[case::full_trace(None)]
#[case::partial_trace(Some("10s".into()))]
async fn test_export(#[case] duration: Option<String>) {
let clock = MockUtcClock::boxed(0);
let config = TracerConfig::default();
let (mut fake_apiserver, client) = make_fake_apiserver();
let apiset = DynamicApiSet::new(client);
fake_apiserver.handle(|when, then| {
when.path("/apis/apps/v1");
then.json_body(apps_v1_discovery());
});
for i in 0..10 {
fake_apiserver.handle(move |when, then| {
when.path("/apis/apps/v1/deployments")
.query_param("fieldSelector", format!("metadata.namespace={TEST_NAMESPACE},metadata.name=depl{i}"));
then.json_body(json!({
"metadata": {},
"items": [
{
"metadata": {
"namespace": TEST_NAMESPACE,
"name": format!("depl{i}"),
}
},
],
}));
});
}
let s = Arc::new(Mutex::new(TraceStore::new(config.clone(), apiset)));
let (dyn_obj_tx, dyn_obj_rx): (dyn_obj_watcher::Sender, dyn_obj_watcher::Receiver) = mpsc::unbounded_channel();
let (_, pod_rx): (pod_watcher::Sender, pod_watcher::Receiver) = mpsc::unbounded_channel();
let (ready_tx, _): (mpsc::Sender<bool>, mpsc::Receiver<bool>) = mpsc::channel(1);
let w = dyn_obj_watcher::new_from_parts(
DEPLOYMENT_GVK.clone(),
dyn_obj_tx,
test_stream(*clock.clone()),
clock,
ready_tx,
);
w.start().await;
handle_messages(dyn_obj_rx, pod_rx, s.clone()).await;
let filter = ExportFilters {
excluded_namespaces: vec!["kube-system".into()],
excluded_labels: vec![metav1::LabelSelector {
match_labels: klabel!("foo" => "bar"),
..Default::default()
}],
};
let (start_ts, end_ts) = (15, 46);
let store = s.lock().await;
match store.export(start_ts, end_ts, &filter).await {
Ok(data) => {
let trace = ExportedTrace::import(data, duration.as_ref()).unwrap();
let import_end_ts = duration.map(|_| start_ts + 10).unwrap_or(end_ts);
let expected_objs = store.objs_at(import_end_ts, &filter).await;
let actual_objs = objs_in_trace(&trace);
println!("{actual_objs:?}");
assert_bag_eq!(actual_objs, expected_objs);
for obj in actual_objs {
assert_not_contains!(obj, "depl30"); assert_not_contains!(obj, "depl31"); assert_not_contains!(obj, "repset"); }
},
Err(e) => panic!("failed with error: {}", e),
};
}
}