use std::collections::HashMap;
use std::sync::RwLock;
use swarm_engine_core::actions::ParamResolver;
use swarm_engine_core::agent::WorkResult;
use swarm_engine_core::environment::Environment;
use swarm_engine_core::types::{Action, WorkerId};
#[derive(Debug, Clone, PartialEq)]
pub enum ServiceStatus {
Running,
Degraded,
Down,
}
#[derive(Debug, Clone, PartialEq)]
pub enum ProblemType {
MemoryLeak,
CpuSpike,
DiskFull,
NetworkTimeout,
DatabaseConnection,
}
impl ProblemType {
fn description(&self) -> &str {
match self {
ProblemType::MemoryLeak => "Memory leak detected - gradual memory increase over time",
ProblemType::CpuSpike => "CPU spike detected - sustained high CPU usage",
ProblemType::DiskFull => "Disk full - storage capacity exceeded",
ProblemType::NetworkTimeout => "Network timeout - connection to upstream failing",
ProblemType::DatabaseConnection => "Database connection pool exhausted",
}
}
fn log_pattern(&self) -> &str {
match self {
ProblemType::MemoryLeak => "OutOfMemoryError",
ProblemType::CpuSpike => "High CPU utilization",
ProblemType::DiskFull => "No space left on device",
ProblemType::NetworkTimeout => "Connection timed out",
ProblemType::DatabaseConnection => "Connection pool exhausted",
}
}
fn metric_anomaly(&self) -> &str {
match self {
ProblemType::MemoryLeak => "memory_usage: 95%",
ProblemType::CpuSpike => "cpu_usage: 98%",
ProblemType::DiskFull => "disk_usage: 100%",
ProblemType::NetworkTimeout => "latency_p99: 30000ms",
ProblemType::DatabaseConnection => "db_connections: 100/100",
}
}
fn solution(&self) -> &str {
match self {
ProblemType::MemoryLeak => "restart",
ProblemType::CpuSpike => "restart",
ProblemType::DiskFull => "cleanup",
ProblemType::NetworkTimeout => "restart",
ProblemType::DatabaseConnection => "restart",
}
}
}
#[derive(Debug, Clone)]
pub struct Service {
name: String,
status: ServiceStatus,
problem: Option<ProblemType>,
logs: Vec<String>,
metrics: HashMap<String, String>,
}
impl Service {
fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
status: ServiceStatus::Running,
problem: None,
logs: Vec::new(),
metrics: HashMap::new(),
}
}
fn with_problem(mut self, problem: ProblemType, status: ServiceStatus) -> Self {
self.logs.push(format!(
"[ERROR] {} - {}",
chrono_like_timestamp(),
problem.log_pattern()
));
self.logs.push(format!(
"[WARN] {} - Service degradation detected",
chrono_like_timestamp()
));
self.logs.push(format!(
"[ERROR] {} - {}",
chrono_like_timestamp(),
problem.log_pattern()
));
self.metrics
.insert("status".into(), format!("{:?}", status));
let (key, value) = problem.metric_anomaly().split_once(": ").unwrap();
self.metrics.insert(key.into(), value.into());
self.status = status;
self.problem = Some(problem);
self
}
}
fn chrono_like_timestamp() -> String {
"2024-01-15T10:30:45Z".to_string()
}
pub struct TroubleshootingEnvironment {
services: HashMap<String, Service>,
target_service: String,
state: RwLock<TroubleshootingState>,
}
#[derive(Debug, Default)]
struct TroubleshootingState {
checked_status: bool,
read_logs: bool,
analyzed_metrics: bool,
diagnosis: Option<String>,
completed: Vec<WorkerId>,
}
impl TroubleshootingEnvironment {
pub fn new(services: HashMap<String, Service>, target_service: impl Into<String>) -> Self {
Self {
services,
target_service: target_service.into(),
state: RwLock::new(TroubleshootingState::default()),
}
}
pub fn memory_leak_scenario() -> Self {
let mut services = HashMap::new();
services.insert("api-gateway".into(), Service::new("api-gateway"));
services.insert("database".into(), Service::new("database"));
services.insert(
"user-service".into(),
Service::new("user-service")
.with_problem(ProblemType::MemoryLeak, ServiceStatus::Degraded),
);
services.insert(
"notification-service".into(),
Service::new("notification-service"),
);
Self::new(services, "user-service")
}
pub fn cpu_spike_scenario() -> Self {
let mut services = HashMap::new();
services.insert("frontend".into(), Service::new("frontend"));
services.insert("cache".into(), Service::new("cache"));
services.insert(
"payment-service".into(),
Service::new("payment-service")
.with_problem(ProblemType::CpuSpike, ServiceStatus::Down),
);
services.insert("auth-service".into(), Service::new("auth-service"));
Self::new(services, "payment-service")
}
pub fn network_timeout_scenario() -> Self {
let mut services = HashMap::new();
services.insert("load-balancer".into(), Service::new("load-balancer"));
services.insert(
"order-service".into(),
Service::new("order-service")
.with_problem(ProblemType::NetworkTimeout, ServiceStatus::Degraded),
);
services.insert(
"inventory-service".into(),
Service::new("inventory-service"),
);
Self::new(services, "order-service")
}
pub fn complex_scenario(
total_services: usize,
noise_services: usize,
cascade_depth: usize,
seed: u64,
) -> Self {
use std::collections::HashSet;
let total = total_services.clamp(2, 50);
let noise = noise_services.min(total.saturating_sub(cascade_depth + 1));
let depth = cascade_depth
.clamp(1, 5)
.min(total.saturating_sub(noise + 1));
let mut rng_state = seed;
let mut next_rand = || {
rng_state = rng_state.wrapping_mul(6364136223846793005).wrapping_add(1);
rng_state
};
let service_names: Vec<&str> = vec![
"api-gateway",
"user-service",
"auth-service",
"payment-service",
"order-service",
"inventory-service",
"notification-service",
"search-service",
"recommendation-service",
"analytics-service",
"logging-service",
"monitoring-service",
"cache-service",
"database-primary",
"database-replica",
"message-queue",
"scheduler-service",
"worker-service",
"cdn-service",
"load-balancer",
"rate-limiter",
"circuit-breaker",
"config-service",
"discovery-service",
"gateway-internal",
"billing-service",
"subscription-service",
"webhook-service",
"export-service",
"import-service",
"backup-service",
"audit-service",
"compliance-service",
"security-service",
"identity-service",
"permission-service",
"session-service",
"storage-service",
"media-service",
"thumbnail-service",
"email-service",
"sms-service",
"push-service",
"report-service",
"dashboard-service",
"admin-service",
"support-service",
"feedback-service",
"survey-service",
"ml-service",
"prediction-service",
];
let mut used_indices: HashSet<usize> = HashSet::new();
used_indices.insert(1);
let mut pick_service = |rng: &mut dyn FnMut() -> u64| -> String {
loop {
let idx = (rng() as usize) % service_names.len();
if !used_indices.contains(&idx) {
used_indices.insert(idx);
return service_names[idx].to_string();
}
if used_indices.len() >= service_names.len() {
let n = used_indices.len();
used_indices.insert(n + 1000);
return format!("service-{}", n);
}
}
};
let mut services = HashMap::new();
let problem_types = [
ProblemType::MemoryLeak,
ProblemType::CpuSpike,
ProblemType::NetworkTimeout,
ProblemType::DatabaseConnection,
];
let root_problem = &problem_types[(next_rand() as usize) % problem_types.len()];
let root_service_name = "user-service".to_string();
services.insert(
root_service_name.clone(),
Service::new(&root_service_name)
.with_problem(root_problem.clone(), ServiceStatus::Down),
);
let mut cascade_services = Vec::new();
for i in 1..depth {
let name = pick_service(&mut next_rand);
let cascade_problem = &problem_types[(next_rand() as usize) % problem_types.len()];
let mut service = Service::new(&name);
service.logs.push(format!(
"[ERROR] {} - Connection to {} failed",
chrono_like_timestamp(),
root_service_name
));
service.logs.push(format!(
"[WARN] {} - Degraded due to upstream dependency",
chrono_like_timestamp()
));
service.logs.push(format!(
"[ERROR] {} - {}",
chrono_like_timestamp(),
cascade_problem.log_pattern()
));
service.metrics.insert("status".into(), "Degraded".into());
service
.metrics
.insert("upstream_errors".into(), format!("{}", 50 + i * 10));
let (key, value) = cascade_problem.metric_anomaly().split_once(": ").unwrap();
service.metrics.insert(key.into(), value.into());
service.status = ServiceStatus::Degraded;
cascade_services.push(name.clone());
services.insert(name, service);
}
for _ in 0..noise {
let name = pick_service(&mut next_rand);
let mut service = Service::new(&name);
service.logs.push(format!(
"[WARN] {} - High latency detected (within threshold)",
chrono_like_timestamp()
));
service.logs.push(format!(
"[INFO] {} - Garbage collection completed",
chrono_like_timestamp()
));
service.logs.push(format!(
"[WARN] {} - Connection pool at 70% capacity",
chrono_like_timestamp()
));
service
.metrics
.insert("cpu_usage".into(), format!("{}%", 40 + (next_rand() % 20)));
service.metrics.insert(
"memory_usage".into(),
format!("{}%", 50 + (next_rand() % 25)),
);
services.insert(name, service);
}
let remaining = total.saturating_sub(1 + depth.saturating_sub(1) + noise);
for _ in 0..remaining {
let name = pick_service(&mut next_rand);
services.insert(name.clone(), Service::new(&name));
}
Self::new(services, root_service_name)
}
pub fn medium_complexity_scenario() -> Self {
Self::complex_scenario(15, 3, 2, 12345)
}
pub fn high_complexity_scenario() -> Self {
Self::complex_scenario(30, 8, 3, 67890)
}
pub fn extreme_complexity_scenario() -> Self {
Self::complex_scenario(50, 15, 4, 11111)
}
fn handle_check_status(&self, _worker_id: WorkerId, action: &Action) -> WorkResult {
let resolver = ParamResolver::new(action);
let service_name = resolver.get("service");
let mut state = self.state.write().unwrap();
match service_name {
Some(name) => {
if let Some(service) = self.services.get(name) {
state.checked_status = true;
let status_str = match service.status {
ServiceStatus::Running => "RUNNING",
ServiceStatus::Degraded => "DEGRADED",
ServiceStatus::Down => "DOWN",
};
WorkResult::env_success(format!(
"Service '{}': {}\nHealth check: {}",
service.name,
status_str,
if service.problem.is_some() {
"UNHEALTHY"
} else {
"HEALTHY"
}
))
} else {
state.checked_status = true;
let mut output = format!(
"Service '{}' not found. Showing all services:\n=== Service Status ===\n",
name
);
for (svc_name, service) in &self.services {
let status_str = match service.status {
ServiceStatus::Running => "RUNNING",
ServiceStatus::Degraded => "DEGRADED",
ServiceStatus::Down => "DOWN",
};
let health = if service.problem.is_some() {
"UNHEALTHY"
} else {
"HEALTHY"
};
output.push_str(&format!("{}: {} ({})\n", svc_name, status_str, health));
}
WorkResult::env_success(output)
}
}
None => {
state.checked_status = true;
let mut output = String::from("=== Service Status ===\n");
for (name, service) in &self.services {
let status_str = match service.status {
ServiceStatus::Running => "RUNNING",
ServiceStatus::Degraded => "DEGRADED",
ServiceStatus::Down => "DOWN",
};
let health = if service.problem.is_some() {
"UNHEALTHY"
} else {
"HEALTHY"
};
output.push_str(&format!("{}: {} ({})\n", name, status_str, health));
}
WorkResult::env_success(output)
}
}
}
fn handle_read_logs(&self, _worker_id: WorkerId, action: &Action) -> WorkResult {
let resolver = ParamResolver::new(action);
let service_name = match resolver.require("service") {
Ok(s) => s,
Err(e) => return WorkResult::env_failure(format!("ReadLogs: {}", e)),
};
let mut state = self.state.write().unwrap();
if let Some(service) = self.services.get(service_name) {
state.read_logs = true;
if service.logs.is_empty() {
WorkResult::env_success(format!(
"=== Logs for '{}' ===\n(no recent logs)",
service_name
))
} else {
let logs_str = service.logs.join("\n");
WorkResult::env_success(format!(
"=== Logs for '{}' ===\n{}",
service_name, logs_str
))
}
} else {
WorkResult::env_failure(format!("Service '{}' not found", service_name))
}
}
fn handle_analyze_metrics(&self, _worker_id: WorkerId, action: &Action) -> WorkResult {
let resolver = ParamResolver::new(action);
let service_name = match resolver.require("service") {
Ok(s) => s,
Err(e) => return WorkResult::env_failure(format!("AnalyzeMetrics: {}", e)),
};
let mut state = self.state.write().unwrap();
if let Some(service) = self.services.get(service_name) {
state.analyzed_metrics = true;
if service.metrics.is_empty() {
WorkResult::env_success(format!(
"=== Metrics for '{}' ===\ncpu_usage: 15%\nmemory_usage: 45%\nlatency_p99: 120ms\n(all normal)",
service_name
))
} else {
let metrics_str: String = service
.metrics
.iter()
.map(|(k, v)| format!("{}: {}", k, v))
.collect::<Vec<_>>()
.join("\n");
WorkResult::env_success(format!(
"=== Metrics for '{}' ===\n{}\n(ANOMALY DETECTED)",
service_name, metrics_str
))
}
} else {
WorkResult::env_failure(format!("Service '{}' not found", service_name))
}
}
fn handle_diagnose(&self, _worker_id: WorkerId, action: &Action) -> WorkResult {
let resolver = ParamResolver::new(action);
let service_name = match resolver.require("service") {
Ok(s) => s,
Err(e) => return WorkResult::env_failure(format!("Diagnose: {}", e)),
};
let mut state = self.state.write().unwrap();
if !state.checked_status {
return WorkResult::env_failure(
"Cannot diagnose without checking status first. Run CheckStatus first.",
);
}
if !state.read_logs && !state.analyzed_metrics {
return WorkResult::env_failure(
"Cannot diagnose without data. Run ReadLogs or AnalyzeMetrics first.",
);
}
if let Some(service) = self.services.get(service_name) {
if let Some(ref problem) = service.problem {
let diagnosis = format!(
"=== Diagnosis for '{}' ===\nProblem identified: {}\nRecommended action: {}",
service_name,
problem.description(),
problem.solution()
);
state.diagnosis = Some(problem.solution().to_string());
WorkResult::env_success(diagnosis)
} else {
state.diagnosis = Some("no_issue".to_string());
WorkResult::env_success(format!(
"=== Diagnosis for '{}' ===\nNo issues found. Service is healthy.",
service_name
))
}
} else {
WorkResult::env_failure(format!("Service '{}' not found", service_name))
}
}
fn handle_restart(&self, worker_id: WorkerId, action: &Action) -> WorkResult {
let resolver = ParamResolver::new(action);
let service_name = match resolver.require("service") {
Ok(s) => s,
Err(e) => return WorkResult::env_failure(format!("Restart: {}", e)),
};
let mut state = self.state.write().unwrap();
if state.diagnosis.is_none() {
return WorkResult::env_failure(
"Cannot restart without diagnosis. Run Diagnose first.",
);
}
if service_name != self.target_service {
return WorkResult::env_failure(format!(
"Restarted wrong service '{}'. The problematic service is different.",
service_name
));
}
if !state.completed.contains(&worker_id) {
state.completed.push(worker_id);
}
WorkResult::done_success(format!(
"=== Service '{}' Restarted ===\nStatus: RUNNING\nHealth: HEALTHY\n\nIncident resolved successfully!",
service_name
))
}
}
impl Environment for TroubleshootingEnvironment {
fn step(&self, worker_id: WorkerId, action: &Action) -> WorkResult {
match action.name.to_lowercase().as_str() {
"checkstatus" | "check_status" | "status" => {
self.handle_check_status(worker_id, action)
}
"readlogs" | "read_logs" | "logs" => self.handle_read_logs(worker_id, action),
"analyzemetrics" | "analyze_metrics" | "metrics" => {
self.handle_analyze_metrics(worker_id, action)
}
"diagnose" | "diagnosis" => self.handle_diagnose(worker_id, action),
"restart" | "reboot" => self.handle_restart(worker_id, action),
"continue" => WorkResult::env_success("Continuing..."),
_ => WorkResult::unsupported(&action.name),
}
}
fn reset(&self) {
let mut state = self.state.write().unwrap();
state.checked_status = false;
state.read_logs = false;
state.analyzed_metrics = false;
state.diagnosis = None;
state.completed.clear();
}
fn name(&self) -> &str {
"TroubleshootingEnvironment"
}
}
#[cfg(test)]
mod tests {
use super::*;
fn is_success(result: &WorkResult) -> bool {
match result {
WorkResult::Acted { action_result, .. } => action_result.success,
WorkResult::Done { success, .. } => *success,
_ => false,
}
}
fn is_done(result: &WorkResult) -> bool {
matches!(result, WorkResult::Done { .. })
}
fn action(name: &str, target: Option<&str>) -> Action {
Action {
name: name.into(),
params: swarm_engine_core::types::ActionParams {
target: target.map(|s| s.into()),
args: HashMap::new(),
data: vec![],
},
}
}
#[test]
fn test_check_status_all() {
let env = TroubleshootingEnvironment::memory_leak_scenario();
let worker = WorkerId(0);
let result = env.step(worker, &action("CheckStatus", None));
assert!(is_success(&result));
}
#[test]
fn test_check_status_specific() {
let env = TroubleshootingEnvironment::memory_leak_scenario();
let worker = WorkerId(0);
let result = env.step(worker, &action("CheckStatus", Some("user-service")));
assert!(is_success(&result));
}
#[test]
fn test_read_logs() {
let env = TroubleshootingEnvironment::memory_leak_scenario();
let worker = WorkerId(0);
env.step(worker, &action("CheckStatus", None));
let result = env.step(worker, &action("ReadLogs", Some("user-service")));
assert!(is_success(&result));
}
#[test]
fn test_diagnose_requires_prerequisites() {
let env = TroubleshootingEnvironment::memory_leak_scenario();
let worker = WorkerId(0);
let result = env.step(worker, &action("Diagnose", Some("user-service")));
assert!(!is_success(&result));
}
#[test]
fn test_full_troubleshooting_flow() {
let env = TroubleshootingEnvironment::memory_leak_scenario();
let worker = WorkerId(0);
let result = env.step(worker, &action("CheckStatus", None));
assert!(is_success(&result));
assert!(!is_done(&result));
let result = env.step(worker, &action("ReadLogs", Some("user-service")));
assert!(is_success(&result));
assert!(!is_done(&result));
let result = env.step(worker, &action("Diagnose", Some("user-service")));
assert!(is_success(&result));
assert!(!is_done(&result));
let result = env.step(worker, &action("Restart", Some("user-service")));
assert!(is_success(&result));
assert!(is_done(&result));
}
#[test]
fn test_restart_wrong_service_fails() {
let env = TroubleshootingEnvironment::memory_leak_scenario();
let worker = WorkerId(0);
env.step(worker, &action("CheckStatus", None));
env.step(worker, &action("ReadLogs", Some("user-service")));
env.step(worker, &action("Diagnose", Some("user-service")));
let result = env.step(worker, &action("Restart", Some("api-gateway")));
assert!(!is_success(&result));
assert!(!is_done(&result));
}
}