use super::*;
use anyhow::{Context, Result};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::time::{timeout, sleep};
use crate::integration::{
DockerTestEnv, SshHelper, SshConfig, TestAssertions, PerformanceUtils, EnvUtils
};
pub struct RoutingTests {
docker_env: DockerTestEnv,
ssh_helper: SshHelper,
}
impl RoutingTests {
pub fn new() -> Self {
Self {
docker_env: DockerTestEnv::new(),
ssh_helper: SshHelper::new(),
}
}
pub async fn setup(&self) -> Result<()> {
println!("Setting up routing test environment...");
EnvUtils::setup_test_environment()?;
self.docker_env.start().await
.context("Failed to start Docker test environment")?;
sleep(Duration::from_secs(5)).await;
let test_results = self.ssh_helper.test_all_containers().await?;
if !test_results.all_successful() {
anyhow::bail!("Not all containers are accessible: {:?}", test_results.failed_tests());
}
println!("โ
Routing test environment ready");
Ok(())
}
pub async fn cleanup(&self) -> Result<()> {
println!("Cleaning up routing test environment...");
self.docker_env.cleanup().await
.context("Failed to cleanup Docker test environment")?;
Ok(())
}
pub async fn test_multi_hop_connections(&self) -> Result<()> {
println!("๐งช Testing multi-hop SSH connections through bastion...");
let bastion_config = self.ssh_helper.config_for_container("bastion", 2224);
let bastion_connected = self.ssh_helper.test_connectivity(&bastion_config).await?;
TestAssertions::assert_ssh_success(
&crate::integration::SshCommandOutput {
exit_code: if bastion_connected { 0 } else { 1 },
stdout: "".to_string(),
stderr: "".to_string(),
},
"Bastion connection"
)?;
let backend_config = self.ssh_helper.config_for_jump_host("mitoxide_backend_target", 2224);
let backend_connected = self.ssh_helper.test_connectivity(&backend_config).await?;
TestAssertions::assert_ssh_success(
&crate::integration::SshCommandOutput {
exit_code: if backend_connected { 0 } else { 1 },
stdout: "".to_string(),
stderr: "".to_string(),
},
"Backend target via jump host"
)?;
let output = self.ssh_helper.execute_command(&backend_config, &["uname", "-a"]).await?;
TestAssertions::assert_ssh_success(&output, "Command execution through jump host")?;
TestAssertions::assert_output_contains(&output.stdout, "Linux", "uname output")?;
let test_content = "Hello from jump host test!";
let remote_path = "/tmp/jump_host_test.txt";
let write_output = self.ssh_helper.execute_command(
&backend_config,
&["sh", "-c", &format!("echo '{}' > {}", test_content, remote_path)]
).await?;
TestAssertions::assert_ssh_success(&write_output, "File write through jump host")?;
let read_output = self.ssh_helper.execute_command(
&backend_config,
&["cat", remote_path]
).await?;
TestAssertions::assert_ssh_success(&read_output, "File read through jump host")?;
TestAssertions::assert_output_contains(&read_output.stdout, test_content, "File content")?;
println!("โ
Multi-hop SSH connections test passed");
Ok(())
}
pub async fn test_connection_routing_multiplexing(&self) -> Result<()> {
println!("๐งช Testing connection routing and multiplexing...");
let targets = vec![
("ubuntu_min", 2223),
("alpine_ro", 2222),
];
let mut handles = Vec::new();
for (container, port) in targets {
let ssh_helper = self.ssh_helper.clone();
let handle = tokio::spawn(async move {
let config = ssh_helper.config_for_container(container, port);
let mut command_results = Vec::new();
for i in 0..5 {
let output = ssh_helper.execute_command(
&config,
&["echo", &format!("test_{}", i)]
).await?;
if !output.success() {
anyhow::bail!("Command {} failed on {}: {}", i, container, output.stderr);
}
command_results.push((i, output.stdout.trim().to_string()));
}
Ok::<_, anyhow::Error>((container, command_results))
});
handles.push(handle);
}
let mut all_results = Vec::new();
for handle in handles {
let (container, results) = handle.await??;
all_results.push((container, results));
}
for (container, results) in all_results {
assert_eq!(results.len(), 5, "Expected 5 command results for {}", container);
for (i, output) in results.iter() {
assert_eq!(output, &format!("test_{}", i),
"Unexpected output for command {} on {}: {}", i, container, output);
}
println!("โ
Container {} completed all {} commands", container, results.len());
}
println!("โ
Connection routing and multiplexing test passed");
Ok(())
}
pub async fn test_connection_failure_recovery(&self) -> Result<()> {
println!("๐งช Testing connection failure and recovery...");
let config = self.ssh_helper.config_for_container("ubuntu_min", 2223);
let output = self.ssh_helper.execute_command(&config, &["echo", "before_failure"]).await?;
TestAssertions::assert_ssh_success(&output, "Normal operation before failure")?;
TestAssertions::assert_output_contains(&output.stdout, "before_failure", "Pre-failure output")?;
println!("Simulating network failure...");
let stop_output = self.docker_env.exec_command("ubuntu_min", &["pkill", "-f", "sshd"]).await;
sleep(Duration::from_secs(2)).await;
let failure_result = self.ssh_helper.test_connectivity(&config).await?;
assert!(!failure_result, "Expected connection to fail during network failure");
println!("Recovering from network failure...");
sleep(Duration::from_secs(3)).await;
let restart_output = self.docker_env.exec_command("ubuntu_min", &["service", "ssh", "start"]).await;
sleep(Duration::from_secs(5)).await;
let recovery_result = self.ssh_helper.test_connectivity(&config).await?;
if !recovery_result {
let _ = self.docker_env.exec_command("ubuntu_min", &["service", "ssh", "restart"]).await;
sleep(Duration::from_secs(3)).await;
}
let recovery_output = self.ssh_helper.execute_command(&config, &["echo", "after_recovery"]).await?;
TestAssertions::assert_ssh_success(&recovery_output, "Operation after recovery")?;
TestAssertions::assert_output_contains(&recovery_output.stdout, "after_recovery", "Post-recovery output")?;
println!("โ
Connection failure and recovery test passed");
Ok(())
}
pub async fn test_load_balancing_connection_pooling(&self) -> Result<()> {
println!("๐งช Testing load balancing and connection pooling...");
let config = self.ssh_helper.config_for_container("ubuntu_min", 2223);
let num_concurrent = 10;
let mut handles = Vec::new();
for i in 0..num_concurrent {
let ssh_helper = self.ssh_helper.clone();
let config_clone = config.clone();
let handle = tokio::spawn(async move {
let start_time = Instant::now();
let connectivity_result = ssh_helper.test_connectivity(&config_clone).await?;
let connection_time = start_time.elapsed();
if !connectivity_result {
anyhow::bail!("Connection {} failed", i);
}
let output = ssh_helper.execute_command(
&config_clone,
&["echo", &format!("pool_test_{}", i)]
).await?;
if !output.success() {
anyhow::bail!("Command execution failed for connection {}: {}", i, output.stderr);
}
Ok::<_, anyhow::Error>((i, connection_time, output.stdout.trim().to_string()))
});
handles.push(handle);
}
let mut results = Vec::new();
for handle in handles {
let (i, time, output) = handle.await??;
println!("Connection {}: established in {:?}, output: {}", i, time, output);
results.push((i, time, output));
}
assert_eq!(results.len(), num_concurrent, "Not all connections completed");
let avg_time = results.iter()
.map(|(_, time, _)| time.as_millis())
.sum::<u128>() as f64 / results.len() as f64;
println!("Average connection time: {:.2}ms", avg_time);
for (i, time, _) in &results {
TestAssertions::assert_performance_threshold(
*time,
Duration::from_secs(5),
&format!("Connection {} establishment", i)
)?;
}
for (i, _, output) in &results {
assert_eq!(output, &format!("pool_test_{}", i),
"Unexpected output for connection {}: {}", i, output);
}
println!("โ
Load balancing and connection pooling test passed");
Ok(())
}
pub async fn test_routing_performance(&self) -> Result<()> {
println!("๐งช Testing routing performance under load...");
let config = self.ssh_helper.config_for_container("ubuntu_min", 2223);
let num_operations = 20; let mut latencies = Vec::new();
for i in 0..num_operations {
let (output_result, latency) = PerformanceUtils::measure_async(
self.ssh_helper.execute_command(&config, &["echo", &format!("perf_test_{}", i)])
).await;
let output = output_result?;
TestAssertions::assert_ssh_success(&output, &format!("Performance test {}", i))?;
latencies.push(latency);
}
latencies.sort();
let p50 = latencies[latencies.len() / 2];
let p95 = latencies[latencies.len() * 95 / 100];
let p99 = latencies[latencies.len() * 99 / 100];
println!("Performance results:");
println!(" P50 latency: {}", PerformanceUtils::format_duration(p50));
println!(" P95 latency: {}", PerformanceUtils::format_duration(p95));
println!(" P99 latency: {}", PerformanceUtils::format_duration(p99));
TestAssertions::assert_performance_threshold(
p50,
Duration::from_secs(2),
"P50 latency"
)?;
TestAssertions::assert_performance_threshold(
p95,
Duration::from_secs(5),
"P95 latency"
)?;
let concurrent_ops = 10; let mut handles = Vec::new();
let start_time = Instant::now();
for i in 0..concurrent_ops {
let ssh_helper = self.ssh_helper.clone();
let config_clone = config.clone();
let handle = tokio::spawn(async move {
ssh_helper.execute_command(&config_clone, &["echo", &format!("concurrent_{}", i)]).await
});
handles.push(handle);
}
let mut successful_ops = 0;
for handle in handles {
if let Ok(Ok(output)) = handle.await {
if output.success() {
successful_ops += 1;
}
}
}
let total_time = start_time.elapsed();
let ops_per_second = successful_ops as f64 / total_time.as_secs_f64();
println!("Throughput: {:.2} ops/second ({}/{} successful)",
ops_per_second, successful_ops, concurrent_ops);
if ops_per_second < 2.0 {
anyhow::bail!("Throughput too low: {:.2} ops/second (minimum: 2.0)", ops_per_second);
}
if successful_ops < concurrent_ops * 8 / 10 {
anyhow::bail!("Too many operations failed: {}/{}",
concurrent_ops - successful_ops, concurrent_ops);
}
println!("โ
Routing performance test passed");
Ok(())
}
pub async fn run_all_tests(&self) -> Result<()> {
println!("๐ Running comprehensive routing tests...");
self.setup().await?;
let test_results = vec![
("Multi-hop connections", self.test_multi_hop_connections().await),
("Connection routing and multiplexing", self.test_connection_routing_multiplexing().await),
("Connection failure and recovery", self.test_connection_failure_recovery().await),
("Load balancing and connection pooling", self.test_load_balancing_connection_pooling().await),
("Routing performance", self.test_routing_performance().await),
];
let mut failed_tests = Vec::new();
let mut passed_tests = Vec::new();
for (test_name, result) in test_results {
match result {
Ok(()) => {
passed_tests.push(test_name);
println!("โ
{} - PASSED", test_name);
}
Err(e) => {
println!("โ {} - FAILED: {}", test_name, e);
failed_tests.push((test_name, e));
}
}
}
self.cleanup().await?;
println!("\n๐ Test Results Summary:");
println!(" Passed: {}", passed_tests.len());
println!(" Failed: {}", failed_tests.len());
if !failed_tests.is_empty() {
println!("\nโ Failed tests:");
for (test_name, error) in &failed_tests {
println!(" - {}: {}", test_name, error);
}
anyhow::bail!("Some routing tests failed");
}
println!("\n๐ All routing tests passed!");
Ok(())
}
}