Skip to main content

rush_sync_server/commands/stop/
command.rs

1use crate::commands::command::Command;
2use crate::commands::parsing::{parse_bulk_args, BulkMode};
3use crate::core::prelude::*;
4use crate::server::types::{ServerContext, ServerStatus};
5use crate::server::utils::validation::find_server;
6
7#[derive(Debug, Default)]
8pub struct StopCommand;
9
10impl StopCommand {
11    pub fn new() -> Self {
12        Self
13    }
14}
15
16impl Command for StopCommand {
17    fn name(&self) -> &'static str {
18        "stop"
19    }
20
21    fn description(&self) -> &'static str {
22        "Stop server(s) - supports ranges and bulk operations"
23    }
24
25    fn matches(&self, command: &str) -> bool {
26        command.trim().to_lowercase().starts_with("stop")
27    }
28
29    fn execute_sync(&self, args: &[&str]) -> Result<String> {
30        if args.is_empty() {
31            return Err(AppError::Validation(get_translation(
32                "server.error.id_missing",
33                &[],
34            )));
35        }
36
37        let config = get_config()?;
38        let ctx = crate::server::shared::get_shared_context();
39
40        match parse_bulk_args(args) {
41            BulkMode::Single(identifier) => self.stop_single_server(&config, ctx, &identifier, false),
42            BulkMode::Range(start, end) => self.stop_range_servers(&config, ctx, start, end),
43            BulkMode::All => self.stop_all_servers(&config, ctx),
44            BulkMode::Invalid(error) => Err(AppError::Validation(error)),
45        }
46    }
47
48    fn priority(&self) -> u8 {
49        67
50    }
51}
52
53impl StopCommand {
54    /// Parallel batch size for bulk stop operations
55    const PARALLEL_BATCH_SIZE: usize = 4;
56
57    // Stop single server
58    // `bulk_mode`: when true, skip the blocking sleep (for parallel bulk ops)
59    fn stop_single_server(
60        &self,
61        config: &Config,
62        ctx: &ServerContext,
63        identifier: &str,
64        bulk_mode: bool,
65    ) -> Result<String> {
66        let (server_info, handle) = {
67            let servers_guard = ctx
68                .servers
69                .read()
70                .map_err(|_| AppError::Validation("Server-Context lock poisoned".to_string()))?;
71
72            let server_info = find_server(&servers_guard, identifier)?.clone();
73
74            if server_info.status != ServerStatus::Running {
75                return Ok(format!(
76                    "Server '{}' is not active (Status: {})",
77                    server_info.name, server_info.status
78                ));
79            }
80
81            // Atomically remove the handle
82            let handle = {
83                let mut handles_guard = ctx.handles.write().map_err(|_| {
84                    AppError::Validation("Handle-Context lock poisoned".to_string())
85                })?;
86                handles_guard.remove(&server_info.id)
87            };
88
89            (server_info, handle)
90        };
91
92        log::info!(
93            "Stopping server {} on port {}",
94            server_info.id,
95            server_info.port
96        );
97
98        // Set status to Stopped immediately
99        self.update_server_status(ctx, &server_info.id, ServerStatus::Stopped);
100
101        // Notify browser to close (skip in bulk mode for speed)
102        if !bulk_mode {
103            self.notify_browser_shutdown(&server_info);
104        }
105
106        if let Some(handle) = handle {
107            // Graceful shutdown (async, non-blocking)
108            self.shutdown_server_gracefully(handle, server_info.id.clone(), config);
109
110            // Persist status update (non-blocking)
111            let server_id = server_info.id.clone();
112            tokio::spawn(async move {
113                crate::server::shared::persist_server_update(&server_id, ServerStatus::Stopped)
114                    .await;
115            });
116
117            // Only sleep for single-server stop (TUI feedback), skip for bulk
118            if !bulk_mode {
119                std::thread::sleep(std::time::Duration::from_millis(
120                    config.server.startup_delay_ms.min(500),
121                ));
122            }
123
124            let running_count = {
125                let servers = ctx.servers.read().unwrap_or_else(|e| {
126                    log::warn!("Server lock poisoned: {}", e);
127                    e.into_inner()
128                });
129                servers
130                    .values()
131                    .filter(|s| s.status == ServerStatus::Running)
132                    .count()
133            };
134
135            Ok(format!(
136                "Server '{}' stopped [PERSISTENT] ({}/{} running)",
137                server_info.name, running_count, config.server.max_concurrent
138            ))
139        } else {
140            // Handle was already removed - just update status
141            let server_id = server_info.id.clone();
142            tokio::spawn(async move {
143                crate::server::shared::persist_server_update(&server_id, ServerStatus::Stopped)
144                    .await;
145            });
146
147            Ok(format!(
148                "Server '{}' was already stopped [PERSISTENT]",
149                server_info.name
150            ))
151        }
152    }
153
154    // Stop servers by range — NON-BLOCKING, PARALLEL with progress
155    fn stop_range_servers(
156        &self,
157        config: &Config,
158        _ctx: &ServerContext,
159        start: u32,
160        end: u32,
161    ) -> Result<String> {
162        let total = (end - start + 1) as usize;
163        let config = config.clone();
164        let rt_handle = tokio::runtime::Handle::current();
165
166        std::thread::spawn(move || {
167            let _guard = rt_handle.enter();
168            let ctx = crate::server::shared::get_shared_context();
169            let timer = std::time::Instant::now();
170
171            let identifiers: Vec<_> = (start..=end)
172                .map(|i| (format!("{}", i), i))
173                .collect();
174
175            let (stopped, failed) = Self::stop_batch_parallel(
176                &config, ctx, &identifiers, total, &rt_handle,
177            );
178
179            let elapsed = timer.elapsed();
180            let mem_info = Self::get_memory_info();
181            crate::input::send_progress(format!(
182                "\n  Range {}-{}: {} [Stopped], {} [Failed]\n  Time: {:.2}s{}\n",
183                start, end, stopped, failed, elapsed.as_secs_f64(), mem_info,
184            ));
185        });
186
187        Ok(format!("  Stopping {} servers (range {}-{}, {} parallel)...", total, start, end, Self::PARALLEL_BATCH_SIZE))
188    }
189
190    // Stop all running servers — NON-BLOCKING, PARALLEL with progress, sorted by port
191    fn stop_all_servers(&self, config: &Config, ctx: &ServerContext) -> Result<String> {
192        let mut running_servers: Vec<_> = {
193            let servers = read_lock(&ctx.servers, "servers")?;
194            servers
195                .values()
196                .filter(|s| s.status == ServerStatus::Running)
197                .map(|s| (s.id.clone(), s.name.clone(), s.port))
198                .collect()
199        };
200
201        if running_servers.is_empty() {
202            return Ok("No running servers to stop".to_string());
203        }
204
205        // Sort by port for consistent ordering
206        running_servers.sort_by_key(|(_, _, port)| *port);
207
208        let total = running_servers.len();
209        let config = config.clone();
210        let rt_handle = tokio::runtime::Handle::current();
211
212        std::thread::spawn(move || {
213            let _guard = rt_handle.enter();
214            let ctx = crate::server::shared::get_shared_context();
215            let timer = std::time::Instant::now();
216
217            let identifiers: Vec<_> = running_servers
218                .iter()
219                .map(|(id, name, _port)| (id.clone(), name.clone()))
220                .collect();
221
222            let (stopped, failed) = Self::stop_batch_parallel_with_names(
223                &config, ctx, &identifiers, total, &rt_handle,
224            );
225
226            let elapsed = timer.elapsed();
227            let mem_info = Self::get_memory_info();
228            crate::input::send_progress(format!(
229                "\n  Done: {} [Stopped], {} [Failed] (of {})\n  Time: {:.2}s{}\n",
230                stopped, failed, total, elapsed.as_secs_f64(), mem_info,
231            ));
232        });
233
234        Ok(format!("  Stopping {} servers ({} parallel)...", total, Self::PARALLEL_BATCH_SIZE))
235    }
236
237    /// Stop servers in parallel batches (for range operations)
238    fn stop_batch_parallel(
239        config: &Config,
240        ctx: &ServerContext,
241        identifiers: &[(String, u32)],
242        total: usize,
243        rt_handle: &tokio::runtime::Handle,
244    ) -> (usize, usize) {
245        use std::sync::atomic::{AtomicUsize, Ordering};
246
247        let stopped = AtomicUsize::new(0);
248        let failed = AtomicUsize::new(0);
249        let progress_idx = AtomicUsize::new(0);
250
251        for chunk in identifiers.chunks(Self::PARALLEL_BATCH_SIZE) {
252            std::thread::scope(|s| {
253                for (identifier, num) in chunk {
254                    let idx = progress_idx.fetch_add(1, Ordering::Relaxed) + 1;
255                    crate::input::send_progress(format!(
256                        "  [{}/{}] Stopping server {}...",
257                        idx, total, num
258                    ));
259
260                    let stopped = &stopped;
261                    let failed = &failed;
262                    let rt = rt_handle.clone();
263
264                    s.spawn(move || {
265                        let _g = rt.enter();
266                        let cmd = StopCommand::new();
267                        match cmd.stop_single_server(config, ctx, identifier, true) {
268                            Ok(message) => {
269                                if message.contains("stopped [PERSISTENT]") {
270                                    stopped.fetch_add(1, Ordering::Relaxed);
271                                    crate::input::send_progress(format!(
272                                        "  Server {}: [Stopped]",
273                                        num
274                                    ));
275                                } else {
276                                    crate::input::send_progress(format!(
277                                        "  Server {}: {}",
278                                        num, message
279                                    ));
280                                }
281                            }
282                            Err(e) => {
283                                failed.fetch_add(1, Ordering::Relaxed);
284                                crate::input::send_progress(format!(
285                                    "  Server {}: [Failed] - {}",
286                                    num, e
287                                ));
288                            }
289                        }
290                    });
291                }
292            });
293        }
294
295        (stopped.load(Ordering::Relaxed), failed.load(Ordering::Relaxed))
296    }
297
298    /// Stop servers in parallel batches (for "all" operations with name info)
299    fn stop_batch_parallel_with_names(
300        config: &Config,
301        ctx: &ServerContext,
302        identifiers: &[(String, String)], // (server_id, server_name)
303        total: usize,
304        rt_handle: &tokio::runtime::Handle,
305    ) -> (usize, usize) {
306        use std::sync::atomic::{AtomicUsize, Ordering};
307
308        let stopped = AtomicUsize::new(0);
309        let failed = AtomicUsize::new(0);
310        let progress_idx = AtomicUsize::new(0);
311
312        for chunk in identifiers.chunks(Self::PARALLEL_BATCH_SIZE) {
313            std::thread::scope(|s| {
314                for (server_id, server_name) in chunk {
315                    let idx = progress_idx.fetch_add(1, Ordering::Relaxed) + 1;
316                    crate::input::send_progress(format!(
317                        "  [{}/{}] Stopping {}...",
318                        idx, total, server_name
319                    ));
320
321                    let stopped = &stopped;
322                    let failed = &failed;
323                    let rt = rt_handle.clone();
324
325                    s.spawn(move || {
326                        let _g = rt.enter();
327                        let cmd = StopCommand::new();
328                        match cmd.stop_single_server(config, ctx, server_id, true) {
329                            Ok(message) => {
330                                if message.contains("stopped [PERSISTENT]") {
331                                    stopped.fetch_add(1, Ordering::Relaxed);
332                                    crate::input::send_progress(format!(
333                                        "  {}: [Stopped]",
334                                        server_name
335                                    ));
336                                } else {
337                                    crate::input::send_progress(format!(
338                                        "  {}: {}",
339                                        server_name, message
340                                    ));
341                                }
342                            }
343                            Err(e) => {
344                                failed.fetch_add(1, Ordering::Relaxed);
345                                crate::input::send_progress(format!(
346                                    "  {}: [Failed] - {}",
347                                    server_name, e
348                                ));
349                            }
350                        }
351                    });
352                }
353            });
354        }
355
356        (stopped.load(Ordering::Relaxed), failed.load(Ordering::Relaxed))
357    }
358
359    /// Get process memory usage for benchmarking
360    fn get_memory_info() -> String {
361        #[cfg(target_os = "macos")]
362        {
363            use std::mem;
364            extern "C" {
365                fn mach_task_self() -> u32;
366                fn task_info(
367                    task: u32,
368                    flavor: u32,
369                    info: *mut libc::c_void,
370                    count: *mut u32,
371                ) -> i32;
372            }
373
374            #[repr(C)]
375            struct MachTaskBasicInfo {
376                virtual_size: u64,
377                resident_size: u64,
378                resident_size_max: u64,
379                user_time: [u32; 2],
380                system_time: [u32; 2],
381                policy: i32,
382                suspend_count: i32,
383            }
384
385            let mut info: MachTaskBasicInfo = unsafe { mem::zeroed() };
386            let mut count = (mem::size_of::<MachTaskBasicInfo>() / mem::size_of::<u32>()) as u32;
387
388            let result = unsafe {
389                task_info(
390                    mach_task_self(),
391                    20, // MACH_TASK_BASIC_INFO
392                    &mut info as *mut _ as *mut libc::c_void,
393                    &mut count,
394                )
395            };
396
397            if result == 0 {
398                let rss_mb = info.resident_size as f64 / (1024.0 * 1024.0);
399                format!("  |  Memory: {:.1} MB", rss_mb)
400            } else {
401                String::new()
402            }
403        }
404        #[cfg(target_os = "linux")]
405        {
406            if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
407                for line in status.lines() {
408                    if line.starts_with("VmRSS:") {
409                        let kb: f64 = line
410                            .split_whitespace()
411                            .nth(1)
412                            .and_then(|s| s.parse().ok())
413                            .unwrap_or(0.0);
414                        return format!("  |  Memory: {:.1} MB", kb / 1024.0);
415                    }
416                }
417            }
418            String::new()
419        }
420        #[cfg(not(any(target_os = "macos", target_os = "linux")))]
421        {
422            String::new()
423        }
424    }
425
426    // Browser notification
427    fn notify_browser_shutdown(&self, server_info: &crate::server::types::ServerInfo) {
428        let server_port = server_info.port;
429        let server_name = server_info.name.clone();
430
431        tokio::spawn(async move {
432            let server_url = format!("http://127.0.0.1:{}", server_port);
433            log::info!(
434                "Notifying browser to close for server {} (async)",
435                server_name
436            );
437
438            let client = reqwest::Client::new();
439            if let Err(e) = client
440                .get(format!("{}/api/close-browser", server_url))
441                .timeout(std::time::Duration::from_millis(300))
442                .send()
443                .await
444            {
445                log::warn!("Failed to notify browser: {}", e);
446            }
447
448            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
449        });
450    }
451
452    // Graceful shutdown
453    fn shutdown_server_gracefully(
454        &self,
455        handle: actix_web::dev::ServerHandle,
456        server_id: String,
457        config: &Config,
458    ) {
459        let shutdown_timeout = config.server.shutdown_timeout;
460
461        tokio::spawn(async move {
462            use tokio::time::{timeout, Duration};
463
464            match timeout(Duration::from_secs(shutdown_timeout), handle.stop(true)).await {
465                Ok(_) => log::info!("Server {} stopped gracefully", server_id),
466                Err(_) => {
467                    log::warn!(
468                        "Server {} shutdown timeout ({}s), forcing stop",
469                        server_id,
470                        shutdown_timeout
471                    );
472                    handle.stop(false).await;
473                }
474            }
475        });
476    }
477
478    // Status update helper
479    fn update_server_status(&self, ctx: &ServerContext, server_id: &str, status: ServerStatus) {
480        if let Ok(mut servers) = ctx.servers.write() {
481            if let Some(server) = servers.get_mut(server_id) {
482                server.status = status;
483            }
484        }
485    }
486}