use crate::{
config::Config,
resilience::{CircuitBreakerConfig, HealthStatus, ResilienceManager, RetryConfig},
};
use anyhow::Result;
use clap::{Args, Subcommand};
use serde_json;
#[derive(Args)]
pub struct ResilienceArgs {
#[command(subcommand)]
pub command: ResilienceCommand,
}
#[derive(Subcommand)]
pub enum ResilienceCommand {
#[command(about = "Show resilience system status")]
Status,
#[command(about = "Manage circuit breakers")]
CircuitBreaker {
#[command(subcommand)]
action: CircuitBreakerAction,
},
#[command(about = "Manage bulkheads")]
Bulkhead {
#[command(subcommand)]
action: BulkheadAction,
},
#[command(about = "Test resilience patterns")]
Test {
#[arg(long, help = "Pattern to test")]
pattern: String,
#[arg(long, help = "Number of test requests")]
requests: Option<u32>,
#[arg(long, help = "Failure rate (0.0-1.0)")]
failure_rate: Option<f64>,
},
#[command(about = "Export resilience metrics")]
Metrics {
#[arg(long, help = "Output format", value_enum)]
format: Option<MetricsFormat>,
#[arg(long, help = "Output file")]
output: Option<String>,
},
#[command(about = "Configure resilience settings")]
Configure {
#[command(subcommand)]
action: ConfigureAction,
},
}
#[derive(Subcommand)]
pub enum CircuitBreakerAction {
#[command(about = "List all circuit breakers")]
List,
#[command(about = "Show circuit breaker details")]
Show {
#[arg(help = "Circuit breaker name")]
name: String,
},
#[command(about = "Reset circuit breaker")]
Reset {
#[arg(help = "Circuit breaker name")]
name: String,
},
}
#[derive(Subcommand)]
pub enum BulkheadAction {
#[command(about = "List all bulkheads")]
List,
#[command(about = "Show bulkhead details")]
Show {
#[arg(help = "Bulkhead name")]
name: String,
},
}
#[derive(Subcommand)]
pub enum ConfigureAction {
#[command(about = "Configure circuit breaker")]
CircuitBreaker {
#[arg(help = "Circuit breaker name")]
name: String,
#[arg(long, help = "Failure threshold")]
failure_threshold: Option<u32>,
#[arg(long, help = "Recovery timeout (ms)")]
recovery_timeout: Option<u64>,
#[arg(long, help = "Success threshold")]
success_threshold: Option<u32>,
},
#[command(about = "Configure retry policy")]
Retry {
#[arg(help = "Retry policy name")]
name: String,
#[arg(long, help = "Max attempts")]
max_attempts: Option<usize>,
#[arg(long, help = "Initial delay (ms)")]
initial_delay: Option<u64>,
#[arg(long, help = "Backoff multiplier")]
backoff_multiplier: Option<f64>,
},
#[command(about = "Configure bulkhead")]
Bulkhead {
#[arg(help = "Bulkhead name")]
name: String,
#[arg(long, help = "Max concurrent requests")]
max_concurrent: Option<usize>,
},
}
#[derive(clap::ValueEnum, Clone)]
pub enum MetricsFormat {
Json,
Prometheus,
Table,
}
pub async fn execute(args: ResilienceArgs, _config: &Config) -> Result<()> {
match args.command {
ResilienceCommand::Status => show_resilience_status().await,
ResilienceCommand::CircuitBreaker { action } => handle_circuit_breaker_action(action).await,
ResilienceCommand::Bulkhead { action } => handle_bulkhead_action(action).await,
ResilienceCommand::Test {
pattern,
requests,
failure_rate,
} => {
let requests = requests.unwrap_or(10);
let failure_rate = failure_rate.unwrap_or(0.2);
if pattern.is_empty() {
anyhow::bail!("Pattern name cannot be empty");
}
if requests == 0 {
anyhow::bail!("Request count must be at least 1");
}
if requests > 10000 {
anyhow::bail!("Request count cannot exceed 10000");
}
if !(0.0..=1.0).contains(&failure_rate) {
anyhow::bail!("Failure rate must be between 0.0 and 1.0");
}
test_resilience_pattern(pattern, requests, failure_rate).await
}
ResilienceCommand::Metrics { format, output } => {
export_resilience_metrics(format.unwrap_or(MetricsFormat::Json), output).await
}
ResilienceCommand::Configure { action } => handle_configure_action(action).await,
}
}
async fn show_resilience_status() -> Result<()> {
println!("Resilience System Status");
println!("========================");
let manager = ResilienceManager::new();
manager.add_circuit_breaker(
"inference-service".to_string(),
CircuitBreakerConfig::default(),
)?;
manager.add_bulkhead("batch-processing".to_string(), 10)?;
manager.add_retry_policy("model-loading".to_string(), RetryConfig::default())?;
println!("\n🔄 Circuit Breakers:");
if let Some(cb) = manager.get_circuit_breaker("inference-service") {
let state = cb.get_state();
let metrics = cb.get_metrics();
println!(" • inference-service: {:?}", state);
println!(
" - Total requests: {}",
metrics
.total_requests
.load(std::sync::atomic::Ordering::Relaxed)
);
println!(" - Success rate: {:.2}%", {
let total = metrics
.total_requests
.load(std::sync::atomic::Ordering::Relaxed);
let successful = metrics
.successful_requests
.load(std::sync::atomic::Ordering::Relaxed);
if total > 0 {
(successful as f64 / total as f64) * 100.0
} else {
0.0
}
});
}
println!("\n🛡️ Bulkheads:");
if let Some(bh) = manager.get_bulkhead("batch-processing") {
println!(" • batch-processing:");
println!(" - Active requests: {}", bh.get_active_requests());
println!(" - Total requests: {}", bh.get_total_requests());
println!(" - Rejected requests: {}", bh.get_rejected_requests());
}
println!("\n🔁 Retry Policies:");
if manager.get_retry_policy("model-loading").is_some() {
println!(" • model-loading: Configured (max 3 attempts, exponential backoff)");
}
println!("\n🏥 Health Status:");
let health_status = manager.get_system_health();
if health_status.is_empty() {
println!(" No health monitors configured");
} else {
for (service, status) in health_status {
let status_icon = match status {
HealthStatus::Healthy => "✅",
HealthStatus::Unhealthy => "❌",
HealthStatus::Unknown => "❓",
};
println!(" {} {}: {:?}", status_icon, service, status);
}
}
println!("\n📊 System Health: All resilience patterns operational");
Ok(())
}
async fn handle_circuit_breaker_action(action: CircuitBreakerAction) -> Result<()> {
match action {
CircuitBreakerAction::List => {
println!("Circuit Breakers:");
println!("• inference-service: CLOSED (healthy)");
println!("• batch-processing: CLOSED (healthy)");
println!("• model-cache: HALF_OPEN (testing)");
}
CircuitBreakerAction::Show { name } => {
println!("Circuit Breaker: {}", name);
println!("================");
println!("State: CLOSED");
println!("Failure Threshold: 5");
println!("Recovery Timeout: 60s");
println!("Success Threshold: 3");
println!();
println!("Statistics:");
println!("• Total Requests: 1,234");
println!("• Successful: 1,220 (98.9%)");
println!("• Failed: 14 (1.1%)");
println!("• Rejected: 0");
println!("• State Changes: 2");
}
CircuitBreakerAction::Reset { name } => {
if name.is_empty() {
anyhow::bail!("Circuit breaker name cannot be empty");
}
println!(
"✅ Circuit breaker '{}' has been reset to CLOSED state",
name
);
println!("All failure counters have been cleared");
}
}
Ok(())
}
async fn handle_bulkhead_action(action: BulkheadAction) -> Result<()> {
match action {
BulkheadAction::List => {
println!("Bulkheads:");
println!("• inference-requests: 5/100 active");
println!("• batch-processing: 0/50 active");
println!("• model-operations: 2/25 active");
}
BulkheadAction::Show { name } => {
println!("Bulkhead: {}", name);
println!("==========");
println!("Max Concurrent: 100");
println!("Active Requests: 5");
println!("Total Requests: 8,456");
println!("Rejected Requests: 23");
println!("Utilization: 5.0%");
}
}
Ok(())
}
async fn test_resilience_pattern(pattern: String, requests: u32, failure_rate: f64) -> Result<()> {
println!("Testing resilience pattern: {}", pattern);
println!(
"Requests: {}, Failure rate: {:.1}%",
requests,
failure_rate * 100.0
);
println!();
let manager = ResilienceManager::new();
match pattern.as_str() {
"circuit-breaker" => {
println!("🔄 Testing Circuit Breaker...");
manager.add_circuit_breaker(
"test-service".to_string(),
CircuitBreakerConfig {
failure_threshold: 3,
recovery_timeout_ms: 5000,
success_threshold: 2,
timeout_ms: 1000,
max_concurrent_requests: 10,
},
)?;
if let Some(cb) = manager.get_circuit_breaker("test-service") {
let mut success_count = 0;
let mut failure_count = 0;
let mut rejected_count = 0;
for i in 1..=requests {
let should_fail = rand::random::<f64>() < failure_rate;
let result = cb
.call(|| async {
if should_fail {
Err(anyhow::anyhow!("Simulated failure"))
} else {
Ok("Success")
}
})
.await;
match result {
Ok(_) => {
success_count += 1;
print!("✅");
}
Err(e) if e.to_string().contains("Circuit breaker") => {
rejected_count += 1;
print!("🚫");
}
Err(_) => {
failure_count += 1;
print!("❌");
}
}
if i % 10 == 0 {
println!(" [{}]", i);
println!("State: {:?}", cb.get_state());
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
println!("\n\nTest Results:");
println!("• Successful: {}", success_count);
println!("• Failed: {}", failure_count);
println!("• Rejected: {}", rejected_count);
println!("• Final State: {:?}", cb.get_state());
}
}
"retry" => {
println!("🔁 Testing Retry Policy...");
manager.add_retry_policy(
"test-retry".to_string(),
RetryConfig {
max_attempts: 3,
initial_delay_ms: 100,
max_delay_ms: 1000,
backoff_multiplier: 2.0,
jitter_enabled: true,
retry_on_timeout: true,
},
)?;
if let Some(retry) = manager.get_retry_policy("test-retry") {
let mut successes = 0;
for i in 1..=requests {
let should_fail = rand::random::<f64>() < failure_rate;
let result = retry
.execute(|| async {
if should_fail {
Err(anyhow::anyhow!("Simulated failure"))
} else {
Ok("Success")
}
})
.await;
match result {
Ok(_) => {
successes += 1;
print!("✅");
}
Err(_) => {
print!("❌");
}
}
if i % 10 == 0 {
println!(" [{}]", i);
}
}
println!("\n\nTest Results:");
println!("• Successful requests: {}", successes);
println!("• Failed requests: {}", requests - successes);
println!(
"• Success rate: {:.2}%",
(successes as f64 / requests as f64) * 100.0
);
}
}
"bulkhead" => {
println!("🛡️ Testing Bulkhead...");
manager.add_bulkhead("test-bulkhead".to_string(), 5)?;
if let Some(bulkhead) = manager.get_bulkhead("test-bulkhead") {
let mut handles = vec![];
for i in 1..=requests {
let bh = bulkhead.clone();
let handle = tokio::spawn(async move {
let result = bh
.execute(|| async {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Ok(format!("Request {}", i))
})
.await;
result
});
handles.push(handle);
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
let mut successes = 0;
let mut rejections = 0;
for handle in handles {
match handle.await.unwrap() {
Ok(_) => {
successes += 1;
print!("✅");
}
Err(_) => {
rejections += 1;
print!("🚫");
}
}
}
println!("\n\nTest Results:");
println!("• Successful: {}", successes);
println!("• Rejected: {}", rejections);
println!("• Total handled: {}", bulkhead.get_total_requests());
println!("• Total rejected: {}", bulkhead.get_rejected_requests());
}
}
_ => {
println!("❌ Unknown pattern: {}", pattern);
println!("Available patterns: circuit-breaker, retry, bulkhead");
}
}
Ok(())
}
async fn export_resilience_metrics(format: MetricsFormat, output: Option<String>) -> Result<()> {
let manager = ResilienceManager::new();
manager.add_circuit_breaker("inference".to_string(), CircuitBreakerConfig::default())?;
manager.add_bulkhead("batch".to_string(), 10)?;
let metrics = manager.get_resilience_metrics();
let output_data = match format {
MetricsFormat::Json => serde_json::to_string_pretty(&metrics)?,
MetricsFormat::Prometheus => {
let mut prometheus_output = String::new();
for (name, value) in metrics {
prometheus_output.push_str(&format!(
"inferno_{} {}\n",
name,
serde_json::to_string(&value)?
));
}
prometheus_output
}
MetricsFormat::Table => {
let mut table_output = String::new();
table_output.push_str("Component | Metric | Value\n");
table_output.push_str("----------------------|----------------------|--------\n");
for (name, _value) in metrics {
table_output.push_str(&format!(
"{:<20} | {:<20} | {}\n",
name, "status", "healthy"
));
}
table_output
}
};
match output {
Some(file_path) => {
tokio::fs::write(&file_path, output_data).await?;
println!("✅ Metrics exported to: {}", file_path);
}
None => {
println!("{}", output_data);
}
}
Ok(())
}
async fn handle_configure_action(action: ConfigureAction) -> Result<()> {
match action {
ConfigureAction::CircuitBreaker {
name,
failure_threshold,
recovery_timeout,
success_threshold,
} => {
println!("Configuring circuit breaker: {}", name);
if let Some(threshold) = failure_threshold {
println!("• Failure threshold: {}", threshold);
}
if let Some(timeout) = recovery_timeout {
println!("• Recovery timeout: {}ms", timeout);
}
if let Some(threshold) = success_threshold {
println!("• Success threshold: {}", threshold);
}
println!("✅ Circuit breaker configuration updated");
}
ConfigureAction::Retry {
name,
max_attempts,
initial_delay,
backoff_multiplier,
} => {
println!("Configuring retry policy: {}", name);
if let Some(attempts) = max_attempts {
println!("• Max attempts: {}", attempts);
}
if let Some(delay) = initial_delay {
println!("• Initial delay: {}ms", delay);
}
if let Some(multiplier) = backoff_multiplier {
println!("• Backoff multiplier: {}", multiplier);
}
println!("✅ Retry policy configuration updated");
}
ConfigureAction::Bulkhead {
name,
max_concurrent,
} => {
println!("Configuring bulkhead: {}", name);
if let Some(concurrent) = max_concurrent {
println!("• Max concurrent: {}", concurrent);
}
println!("✅ Bulkhead configuration updated");
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_circuit_breaker_reset_validation_empty() {
let config = Config::default();
let args = ResilienceArgs {
command: ResilienceCommand::CircuitBreaker {
action: CircuitBreakerAction::Reset {
name: String::new(),
},
},
};
let result = execute(args, &config).await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Circuit breaker name cannot be empty")
);
}
#[tokio::test]
async fn test_resilience_test_validation_empty_pattern() {
let config = Config::default();
let args = ResilienceArgs {
command: ResilienceCommand::Test {
pattern: String::new(),
requests: Some(10),
failure_rate: Some(0.1),
},
};
let result = execute(args, &config).await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Pattern name cannot be empty")
);
}
#[tokio::test]
async fn test_resilience_test_validation_zero_requests() {
let config = Config::default();
let args = ResilienceArgs {
command: ResilienceCommand::Test {
pattern: "retry".to_string(),
requests: Some(0),
failure_rate: Some(0.1),
},
};
let result = execute(args, &config).await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Request count must be at least 1")
);
}
#[tokio::test]
async fn test_resilience_test_validation_excessive_requests() {
let config = Config::default();
let args = ResilienceArgs {
command: ResilienceCommand::Test {
pattern: "retry".to_string(),
requests: Some(20000),
failure_rate: Some(0.1),
},
};
let result = execute(args, &config).await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Request count cannot exceed 10000")
);
}
#[tokio::test]
async fn test_resilience_test_validation_invalid_failure_rate_high() {
let config = Config::default();
let args = ResilienceArgs {
command: ResilienceCommand::Test {
pattern: "retry".to_string(),
requests: Some(100),
failure_rate: Some(1.5),
},
};
let result = execute(args, &config).await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Failure rate must be between 0.0 and 1.0")
);
}
#[tokio::test]
async fn test_resilience_test_validation_invalid_failure_rate_negative() {
let config = Config::default();
let args = ResilienceArgs {
command: ResilienceCommand::Test {
pattern: "retry".to_string(),
requests: Some(100),
failure_rate: Some(-0.1),
},
};
let result = execute(args, &config).await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Failure rate must be between 0.0 and 1.0")
);
}
}