#[cfg(test)]
mod watcher_tests {
use std::time::Duration;
use http::{Request, Response, StatusCode};
use k8s_openapi::api::core::v1::{ConfigMap, Node};
use kube::Client;
use kube::client::Body;
use serde_json::json;
use tokio::sync::mpsc;
use tokio::time::timeout;
use tower_test::mock;
use crate::scope::{Cluster, Namespaced};
use crate::watcher::{WatchEvent, watch, watch_events, watch_objects};
type MockHandle = mock::Handle<Request<Body>, Response<Body>>;
fn mock_client() -> (Client, MockHandle) {
let (svc, handle) = mock::pair::<Request<Body>, Response<Body>>();
(Client::new(svc, "default"), handle)
}
fn json_response(body: serde_json::Value) -> Response<Body> {
let bytes = serde_json::to_vec(&body).unwrap();
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(bytes))
.unwrap()
}
fn configmap_json(name: &str, namespace: &str) -> serde_json::Value {
json!({
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": name,
"namespace": namespace,
"resourceVersion": "100"
}
})
}
fn node_json(name: &str) -> serde_json::Value {
json!({
"apiVersion": "v1",
"kind": "Node",
"metadata": {
"name": name,
"resourceVersion": "100"
}
})
}
fn list_response(kind: &str, items: Vec<serde_json::Value>) -> Response<Body> {
let body = json!({
"apiVersion": "v1",
"kind": kind,
"metadata": { "resourceVersion": "100" },
"items": items
});
json_response(body)
}
fn watch_events_response(events: Vec<serde_json::Value>) -> Response<Body> {
let ndjson = events
.into_iter()
.map(|e| serde_json::to_string(&e).unwrap())
.collect::<Vec<_>>()
.join("\n");
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(ndjson.into_bytes()))
.unwrap()
}
fn added_event(object: serde_json::Value) -> serde_json::Value {
json!({ "type": "ADDED", "object": object })
}
fn modified_event(object: serde_json::Value) -> serde_json::Value {
json!({ "type": "MODIFIED", "object": object })
}
async fn expect_signal(rx: &mut mpsc::Receiver<()>) {
timeout(Duration::from_secs(2), rx.recv())
.await
.expect("timed out waiting for watcher signal")
.expect("channel closed before signal was received");
}
#[tokio::test]
async fn watch_issues_list_then_watch_requests_in_sequence() {
let (client, mut handle) = mock_client();
let (tx, _rx) = mpsc::channel(16);
let server = tokio::spawn(async move {
let (req, send) = handle.next_request().await.unwrap();
assert_eq!(req.method(), http::Method::GET);
let uri = req.uri().to_string();
assert!(
uri.contains("/namespaces/ns1/configmaps"),
"expected namespaced configmap list uri, got: {uri}"
);
assert!(
!uri.contains("watch=true"),
"list request must not have watch=true, got: {uri}"
);
send.send_response(list_response("ConfigMapList", vec![]));
let (req, send) = handle.next_request().await.unwrap();
assert_eq!(req.method(), http::Method::GET);
let uri = req.uri().to_string();
assert!(
uri.contains("watch=true"),
"second request must be a watch, got: {uri}"
);
send.send_response(watch_events_response(vec![]));
});
watch::<ConfigMap, _>(client, Namespaced("ns1"), None, tx)
.await
.unwrap();
server.await.unwrap();
}
#[tokio::test]
async fn watch_sends_signal_when_resource_is_added() {
let (client, mut handle) = mock_client();
let (tx, mut rx) = mpsc::channel(16);
tokio::spawn(async move {
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(list_response("ConfigMapList", vec![]));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![added_event(configmap_json(
"cm1", "ns1",
))]));
});
watch::<ConfigMap, _>(client, Namespaced("ns1"), None, tx)
.await
.unwrap();
expect_signal(&mut rx).await;
}
#[tokio::test]
async fn watch_sends_signal_when_resource_is_modified() {
let (client, mut handle) = mock_client();
let (tx, mut rx) = mpsc::channel(16);
tokio::spawn(async move {
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(list_response("ConfigMapList", vec![]));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![modified_event(configmap_json(
"cm1", "ns1",
))]));
});
watch::<ConfigMap, _>(client, Namespaced("ns1"), None, tx)
.await
.unwrap();
expect_signal(&mut rx).await;
}
#[tokio::test]
async fn watch_sends_signal_for_resources_present_in_initial_list() {
let (client, mut handle) = mock_client();
let (tx, mut rx) = mpsc::channel(16);
tokio::spawn(async move {
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(list_response(
"ConfigMapList",
vec![configmap_json("existing", "ns1")],
));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![]));
});
watch::<ConfigMap, _>(client, Namespaced("ns1"), None, tx)
.await
.unwrap();
expect_signal(&mut rx).await;
}
#[tokio::test]
async fn watch_sends_one_signal_per_applied_event() {
let (client, mut handle) = mock_client();
let (tx, mut rx) = mpsc::channel(16);
tokio::spawn(async move {
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(list_response("ConfigMapList", vec![]));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![
added_event(configmap_json("cm1", "ns1")),
added_event(configmap_json("cm2", "ns1")),
]));
});
watch::<ConfigMap, _>(client, Namespaced("ns1"), None, tx)
.await
.unwrap();
expect_signal(&mut rx).await;
expect_signal(&mut rx).await;
}
#[tokio::test]
async fn watch_with_label_selector_forwards_selector_on_list_request() {
let (client, mut handle) = mock_client();
let (tx, _rx) = mpsc::channel(16);
let server = tokio::spawn(async move {
let (req, send) = handle.next_request().await.unwrap();
let uri = req.uri().to_string();
assert!(
uri.contains("labelSelector=app%3Dmy-op")
|| uri.contains("labelSelector=app=my-op"),
"expected labelSelector in list uri, got: {uri}"
);
send.send_response(list_response("ConfigMapList", vec![]));
let (req, send) = handle.next_request().await.unwrap();
let uri = req.uri().to_string();
assert!(
uri.contains("labelSelector=app%3Dmy-op")
|| uri.contains("labelSelector=app=my-op"),
"expected labelSelector in watch uri, got: {uri}"
);
send.send_response(watch_events_response(vec![]));
});
watch::<ConfigMap, _>(client, Namespaced("ns1"), Some("app=my-op"), tx)
.await
.unwrap();
server.await.unwrap();
}
#[tokio::test]
async fn watch_without_label_selector_omits_label_selector_param() {
let (client, mut handle) = mock_client();
let (tx, _rx) = mpsc::channel(16);
let server = tokio::spawn(async move {
let (req, send) = handle.next_request().await.unwrap();
let uri = req.uri().to_string();
assert!(
!uri.contains("labelSelector"),
"expected no labelSelector in uri, got: {uri}"
);
send.send_response(list_response("ConfigMapList", vec![]));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![]));
});
watch::<ConfigMap, _>(client, Namespaced("ns1"), None, tx)
.await
.unwrap();
server.await.unwrap();
}
#[tokio::test]
async fn watch_cluster_scoped_list_uri_has_no_namespace_segment() {
let (client, mut handle) = mock_client();
let (tx, _rx) = mpsc::channel(16);
let server = tokio::spawn(async move {
let (req, send) = handle.next_request().await.unwrap();
let uri = req.uri().to_string();
assert!(
uri.contains("/api/v1/nodes"),
"expected nodes list uri, got: {uri}"
);
assert!(
!uri.contains("namespaces"),
"cluster-scoped watch must not have namespace segment, got: {uri}"
);
send.send_response(list_response("NodeList", vec![]));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![]));
});
watch::<Node, _>(client, Cluster, None, tx).await.unwrap();
server.await.unwrap();
}
#[tokio::test]
async fn watch_cluster_scoped_sends_signal_on_added_event() {
let (client, mut handle) = mock_client();
let (tx, mut rx) = mpsc::channel(16);
tokio::spawn(async move {
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(list_response("NodeList", vec![]));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![added_event(node_json("n1"))]));
});
watch::<Node, _>(client, Cluster, None, tx).await.unwrap();
expect_signal(&mut rx).await;
}
#[tokio::test]
async fn watch_namespaced_scopes_list_to_correct_namespace() {
let (client, mut handle) = mock_client();
let (tx, _rx) = mpsc::channel(16);
let server = tokio::spawn(async move {
let (req, send) = handle.next_request().await.unwrap();
let uri = req.uri().to_string();
assert!(
uri.contains("/namespaces/prod/configmaps"),
"expected prod namespace in uri, got: {uri}"
);
send.send_response(list_response("ConfigMapList", vec![]));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![]));
});
watch::<ConfigMap, _>(client, Namespaced("prod"), None, tx)
.await
.unwrap();
server.await.unwrap();
}
#[tokio::test]
async fn watch_namespaced_sends_signal_on_applied_event() {
let (client, mut handle) = mock_client();
let (tx, mut rx) = mpsc::channel(16);
tokio::spawn(async move {
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(list_response("ConfigMapList", vec![]));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![added_event(configmap_json(
"cm1", "prod",
))]));
});
watch::<ConfigMap, _>(client, Namespaced("prod"), None, tx)
.await
.unwrap();
expect_signal(&mut rx).await;
}
#[tokio::test]
async fn watch_namespaced_by_label_forwards_label_selector() {
let (client, mut handle) = mock_client();
let (tx, _rx) = mpsc::channel(16);
let server = tokio::spawn(async move {
let (req, send) = handle.next_request().await.unwrap();
let uri = req.uri().to_string();
assert!(
uri.contains("labelSelector"),
"expected labelSelector in uri, got: {uri}"
);
assert!(
uri.contains("/namespaces/ns1/configmaps"),
"expected ns1 in uri, got: {uri}"
);
send.send_response(list_response("ConfigMapList", vec![]));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![]));
});
watch::<ConfigMap, _>(client, Namespaced("ns1"), Some("app=my-op"), tx)
.await
.unwrap();
server.await.unwrap();
}
#[tokio::test]
async fn watch_cluster_uses_all_api_without_namespace_segment() {
let (client, mut handle) = mock_client();
let (tx, _rx) = mpsc::channel(16);
let server = tokio::spawn(async move {
let (req, send) = handle.next_request().await.unwrap();
let uri = req.uri().to_string();
assert!(!uri.contains("namespaces"), "uri={uri}");
assert!(uri.contains("/api/v1/nodes"), "uri={uri}");
send.send_response(list_response("NodeList", vec![]));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![]));
});
watch::<Node, _>(client, Cluster, None, tx).await.unwrap();
server.await.unwrap();
}
#[tokio::test]
async fn watch_cluster_sends_signal_on_applied_event() {
let (client, mut handle) = mock_client();
let (tx, mut rx) = mpsc::channel(16);
tokio::spawn(async move {
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(list_response("NodeList", vec![]));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![added_event(node_json("n1"))]));
});
watch::<Node, _>(client, Cluster, None, tx).await.unwrap();
expect_signal(&mut rx).await;
}
#[tokio::test]
async fn watch_cluster_by_label_forwards_selector_without_namespace() {
let (client, mut handle) = mock_client();
let (tx, _rx) = mpsc::channel(16);
let server = tokio::spawn(async move {
let (req, send) = handle.next_request().await.unwrap();
let uri = req.uri().to_string();
assert!(!uri.contains("namespaces"), "uri={uri}");
assert!(uri.contains("labelSelector"), "uri={uri}");
send.send_response(list_response("NodeList", vec![]));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![]));
});
watch::<Node, _>(client, Cluster, Some("app=my-op"), tx)
.await
.unwrap();
server.await.unwrap();
}
#[tokio::test]
async fn watch_objects_sends_resource_on_applied_event() {
let (client, mut handle) = mock_client();
let (tx, mut rx) = mpsc::channel(16);
tokio::spawn(async move {
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(list_response("ConfigMapList", vec![]));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![added_event(configmap_json(
"cm-data", "ns1",
))]));
});
watch_objects::<ConfigMap, _>(client, Namespaced("ns1"), None, tx)
.await
.unwrap();
let received = timeout(Duration::from_secs(2), rx.recv())
.await
.expect("timed out waiting for resource")
.expect("channel closed");
assert_eq!(
received.metadata.name.as_deref(),
Some("cm-data"),
"received resource name should match the watch event"
);
}
#[tokio::test]
async fn watch_objects_scopes_to_correct_namespace() {
let (client, mut handle) = mock_client();
let (tx, _rx) = mpsc::channel(16);
let server = tokio::spawn(async move {
let (req, send) = handle.next_request().await.unwrap();
let uri = req.uri().to_string();
assert!(
uri.contains("/namespaces/staging/configmaps"),
"expected staging namespace in list uri, got: {uri}"
);
send.send_response(list_response("ConfigMapList", vec![]));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![]));
});
watch_objects::<ConfigMap, _>(client, Namespaced("staging"), None, tx)
.await
.unwrap();
server.await.unwrap();
}
#[tokio::test]
async fn watch_objects_cluster_scope_uses_all_api() {
let (client, mut handle) = mock_client();
let (tx, _rx) = mpsc::channel(16);
let server = tokio::spawn(async move {
let (req, send) = handle.next_request().await.unwrap();
let uri = req.uri().to_string();
assert!(
!uri.contains("namespaces"),
"cluster scope must not have namespace segment, got: {uri}"
);
assert!(uri.contains("/api/v1/nodes"), "uri={uri}");
send.send_response(list_response("NodeList", vec![]));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![]));
});
watch_objects::<Node, _>(client, Cluster, None, tx)
.await
.unwrap();
server.await.unwrap();
}
#[tokio::test]
async fn watch_objects_forwards_label_selector() {
let (client, mut handle) = mock_client();
let (tx, _rx) = mpsc::channel(16);
let server = tokio::spawn(async move {
let (req, send) = handle.next_request().await.unwrap();
let uri = req.uri().to_string();
assert!(
uri.contains("labelSelector=app%3Dmy-op")
|| uri.contains("labelSelector=app=my-op"),
"expected labelSelector in uri, got: {uri}"
);
send.send_response(list_response("ConfigMapList", vec![]));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![]));
});
watch_objects::<ConfigMap, _>(client, Namespaced("ns1"), Some("app=my-op"), tx)
.await
.unwrap();
server.await.unwrap();
}
#[tokio::test]
async fn watch_events_sends_applied_on_added_event() {
let (client, mut handle) = mock_client();
let (tx, mut rx) = mpsc::channel(16);
tokio::spawn(async move {
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(list_response("ConfigMapList", vec![]));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![added_event(configmap_json(
"cm-added", "ns1",
))]));
});
watch_events::<ConfigMap, _>(client, Namespaced("ns1"), None, tx)
.await
.unwrap();
let event = timeout(Duration::from_secs(2), rx.recv())
.await
.expect("timed out")
.expect("channel closed");
assert!(
matches!(event, WatchEvent::Applied(_)),
"expected WatchEvent::Applied for an ADDED watch event"
);
if let WatchEvent::Applied(r) = event {
assert_eq!(r.metadata.name.as_deref(), Some("cm-added"));
}
}
#[tokio::test]
async fn watch_events_sends_deleted_on_deleted_event() {
let (client, mut handle) = mock_client();
let (tx, mut rx) = mpsc::channel(16);
tokio::spawn(async move {
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(list_response("ConfigMapList", vec![]));
let (_req, send) = handle.next_request().await.unwrap();
let deleted_event =
json!({ "type": "DELETED", "object": configmap_json("cm-gone", "ns1") });
send.send_response(watch_events_response(vec![deleted_event]));
});
watch_events::<ConfigMap, _>(client, Namespaced("ns1"), None, tx)
.await
.unwrap();
let event = timeout(Duration::from_secs(2), rx.recv())
.await
.expect("timed out")
.expect("channel closed");
assert!(
matches!(event, WatchEvent::Deleted(_)),
"expected WatchEvent::Deleted for a DELETED watch event"
);
if let WatchEvent::Deleted(r) = event {
assert_eq!(r.metadata.name.as_deref(), Some("cm-gone"));
}
}
#[tokio::test]
async fn watch_events_sends_applied_for_items_in_initial_list() {
let (client, mut handle) = mock_client();
let (tx, mut rx) = mpsc::channel(16);
tokio::spawn(async move {
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(list_response(
"ConfigMapList",
vec![configmap_json("pre-existing", "ns1")],
));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![]));
});
watch_events::<ConfigMap, _>(client, Namespaced("ns1"), None, tx)
.await
.unwrap();
let event = timeout(Duration::from_secs(2), rx.recv())
.await
.expect("timed out")
.expect("channel closed");
assert!(
matches!(event, WatchEvent::Applied(_)),
"InitApply events from the LIST phase must become WatchEvent::Applied"
);
if let WatchEvent::Applied(r) = event {
assert_eq!(r.metadata.name.as_deref(), Some("pre-existing"));
}
}
#[tokio::test]
async fn watch_events_scopes_to_correct_namespace() {
let (client, mut handle) = mock_client();
let (tx, _rx) = mpsc::channel(16);
let server = tokio::spawn(async move {
let (req, send) = handle.next_request().await.unwrap();
let uri = req.uri().to_string();
assert!(
uri.contains("/namespaces/prod/configmaps"),
"expected prod namespace in uri, got: {uri}"
);
send.send_response(list_response("ConfigMapList", vec![]));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![]));
});
watch_events::<ConfigMap, _>(client, Namespaced("prod"), None, tx)
.await
.unwrap();
server.await.unwrap();
}
#[tokio::test]
async fn watch_events_forwards_label_selector() {
let (client, mut handle) = mock_client();
let (tx, _rx) = mpsc::channel(16);
let server = tokio::spawn(async move {
let (req, send) = handle.next_request().await.unwrap();
let uri = req.uri().to_string();
assert!(
uri.contains("labelSelector=app%3Dmy-op")
|| uri.contains("labelSelector=app=my-op"),
"expected labelSelector in uri, got: {uri}"
);
send.send_response(list_response("ConfigMapList", vec![]));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![]));
});
watch_events::<ConfigMap, _>(client, Namespaced("ns1"), Some("app=my-op"), tx)
.await
.unwrap();
server.await.unwrap();
}
#[tokio::test]
async fn watcher_task_shuts_down_when_all_receivers_are_dropped() {
let (client, mut handle) = mock_client();
let (tx, rx) = mpsc::channel::<()>(16);
tokio::spawn(async move {
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(list_response("ConfigMapList", vec![]));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response(watch_events_response(vec![added_event(configmap_json(
"cm1", "ns1",
))]));
});
let handle = watch::<ConfigMap, _>(client, Namespaced("ns1"), None, tx)
.await
.unwrap();
drop(rx);
timeout(Duration::from_secs(2), handle)
.await
.expect("watcher task did not shut down within timeout")
.expect("watcher task panicked");
}
}