use crate::{
runtime::{
watcher::{watcher, Config},
WatchStreamExt,
},
Api, Client,
};
use anyhow::Result;
use futures::{poll, StreamExt, TryStreamExt};
use http::{Request, Response};
use kube_client::client::Body;
use kube_derive::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::json;
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(group = "kube.rs", version = "v1", kind = "Hack")]
#[kube(crates(kube_core = "crate::core"))] struct HackSpec {
num: u32,
}
impl Hack {
fn test(num: u32) -> Self {
Hack::new("h{num}", HackSpec { num })
}
}
#[tokio::test]
async fn watchers_respect_pagination_limits() {
let (client, fakeserver) = testcontext();
let mocksrv = fakeserver.run(Scenario::PaginatedList);
let api: Api<Hack> = Api::all(client);
let cfg = Config::default().page_size(1);
let mut stream = watcher(api, cfg).applied_objects().boxed();
let first: Hack = stream.try_next().await.unwrap().unwrap();
assert_eq!(first.spec.num, 1);
let second: Hack = stream.try_next().await.unwrap().unwrap();
assert_eq!(second.spec.num, 2);
let third: Hack = stream.try_next().await.unwrap().unwrap();
assert_eq!(third.spec.num, 3);
assert!(poll!(stream.next()).is_pending());
timeout_after_1s(mocksrv).await;
}
type ApiServerHandle = tower_test::mock::Handle<Request<Body>, Response<Body>>;
struct ApiServerVerifier(ApiServerHandle);
async fn timeout_after_1s(handle: tokio::task::JoinHandle<()>) {
tokio::time::timeout(std::time::Duration::from_secs(1), handle)
.await
.expect("timeout on mock apiserver")
.expect("scenario succeeded")
}
enum Scenario {
PaginatedList,
#[allow(dead_code)] RadioSilence,
}
impl ApiServerVerifier {
fn run(self, scenario: Scenario) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
match scenario {
Scenario::PaginatedList => self.handle_paged_lists().await,
Scenario::RadioSilence => Ok(self),
}
.expect("scenario completed without errors");
})
}
async fn handle_paged_lists(mut self) -> Result<Self> {
{
let (request, send) = self.0.next_request().await.expect("service not called 1");
assert_eq!(request.method(), http::Method::GET);
let req_uri = request.uri().to_string();
assert!(req_uri.contains("limit="));
assert!(!req_uri.contains("continue="));
let respdata = json!({
"kind": "HackList",
"apiVersion": "kube.rs/v1",
"metadata": {
"continue": "first",
},
"items": [Hack::test(1)]
});
let response = serde_json::to_vec(&respdata).unwrap(); send.send_response(Response::builder().body(Body::from(response)).unwrap());
}
{
let (request, send) = self.0.next_request().await.expect("service not called 2");
assert_eq!(request.method(), http::Method::GET);
let req_uri = request.uri().to_string();
assert!(req_uri.contains("&continue=first"));
let respdata = json!({
"kind": "HackList",
"apiVersion": "kube.rs/v1",
"metadata": {
"continue": "second",
"resourceVersion": "2"
},
"items": [Hack::test(2)]
});
let response = serde_json::to_vec(&respdata).unwrap(); send.send_response(Response::builder().body(Body::from(response)).unwrap());
}
{
let (request, send) = self.0.next_request().await.expect("service not called 3");
assert_eq!(request.method(), http::Method::GET);
let req_uri = request.uri().to_string();
assert!(req_uri.contains("&continue=second"));
let respdata = json!({
"kind": "HackList",
"apiVersion": "kube.rs/v1",
"metadata": {
"continue": "",
"resourceVersion": "2"
},
"items": [Hack::test(3)]
});
let response = serde_json::to_vec(&respdata).unwrap(); send.send_response(Response::builder().body(Body::from(response)).unwrap());
}
Ok(self)
}
}
fn testcontext() -> (Client, ApiServerVerifier) {
let (mock_service, handle) = tower_test::mock::pair::<Request<Body>, Response<Body>>();
let mock_client = Client::new(mock_service, "default");
(mock_client, ApiServerVerifier(handle))
}