rs-zero 0.2.8

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
#![cfg(feature = "discovery-etcd")]

use std::time::{Duration, SystemTime, UNIX_EPOCH};

use rs_zero::discovery::{Discovery, InstanceEndpoint, Registry, ServiceInstance};
use rs_zero::discovery_etcd::{
    BackoffConfig, EtcdDiscoveryConfig, EtcdRegistry, WatchEvent, WatchEventKind,
};

#[tokio::test]
#[ignore = "requires external etcd from examples/production-adapters/docker-compose.yml"]
async fn etcd_registry_external_lifecycle_and_watch() {
    let registry = EtcdRegistry::connect(test_config())
        .await
        .expect("connect etcd");
    let mut watch = registry.watch_service("api").await.expect("watch api");
    let instance = ServiceInstance::new(
        "api",
        "api-1",
        InstanceEndpoint::new("127.0.0.1", 18080).expect("endpoint"),
    )
    .with_metadata("zone", "local")
    .with_weight(3);

    registry.register(instance.clone()).await.expect("register");
    let discovered = discover_until_ready(&registry, "api").await;
    assert_eq!(discovered, vec![instance.clone()]);
    let put = next_event(&mut watch, WatchEventKind::Put, "api-1").await;
    assert_eq!(put.instance.as_ref(), Some(&instance));
    assert!(registry.lease_status("api", "api-1").await.is_some());

    let unhealthy = registry
        .update_health("api", "api-1", false)
        .await
        .expect("mark unhealthy");
    assert!(!unhealthy.healthy);
    let health_event = next_event(&mut watch, WatchEventKind::Put, "api-1").await;
    assert_eq!(health_event.instance.as_ref(), Some(&unhealthy));
    assert_discover_empty(&registry, "api").await;

    let healthy = registry
        .update_health("api", "api-1", true)
        .await
        .expect("mark healthy");
    assert!(healthy.healthy);
    let discovered = discover_until_ready(&registry, "api").await;
    assert_eq!(discovered, vec![healthy.clone()]);

    let removed = registry
        .deregister("api", "api-1")
        .await
        .expect("deregister");
    assert_eq!(removed.id, "api-1");
    let delete = next_event(&mut watch, WatchEventKind::Delete, "api-1").await;
    assert_eq!(delete.service, "api");
    assert_eq!(delete.id, "api-1");
    assert_discover_empty(&registry, "api").await;
}

fn test_config() -> EtcdDiscoveryConfig {
    EtcdDiscoveryConfig {
        endpoints: vec![
            std::env::var("RS_ZERO_ETCD_ENDPOINT")
                .unwrap_or_else(|_| "http://127.0.0.1:2379".to_string()),
        ],
        prefix: format!("/rs-zero-test/{}", unique_suffix()),
        lease_ttl: 5,
        reconnect_interval: Duration::from_millis(100),
        connect_timeout: Duration::from_secs(3),
        operation_timeout: Duration::from_secs(3),
        keep_alive_interval: Duration::from_millis(500),
        keep_alive_timeout: Duration::from_secs(2),
        watch_backoff: BackoffConfig {
            initial: Duration::from_millis(50),
            max: Duration::from_millis(200),
        },
        auth: None,
    }
}

fn unique_suffix() -> String {
    let millis = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_millis();
    format!("{millis}-{}", std::process::id())
}

async fn discover_until_ready(registry: &EtcdRegistry, service: &str) -> Vec<ServiceInstance> {
    for _ in 0..20 {
        if let Ok(instances) = registry.discover(service).await {
            return instances;
        }
        tokio::time::sleep(Duration::from_millis(50)).await;
    }
    registry
        .discover(service)
        .await
        .expect("discover eventually")
}

async fn assert_discover_empty(registry: &EtcdRegistry, service: &str) {
    for _ in 0..20 {
        if registry.discover(service).await.is_err() {
            return;
        }
        tokio::time::sleep(Duration::from_millis(50)).await;
    }
    panic!("service {service} still discoverable");
}

async fn next_event(
    watch: &mut rs_zero::discovery_etcd::EtcdWatchStream,
    kind: WatchEventKind,
    id: &str,
) -> WatchEvent {
    let deadline = tokio::time::sleep(Duration::from_secs(5));
    tokio::pin!(deadline);
    loop {
        tokio::select! {
            _ = &mut deadline => panic!("timed out waiting for {kind:?} event for {id}"),
            item = watch.recv() => {
                let event = item.expect("watch open").expect("watch event");
                if event.kind == kind && event.id == id {
                    return event;
                }
            }
        }
    }
}