#![cfg(feature = "discovery-etcd")]
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use rs_zero::discovery::{Discovery, InstanceEndpoint, Registry, ServiceInstance};
use rs_zero::discovery_etcd::{
BackoffConfig, EtcdDiscoveryConfig, EtcdRegistry, WatchEvent, WatchEventKind,
};
#[tokio::test]
#[ignore = "requires external etcd from examples/production-adapters/docker-compose.yml"]
async fn etcd_registry_external_lifecycle_and_watch() {
let registry = EtcdRegistry::connect(test_config())
.await
.expect("connect etcd");
let mut watch = registry.watch_service("api").await.expect("watch api");
let instance = ServiceInstance::new(
"api",
"api-1",
InstanceEndpoint::new("127.0.0.1", 18080).expect("endpoint"),
)
.with_metadata("zone", "local")
.with_weight(3);
registry.register(instance.clone()).await.expect("register");
let discovered = discover_until_ready(®istry, "api").await;
assert_eq!(discovered, vec![instance.clone()]);
let put = next_event(&mut watch, WatchEventKind::Put, "api-1").await;
assert_eq!(put.instance.as_ref(), Some(&instance));
assert!(registry.lease_status("api", "api-1").await.is_some());
let unhealthy = registry
.update_health("api", "api-1", false)
.await
.expect("mark unhealthy");
assert!(!unhealthy.healthy);
let health_event = next_event(&mut watch, WatchEventKind::Put, "api-1").await;
assert_eq!(health_event.instance.as_ref(), Some(&unhealthy));
assert_discover_empty(®istry, "api").await;
let healthy = registry
.update_health("api", "api-1", true)
.await
.expect("mark healthy");
assert!(healthy.healthy);
let discovered = discover_until_ready(®istry, "api").await;
assert_eq!(discovered, vec![healthy.clone()]);
let removed = registry
.deregister("api", "api-1")
.await
.expect("deregister");
assert_eq!(removed.id, "api-1");
let delete = next_event(&mut watch, WatchEventKind::Delete, "api-1").await;
assert_eq!(delete.service, "api");
assert_eq!(delete.id, "api-1");
assert_discover_empty(®istry, "api").await;
}
fn test_config() -> EtcdDiscoveryConfig {
EtcdDiscoveryConfig {
endpoints: vec![
std::env::var("RS_ZERO_ETCD_ENDPOINT")
.unwrap_or_else(|_| "http://127.0.0.1:2379".to_string()),
],
prefix: format!("/rs-zero-test/{}", unique_suffix()),
lease_ttl: 5,
reconnect_interval: Duration::from_millis(100),
connect_timeout: Duration::from_secs(3),
operation_timeout: Duration::from_secs(3),
keep_alive_interval: Duration::from_millis(500),
keep_alive_timeout: Duration::from_secs(2),
watch_backoff: BackoffConfig {
initial: Duration::from_millis(50),
max: Duration::from_millis(200),
},
auth: None,
}
}
fn unique_suffix() -> String {
let millis = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis();
format!("{millis}-{}", std::process::id())
}
async fn discover_until_ready(registry: &EtcdRegistry, service: &str) -> Vec<ServiceInstance> {
for _ in 0..20 {
if let Ok(instances) = registry.discover(service).await {
return instances;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
registry
.discover(service)
.await
.expect("discover eventually")
}
async fn assert_discover_empty(registry: &EtcdRegistry, service: &str) {
for _ in 0..20 {
if registry.discover(service).await.is_err() {
return;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
panic!("service {service} still discoverable");
}
async fn next_event(
watch: &mut rs_zero::discovery_etcd::EtcdWatchStream,
kind: WatchEventKind,
id: &str,
) -> WatchEvent {
let deadline = tokio::time::sleep(Duration::from_secs(5));
tokio::pin!(deadline);
loop {
tokio::select! {
_ = &mut deadline => panic!("timed out waiting for {kind:?} event for {id}"),
item = watch.recv() => {
let event = item.expect("watch open").expect("watch event");
if event.kind == kind && event.id == id {
return event;
}
}
}
}
}