#![allow(clippy::doc_markdown)]
#![allow(clippy::must_use_candidate)]
#![allow(clippy::return_self_not_must_use)]
use crate::template::{Result, Template, TemplateConfig, TemplateError};
use async_trait::async_trait;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;
const DEFAULT_IMAGE: &str = "ghcr.io/shopify/toxiproxy";
const DEFAULT_TAG: &str = "2.12.0";
const DEFAULT_CONTROL_PORT: u16 = 8474;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ToxicStream {
Downstream,
Upstream,
}
impl ToxicStream {
fn as_str(self) -> &'static str {
match self {
ToxicStream::Downstream => "downstream",
ToxicStream::Upstream => "upstream",
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum Toxic {
Latency {
latency: u64,
jitter: u64,
},
Bandwidth {
rate: u64,
},
Timeout {
timeout: u64,
},
Slicer {
average_size: u64,
size_variation: u64,
delay: u64,
},
LimitData {
bytes: u64,
},
}
impl Toxic {
#[must_use]
pub fn latency(latency: u64) -> Self {
Toxic::Latency { latency, jitter: 0 }
}
#[must_use]
pub fn jitter(latency: u64, jitter: u64) -> Self {
Toxic::Latency { latency, jitter }
}
#[must_use]
pub fn bandwidth(rate: u64) -> Self {
Toxic::Bandwidth { rate }
}
#[must_use]
pub fn timeout(timeout: u64) -> Self {
Toxic::Timeout { timeout }
}
#[must_use]
pub fn slicer(average_size: u64, size_variation: u64, delay: u64) -> Self {
Toxic::Slicer {
average_size,
size_variation,
delay,
}
}
#[must_use]
pub fn limit_data(bytes: u64) -> Self {
Toxic::LimitData { bytes }
}
fn type_name(&self) -> &'static str {
match self {
Toxic::Latency { .. } => "latency",
Toxic::Bandwidth { .. } => "bandwidth",
Toxic::Timeout { .. } => "timeout",
Toxic::Slicer { .. } => "slicer",
Toxic::LimitData { .. } => "limit_data",
}
}
fn attributes(&self) -> HashMap<String, u64> {
let mut attrs = HashMap::new();
match *self {
Toxic::Latency { latency, jitter } => {
attrs.insert("latency".to_string(), latency);
attrs.insert("jitter".to_string(), jitter);
}
Toxic::Bandwidth { rate } => {
attrs.insert("rate".to_string(), rate);
}
Toxic::Timeout { timeout } => {
attrs.insert("timeout".to_string(), timeout);
}
Toxic::Slicer {
average_size,
size_variation,
delay,
} => {
attrs.insert("average_size".to_string(), average_size);
attrs.insert("size_variation".to_string(), size_variation);
attrs.insert("delay".to_string(), delay);
}
Toxic::LimitData { bytes } => {
attrs.insert("bytes".to_string(), bytes);
}
}
attrs
}
}
#[derive(Debug, Serialize)]
struct ProxyRequest {
name: String,
listen: String,
upstream: String,
enabled: bool,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ProxyInfo {
pub name: String,
pub listen: String,
pub upstream: String,
pub enabled: bool,
}
#[derive(Debug, Serialize)]
struct ToxicRequest {
name: String,
#[serde(rename = "type")]
toxic_type: String,
stream: String,
toxicity: f64,
attributes: HashMap<String, u64>,
}
pub struct ToxiproxyTemplate {
config: TemplateConfig,
control_port: u16,
api_ready_timeout: Duration,
}
impl ToxiproxyTemplate {
pub fn new(name: impl Into<String>) -> Self {
let name = name.into();
let config = TemplateConfig {
name,
image: DEFAULT_IMAGE.to_string(),
tag: DEFAULT_TAG.to_string(),
ports: vec![(DEFAULT_CONTROL_PORT, DEFAULT_CONTROL_PORT)],
env: HashMap::new(),
volumes: Vec::new(),
network: None,
health_check: None,
auto_remove: false,
memory_limit: None,
cpu_limit: None,
platform: None,
};
Self {
config,
control_port: DEFAULT_CONTROL_PORT,
api_ready_timeout: Duration::from_secs(30),
}
}
pub fn control_port(mut self, port: u16) -> Self {
if let Some(pos) = self
.config
.ports
.iter()
.position(|(_, c)| *c == DEFAULT_CONTROL_PORT)
{
self.config.ports[pos] = (port, DEFAULT_CONTROL_PORT);
} else {
self.config.ports.push((port, DEFAULT_CONTROL_PORT));
}
self.control_port = port;
self
}
pub fn proxy_port(mut self, port: u16) -> Self {
if !self
.config
.ports
.iter()
.any(|(h, c)| *h == port && *c == port)
{
self.config.ports.push((port, port));
}
self
}
pub fn network(mut self, network: impl Into<String>) -> Self {
self.config.network = Some(network.into());
self
}
pub fn auto_remove(mut self) -> Self {
self.config.auto_remove = true;
self
}
pub fn custom_image(mut self, image: impl Into<String>, tag: impl Into<String>) -> Self {
self.config.image = image.into();
self.config.tag = tag.into();
self
}
pub fn platform(mut self, platform: impl Into<String>) -> Self {
self.config.platform = Some(platform.into());
self
}
pub fn api_ready_timeout(mut self, timeout: Duration) -> Self {
self.api_ready_timeout = timeout;
self
}
fn control_url(&self) -> String {
format!("http://localhost:{}", self.control_port)
}
fn http_client() -> Result<Client> {
Client::builder()
.timeout(Duration::from_secs(10))
.build()
.map_err(|e| {
TemplateError::DockerError(crate::Error::custom(format!(
"failed to build HTTP client: {e}"
)))
})
}
pub async fn wait_for_control_api(&self) -> Result<()> {
let client = Self::http_client()?;
let url = format!("{}/version", self.control_url());
let start = std::time::Instant::now();
while start.elapsed() < self.api_ready_timeout {
if let Ok(response) = client.get(&url).send().await {
if response.status().is_success() {
return Ok(());
}
}
tokio::time::sleep(Duration::from_millis(250)).await;
}
Err(TemplateError::Timeout(format!(
"Toxiproxy control API on port {} did not become ready within {}s",
self.control_port,
self.api_ready_timeout.as_secs()
)))
}
pub async fn create_proxy(
&self,
name: impl Into<String>,
listen: impl Into<String>,
upstream: impl Into<String>,
) -> Result<ProxyInfo> {
let client = Self::http_client()?;
let body = ProxyRequest {
name: name.into(),
listen: listen.into(),
upstream: upstream.into(),
enabled: true,
};
let url = format!("{}/proxies", self.control_url());
let response = client
.post(&url)
.json(&body)
.send()
.await
.map_err(|e| map_request_err(&e))?;
let status = response.status();
if !status.is_success() {
let text = response.text().await.unwrap_or_default();
return Err(TemplateError::InvalidConfig(format!(
"failed to create proxy '{}': HTTP {status}: {text}",
body.name
)));
}
response
.json::<ProxyInfo>()
.await
.map_err(|e| map_request_err(&e))
}
pub async fn add_toxic(
&self,
proxy: &str,
name: impl Into<String>,
stream: ToxicStream,
toxic: Toxic,
) -> Result<()> {
let client = Self::http_client()?;
let body = ToxicRequest {
name: name.into(),
toxic_type: toxic.type_name().to_string(),
stream: stream.as_str().to_string(),
toxicity: 1.0,
attributes: toxic.attributes(),
};
let url = format!("{}/proxies/{proxy}/toxics", self.control_url());
let response = client
.post(&url)
.json(&body)
.send()
.await
.map_err(|e| map_request_err(&e))?;
let status = response.status();
if !status.is_success() {
let text = response.text().await.unwrap_or_default();
return Err(TemplateError::InvalidConfig(format!(
"failed to add toxic '{}' to proxy '{proxy}': HTTP {status}: {text}",
body.name
)));
}
Ok(())
}
pub async fn remove_toxic(&self, proxy: &str, toxic: &str) -> Result<()> {
let client = Self::http_client()?;
let url = format!("{}/proxies/{proxy}/toxics/{toxic}", self.control_url());
let response = client
.delete(&url)
.send()
.await
.map_err(|e| map_request_err(&e))?;
let status = response.status();
if !status.is_success() {
let text = response.text().await.unwrap_or_default();
return Err(TemplateError::InvalidConfig(format!(
"failed to remove toxic '{toxic}' from proxy '{proxy}': HTTP {status}: {text}"
)));
}
Ok(())
}
pub async fn list_proxies(&self) -> Result<Vec<ProxyInfo>> {
let client = Self::http_client()?;
let url = format!("{}/proxies", self.control_url());
let response = client
.get(&url)
.send()
.await
.map_err(|e| map_request_err(&e))?;
let status = response.status();
if !status.is_success() {
let text = response.text().await.unwrap_or_default();
return Err(TemplateError::InvalidConfig(format!(
"failed to list proxies: HTTP {status}: {text}"
)));
}
let map = response
.json::<HashMap<String, ProxyInfo>>()
.await
.map_err(|e| map_request_err(&e))?;
Ok(map.into_values().collect())
}
pub async fn reset(&self) -> Result<()> {
let client = Self::http_client()?;
let url = format!("{}/reset", self.control_url());
let response = client
.post(&url)
.send()
.await
.map_err(|e| map_request_err(&e))?;
let status = response.status();
if !status.is_success() {
let text = response.text().await.unwrap_or_default();
return Err(TemplateError::InvalidConfig(format!(
"failed to reset Toxiproxy: HTTP {status}: {text}"
)));
}
Ok(())
}
}
fn map_request_err(e: &reqwest::Error) -> TemplateError {
TemplateError::DockerError(crate::Error::custom(format!(
"Toxiproxy control API request failed: {e}"
)))
}
#[async_trait]
impl Template for ToxiproxyTemplate {
fn name(&self) -> &str {
&self.config.name
}
fn config(&self) -> &TemplateConfig {
&self.config
}
fn config_mut(&mut self) -> &mut TemplateConfig {
&mut self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_toxiproxy_template_defaults() {
let template = ToxiproxyTemplate::new("test-toxiproxy");
assert_eq!(template.name(), "test-toxiproxy");
assert_eq!(template.config().image, DEFAULT_IMAGE);
assert_eq!(template.config().tag, DEFAULT_TAG);
assert_eq!(template.control_port, DEFAULT_CONTROL_PORT);
assert_eq!(template.config().ports, vec![(8474, 8474)]);
}
#[test]
fn test_control_port_replaces_mapping() {
let template = ToxiproxyTemplate::new("test").control_port(18474);
assert_eq!(template.control_port, 18474);
assert_eq!(template.config().ports, vec![(18474, 8474)]);
assert_eq!(template.control_url(), "http://localhost:18474");
}
#[test]
fn test_proxy_port_published() {
let template = ToxiproxyTemplate::new("test")
.proxy_port(16379)
.proxy_port(16379); let ports = &template.config().ports;
assert!(ports.contains(&(8474, 8474)));
assert!(ports.contains(&(16379, 16379)));
assert_eq!(ports.iter().filter(|p| **p == (16379, 16379)).count(), 1);
}
#[test]
fn test_network_and_custom_image() {
let template = ToxiproxyTemplate::new("test")
.network("chaos-net")
.custom_image("ghcr.io/shopify/toxiproxy", "latest")
.platform("linux/arm64");
assert_eq!(template.config().network.as_deref(), Some("chaos-net"));
assert_eq!(template.config().image, "ghcr.io/shopify/toxiproxy");
assert_eq!(template.config().tag, "latest");
assert_eq!(template.config().platform.as_deref(), Some("linux/arm64"));
}
#[test]
fn test_toxic_latency_attributes() {
let toxic = Toxic::latency(500);
assert_eq!(toxic.type_name(), "latency");
let attrs = toxic.attributes();
assert_eq!(attrs.get("latency"), Some(&500));
assert_eq!(attrs.get("jitter"), Some(&0));
let toxic = Toxic::jitter(500, 100);
let attrs = toxic.attributes();
assert_eq!(attrs.get("latency"), Some(&500));
assert_eq!(attrs.get("jitter"), Some(&100));
}
#[test]
fn test_toxic_bandwidth_attributes() {
let toxic = Toxic::bandwidth(64);
assert_eq!(toxic.type_name(), "bandwidth");
assert_eq!(toxic.attributes().get("rate"), Some(&64));
}
#[test]
fn test_toxic_timeout_attributes() {
let toxic = Toxic::timeout(0);
assert_eq!(toxic.type_name(), "timeout");
assert_eq!(toxic.attributes().get("timeout"), Some(&0));
}
#[test]
fn test_toxic_slicer_attributes() {
let toxic = Toxic::slicer(64, 32, 10);
assert_eq!(toxic.type_name(), "slicer");
let attrs = toxic.attributes();
assert_eq!(attrs.get("average_size"), Some(&64));
assert_eq!(attrs.get("size_variation"), Some(&32));
assert_eq!(attrs.get("delay"), Some(&10));
}
#[test]
fn test_toxic_limit_data_attributes() {
let toxic = Toxic::limit_data(2048);
assert_eq!(toxic.type_name(), "limit_data");
assert_eq!(toxic.attributes().get("bytes"), Some(&2048));
}
#[test]
fn test_toxic_stream_wire_values() {
assert_eq!(ToxicStream::Downstream.as_str(), "downstream");
assert_eq!(ToxicStream::Upstream.as_str(), "upstream");
}
}