#![cfg(feature = "integration-tests")]
use std::time::Duration;
use camel_api::platform::{LeadershipEvent, LeadershipService, PlatformIdentity};
use camel_platform_kubernetes::{
KubernetesLeadershipService, KubernetesPlatformConfig, ensure_rustls_provider,
};
use testcontainers::{ContainerAsync, ImageExt, runners::AsyncRunner};
use testcontainers_modules::k3s::K3s;
async fn wait_for_leader(handle: &camel_api::platform::LeadershipHandle, timeout_secs: u64) {
let deadline = Duration::from_secs(timeout_secs);
tokio::time::timeout(deadline, async {
loop {
if handle.is_leader() {
return;
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
})
.await
.expect("timed out waiting for leadership");
}
const KUBE_SECURE_PORT: u16 = 6443;
async fn start_k3s() -> (ContainerAsync<K3s>, kube::Client) {
ensure_rustls_provider();
let conf_dir = std::env::temp_dir().join(format!(
"camel-k3s-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system clock should be after unix epoch")
.as_nanos()
));
std::fs::create_dir_all(&conf_dir).expect("k3s config directory should be created");
let k3s = K3s::default()
.with_conf_mount(&conf_dir)
.with_privileged(true)
.with_userns_mode("host");
let container = k3s.start().await.expect("k3s container should start");
let kubeconfig_yaml = container
.image()
.read_kube_config()
.expect("should read kube config");
let mut kubeconfig =
kube::config::Kubeconfig::from_yaml(&kubeconfig_yaml).expect("should parse kubeconfig");
let port = container
.get_host_port_ipv4(KUBE_SECURE_PORT)
.await
.expect("should expose k3s secure port");
kubeconfig.clusters.iter_mut().for_each(|cluster| {
if let Some(server) = cluster.cluster.as_mut().and_then(|c| c.server.as_mut()) {
*server = format!("https://127.0.0.1:{port}");
}
});
let config = kube::Config::from_custom_kubeconfig(
kubeconfig,
&kube::config::KubeConfigOptions::default(),
)
.await
.expect("should build kube config");
let client = kube::Client::try_from(config).expect("should create client");
let timeout = Duration::from_secs(90);
let start = std::time::Instant::now();
let leases: kube::Api<k8s_openapi::api::coordination::v1::Lease> =
kube::Api::namespaced(client.clone(), "default");
loop {
match leases.list(&kube::api::ListParams::default()).await {
Ok(_) => break,
Err(_) if start.elapsed() < timeout => {
tokio::time::sleep(Duration::from_millis(500)).await;
}
Err(err) => panic!("k3s Lease API not ready after 90s: {err}"),
}
}
(container, client)
}
fn test_config() -> KubernetesPlatformConfig {
KubernetesPlatformConfig {
namespace: "default".to_string(),
lease_name_prefix: "camel-".to_string(),
lease_duration: Duration::from_secs(10),
renew_deadline: Duration::from_secs(8),
retry_period: Duration::from_secs(1),
jitter_factor: 0.2,
}
}
#[tokio::test]
async fn test_single_instance_becomes_leader() -> Result<(), Box<dyn std::error::Error>> {
let (_container, client) = start_k3s().await;
let leadership =
KubernetesLeadershipService::new(client, PlatformIdentity::local("pod-a"), test_config())?;
let handle = leadership
.start("single-leader")
.await
.expect("start should succeed");
wait_for_leader(&handle, 30).await;
assert!(handle.is_leader(), "single instance should become leader");
let event = handle.events.borrow().clone();
assert_eq!(event, Some(LeadershipEvent::StartedLeading));
Ok(())
}
#[tokio::test]
async fn test_duplicate_lock_reuses_cached_loop() -> Result<(), Box<dyn std::error::Error>> {
let (_container, client) = start_k3s().await;
let leadership =
KubernetesLeadershipService::new(client, PlatformIdentity::local("pod-a"), test_config())?;
let first = leadership
.start("dup-lock")
.await
.expect("first start should succeed");
let second = leadership
.start("dup-lock")
.await
.expect("duplicate lock should reuse existing loop");
wait_for_leader(&first, 30).await;
assert!(first.is_leader());
assert!(second.is_leader());
Ok(())
}
#[tokio::test]
async fn test_step_down_releases_leadership() -> Result<(), Box<dyn std::error::Error>> {
let (_container, client) = start_k3s().await;
let leadership =
KubernetesLeadershipService::new(client, PlatformIdentity::local("pod-a"), test_config())?;
let handle = leadership
.start("step-down")
.await
.expect("start should succeed");
wait_for_leader(&handle, 30).await;
assert!(handle.is_leader());
let result = tokio::time::timeout(Duration::from_secs(5), handle.step_down()).await;
assert!(result.is_ok(), "step_down should not hang");
Ok(())
}
#[tokio::test]
async fn test_two_instances_only_one_leads() -> Result<(), Box<dyn std::error::Error>> {
let (_container, client) = start_k3s().await;
let leadership_a = KubernetesLeadershipService::new(
client.clone(),
PlatformIdentity::local("pod-a"),
test_config(),
)?;
let leadership_b = KubernetesLeadershipService::new(
client.clone(),
PlatformIdentity::local("pod-b"),
test_config(),
)?;
let mut handle_a = Some(
leadership_a
.start("contention")
.await
.expect("start A should succeed"),
);
let mut handle_b = Some(
leadership_b
.start("contention")
.await
.expect("start B should succeed"),
);
tokio::time::timeout(Duration::from_secs(30), async {
loop {
let a_leads = handle_a.as_ref().is_some_and(|h| h.is_leader());
let b_leads = handle_b.as_ref().is_some_and(|h| h.is_leader());
if a_leads || b_leads {
return;
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
})
.await
.expect("one instance should become leader within 30s");
let a_leads = handle_a.as_ref().is_some_and(|h| h.is_leader());
let b_leads = handle_b.as_ref().is_some_and(|h| h.is_leader());
assert_ne!(
a_leads, b_leads,
"exactly one instance must be leader at a time"
);
let stepped_down_a = if a_leads {
let handle = handle_a
.take()
.expect("handle A should exist when A is leader");
let result = tokio::time::timeout(Duration::from_secs(5), handle.step_down()).await;
assert!(
result.is_ok(),
"step_down for current leader A should not hang"
);
true
} else {
let handle = handle_b
.take()
.expect("handle B should exist when B is leader");
let result = tokio::time::timeout(Duration::from_secs(5), handle.step_down()).await;
assert!(
result.is_ok(),
"step_down for current leader B should not hang"
);
false
};
tokio::time::timeout(Duration::from_secs(20), async {
loop {
let a_now = handle_a.as_ref().is_some_and(|h| h.is_leader());
let b_now = handle_b.as_ref().is_some_and(|h| h.is_leader());
if a_now || b_now {
return;
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
})
.await
.expect("remaining instance should take leadership after failover");
let leader_a_after = if stepped_down_a {
false
} else {
handle_a.as_ref().is_some_and(|h| h.is_leader())
};
let leader_b_after = if stepped_down_a {
handle_b.as_ref().is_some_and(|h| h.is_leader())
} else {
false
};
assert_ne!(
leader_a_after, leader_b_after,
"exactly one instance must be leader after failover"
);
Ok(())
}