1use std::collections::HashMap;
23use std::sync::RwLock;
24
25use swarm_engine_core::actions::ParamResolver;
26use swarm_engine_core::agent::WorkResult;
27use swarm_engine_core::environment::Environment;
28use swarm_engine_core::types::{Action, WorkerId};
29
30#[derive(Debug, Clone, PartialEq)]
36pub enum ServiceStatus {
37 Running,
38 Degraded,
39 Down,
40}
41
42#[derive(Debug, Clone, PartialEq)]
44pub enum ProblemType {
45 MemoryLeak,
46 CpuSpike,
47 DiskFull,
48 NetworkTimeout,
49 DatabaseConnection,
50}
51
52impl ProblemType {
53 fn description(&self) -> &str {
54 match self {
55 ProblemType::MemoryLeak => "Memory leak detected - gradual memory increase over time",
56 ProblemType::CpuSpike => "CPU spike detected - sustained high CPU usage",
57 ProblemType::DiskFull => "Disk full - storage capacity exceeded",
58 ProblemType::NetworkTimeout => "Network timeout - connection to upstream failing",
59 ProblemType::DatabaseConnection => "Database connection pool exhausted",
60 }
61 }
62
63 fn log_pattern(&self) -> &str {
64 match self {
65 ProblemType::MemoryLeak => "OutOfMemoryError",
66 ProblemType::CpuSpike => "High CPU utilization",
67 ProblemType::DiskFull => "No space left on device",
68 ProblemType::NetworkTimeout => "Connection timed out",
69 ProblemType::DatabaseConnection => "Connection pool exhausted",
70 }
71 }
72
73 fn metric_anomaly(&self) -> &str {
74 match self {
75 ProblemType::MemoryLeak => "memory_usage: 95%",
76 ProblemType::CpuSpike => "cpu_usage: 98%",
77 ProblemType::DiskFull => "disk_usage: 100%",
78 ProblemType::NetworkTimeout => "latency_p99: 30000ms",
79 ProblemType::DatabaseConnection => "db_connections: 100/100",
80 }
81 }
82
83 fn solution(&self) -> &str {
84 match self {
85 ProblemType::MemoryLeak => "restart",
86 ProblemType::CpuSpike => "restart",
87 ProblemType::DiskFull => "cleanup",
88 ProblemType::NetworkTimeout => "restart",
89 ProblemType::DatabaseConnection => "restart",
90 }
91 }
92}
93
94#[derive(Debug, Clone)]
96pub struct Service {
97 name: String,
98 status: ServiceStatus,
99 problem: Option<ProblemType>,
100 logs: Vec<String>,
101 metrics: HashMap<String, String>,
102}
103
104impl Service {
105 fn new(name: impl Into<String>) -> Self {
106 Self {
107 name: name.into(),
108 status: ServiceStatus::Running,
109 problem: None,
110 logs: Vec::new(),
111 metrics: HashMap::new(),
112 }
113 }
114
115 fn with_problem(mut self, problem: ProblemType, status: ServiceStatus) -> Self {
116 self.logs.push(format!(
118 "[ERROR] {} - {}",
119 chrono_like_timestamp(),
120 problem.log_pattern()
121 ));
122 self.logs.push(format!(
123 "[WARN] {} - Service degradation detected",
124 chrono_like_timestamp()
125 ));
126 self.logs.push(format!(
127 "[ERROR] {} - {}",
128 chrono_like_timestamp(),
129 problem.log_pattern()
130 ));
131
132 self.metrics
134 .insert("status".into(), format!("{:?}", status));
135 let (key, value) = problem.metric_anomaly().split_once(": ").unwrap();
136 self.metrics.insert(key.into(), value.into());
137
138 self.status = status;
139 self.problem = Some(problem);
140 self
141 }
142}
143
144fn chrono_like_timestamp() -> String {
145 "2024-01-15T10:30:45Z".to_string()
146}
147
148pub struct TroubleshootingEnvironment {
154 services: HashMap<String, Service>,
156 target_service: String,
158 state: RwLock<TroubleshootingState>,
160}
161
162#[derive(Debug, Default)]
163struct TroubleshootingState {
164 checked_status: bool,
166 read_logs: bool,
168 analyzed_metrics: bool,
170 diagnosis: Option<String>,
172 completed: Vec<WorkerId>,
174}
175
176impl TroubleshootingEnvironment {
177 pub fn new(services: HashMap<String, Service>, target_service: impl Into<String>) -> Self {
179 Self {
180 services,
181 target_service: target_service.into(),
182 state: RwLock::new(TroubleshootingState::default()),
183 }
184 }
185
186 pub fn memory_leak_scenario() -> Self {
188 let mut services = HashMap::new();
189
190 services.insert("api-gateway".into(), Service::new("api-gateway"));
192 services.insert("database".into(), Service::new("database"));
193
194 services.insert(
196 "user-service".into(),
197 Service::new("user-service")
198 .with_problem(ProblemType::MemoryLeak, ServiceStatus::Degraded),
199 );
200
201 services.insert(
203 "notification-service".into(),
204 Service::new("notification-service"),
205 );
206
207 Self::new(services, "user-service")
208 }
209
210 pub fn cpu_spike_scenario() -> Self {
212 let mut services = HashMap::new();
213
214 services.insert("frontend".into(), Service::new("frontend"));
215 services.insert("cache".into(), Service::new("cache"));
216 services.insert(
217 "payment-service".into(),
218 Service::new("payment-service")
219 .with_problem(ProblemType::CpuSpike, ServiceStatus::Down),
220 );
221 services.insert("auth-service".into(), Service::new("auth-service"));
222
223 Self::new(services, "payment-service")
224 }
225
226 pub fn network_timeout_scenario() -> Self {
228 let mut services = HashMap::new();
229
230 services.insert("load-balancer".into(), Service::new("load-balancer"));
231 services.insert(
232 "order-service".into(),
233 Service::new("order-service")
234 .with_problem(ProblemType::NetworkTimeout, ServiceStatus::Degraded),
235 );
236 services.insert(
237 "inventory-service".into(),
238 Service::new("inventory-service"),
239 );
240
241 Self::new(services, "order-service")
242 }
243
244 pub fn complex_scenario(
260 total_services: usize,
261 noise_services: usize,
262 cascade_depth: usize,
263 seed: u64,
264 ) -> Self {
265 use std::collections::HashSet;
266
267 let total = total_services.clamp(2, 50);
269 let noise = noise_services.min(total.saturating_sub(cascade_depth + 1));
270 let depth = cascade_depth
271 .clamp(1, 5)
272 .min(total.saturating_sub(noise + 1));
273
274 let mut rng_state = seed;
276 let mut next_rand = || {
277 rng_state = rng_state.wrapping_mul(6364136223846793005).wrapping_add(1);
278 rng_state
279 };
280
281 let service_names: Vec<&str> = vec![
283 "api-gateway",
284 "user-service",
285 "auth-service",
286 "payment-service",
287 "order-service",
288 "inventory-service",
289 "notification-service",
290 "search-service",
291 "recommendation-service",
292 "analytics-service",
293 "logging-service",
294 "monitoring-service",
295 "cache-service",
296 "database-primary",
297 "database-replica",
298 "message-queue",
299 "scheduler-service",
300 "worker-service",
301 "cdn-service",
302 "load-balancer",
303 "rate-limiter",
304 "circuit-breaker",
305 "config-service",
306 "discovery-service",
307 "gateway-internal",
308 "billing-service",
309 "subscription-service",
310 "webhook-service",
311 "export-service",
312 "import-service",
313 "backup-service",
314 "audit-service",
315 "compliance-service",
316 "security-service",
317 "identity-service",
318 "permission-service",
319 "session-service",
320 "storage-service",
321 "media-service",
322 "thumbnail-service",
323 "email-service",
324 "sms-service",
325 "push-service",
326 "report-service",
327 "dashboard-service",
328 "admin-service",
329 "support-service",
330 "feedback-service",
331 "survey-service",
332 "ml-service",
333 "prediction-service",
334 ];
335
336 let mut used_indices: HashSet<usize> = HashSet::new();
338 used_indices.insert(1);
340 let mut pick_service = |rng: &mut dyn FnMut() -> u64| -> String {
341 loop {
342 let idx = (rng() as usize) % service_names.len();
343 if !used_indices.contains(&idx) {
344 used_indices.insert(idx);
345 return service_names[idx].to_string();
346 }
347 if used_indices.len() >= service_names.len() {
348 let n = used_indices.len();
350 used_indices.insert(n + 1000);
351 return format!("service-{}", n);
352 }
353 }
354 };
355
356 let mut services = HashMap::new();
357
358 let problem_types = [
360 ProblemType::MemoryLeak,
361 ProblemType::CpuSpike,
362 ProblemType::NetworkTimeout,
363 ProblemType::DatabaseConnection,
364 ];
365 let root_problem = &problem_types[(next_rand() as usize) % problem_types.len()];
366
367 let root_service_name = "user-service".to_string();
370 services.insert(
371 root_service_name.clone(),
372 Service::new(&root_service_name)
373 .with_problem(root_problem.clone(), ServiceStatus::Down),
374 );
375
376 let mut cascade_services = Vec::new();
378 for i in 1..depth {
379 let name = pick_service(&mut next_rand);
380 let cascade_problem = &problem_types[(next_rand() as usize) % problem_types.len()];
382 let mut service = Service::new(&name);
383
384 service.logs.push(format!(
386 "[ERROR] {} - Connection to {} failed",
387 chrono_like_timestamp(),
388 root_service_name
389 ));
390 service.logs.push(format!(
391 "[WARN] {} - Degraded due to upstream dependency",
392 chrono_like_timestamp()
393 ));
394 service.logs.push(format!(
395 "[ERROR] {} - {}",
396 chrono_like_timestamp(),
397 cascade_problem.log_pattern()
398 ));
399
400 service.metrics.insert("status".into(), "Degraded".into());
402 service
403 .metrics
404 .insert("upstream_errors".into(), format!("{}", 50 + i * 10));
405 let (key, value) = cascade_problem.metric_anomaly().split_once(": ").unwrap();
406 service.metrics.insert(key.into(), value.into());
407
408 service.status = ServiceStatus::Degraded;
409 cascade_services.push(name.clone());
413 services.insert(name, service);
414 }
415
416 for _ in 0..noise {
418 let name = pick_service(&mut next_rand);
419 let mut service = Service::new(&name);
420
421 service.logs.push(format!(
423 "[WARN] {} - High latency detected (within threshold)",
424 chrono_like_timestamp()
425 ));
426 service.logs.push(format!(
427 "[INFO] {} - Garbage collection completed",
428 chrono_like_timestamp()
429 ));
430 service.logs.push(format!(
431 "[WARN] {} - Connection pool at 70% capacity",
432 chrono_like_timestamp()
433 ));
434
435 service
437 .metrics
438 .insert("cpu_usage".into(), format!("{}%", 40 + (next_rand() % 20)));
439 service.metrics.insert(
440 "memory_usage".into(),
441 format!("{}%", 50 + (next_rand() % 25)),
442 );
443
444 services.insert(name, service);
445 }
446
447 let remaining = total.saturating_sub(1 + depth.saturating_sub(1) + noise);
449 for _ in 0..remaining {
450 let name = pick_service(&mut next_rand);
451 services.insert(name.clone(), Service::new(&name));
452 }
453
454 Self::new(services, root_service_name)
455 }
456
457 pub fn medium_complexity_scenario() -> Self {
459 Self::complex_scenario(15, 3, 2, 12345)
460 }
461
462 pub fn high_complexity_scenario() -> Self {
464 Self::complex_scenario(30, 8, 3, 67890)
465 }
466
467 pub fn extreme_complexity_scenario() -> Self {
469 Self::complex_scenario(50, 15, 4, 11111)
470 }
471
472 fn handle_check_status(&self, _worker_id: WorkerId, action: &Action) -> WorkResult {
477 let resolver = ParamResolver::new(action);
478 let service_name = resolver.get("service");
479
480 let mut state = self.state.write().unwrap();
481
482 match service_name {
483 Some(name) => {
484 if let Some(service) = self.services.get(name) {
486 state.checked_status = true;
487 let status_str = match service.status {
488 ServiceStatus::Running => "RUNNING",
489 ServiceStatus::Degraded => "DEGRADED",
490 ServiceStatus::Down => "DOWN",
491 };
492 WorkResult::env_success(format!(
493 "Service '{}': {}\nHealth check: {}",
494 service.name,
495 status_str,
496 if service.problem.is_some() {
497 "UNHEALTHY"
498 } else {
499 "HEALTHY"
500 }
501 ))
502 } else {
503 state.checked_status = true;
505 let mut output = format!(
506 "Service '{}' not found. Showing all services:\n=== Service Status ===\n",
507 name
508 );
509 for (svc_name, service) in &self.services {
510 let status_str = match service.status {
511 ServiceStatus::Running => "RUNNING",
512 ServiceStatus::Degraded => "DEGRADED",
513 ServiceStatus::Down => "DOWN",
514 };
515 let health = if service.problem.is_some() {
516 "UNHEALTHY"
517 } else {
518 "HEALTHY"
519 };
520 output.push_str(&format!("{}: {} ({})\n", svc_name, status_str, health));
521 }
522 WorkResult::env_success(output)
523 }
524 }
525 None => {
526 state.checked_status = true;
528 let mut output = String::from("=== Service Status ===\n");
529 for (name, service) in &self.services {
530 let status_str = match service.status {
531 ServiceStatus::Running => "RUNNING",
532 ServiceStatus::Degraded => "DEGRADED",
533 ServiceStatus::Down => "DOWN",
534 };
535 let health = if service.problem.is_some() {
536 "UNHEALTHY"
537 } else {
538 "HEALTHY"
539 };
540 output.push_str(&format!("{}: {} ({})\n", name, status_str, health));
541 }
542 WorkResult::env_success(output)
543 }
544 }
545 }
546
547 fn handle_read_logs(&self, _worker_id: WorkerId, action: &Action) -> WorkResult {
548 let resolver = ParamResolver::new(action);
549 let service_name = match resolver.require("service") {
550 Ok(s) => s,
551 Err(e) => return WorkResult::env_failure(format!("ReadLogs: {}", e)),
552 };
553
554 let mut state = self.state.write().unwrap();
555
556 if let Some(service) = self.services.get(service_name) {
557 state.read_logs = true;
558
559 if service.logs.is_empty() {
560 WorkResult::env_success(format!(
561 "=== Logs for '{}' ===\n(no recent logs)",
562 service_name
563 ))
564 } else {
565 let logs_str = service.logs.join("\n");
566 WorkResult::env_success(format!(
567 "=== Logs for '{}' ===\n{}",
568 service_name, logs_str
569 ))
570 }
571 } else {
572 WorkResult::env_failure(format!("Service '{}' not found", service_name))
573 }
574 }
575
576 fn handle_analyze_metrics(&self, _worker_id: WorkerId, action: &Action) -> WorkResult {
577 let resolver = ParamResolver::new(action);
578 let service_name = match resolver.require("service") {
579 Ok(s) => s,
580 Err(e) => return WorkResult::env_failure(format!("AnalyzeMetrics: {}", e)),
581 };
582
583 let mut state = self.state.write().unwrap();
584
585 if let Some(service) = self.services.get(service_name) {
586 state.analyzed_metrics = true;
587
588 if service.metrics.is_empty() {
589 WorkResult::env_success(format!(
590 "=== Metrics for '{}' ===\ncpu_usage: 15%\nmemory_usage: 45%\nlatency_p99: 120ms\n(all normal)",
591 service_name
592 ))
593 } else {
594 let metrics_str: String = service
595 .metrics
596 .iter()
597 .map(|(k, v)| format!("{}: {}", k, v))
598 .collect::<Vec<_>>()
599 .join("\n");
600 WorkResult::env_success(format!(
601 "=== Metrics for '{}' ===\n{}\n(ANOMALY DETECTED)",
602 service_name, metrics_str
603 ))
604 }
605 } else {
606 WorkResult::env_failure(format!("Service '{}' not found", service_name))
607 }
608 }
609
610 fn handle_diagnose(&self, _worker_id: WorkerId, action: &Action) -> WorkResult {
611 let resolver = ParamResolver::new(action);
612 let service_name = match resolver.require("service") {
613 Ok(s) => s,
614 Err(e) => return WorkResult::env_failure(format!("Diagnose: {}", e)),
615 };
616
617 let mut state = self.state.write().unwrap();
618
619 if !state.checked_status {
621 return WorkResult::env_failure(
622 "Cannot diagnose without checking status first. Run CheckStatus first.",
623 );
624 }
625
626 if !state.read_logs && !state.analyzed_metrics {
628 return WorkResult::env_failure(
629 "Cannot diagnose without data. Run ReadLogs or AnalyzeMetrics first.",
630 );
631 }
632
633 if let Some(service) = self.services.get(service_name) {
634 if let Some(ref problem) = service.problem {
635 let diagnosis = format!(
636 "=== Diagnosis for '{}' ===\nProblem identified: {}\nRecommended action: {}",
637 service_name,
638 problem.description(),
639 problem.solution()
640 );
641 state.diagnosis = Some(problem.solution().to_string());
642 WorkResult::env_success(diagnosis)
643 } else {
644 state.diagnosis = Some("no_issue".to_string());
645 WorkResult::env_success(format!(
646 "=== Diagnosis for '{}' ===\nNo issues found. Service is healthy.",
647 service_name
648 ))
649 }
650 } else {
651 WorkResult::env_failure(format!("Service '{}' not found", service_name))
652 }
653 }
654
655 fn handle_restart(&self, worker_id: WorkerId, action: &Action) -> WorkResult {
656 let resolver = ParamResolver::new(action);
657 let service_name = match resolver.require("service") {
658 Ok(s) => s,
659 Err(e) => return WorkResult::env_failure(format!("Restart: {}", e)),
660 };
661
662 let mut state = self.state.write().unwrap();
663
664 if state.diagnosis.is_none() {
666 return WorkResult::env_failure(
667 "Cannot restart without diagnosis. Run Diagnose first.",
668 );
669 }
670
671 if service_name != self.target_service {
673 return WorkResult::env_failure(format!(
674 "Restarted wrong service '{}'. The problematic service is different.",
675 service_name
676 ));
677 }
678
679 if !state.completed.contains(&worker_id) {
681 state.completed.push(worker_id);
682 }
683
684 WorkResult::done_success(format!(
685 "=== Service '{}' Restarted ===\nStatus: RUNNING\nHealth: HEALTHY\n\nIncident resolved successfully!",
686 service_name
687 ))
688 }
689}
690
691impl Environment for TroubleshootingEnvironment {
692 fn step(&self, worker_id: WorkerId, action: &Action) -> WorkResult {
693 match action.name.to_lowercase().as_str() {
694 "checkstatus" | "check_status" | "status" => {
695 self.handle_check_status(worker_id, action)
696 }
697 "readlogs" | "read_logs" | "logs" => self.handle_read_logs(worker_id, action),
698 "analyzemetrics" | "analyze_metrics" | "metrics" => {
699 self.handle_analyze_metrics(worker_id, action)
700 }
701 "diagnose" | "diagnosis" => self.handle_diagnose(worker_id, action),
702 "restart" | "reboot" => self.handle_restart(worker_id, action),
703 "continue" => WorkResult::env_success("Continuing..."),
704 _ => WorkResult::unsupported(&action.name),
705 }
706 }
707
708 fn reset(&self) {
709 let mut state = self.state.write().unwrap();
710 state.checked_status = false;
711 state.read_logs = false;
712 state.analyzed_metrics = false;
713 state.diagnosis = None;
714 state.completed.clear();
715 }
716
717 fn name(&self) -> &str {
718 "TroubleshootingEnvironment"
719 }
720}
721
722#[cfg(test)]
727mod tests {
728 use super::*;
729
730 fn is_success(result: &WorkResult) -> bool {
731 match result {
732 WorkResult::Acted { action_result, .. } => action_result.success,
733 WorkResult::Done { success, .. } => *success,
734 _ => false,
735 }
736 }
737
738 fn is_done(result: &WorkResult) -> bool {
739 matches!(result, WorkResult::Done { .. })
740 }
741
742 fn action(name: &str, target: Option<&str>) -> Action {
743 Action {
744 name: name.into(),
745 params: swarm_engine_core::types::ActionParams {
746 target: target.map(|s| s.into()),
747 args: HashMap::new(),
748 data: vec![],
749 },
750 }
751 }
752
753 #[test]
754 fn test_check_status_all() {
755 let env = TroubleshootingEnvironment::memory_leak_scenario();
756 let worker = WorkerId(0);
757
758 let result = env.step(worker, &action("CheckStatus", None));
759 assert!(is_success(&result));
760 }
761
762 #[test]
763 fn test_check_status_specific() {
764 let env = TroubleshootingEnvironment::memory_leak_scenario();
765 let worker = WorkerId(0);
766
767 let result = env.step(worker, &action("CheckStatus", Some("user-service")));
768 assert!(is_success(&result));
769 }
770
771 #[test]
772 fn test_read_logs() {
773 let env = TroubleshootingEnvironment::memory_leak_scenario();
774 let worker = WorkerId(0);
775
776 env.step(worker, &action("CheckStatus", None));
778
779 let result = env.step(worker, &action("ReadLogs", Some("user-service")));
781 assert!(is_success(&result));
782 }
783
784 #[test]
785 fn test_diagnose_requires_prerequisites() {
786 let env = TroubleshootingEnvironment::memory_leak_scenario();
787 let worker = WorkerId(0);
788
789 let result = env.step(worker, &action("Diagnose", Some("user-service")));
791 assert!(!is_success(&result));
792 }
793
794 #[test]
795 fn test_full_troubleshooting_flow() {
796 let env = TroubleshootingEnvironment::memory_leak_scenario();
797 let worker = WorkerId(0);
798
799 let result = env.step(worker, &action("CheckStatus", None));
801 assert!(is_success(&result));
802 assert!(!is_done(&result));
803
804 let result = env.step(worker, &action("ReadLogs", Some("user-service")));
806 assert!(is_success(&result));
807 assert!(!is_done(&result));
808
809 let result = env.step(worker, &action("Diagnose", Some("user-service")));
811 assert!(is_success(&result));
812 assert!(!is_done(&result));
813
814 let result = env.step(worker, &action("Restart", Some("user-service")));
816 assert!(is_success(&result));
817 assert!(is_done(&result));
818 }
819
820 #[test]
821 fn test_restart_wrong_service_fails() {
822 let env = TroubleshootingEnvironment::memory_leak_scenario();
823 let worker = WorkerId(0);
824
825 env.step(worker, &action("CheckStatus", None));
827 env.step(worker, &action("ReadLogs", Some("user-service")));
828 env.step(worker, &action("Diagnose", Some("user-service")));
829
830 let result = env.step(worker, &action("Restart", Some("api-gateway")));
832 assert!(!is_success(&result));
833 assert!(!is_done(&result));
834 }
835}