1use std::collections::HashMap;
23use std::sync::RwLock;
24
25use swarm_engine_core::agent::WorkResult;
26use swarm_engine_core::environment::Environment;
27use swarm_engine_core::types::{Action, WorkerId};
28
29#[derive(Debug, Clone, PartialEq)]
35pub enum ServiceStatus {
36 Running,
37 Degraded,
38 Down,
39}
40
41#[derive(Debug, Clone, PartialEq)]
43pub enum ProblemType {
44 MemoryLeak,
45 CpuSpike,
46 DiskFull,
47 NetworkTimeout,
48 DatabaseConnection,
49}
50
51impl ProblemType {
52 fn description(&self) -> &str {
53 match self {
54 ProblemType::MemoryLeak => "Memory leak detected - gradual memory increase over time",
55 ProblemType::CpuSpike => "CPU spike detected - sustained high CPU usage",
56 ProblemType::DiskFull => "Disk full - storage capacity exceeded",
57 ProblemType::NetworkTimeout => "Network timeout - connection to upstream failing",
58 ProblemType::DatabaseConnection => "Database connection pool exhausted",
59 }
60 }
61
62 fn log_pattern(&self) -> &str {
63 match self {
64 ProblemType::MemoryLeak => "OutOfMemoryError",
65 ProblemType::CpuSpike => "High CPU utilization",
66 ProblemType::DiskFull => "No space left on device",
67 ProblemType::NetworkTimeout => "Connection timed out",
68 ProblemType::DatabaseConnection => "Connection pool exhausted",
69 }
70 }
71
72 fn metric_anomaly(&self) -> &str {
73 match self {
74 ProblemType::MemoryLeak => "memory_usage: 95%",
75 ProblemType::CpuSpike => "cpu_usage: 98%",
76 ProblemType::DiskFull => "disk_usage: 100%",
77 ProblemType::NetworkTimeout => "latency_p99: 30000ms",
78 ProblemType::DatabaseConnection => "db_connections: 100/100",
79 }
80 }
81
82 fn solution(&self) -> &str {
83 match self {
84 ProblemType::MemoryLeak => "restart",
85 ProblemType::CpuSpike => "restart",
86 ProblemType::DiskFull => "cleanup",
87 ProblemType::NetworkTimeout => "restart",
88 ProblemType::DatabaseConnection => "restart",
89 }
90 }
91}
92
93#[derive(Debug, Clone)]
95pub struct Service {
96 name: String,
97 status: ServiceStatus,
98 problem: Option<ProblemType>,
99 logs: Vec<String>,
100 metrics: HashMap<String, String>,
101}
102
103impl Service {
104 fn new(name: impl Into<String>) -> Self {
105 Self {
106 name: name.into(),
107 status: ServiceStatus::Running,
108 problem: None,
109 logs: Vec::new(),
110 metrics: HashMap::new(),
111 }
112 }
113
114 fn with_problem(mut self, problem: ProblemType, status: ServiceStatus) -> Self {
115 self.logs.push(format!(
117 "[ERROR] {} - {}",
118 chrono_like_timestamp(),
119 problem.log_pattern()
120 ));
121 self.logs.push(format!(
122 "[WARN] {} - Service degradation detected",
123 chrono_like_timestamp()
124 ));
125 self.logs.push(format!(
126 "[ERROR] {} - {}",
127 chrono_like_timestamp(),
128 problem.log_pattern()
129 ));
130
131 self.metrics
133 .insert("status".into(), format!("{:?}", status));
134 let (key, value) = problem.metric_anomaly().split_once(": ").unwrap();
135 self.metrics.insert(key.into(), value.into());
136
137 self.status = status;
138 self.problem = Some(problem);
139 self
140 }
141}
142
143fn chrono_like_timestamp() -> String {
144 "2024-01-15T10:30:45Z".to_string()
145}
146
147pub struct TroubleshootingEnvironment {
153 services: HashMap<String, Service>,
155 target_service: String,
157 state: RwLock<TroubleshootingState>,
159}
160
161#[derive(Debug, Default)]
162struct TroubleshootingState {
163 checked_status: bool,
165 read_logs: bool,
167 analyzed_metrics: bool,
169 diagnosis: Option<String>,
171 completed: Vec<WorkerId>,
173}
174
175impl TroubleshootingEnvironment {
176 pub fn new(services: HashMap<String, Service>, target_service: impl Into<String>) -> Self {
178 Self {
179 services,
180 target_service: target_service.into(),
181 state: RwLock::new(TroubleshootingState::default()),
182 }
183 }
184
185 pub fn memory_leak_scenario() -> Self {
187 let mut services = HashMap::new();
188
189 services.insert("api-gateway".into(), Service::new("api-gateway"));
191 services.insert("database".into(), Service::new("database"));
192
193 services.insert(
195 "user-service".into(),
196 Service::new("user-service")
197 .with_problem(ProblemType::MemoryLeak, ServiceStatus::Degraded),
198 );
199
200 services.insert(
202 "notification-service".into(),
203 Service::new("notification-service"),
204 );
205
206 Self::new(services, "user-service")
207 }
208
209 pub fn cpu_spike_scenario() -> Self {
211 let mut services = HashMap::new();
212
213 services.insert("frontend".into(), Service::new("frontend"));
214 services.insert("cache".into(), Service::new("cache"));
215 services.insert(
216 "payment-service".into(),
217 Service::new("payment-service")
218 .with_problem(ProblemType::CpuSpike, ServiceStatus::Down),
219 );
220 services.insert("auth-service".into(), Service::new("auth-service"));
221
222 Self::new(services, "payment-service")
223 }
224
225 pub fn network_timeout_scenario() -> Self {
227 let mut services = HashMap::new();
228
229 services.insert("load-balancer".into(), Service::new("load-balancer"));
230 services.insert(
231 "order-service".into(),
232 Service::new("order-service")
233 .with_problem(ProblemType::NetworkTimeout, ServiceStatus::Degraded),
234 );
235 services.insert(
236 "inventory-service".into(),
237 Service::new("inventory-service"),
238 );
239
240 Self::new(services, "order-service")
241 }
242
243 pub fn complex_scenario(
259 total_services: usize,
260 noise_services: usize,
261 cascade_depth: usize,
262 seed: u64,
263 ) -> Self {
264 use std::collections::HashSet;
265
266 let total = total_services.clamp(2, 50);
268 let noise = noise_services.min(total.saturating_sub(cascade_depth + 1));
269 let depth = cascade_depth
270 .clamp(1, 5)
271 .min(total.saturating_sub(noise + 1));
272
273 let mut rng_state = seed;
275 let mut next_rand = || {
276 rng_state = rng_state.wrapping_mul(6364136223846793005).wrapping_add(1);
277 rng_state
278 };
279
280 let service_names: Vec<&str> = vec![
282 "api-gateway",
283 "user-service",
284 "auth-service",
285 "payment-service",
286 "order-service",
287 "inventory-service",
288 "notification-service",
289 "search-service",
290 "recommendation-service",
291 "analytics-service",
292 "logging-service",
293 "monitoring-service",
294 "cache-service",
295 "database-primary",
296 "database-replica",
297 "message-queue",
298 "scheduler-service",
299 "worker-service",
300 "cdn-service",
301 "load-balancer",
302 "rate-limiter",
303 "circuit-breaker",
304 "config-service",
305 "discovery-service",
306 "gateway-internal",
307 "billing-service",
308 "subscription-service",
309 "webhook-service",
310 "export-service",
311 "import-service",
312 "backup-service",
313 "audit-service",
314 "compliance-service",
315 "security-service",
316 "identity-service",
317 "permission-service",
318 "session-service",
319 "storage-service",
320 "media-service",
321 "thumbnail-service",
322 "email-service",
323 "sms-service",
324 "push-service",
325 "report-service",
326 "dashboard-service",
327 "admin-service",
328 "support-service",
329 "feedback-service",
330 "survey-service",
331 "ml-service",
332 "prediction-service",
333 ];
334
335 let mut used_indices: HashSet<usize> = HashSet::new();
337 used_indices.insert(1);
339 let mut pick_service = |rng: &mut dyn FnMut() -> u64| -> String {
340 loop {
341 let idx = (rng() as usize) % service_names.len();
342 if !used_indices.contains(&idx) {
343 used_indices.insert(idx);
344 return service_names[idx].to_string();
345 }
346 if used_indices.len() >= service_names.len() {
347 let n = used_indices.len();
349 used_indices.insert(n + 1000);
350 return format!("service-{}", n);
351 }
352 }
353 };
354
355 let mut services = HashMap::new();
356
357 let problem_types = [
359 ProblemType::MemoryLeak,
360 ProblemType::CpuSpike,
361 ProblemType::NetworkTimeout,
362 ProblemType::DatabaseConnection,
363 ];
364 let root_problem = &problem_types[(next_rand() as usize) % problem_types.len()];
365
366 let root_service_name = "user-service".to_string();
369 services.insert(
370 root_service_name.clone(),
371 Service::new(&root_service_name)
372 .with_problem(root_problem.clone(), ServiceStatus::Down),
373 );
374
375 let mut cascade_services = Vec::new();
377 for i in 1..depth {
378 let name = pick_service(&mut next_rand);
379 let cascade_problem = &problem_types[(next_rand() as usize) % problem_types.len()];
381 let mut service = Service::new(&name);
382
383 service.logs.push(format!(
385 "[ERROR] {} - Connection to {} failed",
386 chrono_like_timestamp(),
387 root_service_name
388 ));
389 service.logs.push(format!(
390 "[WARN] {} - Degraded due to upstream dependency",
391 chrono_like_timestamp()
392 ));
393 service.logs.push(format!(
394 "[ERROR] {} - {}",
395 chrono_like_timestamp(),
396 cascade_problem.log_pattern()
397 ));
398
399 service.metrics.insert("status".into(), "Degraded".into());
401 service
402 .metrics
403 .insert("upstream_errors".into(), format!("{}", 50 + i * 10));
404 let (key, value) = cascade_problem.metric_anomaly().split_once(": ").unwrap();
405 service.metrics.insert(key.into(), value.into());
406
407 service.status = ServiceStatus::Degraded;
408 cascade_services.push(name.clone());
412 services.insert(name, service);
413 }
414
415 for _ in 0..noise {
417 let name = pick_service(&mut next_rand);
418 let mut service = Service::new(&name);
419
420 service.logs.push(format!(
422 "[WARN] {} - High latency detected (within threshold)",
423 chrono_like_timestamp()
424 ));
425 service.logs.push(format!(
426 "[INFO] {} - Garbage collection completed",
427 chrono_like_timestamp()
428 ));
429 service.logs.push(format!(
430 "[WARN] {} - Connection pool at 70% capacity",
431 chrono_like_timestamp()
432 ));
433
434 service
436 .metrics
437 .insert("cpu_usage".into(), format!("{}%", 40 + (next_rand() % 20)));
438 service.metrics.insert(
439 "memory_usage".into(),
440 format!("{}%", 50 + (next_rand() % 25)),
441 );
442
443 services.insert(name, service);
444 }
445
446 let remaining = total.saturating_sub(1 + depth.saturating_sub(1) + noise);
448 for _ in 0..remaining {
449 let name = pick_service(&mut next_rand);
450 services.insert(name.clone(), Service::new(&name));
451 }
452
453 Self::new(services, root_service_name)
454 }
455
456 pub fn medium_complexity_scenario() -> Self {
458 Self::complex_scenario(15, 3, 2, 12345)
459 }
460
461 pub fn high_complexity_scenario() -> Self {
463 Self::complex_scenario(30, 8, 3, 67890)
464 }
465
466 pub fn extreme_complexity_scenario() -> Self {
468 Self::complex_scenario(50, 15, 4, 11111)
469 }
470
471 fn handle_check_status(&self, _worker_id: WorkerId, action: &Action) -> WorkResult {
476 let service_name = action
477 .params
478 .args
479 .get("service")
480 .or(action.params.target.as_ref())
481 .cloned()
482 .filter(|s| !s.is_empty());
484
485 let mut state = self.state.write().unwrap();
486
487 match service_name {
488 Some(name) => {
489 if let Some(service) = self.services.get(&name) {
491 state.checked_status = true;
492 let status_str = match service.status {
493 ServiceStatus::Running => "RUNNING",
494 ServiceStatus::Degraded => "DEGRADED",
495 ServiceStatus::Down => "DOWN",
496 };
497 WorkResult::env_success(format!(
498 "Service '{}': {}\nHealth check: {}",
499 service.name,
500 status_str,
501 if service.problem.is_some() {
502 "UNHEALTHY"
503 } else {
504 "HEALTHY"
505 }
506 ))
507 } else {
508 state.checked_status = true;
510 let mut output = format!(
511 "Service '{}' not found. Showing all services:\n=== Service Status ===\n",
512 name
513 );
514 for (svc_name, service) in &self.services {
515 let status_str = match service.status {
516 ServiceStatus::Running => "RUNNING",
517 ServiceStatus::Degraded => "DEGRADED",
518 ServiceStatus::Down => "DOWN",
519 };
520 let health = if service.problem.is_some() {
521 "UNHEALTHY"
522 } else {
523 "HEALTHY"
524 };
525 output.push_str(&format!("{}: {} ({})\n", svc_name, status_str, health));
526 }
527 WorkResult::env_success(output)
528 }
529 }
530 None => {
531 state.checked_status = true;
533 let mut output = String::from("=== Service Status ===\n");
534 for (name, service) in &self.services {
535 let status_str = match service.status {
536 ServiceStatus::Running => "RUNNING",
537 ServiceStatus::Degraded => "DEGRADED",
538 ServiceStatus::Down => "DOWN",
539 };
540 let health = if service.problem.is_some() {
541 "UNHEALTHY"
542 } else {
543 "HEALTHY"
544 };
545 output.push_str(&format!("{}: {} ({})\n", name, status_str, health));
546 }
547 WorkResult::env_success(output)
548 }
549 }
550 }
551
552 fn handle_read_logs(&self, _worker_id: WorkerId, action: &Action) -> WorkResult {
553 let service_name = action
554 .params
555 .args
556 .get("service")
557 .or(action.params.target.as_ref())
558 .cloned()
559 .unwrap_or_default();
560
561 if service_name.is_empty() {
562 return WorkResult::env_failure("ReadLogs requires a service name");
563 }
564
565 let mut state = self.state.write().unwrap();
566
567 if let Some(service) = self.services.get(&service_name) {
568 state.read_logs = true;
569
570 if service.logs.is_empty() {
571 WorkResult::env_success(format!(
572 "=== Logs for '{}' ===\n(no recent logs)",
573 service_name
574 ))
575 } else {
576 let logs_str = service.logs.join("\n");
577 WorkResult::env_success(format!(
578 "=== Logs for '{}' ===\n{}",
579 service_name, logs_str
580 ))
581 }
582 } else {
583 WorkResult::env_failure(format!("Service '{}' not found", service_name))
584 }
585 }
586
587 fn handle_analyze_metrics(&self, _worker_id: WorkerId, action: &Action) -> WorkResult {
588 let service_name = action
589 .params
590 .args
591 .get("service")
592 .or(action.params.target.as_ref())
593 .cloned()
594 .unwrap_or_default();
595
596 if service_name.is_empty() {
597 return WorkResult::env_failure("AnalyzeMetrics requires a service name");
598 }
599
600 let mut state = self.state.write().unwrap();
601
602 if let Some(service) = self.services.get(&service_name) {
603 state.analyzed_metrics = true;
604
605 if service.metrics.is_empty() {
606 WorkResult::env_success(format!(
607 "=== Metrics for '{}' ===\ncpu_usage: 15%\nmemory_usage: 45%\nlatency_p99: 120ms\n(all normal)",
608 service_name
609 ))
610 } else {
611 let metrics_str: String = service
612 .metrics
613 .iter()
614 .map(|(k, v)| format!("{}: {}", k, v))
615 .collect::<Vec<_>>()
616 .join("\n");
617 WorkResult::env_success(format!(
618 "=== Metrics for '{}' ===\n{}\n(ANOMALY DETECTED)",
619 service_name, metrics_str
620 ))
621 }
622 } else {
623 WorkResult::env_failure(format!("Service '{}' not found", service_name))
624 }
625 }
626
627 fn handle_diagnose(&self, _worker_id: WorkerId, action: &Action) -> WorkResult {
628 let service_name = action
629 .params
630 .args
631 .get("service")
632 .or(action.params.target.as_ref())
633 .cloned()
634 .unwrap_or_default();
635
636 if service_name.is_empty() {
637 return WorkResult::env_failure("Diagnose requires a service name");
638 }
639
640 let mut state = self.state.write().unwrap();
641
642 if !state.checked_status {
644 return WorkResult::env_failure(
645 "Cannot diagnose without checking status first. Run CheckStatus first.",
646 );
647 }
648
649 if !state.read_logs && !state.analyzed_metrics {
651 return WorkResult::env_failure(
652 "Cannot diagnose without data. Run ReadLogs or AnalyzeMetrics first.",
653 );
654 }
655
656 if let Some(service) = self.services.get(&service_name) {
657 if let Some(ref problem) = service.problem {
658 let diagnosis = format!(
659 "=== Diagnosis for '{}' ===\nProblem identified: {}\nRecommended action: {}",
660 service_name,
661 problem.description(),
662 problem.solution()
663 );
664 state.diagnosis = Some(problem.solution().to_string());
665 WorkResult::env_success(diagnosis)
666 } else {
667 state.diagnosis = Some("no_issue".to_string());
668 WorkResult::env_success(format!(
669 "=== Diagnosis for '{}' ===\nNo issues found. Service is healthy.",
670 service_name
671 ))
672 }
673 } else {
674 WorkResult::env_failure(format!("Service '{}' not found", service_name))
675 }
676 }
677
678 fn handle_restart(&self, worker_id: WorkerId, action: &Action) -> WorkResult {
679 let service_name = action
680 .params
681 .args
682 .get("service")
683 .or(action.params.target.as_ref())
684 .cloned()
685 .unwrap_or_default();
686
687 if service_name.is_empty() {
688 return WorkResult::env_failure("Restart requires a service name");
689 }
690
691 let mut state = self.state.write().unwrap();
692
693 if state.diagnosis.is_none() {
695 return WorkResult::env_failure(
696 "Cannot restart without diagnosis. Run Diagnose first.",
697 );
698 }
699
700 if service_name != self.target_service {
702 return WorkResult::env_failure(format!(
703 "Restarted wrong service '{}'. The problematic service is different.",
704 service_name
705 ));
706 }
707
708 if !state.completed.contains(&worker_id) {
710 state.completed.push(worker_id);
711 }
712
713 WorkResult::done_success(format!(
714 "=== Service '{}' Restarted ===\nStatus: RUNNING\nHealth: HEALTHY\n\nIncident resolved successfully!",
715 service_name
716 ))
717 }
718}
719
720impl Environment for TroubleshootingEnvironment {
721 fn step(&self, worker_id: WorkerId, action: &Action) -> WorkResult {
722 match action.name.to_lowercase().as_str() {
723 "checkstatus" | "check_status" | "status" => {
724 self.handle_check_status(worker_id, action)
725 }
726 "readlogs" | "read_logs" | "logs" => self.handle_read_logs(worker_id, action),
727 "analyzemetrics" | "analyze_metrics" | "metrics" => {
728 self.handle_analyze_metrics(worker_id, action)
729 }
730 "diagnose" | "diagnosis" => self.handle_diagnose(worker_id, action),
731 "restart" | "reboot" => self.handle_restart(worker_id, action),
732 "continue" => WorkResult::env_success("Continuing..."),
733 _ => WorkResult::unsupported(&action.name),
734 }
735 }
736
737 fn reset(&self) {
738 let mut state = self.state.write().unwrap();
739 state.checked_status = false;
740 state.read_logs = false;
741 state.analyzed_metrics = false;
742 state.diagnosis = None;
743 state.completed.clear();
744 }
745
746 fn name(&self) -> &str {
747 "TroubleshootingEnvironment"
748 }
749}
750
751#[cfg(test)]
756mod tests {
757 use super::*;
758
759 fn is_success(result: &WorkResult) -> bool {
760 match result {
761 WorkResult::Acted { action_result, .. } => action_result.success,
762 WorkResult::Done { success, .. } => *success,
763 _ => false,
764 }
765 }
766
767 fn is_done(result: &WorkResult) -> bool {
768 matches!(result, WorkResult::Done { .. })
769 }
770
771 fn action(name: &str, target: Option<&str>) -> Action {
772 Action {
773 name: name.into(),
774 params: swarm_engine_core::types::ActionParams {
775 target: target.map(|s| s.into()),
776 args: HashMap::new(),
777 data: vec![],
778 },
779 }
780 }
781
782 #[test]
783 fn test_check_status_all() {
784 let env = TroubleshootingEnvironment::memory_leak_scenario();
785 let worker = WorkerId(0);
786
787 let result = env.step(worker, &action("CheckStatus", None));
788 assert!(is_success(&result));
789 }
790
791 #[test]
792 fn test_check_status_specific() {
793 let env = TroubleshootingEnvironment::memory_leak_scenario();
794 let worker = WorkerId(0);
795
796 let result = env.step(worker, &action("CheckStatus", Some("user-service")));
797 assert!(is_success(&result));
798 }
799
800 #[test]
801 fn test_read_logs() {
802 let env = TroubleshootingEnvironment::memory_leak_scenario();
803 let worker = WorkerId(0);
804
805 env.step(worker, &action("CheckStatus", None));
807
808 let result = env.step(worker, &action("ReadLogs", Some("user-service")));
810 assert!(is_success(&result));
811 }
812
813 #[test]
814 fn test_diagnose_requires_prerequisites() {
815 let env = TroubleshootingEnvironment::memory_leak_scenario();
816 let worker = WorkerId(0);
817
818 let result = env.step(worker, &action("Diagnose", Some("user-service")));
820 assert!(!is_success(&result));
821 }
822
823 #[test]
824 fn test_full_troubleshooting_flow() {
825 let env = TroubleshootingEnvironment::memory_leak_scenario();
826 let worker = WorkerId(0);
827
828 let result = env.step(worker, &action("CheckStatus", None));
830 assert!(is_success(&result));
831 assert!(!is_done(&result));
832
833 let result = env.step(worker, &action("ReadLogs", Some("user-service")));
835 assert!(is_success(&result));
836 assert!(!is_done(&result));
837
838 let result = env.step(worker, &action("Diagnose", Some("user-service")));
840 assert!(is_success(&result));
841 assert!(!is_done(&result));
842
843 let result = env.step(worker, &action("Restart", Some("user-service")));
845 assert!(is_success(&result));
846 assert!(is_done(&result));
847 }
848
849 #[test]
850 fn test_restart_wrong_service_fails() {
851 let env = TroubleshootingEnvironment::memory_leak_scenario();
852 let worker = WorkerId(0);
853
854 env.step(worker, &action("CheckStatus", None));
856 env.step(worker, &action("ReadLogs", Some("user-service")));
857 env.step(worker, &action("Diagnose", Some("user-service")));
858
859 let result = env.step(worker, &action("Restart", Some("api-gateway")));
861 assert!(!is_success(&result));
862 assert!(!is_done(&result));
863 }
864}