Skip to main content

rush_sync_server/commands/recovery/
command.rs

1use crate::commands::command::Command;
2use crate::core::prelude::*;
3use crate::server::types::{ServerContext, ServerStatus};
4use crate::server::utils::port::is_port_available;
5
6#[derive(Debug, Default)]
7pub struct RecoveryCommand;
8
9impl RecoveryCommand {
10    pub fn new() -> Self {
11        Self
12    }
13}
14
15impl Command for RecoveryCommand {
16    fn name(&self) -> &'static str {
17        "recover"
18    }
19
20    fn description(&self) -> &'static str {
21        "Recover and fix server status inconsistencies"
22    }
23
24    fn matches(&self, command: &str) -> bool {
25        let cmd = command.trim().to_lowercase();
26        cmd.starts_with("recover") || cmd.starts_with("fix") || cmd == "status-fix"
27    }
28
29    fn execute_sync(&self, args: &[&str]) -> Result<String> {
30        let ctx = crate::server::shared::get_shared_context();
31
32        match args.first() {
33            Some(&"all") => Ok(self.recover_all_servers(ctx)),
34            Some(&server_id) => Ok(self.recover_single_server(ctx, server_id)),
35            None => Ok(self.auto_recover(ctx)),
36        }
37    }
38
39    fn priority(&self) -> u8 {
40        80
41    }
42}
43
44impl RecoveryCommand {
45    /// Analyzes and repairs all inconsistent servers
46    fn auto_recover(&self, ctx: &ServerContext) -> String {
47        let mut fixes = Vec::new();
48        let registry = crate::server::shared::get_persistent_registry();
49
50        // 1. Handle-status synchronization
51        let (orphaned_handles, missing_handles) = {
52            let servers = match ctx.servers.read() {
53                Ok(s) => s,
54                Err(e) => {
55                    log::error!("servers lock poisoned: {}", e);
56                    return "Error: lock poisoned".to_string();
57                }
58            };
59            let handles = match ctx.handles.read() {
60                Ok(h) => h,
61                Err(e) => {
62                    log::error!("handles lock poisoned: {}", e);
63                    return "Error: lock poisoned".to_string();
64                }
65            };
66
67            // Handles without corresponding servers (orphaned)
68            let orphaned: Vec<String> = handles
69                .keys()
70                .filter(|id| !servers.contains_key(*id))
71                .cloned()
72                .collect();
73
74            // Running servers without a handle (missing)
75            let missing: Vec<String> = servers
76                .iter()
77                .filter_map(|(id, server)| {
78                    if server.status == ServerStatus::Running && !handles.contains_key(id) {
79                        Some(id.clone())
80                    } else {
81                        None
82                    }
83                })
84                .collect();
85
86            (orphaned, missing)
87        };
88
89        // 2. Port status validation
90        let port_fixes = self.validate_and_fix_ports(ctx);
91        fixes.extend(port_fixes);
92
93        // 3. Clean up orphaned handles
94        if !orphaned_handles.is_empty() {
95            let count = orphaned_handles.len();
96            for handle_id in orphaned_handles {
97                let mut handles = match ctx.handles.write() {
98                    Ok(h) => h,
99                    Err(e) => {
100                        log::error!("handles lock poisoned: {}", e);
101                        continue;
102                    }
103                };
104                if let Some(handle) = handles.remove(&handle_id) {
105                    // Stop handle gracefully
106                    tokio::spawn(async move {
107                        let _ = handle.stop(true).await;
108                    });
109                }
110            }
111            fixes.push(format!("{} orphaned handles cleaned", count));
112        }
113
114        // 4. Repair missing handles
115        if !missing_handles.is_empty() {
116            for server_id in &missing_handles {
117                self.fix_missing_handle(ctx, server_id);
118            }
119            fixes.push(format!("{} missing handles fixed", missing_handles.len()));
120        }
121
122        // 5. Synchronize persistence
123        tokio::spawn(async move {
124            if let Ok(persistent_servers) = registry.load_servers().await {
125                let _ = registry.save_servers(&persistent_servers).await;
126            }
127        });
128
129        if fixes.is_empty() {
130            "All servers are in consistent state".to_string()
131        } else {
132            format!("Recovery completed:\n{}", fixes.join("\n"))
133        }
134    }
135
136    fn recover_single_server(&self, ctx: &ServerContext, identifier: &str) -> String {
137        let servers = match ctx.servers.read() {
138            Ok(s) => s,
139            Err(e) => {
140                log::error!("servers lock poisoned: {}", e);
141                return "Error: lock poisoned".to_string();
142            }
143        };
144
145        // Find server
146        let server_info = match servers
147            .values()
148            .find(|s| s.id.starts_with(identifier) || s.name == identifier)
149        {
150            Some(server) => server.clone(),
151            None => return format!("Server '{}' not found", identifier),
152        };
153
154        drop(servers); // Release lock
155
156        let fixes = self.diagnose_and_fix_server(ctx, &server_info);
157
158        if fixes.is_empty() {
159            format!(
160                "Server '{}' is already in consistent state",
161                server_info.name
162            )
163        } else {
164            format!("Fixed server '{}':\n{}", server_info.name, fixes.join("\n"))
165        }
166    }
167
168    fn recover_all_servers(&self, ctx: &ServerContext) -> String {
169        let mut total_fixes = Vec::new();
170        let servers: Vec<_> = {
171            let servers = match ctx.servers.read() {
172                Ok(s) => s,
173                Err(e) => {
174                    log::error!("servers lock poisoned: {}", e);
175                    return "Error: lock poisoned".to_string();
176                }
177            };
178            servers.values().cloned().collect()
179        };
180
181        for server_info in servers {
182            let fixes = self.diagnose_and_fix_server(ctx, &server_info);
183            if !fixes.is_empty() {
184                total_fixes.push(format!(
185                    "Server '{}': {}",
186                    server_info.name,
187                    fixes.join(", ")
188                ));
189            }
190        }
191
192        if total_fixes.is_empty() {
193            "All servers are in consistent state".to_string()
194        } else {
195            format!("Recovery results:\n{}", total_fixes.join("\n"))
196        }
197    }
198
199    fn diagnose_and_fix_server(
200        &self,
201        ctx: &ServerContext,
202        server_info: &crate::server::types::ServerInfo,
203    ) -> Vec<String> {
204        let mut fixes = Vec::new();
205
206        let has_handle = {
207            let handles = match ctx.handles.read() {
208                Ok(h) => h,
209                Err(_) => return fixes,
210            };
211            handles.contains_key(&server_info.id)
212        };
213
214        // Check port status (use 127.0.0.1 as conservative check)
215        let port_available = is_port_available(server_info.port, "127.0.0.1");
216
217        match (server_info.status, has_handle, port_available) {
218            // Inconsistency: server should be running but has no handle
219            (ServerStatus::Running, false, _) => {
220                if port_available {
221                    // Port free but status Running -> correct to Stopped
222                    self.update_server_status(ctx, &server_info.id, ServerStatus::Stopped);
223                    fixes.push("Status: Running → Stopped (no handle, port free)".to_string());
224                } else {
225                    // Port occupied but handle missing -> mark as Failed
226                    self.update_server_status(ctx, &server_info.id, ServerStatus::Failed);
227                    fixes.push("Status: Running → Failed (no handle, port occupied)".to_string());
228                }
229            }
230
231            // Inconsistency: server has handle but status is not Running
232            (status, true, _) if status != ServerStatus::Running => {
233                self.update_server_status(ctx, &server_info.id, ServerStatus::Running);
234                fixes.push(format!("Status: {} → Running (handle exists)", status));
235            }
236
237            // Inconsistency: server Failed but port is free
238            (ServerStatus::Failed, false, true) => {
239                self.update_server_status(ctx, &server_info.id, ServerStatus::Stopped);
240                fixes.push("Status: Failed → Stopped (port now free)".to_string());
241            }
242
243            _ => {
244                // Server is consistent
245            }
246        }
247
248        fixes
249    }
250
251    fn validate_and_fix_ports(&self, ctx: &ServerContext) -> Vec<String> {
252        let mut fixes = Vec::new();
253        let servers: Vec<_> = {
254            let servers = match ctx.servers.read() {
255                Ok(s) => s,
256                Err(_) => return fixes,
257            };
258            servers.values().cloned().collect()
259        };
260
261        for server_info in servers {
262            let port_available = is_port_available(server_info.port, "127.0.0.1");
263            let has_handle = {
264                let handles = match ctx.handles.read() {
265                    Ok(h) => h,
266                    Err(_) => continue,
267                };
268                handles.contains_key(&server_info.id)
269            };
270
271            // Port fix decision matrix
272            match (server_info.status, has_handle, port_available) {
273                (ServerStatus::Running, true, false) => {
274                    // OK: server running, handle present, port occupied
275                }
276                (ServerStatus::Running, false, false) => {
277                    // Server should be running but no handle and port is occupied
278                    self.update_server_status(ctx, &server_info.id, ServerStatus::Failed);
279                    fixes.push(format!(
280                        "Fixed '{}': Running → Failed (orphaned)",
281                        server_info.name
282                    ));
283                }
284                (ServerStatus::Running, false, true) => {
285                    // Server should be running but no handle and port is free
286                    self.update_server_status(ctx, &server_info.id, ServerStatus::Stopped);
287                    fixes.push(format!(
288                        "Fixed '{}': Running → Stopped (no handle)",
289                        server_info.name
290                    ));
291                }
292                _ => {}
293            }
294        }
295
296        fixes
297    }
298
299    fn fix_missing_handle(&self, ctx: &ServerContext, server_id: &str) {
300        // Set server to Stopped since we have no handle
301        self.update_server_status(ctx, server_id, ServerStatus::Stopped);
302
303        // Async persistence update
304        let server_id_clone = server_id.to_string();
305        tokio::spawn(async move {
306            crate::server::shared::persist_server_update(&server_id_clone, ServerStatus::Stopped)
307                .await;
308        });
309    }
310
311    fn update_server_status(&self, ctx: &ServerContext, server_id: &str, status: ServerStatus) {
312        if let Ok(mut servers) = ctx.servers.write() {
313            if let Some(server) = servers.get_mut(server_id) {
314                server.status = status;
315            }
316        }
317    }
318}