rush_sync_server/commands/recovery/
command.rs1use crate::commands::command::Command;
2use crate::core::prelude::*;
3use crate::server::types::{ServerContext, ServerStatus};
4use crate::server::utils::port::is_port_available;
5
6#[derive(Debug, Default)]
7pub struct RecoveryCommand;
8
9impl RecoveryCommand {
10 pub fn new() -> Self {
11 Self
12 }
13}
14
15impl Command for RecoveryCommand {
16 fn name(&self) -> &'static str {
17 "recover"
18 }
19
20 fn description(&self) -> &'static str {
21 "Recover and fix server status inconsistencies"
22 }
23
24 fn matches(&self, command: &str) -> bool {
25 let cmd = command.trim().to_lowercase();
26 cmd.starts_with("recover") || cmd.starts_with("fix") || cmd == "status-fix"
27 }
28
29 fn execute_sync(&self, args: &[&str]) -> Result<String> {
30 let ctx = crate::server::shared::get_shared_context();
31
32 match args.first() {
33 Some(&"all") => Ok(self.recover_all_servers(ctx)),
34 Some(&server_id) => Ok(self.recover_single_server(ctx, server_id)),
35 None => Ok(self.auto_recover(ctx)),
36 }
37 }
38
39 fn priority(&self) -> u8 {
40 80
41 }
42}
43
44impl RecoveryCommand {
45 fn auto_recover(&self, ctx: &ServerContext) -> String {
47 let mut fixes = Vec::new();
48 let registry = crate::server::shared::get_persistent_registry();
49
50 let (orphaned_handles, missing_handles) = {
52 let servers = match ctx.servers.read() {
53 Ok(s) => s,
54 Err(e) => {
55 log::error!("servers lock poisoned: {}", e);
56 return "Error: lock poisoned".to_string();
57 }
58 };
59 let handles = match ctx.handles.read() {
60 Ok(h) => h,
61 Err(e) => {
62 log::error!("handles lock poisoned: {}", e);
63 return "Error: lock poisoned".to_string();
64 }
65 };
66
67 let orphaned: Vec<String> = handles
69 .keys()
70 .filter(|id| !servers.contains_key(*id))
71 .cloned()
72 .collect();
73
74 let missing: Vec<String> = servers
76 .iter()
77 .filter_map(|(id, server)| {
78 if server.status == ServerStatus::Running && !handles.contains_key(id) {
79 Some(id.clone())
80 } else {
81 None
82 }
83 })
84 .collect();
85
86 (orphaned, missing)
87 };
88
89 let port_fixes = self.validate_and_fix_ports(ctx);
91 fixes.extend(port_fixes);
92
93 if !orphaned_handles.is_empty() {
95 let count = orphaned_handles.len();
96 for handle_id in orphaned_handles {
97 let mut handles = match ctx.handles.write() {
98 Ok(h) => h,
99 Err(e) => {
100 log::error!("handles lock poisoned: {}", e);
101 continue;
102 }
103 };
104 if let Some(handle) = handles.remove(&handle_id) {
105 tokio::spawn(async move {
107 let _ = handle.stop(true).await;
108 });
109 }
110 }
111 fixes.push(format!("{} orphaned handles cleaned", count));
112 }
113
114 if !missing_handles.is_empty() {
116 for server_id in &missing_handles {
117 self.fix_missing_handle(ctx, server_id);
118 }
119 fixes.push(format!("{} missing handles fixed", missing_handles.len()));
120 }
121
122 tokio::spawn(async move {
124 if let Ok(persistent_servers) = registry.load_servers().await {
125 let _ = registry.save_servers(&persistent_servers).await;
126 }
127 });
128
129 if fixes.is_empty() {
130 "All servers are in consistent state".to_string()
131 } else {
132 format!("Recovery completed:\n{}", fixes.join("\n"))
133 }
134 }
135
136 fn recover_single_server(&self, ctx: &ServerContext, identifier: &str) -> String {
137 let servers = match ctx.servers.read() {
138 Ok(s) => s,
139 Err(e) => {
140 log::error!("servers lock poisoned: {}", e);
141 return "Error: lock poisoned".to_string();
142 }
143 };
144
145 let server_info = match servers
147 .values()
148 .find(|s| s.id.starts_with(identifier) || s.name == identifier)
149 {
150 Some(server) => server.clone(),
151 None => return format!("Server '{}' not found", identifier),
152 };
153
154 drop(servers); let fixes = self.diagnose_and_fix_server(ctx, &server_info);
157
158 if fixes.is_empty() {
159 format!(
160 "Server '{}' is already in consistent state",
161 server_info.name
162 )
163 } else {
164 format!("Fixed server '{}':\n{}", server_info.name, fixes.join("\n"))
165 }
166 }
167
168 fn recover_all_servers(&self, ctx: &ServerContext) -> String {
169 let mut total_fixes = Vec::new();
170 let servers: Vec<_> = {
171 let servers = match ctx.servers.read() {
172 Ok(s) => s,
173 Err(e) => {
174 log::error!("servers lock poisoned: {}", e);
175 return "Error: lock poisoned".to_string();
176 }
177 };
178 servers.values().cloned().collect()
179 };
180
181 for server_info in servers {
182 let fixes = self.diagnose_and_fix_server(ctx, &server_info);
183 if !fixes.is_empty() {
184 total_fixes.push(format!(
185 "Server '{}': {}",
186 server_info.name,
187 fixes.join(", ")
188 ));
189 }
190 }
191
192 if total_fixes.is_empty() {
193 "All servers are in consistent state".to_string()
194 } else {
195 format!("Recovery results:\n{}", total_fixes.join("\n"))
196 }
197 }
198
199 fn diagnose_and_fix_server(
200 &self,
201 ctx: &ServerContext,
202 server_info: &crate::server::types::ServerInfo,
203 ) -> Vec<String> {
204 let mut fixes = Vec::new();
205
206 let has_handle = {
207 let handles = match ctx.handles.read() {
208 Ok(h) => h,
209 Err(_) => return fixes,
210 };
211 handles.contains_key(&server_info.id)
212 };
213
214 let port_available = is_port_available(server_info.port, "127.0.0.1");
216
217 match (server_info.status, has_handle, port_available) {
218 (ServerStatus::Running, false, _) => {
220 if port_available {
221 self.update_server_status(ctx, &server_info.id, ServerStatus::Stopped);
223 fixes.push("Status: Running → Stopped (no handle, port free)".to_string());
224 } else {
225 self.update_server_status(ctx, &server_info.id, ServerStatus::Failed);
227 fixes.push("Status: Running → Failed (no handle, port occupied)".to_string());
228 }
229 }
230
231 (status, true, _) if status != ServerStatus::Running => {
233 self.update_server_status(ctx, &server_info.id, ServerStatus::Running);
234 fixes.push(format!("Status: {} → Running (handle exists)", status));
235 }
236
237 (ServerStatus::Failed, false, true) => {
239 self.update_server_status(ctx, &server_info.id, ServerStatus::Stopped);
240 fixes.push("Status: Failed → Stopped (port now free)".to_string());
241 }
242
243 _ => {
244 }
246 }
247
248 fixes
249 }
250
251 fn validate_and_fix_ports(&self, ctx: &ServerContext) -> Vec<String> {
252 let mut fixes = Vec::new();
253 let servers: Vec<_> = {
254 let servers = match ctx.servers.read() {
255 Ok(s) => s,
256 Err(_) => return fixes,
257 };
258 servers.values().cloned().collect()
259 };
260
261 for server_info in servers {
262 let port_available = is_port_available(server_info.port, "127.0.0.1");
263 let has_handle = {
264 let handles = match ctx.handles.read() {
265 Ok(h) => h,
266 Err(_) => continue,
267 };
268 handles.contains_key(&server_info.id)
269 };
270
271 match (server_info.status, has_handle, port_available) {
273 (ServerStatus::Running, true, false) => {
274 }
276 (ServerStatus::Running, false, false) => {
277 self.update_server_status(ctx, &server_info.id, ServerStatus::Failed);
279 fixes.push(format!(
280 "Fixed '{}': Running → Failed (orphaned)",
281 server_info.name
282 ));
283 }
284 (ServerStatus::Running, false, true) => {
285 self.update_server_status(ctx, &server_info.id, ServerStatus::Stopped);
287 fixes.push(format!(
288 "Fixed '{}': Running → Stopped (no handle)",
289 server_info.name
290 ));
291 }
292 _ => {}
293 }
294 }
295
296 fixes
297 }
298
299 fn fix_missing_handle(&self, ctx: &ServerContext, server_id: &str) {
300 self.update_server_status(ctx, server_id, ServerStatus::Stopped);
302
303 let server_id_clone = server_id.to_string();
305 tokio::spawn(async move {
306 crate::server::shared::persist_server_update(&server_id_clone, ServerStatus::Stopped)
307 .await;
308 });
309 }
310
311 fn update_server_status(&self, ctx: &ServerContext, server_id: &str, status: ServerStatus) {
312 if let Ok(mut servers) = ctx.servers.write() {
313 if let Some(server) = servers.get_mut(server_id) {
314 server.status = status;
315 }
316 }
317 }
318}