mod common;
#[cfg(feature = "etcd")]
mod etcd_tests {
use hexcfg::adapters::EtcdAdapter;
use hexcfg::domain::ConfigKey;
use hexcfg::ports::ConfigSource;
use testcontainers::{core::WaitFor, runners::AsyncRunner, GenericImage, ImageExt};
use crate::common as docker_helpers;
async fn setup_etcd_test() -> Option<(testcontainers::ContainerAsync<GenericImage>, EtcdAdapter)>
{
if !docker_helpers::is_docker_available() {
docker_helpers::print_docker_unavailable_warning("etcd integration test");
return None;
}
let etcd_image = GenericImage::new("quay.io/coreos/etcd", "v3.5.0")
.with_exposed_port(2379.into())
.with_wait_for(WaitFor::message_on_stderr("ready to serve client requests"))
.with_env_var("ETCD_ADVERTISE_CLIENT_URLS", "http://0.0.0.0:2379")
.with_env_var("ETCD_LISTEN_CLIENT_URLS", "http://0.0.0.0:2379");
let container = etcd_image.start().await.ok()?;
let port = container.get_host_port_ipv4(2379).await.ok()?;
let endpoint = format!("127.0.0.1:{}", port);
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
let client = etcd_client::Client::connect([&endpoint], None).await.ok()?;
let mut client_clone = client.clone();
client_clone
.put("test/test.key", "test_value", None)
.await
.ok()?;
client_clone
.put("test/database/host", "localhost", None)
.await
.ok()?;
client_clone
.put("test/database/port", "5432", None)
.await
.ok()?;
let adapter = EtcdAdapter::new(vec![endpoint], Some("test/")).await.ok()?;
Some((container, adapter))
}
#[tokio::test]
async fn test_etcd_get() {
let Some((_container, adapter)) = setup_etcd_test().await else {
return;
};
let key = ConfigKey::from("test.key");
let value = adapter.get(&key).unwrap();
assert!(value.is_some());
assert_eq!(value.unwrap().as_str(), "test_value");
}
#[tokio::test]
async fn test_etcd_get_nested() {
let Some((_container, adapter)) = setup_etcd_test().await else {
return;
};
let key = ConfigKey::from("database.host");
let value = adapter.get(&key).unwrap();
assert!(value.is_some());
assert_eq!(value.unwrap().as_str(), "localhost");
}
#[tokio::test]
async fn test_etcd_all_keys() {
let Some((_container, adapter)) = setup_etcd_test().await else {
return;
};
let keys = adapter.all_keys().unwrap();
assert_eq!(keys.len(), 3);
assert!(keys.contains(&ConfigKey::from("test.key")));
assert!(keys.contains(&ConfigKey::from("database.host")));
assert!(keys.contains(&ConfigKey::from("database.port")));
}
#[tokio::test]
async fn test_etcd_reload() {
let Some((container, mut adapter)) = setup_etcd_test().await else {
return;
};
let key = ConfigKey::from("test.key");
let value = adapter.get(&key).unwrap();
assert_eq!(value.unwrap().as_str(), "test_value");
let port = container.get_host_port_ipv4(2379).await.unwrap();
let endpoint = format!("127.0.0.1:{}", port);
let client = etcd_client::Client::connect([&endpoint], None)
.await
.unwrap();
let mut client_clone = client.clone();
client_clone
.put("test/test.key", "updated_value", None)
.await
.unwrap();
let value = adapter.get(&key).unwrap();
assert_eq!(value.unwrap().as_str(), "test_value");
adapter.reload().unwrap();
let value = adapter.get(&key).unwrap();
assert_eq!(value.unwrap().as_str(), "updated_value");
}
#[tokio::test]
async fn test_etcd_priority() {
let Some((_container, adapter)) = setup_etcd_test().await else {
return;
};
assert_eq!(adapter.priority(), 1);
}
#[tokio::test]
async fn test_etcd_custom_priority() {
if !docker_helpers::is_docker_available() {
docker_helpers::print_docker_unavailable_warning("etcd custom priority test");
return;
}
let etcd_image = GenericImage::new("quay.io/coreos/etcd", "v3.5.0")
.with_exposed_port(2379.into())
.with_wait_for(WaitFor::message_on_stderr("ready to serve client requests"))
.with_env_var("ETCD_ADVERTISE_CLIENT_URLS", "http://0.0.0.0:2379")
.with_env_var("ETCD_LISTEN_CLIENT_URLS", "http://0.0.0.0:2379");
let container = etcd_image.start().await.unwrap();
let port = container.get_host_port_ipv4(2379).await.unwrap();
let endpoint = format!("127.0.0.1:{}", port);
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
let adapter = EtcdAdapter::with_priority(vec![endpoint], None, 5)
.await
.unwrap();
assert_eq!(adapter.priority(), 5);
}
#[tokio::test]
async fn test_etcd_nonexistent_key() {
let Some((_container, adapter)) = setup_etcd_test().await else {
return;
};
let key = ConfigKey::from("nonexistent.key");
let value = adapter.get(&key).unwrap();
assert!(value.is_none());
}
#[tokio::test]
async fn test_etcd_prefix_filtering() {
if !docker_helpers::is_docker_available() {
docker_helpers::print_docker_unavailable_warning("etcd prefix filtering test");
return;
}
let etcd_image = GenericImage::new("quay.io/coreos/etcd", "v3.5.0")
.with_exposed_port(2379.into())
.with_wait_for(WaitFor::message_on_stderr("ready to serve client requests"))
.with_env_var("ETCD_ADVERTISE_CLIENT_URLS", "http://0.0.0.0:2379")
.with_env_var("ETCD_LISTEN_CLIENT_URLS", "http://0.0.0.0:2379");
let container = etcd_image.start().await.unwrap();
let port = container.get_host_port_ipv4(2379).await.unwrap();
let endpoint = format!("127.0.0.1:{}", port);
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
let client = etcd_client::Client::connect([&endpoint], None)
.await
.unwrap();
let mut client_clone = client.clone();
client_clone.put("app1/key1", "value1", None).await.unwrap();
client_clone.put("app2/key2", "value2", None).await.unwrap();
let adapter = EtcdAdapter::new(vec![endpoint], Some("app1/"))
.await
.unwrap();
let keys = adapter.all_keys().unwrap();
assert_eq!(keys.len(), 1);
assert!(keys.contains(&ConfigKey::from("key1")));
assert!(!keys.contains(&ConfigKey::from("key2")));
}
#[tokio::test]
async fn test_cli_overrides_etcd() {
use hexcfg::adapters::CommandLineAdapter;
use hexcfg::domain::ConfigurationService;
use hexcfg::service::ConfigurationServiceBuilder;
let Some((_container, etcd_adapter)) = setup_etcd_test().await else {
return;
};
let cli_args = vec!["--test.key=from_cli"];
let cli_adapter = CommandLineAdapter::from_args(cli_args);
let service = ConfigurationServiceBuilder::new()
.with_source(Box::new(etcd_adapter))
.with_source(Box::new(cli_adapter))
.build()
.unwrap();
let value = service.get_str("test.key").unwrap();
assert_eq!(value.as_str(), "from_cli");
}
#[tokio::test]
async fn test_env_overrides_etcd() {
use hexcfg::adapters::EnvVarAdapter;
use hexcfg::domain::ConfigurationService;
use hexcfg::service::ConfigurationServiceBuilder;
use std::collections::HashMap;
let Some((_container, etcd_adapter)) = setup_etcd_test().await else {
return;
};
let mut env_vars = HashMap::new();
env_vars.insert("test.key".to_string(), "from_env".to_string());
let env_adapter = EnvVarAdapter::with_values(env_vars);
let service = ConfigurationServiceBuilder::new()
.with_source(Box::new(etcd_adapter))
.with_source(Box::new(env_adapter))
.build()
.unwrap();
let value = service.get_str("test.key").unwrap();
assert_eq!(value.as_str(), "from_env");
}
#[tokio::test]
async fn test_full_precedence_chain_with_etcd() {
use hexcfg::adapters::{CommandLineAdapter, EnvVarAdapter};
use hexcfg::domain::ConfigurationService;
use hexcfg::service::ConfigurationServiceBuilder;
use std::collections::HashMap;
let Some((_container, etcd_adapter)) = setup_etcd_test().await else {
return;
};
let mut env_vars = HashMap::new();
env_vars.insert("test.key".to_string(), "from_env".to_string());
env_vars.insert("database.host".to_string(), "env.example.com".to_string());
let env_adapter = EnvVarAdapter::with_values(env_vars);
let cli_args = vec!["--test.key=from_cli"];
let cli_adapter = CommandLineAdapter::from_args(cli_args);
let service = ConfigurationServiceBuilder::new()
.with_source(Box::new(etcd_adapter))
.with_source(Box::new(env_adapter))
.with_source(Box::new(cli_adapter))
.build()
.unwrap();
let value = service.get_str("test.key").unwrap();
assert_eq!(value.as_str(), "from_cli");
let value = service.get_str("database.host").unwrap();
assert_eq!(value.as_str(), "env.example.com");
let value = service.get_str("database.port").unwrap();
assert_eq!(value.as_str(), "5432");
}
#[tokio::test]
async fn test_etcd_with_custom_priority_overrides_env() {
use hexcfg::adapters::EnvVarAdapter;
use hexcfg::domain::ConfigurationService;
use hexcfg::service::ConfigurationServiceBuilder;
use std::collections::HashMap;
if !docker_helpers::is_docker_available() {
docker_helpers::print_docker_unavailable_warning(
"etcd custom priority precedence test",
);
return;
}
let etcd_image = GenericImage::new("quay.io/coreos/etcd", "v3.5.0")
.with_exposed_port(2379.into())
.with_wait_for(WaitFor::message_on_stderr("ready to serve client requests"))
.with_env_var("ETCD_ADVERTISE_CLIENT_URLS", "http://0.0.0.0:2379")
.with_env_var("ETCD_LISTEN_CLIENT_URLS", "http://0.0.0.0:2379");
let container = etcd_image.start().await.unwrap();
let port = container.get_host_port_ipv4(2379).await.unwrap();
let endpoint = format!("127.0.0.1:{}", port);
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
let client = etcd_client::Client::connect([&endpoint], None)
.await
.unwrap();
let mut client_clone = client.clone();
client_clone
.put("special.key", "from_etcd", None)
.await
.unwrap();
let etcd_adapter = EtcdAdapter::with_priority(vec![endpoint], None, 3)
.await
.unwrap();
let mut env_vars = HashMap::new();
env_vars.insert("special.key".to_string(), "from_env".to_string());
let env_adapter = EnvVarAdapter::with_values(env_vars);
let service = ConfigurationServiceBuilder::new()
.with_source(Box::new(env_adapter))
.with_source(Box::new(etcd_adapter))
.build()
.unwrap();
let value = service.get_str("special.key").unwrap();
assert_eq!(value.as_str(), "from_etcd");
}
async fn setup_etcd_watcher_test(
) -> Option<(testcontainers::ContainerAsync<GenericImage>, String)> {
if !docker_helpers::is_docker_available() {
docker_helpers::print_docker_unavailable_warning("etcd watcher test");
return None;
}
let etcd_image = GenericImage::new("quay.io/coreos/etcd", "v3.5.0")
.with_exposed_port(2379.into())
.with_wait_for(WaitFor::message_on_stderr("ready to serve client requests"))
.with_env_var("ETCD_ADVERTISE_CLIENT_URLS", "http://0.0.0.0:2379")
.with_env_var("ETCD_LISTEN_CLIENT_URLS", "http://0.0.0.0:2379");
let container = etcd_image.start().await.ok()?;
let port = container.get_host_port_ipv4(2379).await.ok()?;
let endpoint = format!("127.0.0.1:{}", port);
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
Some((container, endpoint))
}
#[tokio::test]
async fn test_etcd_watcher_creation() {
use hexcfg::adapters::EtcdWatcher;
let Some((_container, endpoint)) = setup_etcd_watcher_test().await else {
return;
};
let watcher = EtcdWatcher::new(vec![&endpoint], Some("test/")).await;
assert!(watcher.is_ok());
}
#[tokio::test]
async fn test_etcd_watcher_invalid_endpoint() {
use hexcfg::adapters::EtcdWatcher;
let watcher = EtcdWatcher::new(vec!["127.0.0.1:19999"], Some("test/")).await;
let _ = watcher;
}
#[tokio::test]
async fn test_etcd_watcher_start_stop() {
use hexcfg::adapters::EtcdWatcher;
use hexcfg::ports::ConfigWatcher;
use std::sync::Arc;
let Some((_container, endpoint)) = setup_etcd_watcher_test().await else {
return;
};
let mut watcher = EtcdWatcher::new(vec![&endpoint], Some("test/"))
.await
.unwrap();
let callback = Arc::new(|_key: ConfigKey| {
});
assert!(watcher.watch(callback).is_ok());
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
assert!(watcher.stop().is_ok());
}
#[tokio::test]
async fn test_etcd_watcher_callback_triggered() {
use hexcfg::adapters::EtcdWatcher;
use hexcfg::ports::ConfigWatcher;
use etcd_client::Client;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
let Some((_container, endpoint)) = setup_etcd_watcher_test().await else {
return;
};
let mut watcher = EtcdWatcher::new(vec![&endpoint], Some("test/watcher/"))
.await
.unwrap();
let triggered = Arc::new(AtomicBool::new(false));
let triggered_clone = Arc::clone(&triggered);
let callback = Arc::new(move |key: ConfigKey| {
eprintln!("etcd watcher callback triggered for key: {}", key.as_str());
triggered_clone.store(true, Ordering::SeqCst);
});
watcher.watch(callback).unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
let mut client = Client::connect([&endpoint], None).await.unwrap();
client
.put("test/watcher/mykey", "test_value", None)
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
client.delete("test/watcher/mykey", None).await.unwrap();
watcher.stop().unwrap();
let was_triggered = triggered.load(Ordering::SeqCst);
assert!(
was_triggered,
"etcd watcher callback should have been triggered"
);
}
#[tokio::test]
async fn test_etcd_watcher_multiple_changes() {
use hexcfg::adapters::EtcdWatcher;
use hexcfg::ports::ConfigWatcher;
use etcd_client::Client;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
let Some((_container, endpoint)) = setup_etcd_watcher_test().await else {
return;
};
let mut watcher = EtcdWatcher::new(vec![&endpoint], Some("test/multi/"))
.await
.unwrap();
let trigger_count = Arc::new(AtomicUsize::new(0));
let trigger_count_clone = Arc::clone(&trigger_count);
let callback = Arc::new(move |key: ConfigKey| {
eprintln!("etcd watcher detected change: {}", key.as_str());
trigger_count_clone.fetch_add(1, Ordering::SeqCst);
});
watcher.watch(callback).unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
let mut client = Client::connect([&endpoint], None).await.unwrap();
for i in 0..3 {
client
.put(format!("test/multi/key{}", i), format!("value{}", i), None)
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
for i in 0..3 {
client
.delete(format!("test/multi/key{}", i), None)
.await
.unwrap();
}
watcher.stop().unwrap();
let count = trigger_count.load(Ordering::SeqCst);
assert!(count >= 3, "Expected at least 3 triggers, got {}", count);
}
#[tokio::test]
async fn test_etcd_watcher_prefix_filtering() {
use hexcfg::adapters::EtcdWatcher;
use hexcfg::ports::ConfigWatcher;
use etcd_client::Client;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
let Some((_container, endpoint)) = setup_etcd_watcher_test().await else {
return;
};
let mut watcher = EtcdWatcher::new(vec![&endpoint], Some("test/prefix/"))
.await
.unwrap();
let trigger_count = Arc::new(AtomicUsize::new(0));
let trigger_count_clone = Arc::clone(&trigger_count);
let callback = Arc::new(move |key: ConfigKey| {
eprintln!("etcd watcher detected change: {}", key.as_str());
trigger_count_clone.fetch_add(1, Ordering::SeqCst);
});
watcher.watch(callback).unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
let mut client = Client::connect([&endpoint], None).await.unwrap();
client
.put("test/prefix/key1", "value1", None)
.await
.unwrap();
client
.put("other/prefix/key2", "value2", None)
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
client.delete("test/prefix/key1", None).await.unwrap();
client.delete("other/prefix/key2", None).await.unwrap();
watcher.stop().unwrap();
let count = trigger_count.load(Ordering::SeqCst);
assert!(
count >= 1,
"Expected at least 1 trigger for key with correct prefix, got {}",
count
);
assert!(
count <= 3,
"Expected at most 3 triggers, got {} (something may be wrong)",
count
);
}
}