mitoxide 0.1.0

Rust library for remote execution and automation inspired by Mitogen
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
//! Comprehensive integration tests for jump host and routing functionality
//! 
//! These tests verify multi-hop SSH connections, connection routing, multiplexing,
//! failure recovery, and load balancing across different network topologies.

use super::*;
use anyhow::{Context, Result};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::time::{timeout, sleep};
// Note: These imports would be used in actual implementation
// For now, we'll simulate the behavior with mock implementations
// use mitoxide::{Session, Context as MitoxideContext, MitoxideError};
// use mitoxide_ssh::{ConnectionPool, PoolConfig, SshConfig as MitoxideSshConfig};
use crate::integration::{
    DockerTestEnv, SshHelper, SshConfig, TestAssertions, PerformanceUtils, EnvUtils
};

/// Test jump host and routing functionality
pub struct RoutingTests {
    docker_env: DockerTestEnv,
    ssh_helper: SshHelper,
}

impl RoutingTests {
    /// Create new routing tests instance
    pub fn new() -> Self {
        Self {
            docker_env: DockerTestEnv::new(),
            ssh_helper: SshHelper::new(),
        }
    }
    
    /// Setup test environment
    pub async fn setup(&self) -> Result<()> {
        println!("Setting up routing test environment...");
        
        // Check prerequisites
        EnvUtils::setup_test_environment()?;
        
        // Start Docker containers
        self.docker_env.start().await
            .context("Failed to start Docker test environment")?;
        
        // Wait for SSH services to be ready
        sleep(Duration::from_secs(5)).await;
        
        // Test basic connectivity
        let test_results = self.ssh_helper.test_all_containers().await?;
        if !test_results.all_successful() {
            anyhow::bail!("Not all containers are accessible: {:?}", test_results.failed_tests());
        }
        
        println!("โœ… Routing test environment ready");
        Ok(())
    }
    
    /// Cleanup test environment
    pub async fn cleanup(&self) -> Result<()> {
        println!("Cleaning up routing test environment...");
        self.docker_env.cleanup().await
            .context("Failed to cleanup Docker test environment")?;
        Ok(())
    }
    
    /// Test multi-hop SSH connections through bastion
    pub async fn test_multi_hop_connections(&self) -> Result<()> {
        println!("๐Ÿงช Testing multi-hop SSH connections through bastion...");
        
        // Test direct connection to bastion
        let bastion_config = self.ssh_helper.config_for_container("bastion", 2224);
        let bastion_connected = self.ssh_helper.test_connectivity(&bastion_config).await?;
        TestAssertions::assert_ssh_success(
            &crate::integration::SshCommandOutput {
                exit_code: if bastion_connected { 0 } else { 1 },
                stdout: "".to_string(),
                stderr: "".to_string(),
            },
            "Bastion connection"
        )?;
        
        // Test connection to backend target through bastion
        let backend_config = self.ssh_helper.config_for_jump_host("mitoxide_backend_target", 2224);
        let backend_connected = self.ssh_helper.test_connectivity(&backend_config).await?;
        TestAssertions::assert_ssh_success(
            &crate::integration::SshCommandOutput {
                exit_code: if backend_connected { 0 } else { 1 },
                stdout: "".to_string(),
                stderr: "".to_string(),
            },
            "Backend target via jump host"
        )?;
        
        // Test command execution through jump host
        let output = self.ssh_helper.execute_command(&backend_config, &["uname", "-a"]).await?;
        TestAssertions::assert_ssh_success(&output, "Command execution through jump host")?;
        TestAssertions::assert_output_contains(&output.stdout, "Linux", "uname output")?;
        
        // Test file operations through jump host
        let test_content = "Hello from jump host test!";
        let remote_path = "/tmp/jump_host_test.txt";
        
        // Write file through jump host
        let write_output = self.ssh_helper.execute_command(
            &backend_config,
            &["sh", "-c", &format!("echo '{}' > {}", test_content, remote_path)]
        ).await?;
        TestAssertions::assert_ssh_success(&write_output, "File write through jump host")?;
        
        // Read file through jump host
        let read_output = self.ssh_helper.execute_command(
            &backend_config,
            &["cat", remote_path]
        ).await?;
        TestAssertions::assert_ssh_success(&read_output, "File read through jump host")?;
        TestAssertions::assert_output_contains(&read_output.stdout, test_content, "File content")?;
        
        println!("โœ… Multi-hop SSH connections test passed");
        Ok(())
    }
    
    /// Test connection routing and multiplexing
    pub async fn test_connection_routing_multiplexing(&self) -> Result<()> {
        println!("๐Ÿงช Testing connection routing and multiplexing...");
        
        // Test concurrent SSH operations to multiple targets
        let targets = vec![
            ("ubuntu_min", 2223),
            ("alpine_ro", 2222),
        ];
        
        let mut handles = Vec::new();
        
        for (container, port) in targets {
            let ssh_helper = self.ssh_helper.clone();
            let handle = tokio::spawn(async move {
                let config = ssh_helper.config_for_container(container, port);
                
                // Execute multiple concurrent commands
                let mut command_results = Vec::new();
                
                for i in 0..5 {
                    let output = ssh_helper.execute_command(
                        &config,
                        &["echo", &format!("test_{}", i)]
                    ).await?;
                    
                    if !output.success() {
                        anyhow::bail!("Command {} failed on {}: {}", i, container, output.stderr);
                    }
                    
                    command_results.push((i, output.stdout.trim().to_string()));
                }
                
                Ok::<_, anyhow::Error>((container, command_results))
            });
            
            handles.push(handle);
        }
        
        // Wait for all operations to complete
        let mut all_results = Vec::new();
        for handle in handles {
            let (container, results) = handle.await??;
            all_results.push((container, results));
        }
        
        // Verify all operations completed successfully
        for (container, results) in all_results {
            assert_eq!(results.len(), 5, "Expected 5 command results for {}", container);
            for (i, output) in results.iter() {
                assert_eq!(output, &format!("test_{}", i), 
                          "Unexpected output for command {} on {}: {}", i, container, output);
            }
            println!("โœ… Container {} completed all {} commands", container, results.len());
        }
        
        println!("โœ… Connection routing and multiplexing test passed");
        Ok(())
    }
    
    /// Test connection failure and recovery
    pub async fn test_connection_failure_recovery(&self) -> Result<()> {
        println!("๐Ÿงช Testing connection failure and recovery...");
        
        let config = self.ssh_helper.config_for_container("ubuntu_min", 2223);
        
        // Test normal operation
        let output = self.ssh_helper.execute_command(&config, &["echo", "before_failure"]).await?;
        TestAssertions::assert_ssh_success(&output, "Normal operation before failure")?;
        TestAssertions::assert_output_contains(&output.stdout, "before_failure", "Pre-failure output")?;
        
        // Simulate network failure by stopping SSH service
        println!("Simulating network failure...");
        let stop_output = self.docker_env.exec_command("ubuntu_min", &["pkill", "-f", "sshd"]).await;
        // Note: This might fail if sshd is not running as expected, which is okay for the test
        
        // Wait a moment for the failure to propagate
        sleep(Duration::from_secs(2)).await;
        
        // Try to execute command - should fail
        let failure_result = self.ssh_helper.test_connectivity(&config).await?;
        assert!(!failure_result, "Expected connection to fail during network failure");
        
        // Restart the container's SSH service
        println!("Recovering from network failure...");
        sleep(Duration::from_secs(3)).await;
        
        // The container should automatically restart SSH, but let's ensure it's running
        let restart_output = self.docker_env.exec_command("ubuntu_min", &["service", "ssh", "start"]).await;
        // This might fail if SSH is already running, which is fine
        
        // Wait for recovery
        sleep(Duration::from_secs(5)).await;
        
        // Test that operations work again
        let recovery_result = self.ssh_helper.test_connectivity(&config).await?;
        if !recovery_result {
            // If SSH didn't restart automatically, try to restart it
            let _ = self.docker_env.exec_command("ubuntu_min", &["service", "ssh", "restart"]).await;
            sleep(Duration::from_secs(3)).await;
        }
        
        let recovery_output = self.ssh_helper.execute_command(&config, &["echo", "after_recovery"]).await?;
        TestAssertions::assert_ssh_success(&recovery_output, "Operation after recovery")?;
        TestAssertions::assert_output_contains(&recovery_output.stdout, "after_recovery", "Post-recovery output")?;
        
        println!("โœ… Connection failure and recovery test passed");
        Ok(())
    }
    
    /// Test load balancing and connection pooling
    pub async fn test_load_balancing_connection_pooling(&self) -> Result<()> {
        println!("๐Ÿงช Testing load balancing and connection pooling...");
        
        // Simulate connection pooling by testing concurrent SSH connections
        let config = self.ssh_helper.config_for_container("ubuntu_min", 2223);
        
        // Test concurrent connection requests
        let num_concurrent = 10;
        let mut handles = Vec::new();
        
        for i in 0..num_concurrent {
            let ssh_helper = self.ssh_helper.clone();
            let config_clone = config.clone();
            let handle = tokio::spawn(async move {
                let start_time = Instant::now();
                
                // Test connection establishment time
                let connectivity_result = ssh_helper.test_connectivity(&config_clone).await?;
                let connection_time = start_time.elapsed();
                
                if !connectivity_result {
                    anyhow::bail!("Connection {} failed", i);
                }
                
                // Execute a command to simulate work
                let output = ssh_helper.execute_command(
                    &config_clone,
                    &["echo", &format!("pool_test_{}", i)]
                ).await?;
                
                if !output.success() {
                    anyhow::bail!("Command execution failed for connection {}: {}", i, output.stderr);
                }
                
                Ok::<_, anyhow::Error>((i, connection_time, output.stdout.trim().to_string()))
            });
            
            handles.push(handle);
        }
        
        // Wait for all connections to complete
        let mut results = Vec::new();
        for handle in handles {
            let (i, time, output) = handle.await??;
            println!("Connection {}: established in {:?}, output: {}", i, time, output);
            results.push((i, time, output));
        }
        
        // Verify all connections succeeded
        assert_eq!(results.len(), num_concurrent, "Not all connections completed");
        
        // Verify connection efficiency
        let avg_time = results.iter()
            .map(|(_, time, _)| time.as_millis())
            .sum::<u128>() as f64 / results.len() as f64;
        
        println!("Average connection time: {:.2}ms", avg_time);
        
        // Assert reasonable performance (connections should be under 5 seconds each)
        for (i, time, _) in &results {
            TestAssertions::assert_performance_threshold(
                *time,
                Duration::from_secs(5),
                &format!("Connection {} establishment", i)
            )?;
        }
        
        // Verify all outputs are correct
        for (i, _, output) in &results {
            assert_eq!(output, &format!("pool_test_{}", i), 
                      "Unexpected output for connection {}: {}", i, output);
        }
        
        println!("โœ… Load balancing and connection pooling test passed");
        Ok(())
    }
    
    /// Test routing performance under load
    pub async fn test_routing_performance(&self) -> Result<()> {
        println!("๐Ÿงช Testing routing performance under load...");
        
        let config = self.ssh_helper.config_for_container("ubuntu_min", 2223);
        
        // Measure latency for single operations
        let num_operations = 20; // Reduced for faster testing
        let mut latencies = Vec::new();
        
        for i in 0..num_operations {
            let (output_result, latency) = PerformanceUtils::measure_async(
                self.ssh_helper.execute_command(&config, &["echo", &format!("perf_test_{}", i)])
            ).await;
            
            let output = output_result?;
            TestAssertions::assert_ssh_success(&output, &format!("Performance test {}", i))?;
            latencies.push(latency);
        }
        
        // Calculate performance statistics
        latencies.sort();
        let p50 = latencies[latencies.len() / 2];
        let p95 = latencies[latencies.len() * 95 / 100];
        let p99 = latencies[latencies.len() * 99 / 100];
        
        println!("Performance results:");
        println!("  P50 latency: {}", PerformanceUtils::format_duration(p50));
        println!("  P95 latency: {}", PerformanceUtils::format_duration(p95));
        println!("  P99 latency: {}", PerformanceUtils::format_duration(p99));
        
        // Assert performance thresholds (more lenient for SSH overhead)
        TestAssertions::assert_performance_threshold(
            p50,
            Duration::from_secs(2),
            "P50 latency"
        )?;
        
        TestAssertions::assert_performance_threshold(
            p95,
            Duration::from_secs(5),
            "P95 latency"
        )?;
        
        // Test concurrent throughput
        let concurrent_ops = 10; // Reduced for faster testing
        let mut handles = Vec::new();
        
        let start_time = Instant::now();
        
        for i in 0..concurrent_ops {
            let ssh_helper = self.ssh_helper.clone();
            let config_clone = config.clone();
            let handle = tokio::spawn(async move {
                ssh_helper.execute_command(&config_clone, &["echo", &format!("concurrent_{}", i)]).await
            });
            handles.push(handle);
        }
        
        // Wait for all operations to complete
        let mut successful_ops = 0;
        for handle in handles {
            if let Ok(Ok(output)) = handle.await {
                if output.success() {
                    successful_ops += 1;
                }
            }
        }
        
        let total_time = start_time.elapsed();
        let ops_per_second = successful_ops as f64 / total_time.as_secs_f64();
        
        println!("Throughput: {:.2} ops/second ({}/{} successful)", 
                ops_per_second, successful_ops, concurrent_ops);
        
        // Assert minimum throughput (more lenient for SSH)
        if ops_per_second < 2.0 {
            anyhow::bail!("Throughput too low: {:.2} ops/second (minimum: 2.0)", ops_per_second);
        }
        
        // Assert that most operations succeeded
        if successful_ops < concurrent_ops * 8 / 10 {
            anyhow::bail!("Too many operations failed: {}/{}", 
                         concurrent_ops - successful_ops, concurrent_ops);
        }
        
        println!("โœ… Routing performance test passed");
        Ok(())
    }
    
    /// Run all routing tests
    pub async fn run_all_tests(&self) -> Result<()> {
        println!("๐Ÿš€ Running comprehensive routing tests...");
        
        self.setup().await?;
        
        let test_results = vec![
            ("Multi-hop connections", self.test_multi_hop_connections().await),
            ("Connection routing and multiplexing", self.test_connection_routing_multiplexing().await),
            ("Connection failure and recovery", self.test_connection_failure_recovery().await),
            ("Load balancing and connection pooling", self.test_load_balancing_connection_pooling().await),
            ("Routing performance", self.test_routing_performance().await),
        ];
        
        let mut failed_tests = Vec::new();
        let mut passed_tests = Vec::new();
        
        for (test_name, result) in test_results {
            match result {
                Ok(()) => {
                    passed_tests.push(test_name);
                    println!("โœ… {} - PASSED", test_name);
                }
                Err(e) => {
                    println!("โŒ {} - FAILED: {}", test_name, e);
                    failed_tests.push((test_name, e));
                }
            }
        }
        
        self.cleanup().await?;
        
        println!("\n๐Ÿ“Š Test Results Summary:");
        println!("  Passed: {}", passed_tests.len());
        println!("  Failed: {}", failed_tests.len());
        
        if !failed_tests.is_empty() {
            println!("\nโŒ Failed tests:");
            for (test_name, error) in &failed_tests {
                println!("  - {}: {}", test_name, error);
            }
            anyhow::bail!("Some routing tests failed");
        }
        
        println!("\n๐ŸŽ‰ All routing tests passed!");
        Ok(())
    }
}