use std::sync::{
Arc,
Mutex,
};
use clockabilly::mock::MockUtcClock;
use futures::stream;
use futures::stream::StreamExt;
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1;
use kube::runtime::watcher::Event;
use kube::ResourceExt;
use sk_api::v1::ExportFilters;
use sk_core::macros::*;
use super::*;
use crate::watchers::{
DynObjHandler,
ObjStream,
ObjWatcher,
};
use crate::TraceStore;
fn d(idx: i64) -> DynamicObject {
test_deployment(&format!("depl{idx}"))
}
fn test_stream(clock: MockUtcClock) -> ObjStream<DynamicObject> {
stream::unfold((-1, -1), move |state| {
let mut c = clock.clone();
async move {
match state {
(-1, -1) => {
return Some((Ok(Event::Init), (-1, 0)));
},
(-1, id) if id < 10 => {
let obj = d(id);
return Some((Ok(Event::InitApply(obj)), (-1, id + 1)));
},
(-1, 10) => {
return Some((Ok(Event::InitDone), (0, 0)));
},
(0, id) => {
let obj = d(id);
let new_ts = c.advance(5);
return Some((Ok(Event::Apply(obj)), (new_ts, id)));
},
(5..=19, id) => {
let obj = d(id);
let new_ts = c.advance(5);
return Some((Ok(Event::Delete(obj)), (new_ts, id + 1)));
},
(22, id) => {
let mut obj = d(30);
obj.metadata.namespace = Some("kube-system".into());
let new_ts = c.advance(1);
return Some((Ok(Event::Apply(obj)), (new_ts, id)));
},
(24, id) => {
let mut obj = d(31);
obj.labels_mut().insert("foo".into(), "bar".into());
let new_ts = c.advance(1);
return Some((Ok(Event::Apply(obj)), (new_ts, id)));
},
(25..=55, id) => {
let obj = d(id);
let new_ts = c.advance(5);
return Some((Ok(Event::Delete(obj)), (new_ts, id + 1)));
},
_ => None,
}
}
})
.boxed()
}
#[rstest]
#[case::full_trace(None)]
#[case::partial_trace(Some("10s".into()))]
#[traced_test]
#[tokio::test]
async fn itest_export(#[case] duration: Option<String>) {
let clock = MockUtcClock::boxed(0);
let s = Arc::new(Mutex::new(TraceStore::new(Default::default())));
let h = DynObjHandler::new(DEPL_GVK.clone());
let w = ObjWatcher::new_from_parts(h, test_stream(*clock.clone()), s.clone(), clock);
w.start().await;
let filter = ExportFilters {
excluded_namespaces: vec!["kube-system".into()],
excluded_labels: vec![metav1::LabelSelector {
match_labels: klabel!("foo" => "bar"),
..Default::default()
}],
};
let store = s.lock().unwrap();
let (start_ts, end_ts) = (15, 46);
match store.export(start_ts, end_ts, &filter) {
Ok(data) => {
let new_store = TraceStore::import(data, &duration).unwrap();
let import_end_ts = duration.map(|_| start_ts + 10).unwrap_or(end_ts);
let expected_pods = store.sorted_objs_at(import_end_ts, &filter);
let actual_pods = new_store.sorted_objs_at(end_ts, &filter);
println!("Expected pods: {:?}", expected_pods);
println!("Actual pods: {:?}", actual_pods);
assert_eq!(actual_pods, expected_pods);
},
Err(e) => panic!("failed with error: {}", e),
};
}