1#![allow(clippy::doc_markdown)]
54#![allow(clippy::must_use_candidate)]
55#![allow(clippy::return_self_not_must_use)]
56
57use crate::template::{Result, Template, TemplateConfig, TemplateError};
58use async_trait::async_trait;
59use reqwest::Client;
60use serde::{Deserialize, Serialize};
61use std::collections::HashMap;
62use std::time::Duration;
63
64const DEFAULT_IMAGE: &str = "ghcr.io/shopify/toxiproxy";
66const DEFAULT_TAG: &str = "2.12.0";
68const DEFAULT_CONTROL_PORT: u16 = 8474;
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
77pub enum ToxicStream {
78 Downstream,
80 Upstream,
82}
83
84impl ToxicStream {
85 fn as_str(self) -> &'static str {
87 match self {
88 ToxicStream::Downstream => "downstream",
89 ToxicStream::Upstream => "upstream",
90 }
91 }
92}
93
94#[derive(Debug, Clone, PartialEq)]
103pub enum Toxic {
104 Latency {
108 latency: u64,
110 jitter: u64,
112 },
113 Bandwidth {
117 rate: u64,
119 },
120 Timeout {
125 timeout: u64,
127 },
128 Slicer {
130 average_size: u64,
132 size_variation: u64,
134 delay: u64,
136 },
137 LimitData {
139 bytes: u64,
141 },
142}
143
144impl Toxic {
145 #[must_use]
149 pub fn latency(latency: u64) -> Self {
150 Toxic::Latency { latency, jitter: 0 }
151 }
152
153 #[must_use]
157 pub fn jitter(latency: u64, jitter: u64) -> Self {
158 Toxic::Latency { latency, jitter }
159 }
160
161 #[must_use]
163 pub fn bandwidth(rate: u64) -> Self {
164 Toxic::Bandwidth { rate }
165 }
166
167 #[must_use]
171 pub fn timeout(timeout: u64) -> Self {
172 Toxic::Timeout { timeout }
173 }
174
175 #[must_use]
179 pub fn slicer(average_size: u64, size_variation: u64, delay: u64) -> Self {
180 Toxic::Slicer {
181 average_size,
182 size_variation,
183 delay,
184 }
185 }
186
187 #[must_use]
189 pub fn limit_data(bytes: u64) -> Self {
190 Toxic::LimitData { bytes }
191 }
192
193 fn type_name(&self) -> &'static str {
195 match self {
196 Toxic::Latency { .. } => "latency",
197 Toxic::Bandwidth { .. } => "bandwidth",
198 Toxic::Timeout { .. } => "timeout",
199 Toxic::Slicer { .. } => "slicer",
200 Toxic::LimitData { .. } => "limit_data",
201 }
202 }
203
204 fn attributes(&self) -> HashMap<String, u64> {
206 let mut attrs = HashMap::new();
207 match *self {
208 Toxic::Latency { latency, jitter } => {
209 attrs.insert("latency".to_string(), latency);
210 attrs.insert("jitter".to_string(), jitter);
211 }
212 Toxic::Bandwidth { rate } => {
213 attrs.insert("rate".to_string(), rate);
214 }
215 Toxic::Timeout { timeout } => {
216 attrs.insert("timeout".to_string(), timeout);
217 }
218 Toxic::Slicer {
219 average_size,
220 size_variation,
221 delay,
222 } => {
223 attrs.insert("average_size".to_string(), average_size);
224 attrs.insert("size_variation".to_string(), size_variation);
225 attrs.insert("delay".to_string(), delay);
226 }
227 Toxic::LimitData { bytes } => {
228 attrs.insert("bytes".to_string(), bytes);
229 }
230 }
231 attrs
232 }
233}
234
235#[derive(Debug, Serialize)]
237struct ProxyRequest {
238 name: String,
239 listen: String,
240 upstream: String,
241 enabled: bool,
242}
243
244#[derive(Debug, Clone, Deserialize)]
246pub struct ProxyInfo {
247 pub name: String,
249 pub listen: String,
251 pub upstream: String,
253 pub enabled: bool,
255}
256
257#[derive(Debug, Serialize)]
259struct ToxicRequest {
260 name: String,
261 #[serde(rename = "type")]
262 toxic_type: String,
263 stream: String,
264 toxicity: f64,
265 attributes: HashMap<String, u64>,
266}
267
268pub struct ToxiproxyTemplate {
277 config: TemplateConfig,
278 control_port: u16,
279 api_ready_timeout: Duration,
280}
281
282impl ToxiproxyTemplate {
283 pub fn new(name: impl Into<String>) -> Self {
289 let name = name.into();
290 let config = TemplateConfig {
291 name,
292 image: DEFAULT_IMAGE.to_string(),
293 tag: DEFAULT_TAG.to_string(),
294 ports: vec![(DEFAULT_CONTROL_PORT, DEFAULT_CONTROL_PORT)],
295 env: HashMap::new(),
296 volumes: Vec::new(),
297 network: None,
298 health_check: None,
299 auto_remove: false,
300 memory_limit: None,
301 cpu_limit: None,
302 platform: None,
303 };
304
305 Self {
306 config,
307 control_port: DEFAULT_CONTROL_PORT,
308 api_ready_timeout: Duration::from_secs(30),
309 }
310 }
311
312 pub fn control_port(mut self, port: u16) -> Self {
316 if let Some(pos) = self
318 .config
319 .ports
320 .iter()
321 .position(|(_, c)| *c == DEFAULT_CONTROL_PORT)
322 {
323 self.config.ports[pos] = (port, DEFAULT_CONTROL_PORT);
324 } else {
325 self.config.ports.push((port, DEFAULT_CONTROL_PORT));
326 }
327 self.control_port = port;
328 self
329 }
330
331 pub fn proxy_port(mut self, port: u16) -> Self {
349 if !self
350 .config
351 .ports
352 .iter()
353 .any(|(h, c)| *h == port && *c == port)
354 {
355 self.config.ports.push((port, port));
356 }
357 self
358 }
359
360 pub fn network(mut self, network: impl Into<String>) -> Self {
365 self.config.network = Some(network.into());
366 self
367 }
368
369 pub fn auto_remove(mut self) -> Self {
371 self.config.auto_remove = true;
372 self
373 }
374
375 pub fn custom_image(mut self, image: impl Into<String>, tag: impl Into<String>) -> Self {
377 self.config.image = image.into();
378 self.config.tag = tag.into();
379 self
380 }
381
382 pub fn platform(mut self, platform: impl Into<String>) -> Self {
384 self.config.platform = Some(platform.into());
385 self
386 }
387
388 pub fn api_ready_timeout(mut self, timeout: Duration) -> Self {
390 self.api_ready_timeout = timeout;
391 self
392 }
393
394 fn control_url(&self) -> String {
396 format!("http://localhost:{}", self.control_port)
397 }
398
399 fn http_client() -> Result<Client> {
401 Client::builder()
402 .timeout(Duration::from_secs(10))
403 .build()
404 .map_err(|e| {
405 TemplateError::DockerError(crate::Error::custom(format!(
406 "failed to build HTTP client: {e}"
407 )))
408 })
409 }
410
411 pub async fn wait_for_control_api(&self) -> Result<()> {
422 let client = Self::http_client()?;
423 let url = format!("{}/version", self.control_url());
424 let start = std::time::Instant::now();
425
426 while start.elapsed() < self.api_ready_timeout {
427 if let Ok(response) = client.get(&url).send().await {
428 if response.status().is_success() {
429 return Ok(());
430 }
431 }
432 tokio::time::sleep(Duration::from_millis(250)).await;
433 }
434
435 Err(TemplateError::Timeout(format!(
436 "Toxiproxy control API on port {} did not become ready within {}s",
437 self.control_port,
438 self.api_ready_timeout.as_secs()
439 )))
440 }
441
442 pub async fn create_proxy(
456 &self,
457 name: impl Into<String>,
458 listen: impl Into<String>,
459 upstream: impl Into<String>,
460 ) -> Result<ProxyInfo> {
461 let client = Self::http_client()?;
462 let body = ProxyRequest {
463 name: name.into(),
464 listen: listen.into(),
465 upstream: upstream.into(),
466 enabled: true,
467 };
468
469 let url = format!("{}/proxies", self.control_url());
470 let response = client
471 .post(&url)
472 .json(&body)
473 .send()
474 .await
475 .map_err(|e| map_request_err(&e))?;
476
477 let status = response.status();
478 if !status.is_success() {
479 let text = response.text().await.unwrap_or_default();
480 return Err(TemplateError::InvalidConfig(format!(
481 "failed to create proxy '{}': HTTP {status}: {text}",
482 body.name
483 )));
484 }
485
486 response
487 .json::<ProxyInfo>()
488 .await
489 .map_err(|e| map_request_err(&e))
490 }
491
492 pub async fn add_toxic(
507 &self,
508 proxy: &str,
509 name: impl Into<String>,
510 stream: ToxicStream,
511 toxic: Toxic,
512 ) -> Result<()> {
513 let client = Self::http_client()?;
514 let body = ToxicRequest {
515 name: name.into(),
516 toxic_type: toxic.type_name().to_string(),
517 stream: stream.as_str().to_string(),
518 toxicity: 1.0,
519 attributes: toxic.attributes(),
520 };
521
522 let url = format!("{}/proxies/{proxy}/toxics", self.control_url());
523 let response = client
524 .post(&url)
525 .json(&body)
526 .send()
527 .await
528 .map_err(|e| map_request_err(&e))?;
529
530 let status = response.status();
531 if !status.is_success() {
532 let text = response.text().await.unwrap_or_default();
533 return Err(TemplateError::InvalidConfig(format!(
534 "failed to add toxic '{}' to proxy '{proxy}': HTTP {status}: {text}",
535 body.name
536 )));
537 }
538
539 Ok(())
540 }
541
542 pub async fn remove_toxic(&self, proxy: &str, toxic: &str) -> Result<()> {
549 let client = Self::http_client()?;
550 let url = format!("{}/proxies/{proxy}/toxics/{toxic}", self.control_url());
551 let response = client
552 .delete(&url)
553 .send()
554 .await
555 .map_err(|e| map_request_err(&e))?;
556
557 let status = response.status();
558 if !status.is_success() {
559 let text = response.text().await.unwrap_or_default();
560 return Err(TemplateError::InvalidConfig(format!(
561 "failed to remove toxic '{toxic}' from proxy '{proxy}': HTTP {status}: {text}"
562 )));
563 }
564
565 Ok(())
566 }
567
568 pub async fn list_proxies(&self) -> Result<Vec<ProxyInfo>> {
575 let client = Self::http_client()?;
576 let url = format!("{}/proxies", self.control_url());
577 let response = client
578 .get(&url)
579 .send()
580 .await
581 .map_err(|e| map_request_err(&e))?;
582
583 let status = response.status();
584 if !status.is_success() {
585 let text = response.text().await.unwrap_or_default();
586 return Err(TemplateError::InvalidConfig(format!(
587 "failed to list proxies: HTTP {status}: {text}"
588 )));
589 }
590
591 let map = response
593 .json::<HashMap<String, ProxyInfo>>()
594 .await
595 .map_err(|e| map_request_err(&e))?;
596 Ok(map.into_values().collect())
597 }
598
599 pub async fn reset(&self) -> Result<()> {
609 let client = Self::http_client()?;
610 let url = format!("{}/reset", self.control_url());
611 let response = client
612 .post(&url)
613 .send()
614 .await
615 .map_err(|e| map_request_err(&e))?;
616
617 let status = response.status();
618 if !status.is_success() {
619 let text = response.text().await.unwrap_or_default();
620 return Err(TemplateError::InvalidConfig(format!(
621 "failed to reset Toxiproxy: HTTP {status}: {text}"
622 )));
623 }
624
625 Ok(())
626 }
627}
628
629fn map_request_err(e: &reqwest::Error) -> TemplateError {
631 TemplateError::DockerError(crate::Error::custom(format!(
632 "Toxiproxy control API request failed: {e}"
633 )))
634}
635
636#[async_trait]
637impl Template for ToxiproxyTemplate {
638 fn name(&self) -> &str {
639 &self.config.name
640 }
641
642 fn config(&self) -> &TemplateConfig {
643 &self.config
644 }
645
646 fn config_mut(&mut self) -> &mut TemplateConfig {
647 &mut self.config
648 }
649}
650
651#[cfg(test)]
652mod tests {
653 use super::*;
654
655 #[test]
656 fn test_toxiproxy_template_defaults() {
657 let template = ToxiproxyTemplate::new("test-toxiproxy");
658 assert_eq!(template.name(), "test-toxiproxy");
659 assert_eq!(template.config().image, DEFAULT_IMAGE);
660 assert_eq!(template.config().tag, DEFAULT_TAG);
661 assert_eq!(template.control_port, DEFAULT_CONTROL_PORT);
662 assert_eq!(template.config().ports, vec![(8474, 8474)]);
663 }
664
665 #[test]
666 fn test_control_port_replaces_mapping() {
667 let template = ToxiproxyTemplate::new("test").control_port(18474);
668 assert_eq!(template.control_port, 18474);
669 assert_eq!(template.config().ports, vec![(18474, 8474)]);
671 assert_eq!(template.control_url(), "http://localhost:18474");
672 }
673
674 #[test]
675 fn test_proxy_port_published() {
676 let template = ToxiproxyTemplate::new("test")
677 .proxy_port(16379)
678 .proxy_port(16379); let ports = &template.config().ports;
680 assert!(ports.contains(&(8474, 8474)));
681 assert!(ports.contains(&(16379, 16379)));
682 assert_eq!(ports.iter().filter(|p| **p == (16379, 16379)).count(), 1);
684 }
685
686 #[test]
687 fn test_network_and_custom_image() {
688 let template = ToxiproxyTemplate::new("test")
689 .network("chaos-net")
690 .custom_image("ghcr.io/shopify/toxiproxy", "latest")
691 .platform("linux/arm64");
692 assert_eq!(template.config().network.as_deref(), Some("chaos-net"));
693 assert_eq!(template.config().image, "ghcr.io/shopify/toxiproxy");
694 assert_eq!(template.config().tag, "latest");
695 assert_eq!(template.config().platform.as_deref(), Some("linux/arm64"));
696 }
697
698 #[test]
699 fn test_toxic_latency_attributes() {
700 let toxic = Toxic::latency(500);
701 assert_eq!(toxic.type_name(), "latency");
702 let attrs = toxic.attributes();
703 assert_eq!(attrs.get("latency"), Some(&500));
704 assert_eq!(attrs.get("jitter"), Some(&0));
705
706 let toxic = Toxic::jitter(500, 100);
707 let attrs = toxic.attributes();
708 assert_eq!(attrs.get("latency"), Some(&500));
709 assert_eq!(attrs.get("jitter"), Some(&100));
710 }
711
712 #[test]
713 fn test_toxic_bandwidth_attributes() {
714 let toxic = Toxic::bandwidth(64);
715 assert_eq!(toxic.type_name(), "bandwidth");
716 assert_eq!(toxic.attributes().get("rate"), Some(&64));
717 }
718
719 #[test]
720 fn test_toxic_timeout_attributes() {
721 let toxic = Toxic::timeout(0);
722 assert_eq!(toxic.type_name(), "timeout");
723 assert_eq!(toxic.attributes().get("timeout"), Some(&0));
724 }
725
726 #[test]
727 fn test_toxic_slicer_attributes() {
728 let toxic = Toxic::slicer(64, 32, 10);
729 assert_eq!(toxic.type_name(), "slicer");
730 let attrs = toxic.attributes();
731 assert_eq!(attrs.get("average_size"), Some(&64));
732 assert_eq!(attrs.get("size_variation"), Some(&32));
733 assert_eq!(attrs.get("delay"), Some(&10));
734 }
735
736 #[test]
737 fn test_toxic_limit_data_attributes() {
738 let toxic = Toxic::limit_data(2048);
739 assert_eq!(toxic.type_name(), "limit_data");
740 assert_eq!(toxic.attributes().get("bytes"), Some(&2048));
741 }
742
743 #[test]
744 fn test_toxic_stream_wire_values() {
745 assert_eq!(ToxicStream::Downstream.as_str(), "downstream");
746 assert_eq!(ToxicStream::Upstream.as_str(), "upstream");
747 }
748}