Skip to main content

rush_sync_server/commands/start/
command.rs

1use crate::commands::command::Command;
2use crate::commands::parsing::{parse_bulk_args, BulkMode};
3use crate::core::prelude::*;
4use crate::server::types::{ServerContext, ServerStatus};
5use crate::server::utils::port::is_port_available;
6use crate::server::utils::validation::find_server;
7use opener;
8
9#[derive(Debug, Default)]
10pub struct StartCommand;
11
12impl StartCommand {
13    pub fn new() -> Self {
14        Self
15    }
16}
17
18impl Command for StartCommand {
19    fn name(&self) -> &'static str {
20        "start"
21    }
22    fn description(&self) -> &'static str {
23        "Start server(s) - supports ranges and bulk operations"
24    }
25    fn matches(&self, command: &str) -> bool {
26        command.trim().to_lowercase().starts_with("start")
27    }
28
29    fn execute_sync(&self, args: &[&str]) -> Result<String> {
30        if args.is_empty() {
31            return Err(AppError::Validation(get_translation(
32                "server.error.id_missing",
33                &[],
34            )));
35        }
36
37        let config = get_config()?;
38        let ctx = crate::server::shared::get_shared_context();
39
40        // Extract --workers N from args
41        let (filtered_args, workers_override) = Self::extract_workers_flag(args);
42
43        if filtered_args.is_empty() {
44            return Err(AppError::Validation(get_translation(
45                "server.error.id_missing",
46                &[],
47            )));
48        }
49
50        let filtered_refs: Vec<&str> = filtered_args.iter().map(|s| s.as_str()).collect();
51
52        match parse_bulk_args(&filtered_refs) {
53            BulkMode::Single(identifier) => {
54                self.start_server_internal(&config, ctx, &identifier, false, workers_override)
55            }
56            BulkMode::Range(start, end) => {
57                self.start_range_servers(&config, ctx, start, end, workers_override)
58            }
59            BulkMode::All => self.start_all_servers(&config, ctx, workers_override),
60            BulkMode::Invalid(error) => Err(AppError::Validation(error)),
61        }
62    }
63
64    fn priority(&self) -> u8 {
65        66
66    }
67}
68
69impl StartCommand {
70    /// Extract --workers N flag from args, return remaining args + workers value
71    fn extract_workers_flag(args: &[&str]) -> (Vec<String>, Option<usize>) {
72        let mut filtered = Vec::new();
73        let mut workers = None;
74        let mut skip_next = false;
75
76        for (i, arg) in args.iter().enumerate() {
77            if skip_next {
78                skip_next = false;
79                continue;
80            }
81
82            if *arg == "--workers" || *arg == "-w" {
83                if let Some(next) = args.get(i + 1) {
84                    if let Ok(w) = next.parse::<usize>() {
85                        if w >= 1 && w <= 16 {
86                            workers = Some(w);
87                            skip_next = true;
88                            continue;
89                        }
90                    }
91                }
92                // Invalid --workers value, keep it as regular arg
93                filtered.push(arg.to_string());
94            } else {
95                filtered.push(arg.to_string());
96            }
97        }
98
99        (filtered, workers)
100    }
101
102    // Internal start logic
103    fn start_server_internal(
104        &self,
105        config: &Config,
106        ctx: &ServerContext,
107        identifier: &str,
108        skip_browser: bool,
109        workers_override: Option<usize>,
110    ) -> Result<String> {
111        let (server_info, existing_handle) =
112            {
113                let servers_guard = ctx.servers.read().map_err(|_| {
114                    AppError::Validation("Server-Context lock poisoned".to_string())
115                })?;
116
117                let server_info = find_server(&servers_guard, identifier)?.clone();
118
119                if server_info.status == ServerStatus::Running {
120                    let handles_guard = ctx.handles.read().map_err(|_| {
121                        AppError::Validation("Handle-Context lock poisoned".to_string())
122                    })?;
123
124                    if handles_guard.contains_key(&server_info.id) {
125                        return Ok(format!(
126                            "Server '{}' is already running on http://{}:{}",
127                            server_info.name, config.server.bind_address, server_info.port
128                        ));
129                    }
130                }
131
132                let handles_guard = ctx.handles.read().map_err(|_| {
133                    AppError::Validation("Handle-Context lock poisoned".to_string())
134                })?;
135                let existing_handle = handles_guard.get(&server_info.id).cloned();
136
137                (server_info, existing_handle)
138            };
139
140        if let Some(_handle) = existing_handle {
141            if server_info.status != ServerStatus::Running {
142                self.update_server_status(ctx, &server_info.id, ServerStatus::Running);
143
144                let server_id = server_info.id.clone();
145                tokio::spawn(async move {
146                    crate::server::shared::persist_server_update(&server_id, ServerStatus::Running)
147                        .await;
148                });
149            }
150
151            return Ok(format!(
152                "Server '{}' is already running on http://{}:{} (status corrected)",
153                server_info.name, config.server.bind_address, server_info.port
154            ));
155        }
156
157        // Port validation
158        match self.validate_port_safely(&server_info, &config.server.bind_address) {
159            PortValidationResult::Available => {}
160            PortValidationResult::OccupiedByUs => {
161                return Ok(get_translation(
162                    "server.error.port_used_by_us",
163                    &[&server_info.port.to_string()],
164                ));
165            }
166            PortValidationResult::OccupiedByOther => {
167                return Ok(get_translation(
168                    "server.error.port_used_by_other",
169                    &[&server_info.port.to_string(), &server_info.name],
170                ));
171            }
172        }
173
174        // Server limit check
175        let running_count = self.count_running_servers(ctx);
176        if running_count >= config.server.max_concurrent {
177            return Err(AppError::Validation(format!(
178                "Cannot start server: Running servers limit reached ({}/{})",
179                running_count, config.server.max_concurrent
180            )));
181        }
182
183        self.actually_start_server(
184            config,
185            ctx,
186            server_info,
187            running_count,
188            skip_browser,
189            workers_override,
190        )
191    }
192
193    /// Parallel batch size for bulk operations.
194    /// Each batch starts N servers concurrently, overlapping the startup delay.
195    /// Kept conservative to avoid FD exhaustion with many servers.
196    const PARALLEL_BATCH_SIZE: usize = 4;
197
198    // Start servers by range — NON-BLOCKING, PARALLEL with progress
199    fn start_range_servers(
200        &self,
201        config: &Config,
202        _ctx: &ServerContext,
203        start: u32,
204        end: u32,
205        workers_override: Option<usize>,
206    ) -> Result<String> {
207        let total = (end - start + 1) as usize;
208        let config = config.clone();
209        let rt_handle = tokio::runtime::Handle::current();
210
211        std::thread::spawn(move || {
212            let _guard = rt_handle.enter();
213            let ctx = crate::server::shared::get_shared_context();
214            let timer = std::time::Instant::now();
215
216            let identifiers: Vec<_> = (start..=end)
217                .map(|i| (format!("{}", i), i))
218                .collect();
219
220            let (started, failed) = Self::start_batch_parallel(
221                &config, ctx, &identifiers, total, workers_override, &rt_handle,
222            );
223
224            let elapsed = timer.elapsed();
225            let mem_info = Self::get_memory_info();
226            crate::input::send_progress(format!(
227                "\n  Range {}-{}: {} [Started], {} [Failed]\n  Time: {:.2}s{}\n",
228                start, end, started, failed, elapsed.as_secs_f64(), mem_info,
229            ));
230        });
231
232        Ok(format!("  Starting {} servers (range {}-{})...", total, start, end))
233    }
234
235    // Start all stopped servers — NON-BLOCKING, PARALLEL with progress
236    fn start_all_servers(
237        &self,
238        config: &Config,
239        ctx: &ServerContext,
240        workers_override: Option<usize>,
241    ) -> Result<String> {
242        let mut stopped_servers: Vec<_> = {
243            let servers = read_lock(&ctx.servers, "servers")?;
244            servers
245                .values()
246                .filter(|s| s.status == ServerStatus::Stopped)
247                .map(|s| (s.id.clone(), s.name.clone(), s.port))
248                .collect()
249        };
250
251        if stopped_servers.is_empty() {
252            return Ok("No stopped servers to start".to_string());
253        }
254
255        stopped_servers.sort_by_key(|(_, _, port)| *port);
256
257        let total = stopped_servers.len();
258        let config = config.clone();
259        let rt_handle = tokio::runtime::Handle::current();
260
261        // Prepare identifiers with index for progress
262        let servers_with_idx: Vec<_> = stopped_servers
263            .iter()
264            .enumerate()
265            .map(|(i, (id, name, port))| (id.clone(), name.clone(), *port, i as u32))
266            .collect();
267
268        std::thread::spawn(move || {
269            let _guard = rt_handle.enter();
270            let ctx = crate::server::shared::get_shared_context();
271            let timer = std::time::Instant::now();
272
273            let identifiers: Vec<_> = servers_with_idx
274                .iter()
275                .map(|(id, _name, _port, idx)| (id.clone(), *idx))
276                .collect();
277
278            // Map for port lookup
279            let port_map: std::collections::HashMap<String, (String, u16)> = servers_with_idx
280                .iter()
281                .map(|(id, name, port, _)| (id.clone(), (name.clone(), *port)))
282                .collect();
283
284            let (started, failed) = Self::start_batch_parallel_with_names(
285                &config, ctx, &identifiers, &port_map, total, workers_override, &rt_handle,
286            );
287
288            let elapsed = timer.elapsed();
289            let mem_info = Self::get_memory_info();
290            crate::input::send_progress(format!(
291                "\n  Done: {} [Started], {} [Failed] (of {})\n  Time: {:.2}s{}\n",
292                started, failed, total, elapsed.as_secs_f64(), mem_info,
293            ));
294        });
295
296        Ok(format!("  Starting {} servers ({} parallel)...", total, Self::PARALLEL_BATCH_SIZE))
297    }
298
299    /// Start servers in parallel batches (for range operations)
300    fn start_batch_parallel(
301        config: &Config,
302        ctx: &ServerContext,
303        identifiers: &[(String, u32)],
304        total: usize,
305        workers_override: Option<usize>,
306        rt_handle: &tokio::runtime::Handle,
307    ) -> (usize, usize) {
308        use std::sync::atomic::{AtomicUsize, Ordering};
309
310        let started = AtomicUsize::new(0);
311        let failed = AtomicUsize::new(0);
312        let progress_idx = AtomicUsize::new(0);
313
314        for chunk in identifiers.chunks(Self::PARALLEL_BATCH_SIZE) {
315            std::thread::scope(|s| {
316                for (identifier, num) in chunk {
317                    let idx = progress_idx.fetch_add(1, Ordering::Relaxed) + 1;
318                    crate::input::send_progress(format!(
319                        "  [{}/{}] Starting server {}...",
320                        idx, total, num
321                    ));
322
323                    let started = &started;
324                    let failed = &failed;
325                    let rt = rt_handle.clone();
326
327                    s.spawn(move || {
328                        let _g = rt.enter();
329                        let cmd = StartCommand::new();
330                        match cmd.start_server_internal(config, ctx, identifier, true, workers_override) {
331                            Ok(message) => {
332                                if message.contains("started successfully") {
333                                    started.fetch_add(1, Ordering::Relaxed);
334                                    let port = Self::extract_port_from_message(&message);
335                                    let name = Self::extract_name_from_message(&message)
336                                        .unwrap_or_else(|| format!("server-{}", num));
337                                    let url_str = port
338                                        .map(|p| format!("  http://127.0.0.1:{}", p))
339                                        .unwrap_or_default();
340                                    crate::input::send_progress(format!(
341                                        "  {}: [Started]{}",
342                                        name, url_str
343                                    ));
344                                } else {
345                                    crate::input::send_progress(format!("  Server {}: {}", num, message));
346                                }
347                            }
348                            Err(e) => {
349                                failed.fetch_add(1, Ordering::Relaxed);
350                                crate::input::send_progress(format!(
351                                    "  Server {}: [Failed] - {}",
352                                    num, e
353                                ));
354                            }
355                        }
356                    });
357                }
358            });
359        }
360
361        (started.load(Ordering::Relaxed), failed.load(Ordering::Relaxed))
362    }
363
364    /// Start servers in parallel batches (for "all" operations with name/port info)
365    fn start_batch_parallel_with_names(
366        config: &Config,
367        ctx: &ServerContext,
368        identifiers: &[(String, u32)],
369        port_map: &std::collections::HashMap<String, (String, u16)>,
370        total: usize,
371        workers_override: Option<usize>,
372        rt_handle: &tokio::runtime::Handle,
373    ) -> (usize, usize) {
374        use std::sync::atomic::{AtomicUsize, Ordering};
375
376        let started = AtomicUsize::new(0);
377        let failed = AtomicUsize::new(0);
378        let progress_idx = AtomicUsize::new(0);
379
380        for chunk in identifiers.chunks(Self::PARALLEL_BATCH_SIZE) {
381            std::thread::scope(|s| {
382                for (server_id, _idx) in chunk {
383                    let idx = progress_idx.fetch_add(1, Ordering::Relaxed) + 1;
384                    let (name, port) = port_map
385                        .get(server_id)
386                        .cloned()
387                        .unwrap_or_else(|| (server_id.clone(), 0));
388
389                    crate::input::send_progress(format!(
390                        "  [{}/{}] Starting {}...",
391                        idx, total, name
392                    ));
393
394                    let started = &started;
395                    let failed = &failed;
396                    let rt = rt_handle.clone();
397
398                    s.spawn(move || {
399                        let _g = rt.enter();
400                        let cmd = StartCommand::new();
401                        match cmd.start_server_internal(config, ctx, server_id, true, workers_override) {
402                            Ok(message) => {
403                                if message.contains("started successfully") {
404                                    started.fetch_add(1, Ordering::Relaxed);
405                                    crate::input::send_progress(format!(
406                                        "  {}: [Started]  http://127.0.0.1:{}",
407                                        name, port
408                                    ));
409                                } else {
410                                    crate::input::send_progress(format!("  {}: {}", name, message));
411                                }
412                            }
413                            Err(e) => {
414                                failed.fetch_add(1, Ordering::Relaxed);
415                                crate::input::send_progress(format!(
416                                    "  {}: [Failed] - {}",
417                                    name, e
418                                ));
419                            }
420                        }
421                    });
422                }
423            });
424        }
425
426        (started.load(Ordering::Relaxed), failed.load(Ordering::Relaxed))
427    }
428
429    // Actually start the server
430    fn actually_start_server(
431        &self,
432        config: &Config,
433        ctx: &ServerContext,
434        server_info: crate::server::types::ServerInfo,
435        current_running_count: usize,
436        skip_browser: bool,
437        workers_override: Option<usize>,
438    ) -> Result<String> {
439        match self.spawn_server(config, ctx, server_info.clone(), workers_override) {
440            Ok(handle) => {
441                {
442                    let mut handles = write_lock(&ctx.handles, "handles")?;
443                    handles.insert(server_info.id.clone(), handle);
444                }
445
446                self.update_server_status(ctx, &server_info.id, ServerStatus::Running);
447
448                let server_id = server_info.id.clone();
449                tokio::spawn(async move {
450                    crate::server::shared::persist_server_update(&server_id, ServerStatus::Running)
451                        .await;
452                });
453
454                let server_url =
455                    format!("http://{}:{}", config.server.bind_address, server_info.port);
456                let proxy_http_port = config.proxy.port;
457                let proxy_https_port = config.proxy.port + config.proxy.https_port_offset;
458                let actual_workers = workers_override.unwrap_or(config.server.workers);
459
460                let open_browser = !skip_browser && config.server.auto_open_browser;
461                if open_browser {
462                    self.spawn_browser_opener(server_url.clone(), server_info.name.clone(), config);
463                }
464
465                Ok(format!(
466                    "\n  Server '{}' started successfully [PERSISTENT]\n\n  \
467                     HTTP        {}\n  \
468                     Proxy HTTP  http://{}.localhost:{}\n  \
469                     Proxy HTTPS https://{}.localhost:{}\n  \
470                     Dashboard   {}/.rss/\n  \
471                     Directory   www/{}-[{}]/\n  \
472                     Workers     {}\n\n  \
473                     Running {}/{}{}\n",
474                    server_info.name,
475                    server_url,
476                    server_info.name, proxy_http_port,
477                    server_info.name, proxy_https_port,
478                    server_url,
479                    server_info.name, server_info.port,
480                    actual_workers,
481                    current_running_count + 1,
482                    config.server.max_concurrent,
483                    if open_browser {
484                        " - Browser opening..."
485                    } else {
486                        ""
487                    }
488                ))
489            }
490            Err(e) => {
491                self.update_server_status(ctx, &server_info.id, ServerStatus::Failed);
492
493                let server_id = server_info.id.clone();
494                tokio::spawn(async move {
495                    crate::server::shared::persist_server_update(&server_id, ServerStatus::Failed)
496                        .await;
497                });
498
499                Err(AppError::Validation(format!("Server start failed: {}", e)))
500            }
501        }
502    }
503
504    /// Get process memory usage for benchmarking
505    fn get_memory_info() -> String {
506        #[cfg(target_os = "macos")]
507        {
508            use std::mem;
509            extern "C" {
510                fn mach_task_self() -> u32;
511                fn task_info(
512                    task: u32,
513                    flavor: u32,
514                    info: *mut libc::c_void,
515                    count: *mut u32,
516                ) -> i32;
517            }
518
519            #[repr(C)]
520            struct MachTaskBasicInfo {
521                virtual_size: u64,
522                resident_size: u64,
523                resident_size_max: u64,
524                user_time: [u32; 2],
525                system_time: [u32; 2],
526                policy: i32,
527                suspend_count: i32,
528            }
529
530            let mut info: MachTaskBasicInfo = unsafe { mem::zeroed() };
531            let mut count = (mem::size_of::<MachTaskBasicInfo>() / mem::size_of::<u32>()) as u32;
532
533            let result = unsafe {
534                task_info(
535                    mach_task_self(),
536                    20, // MACH_TASK_BASIC_INFO
537                    &mut info as *mut _ as *mut libc::c_void,
538                    &mut count,
539                )
540            };
541
542            if result == 0 {
543                let rss_mb = info.resident_size as f64 / (1024.0 * 1024.0);
544                format!("  |  Memory: {:.1} MB", rss_mb)
545            } else {
546                String::new()
547            }
548        }
549        #[cfg(target_os = "linux")]
550        {
551            if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
552                for line in status.lines() {
553                    if line.starts_with("VmRSS:") {
554                        let kb: f64 = line
555                            .split_whitespace()
556                            .nth(1)
557                            .and_then(|s| s.parse().ok())
558                            .unwrap_or(0.0);
559                        return format!("  |  Memory: {:.1} MB", kb / 1024.0);
560                    }
561                }
562            }
563            String::new()
564        }
565        #[cfg(not(any(target_os = "macos", target_os = "linux")))]
566        {
567            String::new()
568        }
569    }
570
571    // Helper methods
572    fn validate_port_safely(
573        &self,
574        server_info: &crate::server::types::ServerInfo,
575        bind_address: &str,
576    ) -> PortValidationResult {
577        if is_port_available(server_info.port, bind_address) {
578            PortValidationResult::Available
579        } else {
580            match crate::server::utils::port::check_port_status(server_info.port, bind_address) {
581                crate::server::utils::port::PortStatus::Available => {
582                    PortValidationResult::Available
583                }
584                crate::server::utils::port::PortStatus::OccupiedByUs => {
585                    PortValidationResult::OccupiedByUs
586                }
587                crate::server::utils::port::PortStatus::OccupiedByOther => {
588                    PortValidationResult::OccupiedByOther
589                }
590            }
591        }
592    }
593
594    fn count_running_servers(&self, ctx: &ServerContext) -> usize {
595        let servers = match ctx.servers.read() {
596            Ok(s) => s,
597            Err(e) => {
598                log::error!("servers lock poisoned: {}", e);
599                return 0;
600            }
601        };
602        servers
603            .values()
604            .filter(|s| s.status == ServerStatus::Running)
605            .count()
606    }
607
608    fn spawn_server(
609        &self,
610        config: &Config,
611        ctx: &ServerContext,
612        server_info: crate::server::types::ServerInfo,
613        workers_override: Option<usize>,
614    ) -> std::result::Result<actix_web::dev::ServerHandle, String> {
615        crate::server::handlers::web::create_web_server_with_workers(
616            ctx,
617            server_info,
618            config,
619            workers_override,
620        )
621    }
622
623    fn spawn_browser_opener(&self, url: String, name: String, config: &Config) {
624        let delay = config.server.startup_delay_ms.min(2000);
625        tokio::spawn(async move {
626            tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
627            if let Err(e) = opener::open(&url) {
628                log::warn!("Failed to open browser for '{}': {}", name, e);
629            }
630        });
631    }
632
633    fn update_server_status(&self, ctx: &ServerContext, server_id: &str, status: ServerStatus) {
634        if let Ok(mut servers) = ctx.servers.write() {
635            if let Some(server) = servers.get_mut(server_id) {
636                server.status = status;
637            }
638        }
639    }
640
641    /// Extract port from a success message like "http://127.0.0.1:8001"
642    fn extract_port_from_message(message: &str) -> Option<u16> {
643        message
644            .find("127.0.0.1:")
645            .and_then(|pos| {
646                let after = &message[pos + 10..];
647                let port_str: String = after.chars().take_while(|c| c.is_ascii_digit()).collect();
648                port_str.parse().ok()
649            })
650    }
651
652    /// Extract server name from a success message like "Server 'rss-001' started"
653    fn extract_name_from_message(message: &str) -> Option<String> {
654        let start = message.find('\'')?;
655        let end = message[start + 1..].find('\'')?;
656        Some(message[start + 1..start + 1 + end].to_string())
657    }
658}
659
660#[derive(Debug)]
661enum PortValidationResult {
662    Available,
663    OccupiedByUs,
664    OccupiedByOther,
665}