#![cfg(all(feature = "template-toxiproxy", feature = "template-redis"))]
use docker_wrapper::template::toxiproxy::{Toxic, ToxicStream};
use docker_wrapper::{
DockerCommand, NetworkCreateCommand, NetworkRmCommand, RedisTemplate, Template,
ToxiproxyTemplate,
};
use std::time::{Duration, Instant};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
fn unique(prefix: &str) -> String {
format!("{}-{}", prefix, uuid::Uuid::new_v4())
}
fn random_port() -> u16 {
30000 + (uuid::Uuid::new_v4().as_u128() % 10000) as u16
}
async fn ping_through(addr: &str) -> Result<Duration, Box<dyn std::error::Error>> {
let start = Instant::now();
let mut stream = TcpStream::connect(addr).await?;
stream.write_all(b"PING\r\n").await?;
stream.flush().await?;
let mut buf = [0u8; 64];
let n = tokio::time::timeout(Duration::from_secs(5), stream.read(&mut buf)).await??;
let elapsed = start.elapsed();
let reply = String::from_utf8_lossy(&buf[..n]);
if !reply.to_uppercase().contains("PONG") {
return Err(format!("unexpected reply from {addr}: {reply:?}").into());
}
Ok(elapsed)
}
#[tokio::test]
async fn test_toxiproxy_proxy_and_latency_toxic() -> Result<(), Box<dyn std::error::Error>> {
if docker_wrapper::VersionCommand::new()
.execute()
.await
.is_err()
{
eprintln!("Docker not available, skipping test");
return Ok(());
}
let network = unique("toxi-net");
let redis_name = unique("toxi-redis");
let proxy_name = unique("toxi-proxy");
let proxy_port = random_port();
let _ = NetworkCreateCommand::new(&network)
.driver("bridge")
.execute()
.await;
let result = async {
let mut redis = RedisTemplate::new(&redis_name);
redis.config_mut().network = Some(network.clone());
redis.start_and_wait().await?;
redis.wait_for_ready().await?;
let mut toxiproxy = ToxiproxyTemplate::new(&proxy_name)
.proxy_port(proxy_port)
.api_ready_timeout(Duration::from_secs(30));
toxiproxy.config_mut().network = Some(network.clone());
toxiproxy.start_and_wait().await?;
toxiproxy.wait_for_control_api().await?;
let proxy = toxiproxy
.create_proxy(
"redis",
format!("0.0.0.0:{proxy_port}"),
format!("{redis_name}:6379"),
)
.await?;
assert_eq!(proxy.name, "redis");
assert!(proxy.enabled);
let proxies = toxiproxy.list_proxies().await?;
assert!(
proxies.iter().any(|p| p.name == "redis"),
"expected 'redis' proxy in listing, got {proxies:?}"
);
let addr = format!("127.0.0.1:{proxy_port}");
let baseline = ping_through(&addr).await?;
toxiproxy
.add_toxic(
"redis",
"slow",
ToxicStream::Downstream,
Toxic::latency(600),
)
.await?;
let slowed = ping_through(&addr).await?;
assert!(
slowed >= Duration::from_millis(500),
"expected latency toxic to add delay (baseline {baseline:?}, slowed {slowed:?})"
);
toxiproxy.remove_toxic("redis", "slow").await?;
let recovered = ping_through(&addr).await?;
assert!(
recovered < Duration::from_millis(500),
"expected latency to recover after removing toxic, got {recovered:?}"
);
toxiproxy.reset().await?;
toxiproxy.stop().await?;
toxiproxy.remove().await?;
redis.stop().await?;
redis.remove().await?;
Ok::<(), Box<dyn std::error::Error>>(())
}
.await;
let _ = RedisTemplate::new(&redis_name).remove().await;
let _ = ToxiproxyTemplate::new(&proxy_name).remove().await;
let _ = NetworkRmCommand::new(&network).run().await;
result
}