use super::*;
use crate::nodes::{NodeOperators, StreamOperators};
use crate::types::Burst;
use crate::{RunFor, RunMode, burst};
use etcd_client::Client;
use std::rc::Rc;
use testcontainers::{GenericImage, ImageExt, core::WaitFor, runners::SyncRunner};
const ETCD_PORT: u16 = 2379;
const ETCD_IMAGE: &str = "gcr.io/etcd-development/etcd";
const ETCD_TAG: &str = "v3.5.0";
fn start_etcd() -> anyhow::Result<(impl Drop, String)> {
let container = GenericImage::new(ETCD_IMAGE, ETCD_TAG)
.with_wait_for(WaitFor::message_on_stderr(
"now serving peer/client/metrics",
))
.with_env_var("ETCD_LISTEN_CLIENT_URLS", "http://0.0.0.0:2379")
.with_env_var("ETCD_ADVERTISE_CLIENT_URLS", "http://0.0.0.0:2379")
.start()?;
let port = container.get_host_port_ipv4(ETCD_PORT)?;
let endpoint = format!("http://127.0.0.1:{port}");
Ok((container, endpoint))
}
fn seed_keys(endpoint: &str, pairs: &[(&str, &str)]) -> anyhow::Result<()> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
let mut client = Client::connect(&[endpoint], None).await?;
for (k, v) in pairs {
client.put(*k, *v, None).await?;
}
Ok(())
})
}
fn delete_key(endpoint: &str, key: &str) -> anyhow::Result<()> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
let mut client = Client::connect(&[endpoint], None).await?;
client.delete(key, None).await?;
Ok(())
})
}
fn make_burst(key: &str, value: &[u8]) -> Rc<dyn crate::Stream<Burst<EtcdEntry>>> {
crate::nodes::constant(burst![EtcdEntry {
key: key.to_string(),
value: value.to_vec(),
}])
}
#[test]
fn test_connection_refused() {
let conn = EtcdConnection::new("http://127.0.0.1:59999");
let result = etcd_sub(conn, "/x/")
.collapse()
.collect()
.run(RunMode::RealTime, RunFor::Cycles(1));
assert!(result.is_err(), "expected connection error");
}
#[test]
fn test_sub_empty_snapshot_then_live_write() -> anyhow::Result<()> {
let (_container, endpoint) = start_etcd()?;
let conn = EtcdConnection::new(&endpoint);
let collected = etcd_sub(conn, "/empty/").collapse().collect();
let endpoint_clone = endpoint.clone();
let handle = std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(200));
seed_keys(&endpoint_clone, &[("/empty/probe", "1")]).unwrap();
});
collected
.clone()
.run(RunMode::RealTime, RunFor::Cycles(1))?;
handle.join().unwrap();
let events = collected.peek_value();
assert_eq!(events.len(), 1);
assert_eq!(events[0].value.kind, EtcdEventKind::Put);
assert_eq!(events[0].value.entry.key, "/empty/probe");
Ok(())
}
#[test]
fn test_sub_snapshot_with_existing_keys() -> anyhow::Result<()> {
let (_container, endpoint) = start_etcd()?;
seed_keys(&endpoint, &[("/snap/a", "1"), ("/snap/b", "2")])?;
let conn = EtcdConnection::new(&endpoint);
let collected = etcd_sub(conn, "/snap/").collect();
collected
.clone()
.run(RunMode::RealTime, RunFor::Cycles(1))?;
let events: Vec<EtcdEvent> = collected
.peek_value()
.into_iter()
.flat_map(|v| v.value.into_iter())
.collect();
assert_eq!(events.len(), 2);
assert!(events.iter().all(|e| e.kind == EtcdEventKind::Put));
let keys: std::collections::BTreeSet<String> =
events.iter().map(|e| e.entry.key.clone()).collect();
assert!(keys.contains("/snap/a"));
assert!(keys.contains("/snap/b"));
Ok(())
}
#[test]
fn test_sub_live_updates() -> anyhow::Result<()> {
let (_container, endpoint) = start_etcd()?;
let conn = EtcdConnection::new(&endpoint);
let endpoint_clone = endpoint.clone();
let handle = std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(150));
seed_keys(&endpoint_clone, &[("/live/x", "hello")]).unwrap();
});
let collected = etcd_sub(conn, "/live/").collapse().collect();
collected
.clone()
.run(RunMode::RealTime, RunFor::Cycles(1))?;
handle.join().unwrap();
let events = collected.peek_value();
assert_eq!(events.len(), 1);
assert_eq!(events[0].value.kind, EtcdEventKind::Put);
assert_eq!(events[0].value.entry.key, "/live/x");
assert_eq!(events[0].value.entry.value, b"hello");
Ok(())
}
#[test]
fn test_pub_round_trip() -> anyhow::Result<()> {
let (_container, endpoint) = start_etcd()?;
let conn = EtcdConnection::new(&endpoint);
let source = make_burst("/rt/key1", b"value1");
etcd_pub(conn, &source, None, true).run(RunMode::RealTime, RunFor::Cycles(1))?;
let rt = tokio::runtime::Runtime::new()?;
let value = rt.block_on(async {
let mut client = Client::connect(&[&endpoint], None).await?;
let resp = client.get("/rt/key1", None).await?;
let kv = resp
.kvs()
.first()
.ok_or_else(|| anyhow::anyhow!("key not found"))?;
Ok::<Vec<u8>, anyhow::Error>(kv.value().to_vec())
})?;
assert_eq!(value, b"value1");
Ok(())
}
#[test]
fn test_sub_delete_events() -> anyhow::Result<()> {
let (_container, endpoint) = start_etcd()?;
seed_keys(&endpoint, &[("/del/k", "v")])?;
let conn = EtcdConnection::new(&endpoint);
let endpoint_clone = endpoint.clone();
let handle = std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(150));
delete_key(&endpoint_clone, "/del/k").unwrap();
});
let collected = etcd_sub(conn, "/del/").collapse().collect();
collected
.clone()
.run(RunMode::RealTime, RunFor::Cycles(2))?;
handle.join().unwrap();
let events = collected.peek_value();
assert_eq!(events.len(), 2);
assert_eq!(events[0].value.kind, EtcdEventKind::Put);
assert_eq!(events[1].value.kind, EtcdEventKind::Delete);
assert_eq!(events[1].value.entry.key, "/del/k");
Ok(())
}
#[test]
fn test_sub_no_race_between_snapshot_and_watch() -> anyhow::Result<()> {
let (_container, endpoint) = start_etcd()?;
seed_keys(&endpoint, &[("/race/existing", "old")])?;
let conn = EtcdConnection::new(&endpoint);
let endpoint_clone = endpoint.clone();
let handle = std::thread::spawn(move || {
seed_keys(&endpoint_clone, &[("/race/new", "new")]).unwrap();
});
handle.join().unwrap();
let collected = etcd_sub(conn, "/race/").collect();
collected
.clone()
.run(RunMode::RealTime, RunFor::Cycles(1))?;
let events: Vec<EtcdEvent> = collected
.peek_value()
.into_iter()
.flat_map(|v| v.value.into_iter())
.collect();
let keys: std::collections::HashSet<String> =
events.iter().map(|e| e.entry.key.clone()).collect();
assert!(keys.contains("/race/existing"), "existing key missing");
assert!(keys.contains("/race/new"), "concurrent key missing");
assert_eq!(keys.len(), 2, "duplicate events detected");
Ok(())
}
fn get_key(endpoint: &str, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
let mut client = Client::connect(&[endpoint], None).await?;
let resp = client.get(key, None).await?;
Ok(resp.kvs().first().map(|kv| kv.value().to_vec()))
})
}
#[test]
fn test_pub_lease_keys_expire_after_revoke() -> anyhow::Result<()> {
let (_container, endpoint) = start_etcd()?;
let conn = EtcdConnection::new(&endpoint);
let source = make_burst("/lease/k1", b"hello");
etcd_pub(
conn,
&source,
Some(std::time::Duration::from_secs(30)),
true,
)
.run(RunMode::RealTime, RunFor::Cycles(1))?;
let value = get_key(&endpoint, "/lease/k1")?;
assert!(value.is_none(), "key should be gone after lease revoke");
Ok(())
}
#[test]
fn test_pub_lease_keepalive_extends_ttl() -> anyhow::Result<()> {
let (_container, endpoint) = start_etcd()?;
let conn = EtcdConnection::new(&endpoint);
let endpoint_check = endpoint.clone();
let handle = std::thread::spawn(move || -> anyhow::Result<()> {
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(15);
loop {
if get_key(&endpoint_check, "/lease/heartbeat")?.is_some() {
break;
}
anyhow::ensure!(
std::time::Instant::now() < deadline,
"key never appeared within 15 s"
);
std::thread::sleep(std::time::Duration::from_millis(200));
}
std::thread::sleep(std::time::Duration::from_secs(4));
let v = get_key(&endpoint_check, "/lease/heartbeat")?;
assert!(
v.is_some(),
"key should still exist; keepalive should have renewed lease"
);
Ok(())
});
let entry = make_burst("/lease/heartbeat", b"alive");
etcd_pub(conn, &entry, Some(std::time::Duration::from_secs(3)), true).run(
RunMode::RealTime,
RunFor::Duration(std::time::Duration::from_secs(10)),
)?;
handle.join().unwrap()?;
Ok(())
}
#[test]
fn test_pub_no_lease_keys_persist() -> anyhow::Result<()> {
let (_container, endpoint) = start_etcd()?;
let conn = EtcdConnection::new(&endpoint);
let source = make_burst("/nolease/k1", b"persist");
etcd_pub(conn, &source, None, true).run(RunMode::RealTime, RunFor::Cycles(1))?;
let value = get_key(&endpoint, "/nolease/k1")?;
assert_eq!(value.as_deref(), Some(b"persist".as_ref()));
Ok(())
}
#[test]
fn test_pub_force_true_overwrites() -> anyhow::Result<()> {
let (_container, endpoint) = start_etcd()?;
seed_keys(&endpoint, &[("/force/k", "original")])?;
let conn = EtcdConnection::new(&endpoint);
let source = make_burst("/force/k", b"updated");
etcd_pub(conn, &source, None, true).run(RunMode::RealTime, RunFor::Cycles(1))?;
let value = get_key(&endpoint, "/force/k")?;
assert_eq!(value.as_deref(), Some(b"updated".as_ref()));
Ok(())
}
#[test]
fn test_pub_force_false_fails_if_exists() -> anyhow::Result<()> {
let (_container, endpoint) = start_etcd()?;
seed_keys(&endpoint, &[("/noforce/k", "original")])?;
let conn = EtcdConnection::new(&endpoint);
let source = make_burst("/noforce/k", b"should-not-overwrite");
let result = etcd_pub(conn, &source, None, false).run(RunMode::RealTime, RunFor::Cycles(1));
assert!(result.is_err(), "expected error when key already exists");
let err = result.unwrap_err();
let named_in_chain = err.chain().any(|e| e.to_string().contains("/noforce/k"));
assert!(named_in_chain, "error chain should name the key: {err:#}");
let value = get_key(&endpoint, "/noforce/k")?;
assert_eq!(value.as_deref(), Some(b"original".as_ref()));
Ok(())
}
#[test]
fn test_pub_force_false_succeeds_if_absent() -> anyhow::Result<()> {
let (_container, endpoint) = start_etcd()?;
let conn = EtcdConnection::new(&endpoint);
let source = make_burst("/noforce/new", b"value");
etcd_pub(conn, &source, None, false).run(RunMode::RealTime, RunFor::Cycles(1))?;
let value = get_key(&endpoint, "/noforce/new")?;
assert_eq!(value.as_deref(), Some(b"value".as_ref()));
Ok(())
}
#[test]
fn test_etcd_kv_value_str() {
let kv = EtcdEntry {
key: "/foo".to_string(),
value: b"bar".to_vec(),
};
assert_eq!(kv.value_str().unwrap(), "bar");
}