#![cfg(feature = "integration")]
use std::sync::{Arc, Mutex};
use std::time::Duration;
use axum::{
Router,
extract::State,
http::{HeaderMap, HeaderValue, StatusCode},
response::IntoResponse,
routing::get,
};
use koprs_external::{
ExternalEvent,
http::HttpPoller,
watcher::{ExternalSource, watch_external},
};
use tokio::net::TcpListener;
use tokio::sync::mpsc;
use tokio::time::timeout;
#[derive(Clone, Default)]
struct MockState {
body: Arc<Mutex<Option<String>>>,
etag: Arc<Mutex<Option<String>>>,
}
async fn mock_handler(State(state): State<MockState>, headers: HeaderMap) -> impl IntoResponse {
let body = state.body.lock().unwrap().clone();
let etag_val = state.etag.lock().unwrap().clone();
let Some(text) = body else {
return StatusCode::NOT_FOUND.into_response();
};
if let Some(ref etag) = etag_val {
if let Some(inm) = headers.get("if-none-match") {
if inm.to_str().unwrap_or("") == etag.as_str() {
return StatusCode::NOT_MODIFIED.into_response();
}
}
}
let mut resp_headers = HeaderMap::new();
if let Some(ref etag) = etag_val {
resp_headers.insert("etag", HeaderValue::from_str(etag).unwrap());
}
(StatusCode::OK, resp_headers, text).into_response()
}
async fn start_mock_server(state: MockState) -> String {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let router = Router::new()
.route("/resource", get(mock_handler))
.with_state(state);
tokio::spawn(async move {
axum::serve(listener, router).await.unwrap();
});
format!("http://{addr}/resource")
}
fn uid(prefix: &str) -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.subsec_nanos();
format!("{prefix}-{ns}")
}
#[tokio::test]
async fn http_poller_emits_added_on_first_200() {
let state = MockState::default();
*state.body.lock().unwrap() = Some("hello".to_string());
let url = start_mock_server(state).await;
let mut poller = HttpPoller::new(&url).with_name(uid("added"));
let events = timeout(Duration::from_secs(5), poller.poll())
.await
.expect("poll timed out")
.expect("poll error");
assert_eq!(events.len(), 1);
assert!(matches!(events[0], ExternalEvent::Added(_)));
if let ExternalEvent::Added(ref r) = events[0] {
assert_eq!(r.status, 200);
assert_eq!(r.body.as_ref(), b"hello");
}
}
#[tokio::test]
async fn http_poller_emits_nothing_when_404_never_seen() {
let state = MockState::default();
let url = start_mock_server(state).await;
let mut poller = HttpPoller::new(url);
let events = timeout(Duration::from_secs(5), poller.poll())
.await
.expect("poll timed out")
.expect("poll error");
assert!(events.is_empty(), "expected no events on first 404");
}
#[tokio::test]
async fn http_poller_emits_modified_after_content_change() {
let state = MockState::default();
*state.body.lock().unwrap() = Some("v1".to_string());
let url = start_mock_server(state.clone()).await;
let mut poller = HttpPoller::new(&url).with_name(uid("modified"));
let events = timeout(Duration::from_secs(5), poller.poll())
.await
.unwrap()
.unwrap();
assert!(matches!(events[0], ExternalEvent::Added(_)));
*state.body.lock().unwrap() = Some("v2".to_string());
let events = timeout(Duration::from_secs(5), poller.poll())
.await
.unwrap()
.unwrap();
assert_eq!(events.len(), 1);
assert!(matches!(events[0], ExternalEvent::Modified(_)));
if let ExternalEvent::Modified(ref r) = events[0] {
assert_eq!(r.body.as_ref(), b"v2");
}
}
#[tokio::test]
async fn http_poller_emits_nothing_on_304_when_etag_unchanged() {
let state = MockState::default();
*state.body.lock().unwrap() = Some("content".to_string());
*state.etag.lock().unwrap() = Some("\"v1\"".to_string());
let url = start_mock_server(state).await;
let mut poller = HttpPoller::new(url);
let events = timeout(Duration::from_secs(5), poller.poll())
.await
.unwrap()
.unwrap();
assert!(matches!(events[0], ExternalEvent::Added(_)));
let events = timeout(Duration::from_secs(5), poller.poll())
.await
.unwrap()
.unwrap();
assert!(events.is_empty(), "expected no events on 304");
}
#[tokio::test]
async fn http_poller_emits_removed_then_nothing_after_resource_disappears() {
let state = MockState::default();
*state.body.lock().unwrap() = Some("here".to_string());
let url = start_mock_server(state.clone()).await;
let mut poller = HttpPoller::new(url);
let events = timeout(Duration::from_secs(5), poller.poll())
.await
.unwrap()
.unwrap();
assert!(matches!(events[0], ExternalEvent::Added(_)));
*state.body.lock().unwrap() = None;
let events = timeout(Duration::from_secs(5), poller.poll())
.await
.unwrap()
.unwrap();
assert_eq!(events.len(), 1);
assert!(matches!(events[0], ExternalEvent::Removed(_)));
let events = timeout(Duration::from_secs(5), poller.poll())
.await
.unwrap()
.unwrap();
assert!(events.is_empty(), "removal should be reported exactly once");
}
#[tokio::test]
async fn watch_external_delivers_events_through_channel() {
let state = MockState::default();
*state.body.lock().unwrap() = Some("initial".to_string());
let url = start_mock_server(state.clone()).await;
let poller = HttpPoller::new(url).with_name(uid("watch-loop"));
let (tx, mut rx) = mpsc::channel(16);
let _handle = watch_external(poller, Duration::from_millis(10), tx);
let ev = timeout(Duration::from_secs(5), rx.recv())
.await
.expect("timed out waiting for Added event")
.expect("channel closed");
assert!(
matches!(ev, ExternalEvent::Added(_)),
"expected Added, got {ev:?}"
);
*state.body.lock().unwrap() = Some("updated".to_string());
let ev = timeout(Duration::from_secs(5), rx.recv())
.await
.expect("timed out waiting for Modified event")
.expect("channel closed");
assert!(
matches!(ev, ExternalEvent::Modified(_)),
"expected Modified, got {ev:?}"
);
}
#[tokio::test]
async fn watch_external_shuts_down_when_receiver_is_dropped() {
let state = MockState::default();
*state.body.lock().unwrap() = Some("data".to_string());
let url = start_mock_server(state).await;
let poller = HttpPoller::new(url);
let (tx, rx) = mpsc::channel(16);
let handle = watch_external(poller, Duration::from_millis(10), tx);
drop(rx);
timeout(Duration::from_secs(2), handle)
.await
.expect("watcher did not shut down after receiver drop")
.expect("watcher task panicked");
}
#[tokio::test]
#[ignore = "requires a running Kubernetes cluster; set KUBE_TOKEN and KUBE_API_URL, then run with -- --include-ignored"]
async fn kubernetes_configmap_lifecycle_via_http_poller() {
use k8s_openapi::api::core::v1::ConfigMap;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::api::{DeleteParams, PostParams};
use kube::{Api, Client};
let token = std::env::var("KUBE_TOKEN").expect("KUBE_TOKEN must be set");
let api_url =
std::env::var("KUBE_API_URL").unwrap_or_else(|_| "https://127.0.0.1:6443".to_string());
let name = uid("koprs-ext-test");
let namespace = "default";
let resource_url = format!("{api_url}/api/v1/namespaces/{namespace}/configmaps/{name}");
let reqwest_client = reqwest::Client::builder()
.danger_accept_invalid_certs(true)
.build()
.unwrap();
let mut poller = HttpPoller::new(&resource_url)
.with_client(reqwest_client)
.with_bearer_token(&token);
let events = poller.poll().await.expect("initial poll failed");
assert!(
events.is_empty(),
"ConfigMap should not exist at test start"
);
let kube_client = Client::try_default()
.await
.expect("failed to build kube Client — is a cluster reachable?");
let api: Api<ConfigMap> = Api::namespaced(kube_client.clone(), namespace);
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some(name.clone()),
namespace: Some(namespace.to_string()),
..Default::default()
},
..Default::default()
};
api.create(&PostParams::default(), &cm)
.await
.expect("failed to create ConfigMap");
let events = timeout(Duration::from_secs(5), poller.poll())
.await
.expect("poll timed out after create")
.expect("poll error after create");
assert_eq!(events.len(), 1);
assert!(
matches!(events[0], ExternalEvent::Added(_)),
"expected Added after ConfigMap create, got {:?}",
events[0]
);
api.delete(&name, &DeleteParams::default())
.await
.expect("failed to delete ConfigMap");
let events = timeout(Duration::from_secs(5), poller.poll())
.await
.expect("poll timed out after delete")
.expect("poll error after delete");
assert_eq!(events.len(), 1);
assert!(
matches!(events[0], ExternalEvent::Removed(_)),
"expected Removed after ConfigMap delete, got {:?}",
events[0]
);
}