warpdrive_proxy/process/
supervisor.rs

1//! Process supervisor for managing upstream application servers
2//!
3//! This module provides process supervision capabilities to spawn and monitor
4//! application servers (e.g., Rails, Node.js). It handles:
5//! - Process spawning with environment setup
6//! - Signal forwarding (SIGTERM/SIGINT)
7//! - Graceful shutdown with timeout
8//! - Automatic restart on crash (optional)
9//!
10//! # Example
11//!
12//! ```no_run
13//! use warpdrive::process::ProcessSupervisor;
14//! use anyhow::Result;
15//!
16//! #[tokio::main]
17//! async fn main() -> Result<()> {
18//!     let mut supervisor = ProcessSupervisor::new(
19//!         "bundle".to_string(),
20//!         vec!["exec".to_string(), "puma".to_string()],
21//!     );
22//!
23//!     // Start the process with PORT env var
24//!     supervisor.start(3000).await?;
25//!
26//!     // Handle signals in background
27//!     let signal_handle = tokio::spawn({
28//!         let supervisor = supervisor.clone();
29//!         async move {
30//!             supervisor.handle_signals().await;
31//!         }
32//!     });
33//!
34//!     // Wait for process to exit
35//!     let exit_code = supervisor.wait().await?;
36//!
37//!     // Clean up signal handler
38//!     signal_handle.abort();
39//!
40//!     std::process::exit(exit_code);
41//! }
42//! ```
43
44use anyhow::{Context, Result, anyhow};
45use std::sync::Arc;
46use std::time::Duration;
47use tokio::process::{Child, Command};
48use tokio::signal::unix::{SignalKind, signal};
49use tokio::sync::{Mutex, Notify};
50use tracing::{debug, error, info, warn};
51
52/// Graceful shutdown timeout before sending SIGKILL
53const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30);
54
55/// Process supervisor for managing upstream application servers
56///
57/// The ProcessSupervisor spawns and monitors a child process, forwarding signals
58/// and handling graceful shutdown. It's designed to work with application servers
59/// that listen on a specific port.
60#[derive(Clone)]
61pub struct ProcessSupervisor {
62    /// Command to execute
63    command: String,
64
65    /// Arguments for the command
66    args: Vec<String>,
67
68    /// Child process handle (protected by mutex for async access)
69    child: Arc<Mutex<Option<Child>>>,
70
71    /// Notification channel for process startup
72    started: Arc<Notify>,
73
74    /// Notification channel for shutdown signal
75    shutdown: Arc<Notify>,
76}
77
78impl ProcessSupervisor {
79    /// Create a new process supervisor
80    ///
81    /// # Arguments
82    ///
83    /// * `command` - The command to execute (e.g., "bundle", "node")
84    /// * `args` - Command-line arguments
85    ///
86    /// # Example
87    ///
88    /// ```
89    /// use warpdrive::process::ProcessSupervisor;
90    ///
91    /// let supervisor = ProcessSupervisor::new(
92    ///     "bundle".to_string(),
93    ///     vec!["exec".to_string(), "puma".to_string()],
94    /// );
95    /// ```
96    pub fn new(command: String, args: Vec<String>) -> Self {
97        Self {
98            command,
99            args,
100            child: Arc::new(Mutex::new(None)),
101            started: Arc::new(Notify::new()),
102            shutdown: Arc::new(Notify::new()),
103        }
104    }
105
106    /// Start the child process with the specified port
107    ///
108    /// Spawns the child process with the PORT environment variable set.
109    /// The process inherits stdin, stdout, and stderr from the parent.
110    ///
111    /// # Arguments
112    ///
113    /// * `port` - Port number to pass via PORT environment variable
114    ///
115    /// # Errors
116    ///
117    /// Returns an error if:
118    /// - The command cannot be found or executed
119    /// - The process is already running
120    ///
121    /// # Example
122    ///
123    /// ```no_run
124    /// # use warpdrive::process::ProcessSupervisor;
125    /// # use anyhow::Result;
126    /// # async fn example() -> Result<()> {
127    /// let mut supervisor = ProcessSupervisor::new("rails".to_string(), vec!["server".to_string()]);
128    /// supervisor.start(3000).await?;
129    /// # Ok(())
130    /// # }
131    /// ```
132    pub async fn start(&self, port: u16) -> Result<()> {
133        let mut child_guard = self.child.lock().await;
134
135        // Check if process is already running
136        if child_guard.is_some() {
137            return Err(anyhow!("Process is already running"));
138        }
139
140        info!(
141            "Starting upstream process: {} {}",
142            self.command,
143            self.args.join(" ")
144        );
145
146        // Spawn the child process with PORT environment variable
147        let child = Command::new(&self.command)
148            .args(&self.args)
149            .env("PORT", port.to_string())
150            .stdin(std::process::Stdio::inherit())
151            .stdout(std::process::Stdio::inherit())
152            .stderr(std::process::Stdio::inherit())
153            .spawn()
154            .with_context(|| format!("Failed to spawn command: {}", self.command))?;
155
156        let pid = child
157            .id()
158            .ok_or_else(|| anyhow!("Failed to get child process ID"))?;
159
160        info!("Upstream process started with PID: {}", pid);
161
162        *child_guard = Some(child);
163        drop(child_guard);
164
165        // Notify that process has started
166        self.started.notify_waiters();
167
168        Ok(())
169    }
170
171    /// Wait for the child process to exit
172    ///
173    /// This method blocks until the child process terminates and returns
174    /// the exit code. A non-zero exit code indicates the process was
175    /// terminated by a signal or crashed.
176    ///
177    /// # Errors
178    ///
179    /// Returns an error if:
180    /// - No process is running
181    /// - Failed to wait for the process
182    ///
183    /// # Returns
184    ///
185    /// The exit code of the process (0 for success, non-zero for error/signal)
186    ///
187    /// # Example
188    ///
189    /// ```no_run
190    /// # use warpdrive::process::ProcessSupervisor;
191    /// # use anyhow::Result;
192    /// # async fn example() -> Result<()> {
193    /// # let mut supervisor = ProcessSupervisor::new("sleep".to_string(), vec!["1".to_string()]);
194    /// # supervisor.start(3000).await?;
195    /// let exit_code = supervisor.wait().await?;
196    /// println!("Process exited with code: {}", exit_code);
197    /// # Ok(())
198    /// # }
199    /// ```
200    pub async fn wait(&self) -> Result<i32> {
201        let mut child_guard = self.child.lock().await;
202
203        let child = child_guard
204            .as_mut()
205            .ok_or_else(|| anyhow!("No process is running"))?;
206
207        let status = child
208            .wait()
209            .await
210            .context("Failed to wait for child process")?;
211
212        // Clear the child handle
213        *child_guard = None;
214        drop(child_guard);
215
216        let exit_code = status.code().unwrap_or(-1);
217
218        if status.success() {
219            info!("Upstream process exited successfully");
220        } else {
221            warn!("Upstream process exited with code: {}", exit_code);
222        }
223
224        Ok(exit_code)
225    }
226
227    /// Stop the child process gracefully
228    ///
229    /// Sends SIGTERM to the process and waits for it to exit within the
230    /// timeout period. If the process doesn't exit within the timeout,
231    /// sends SIGKILL to force termination.
232    ///
233    /// # Errors
234    ///
235    /// Returns an error if:
236    /// - No process is running
237    /// - Failed to send signals to the process
238    ///
239    /// # Example
240    ///
241    /// ```no_run
242    /// # use warpdrive::process::ProcessSupervisor;
243    /// # use anyhow::Result;
244    /// # async fn example() -> Result<()> {
245    /// # let mut supervisor = ProcessSupervisor::new("sleep".to_string(), vec!["100".to_string()]);
246    /// # supervisor.start(3000).await?;
247    /// supervisor.stop().await?;
248    /// # Ok(())
249    /// # }
250    /// ```
251    pub async fn stop(&self) -> Result<()> {
252        let mut child_guard = self.child.lock().await;
253
254        let child = match child_guard.as_mut() {
255            Some(c) => c,
256            None => return Ok(()), // No process running, nothing to stop
257        };
258
259        let pid = child
260            .id()
261            .ok_or_else(|| anyhow!("Failed to get child process ID"))?;
262
263        info!("Stopping upstream process (PID: {})", pid);
264
265        // Send SIGTERM for graceful shutdown
266        debug!("Sending SIGTERM to PID {}", pid);
267        send_signal(pid, SignalKind::terminate())?;
268
269        // Wait for process to exit with timeout
270        let wait_result = tokio::select! {
271            status = child.wait() => {
272                match status {
273                    Ok(s) => {
274                        info!("Process exited gracefully with code: {}", s.code().unwrap_or(-1));
275                        Ok(())
276                    }
277                    Err(e) => Err(anyhow!("Failed to wait for process: {}", e))
278                }
279            }
280            _ = tokio::time::sleep(SHUTDOWN_TIMEOUT) => {
281                warn!("Process did not exit within timeout, sending SIGKILL");
282                Err(anyhow!("Shutdown timeout"))
283            }
284        };
285
286        // If graceful shutdown failed, force kill
287        if wait_result.is_err() {
288            debug!("Sending SIGKILL to PID {}", pid);
289            if let Err(e) = child.kill().await {
290                error!("Failed to kill process: {}", e);
291            }
292
293            // Wait for process to be reaped
294            if let Err(e) = child.wait().await {
295                error!("Failed to wait for killed process: {}", e);
296            }
297        }
298
299        // Clear the child handle
300        *child_guard = None;
301
302        info!("Upstream process stopped");
303        Ok(())
304    }
305
306    /// Handle signals and forward them to the child process
307    ///
308    /// This method listens for SIGTERM and SIGINT signals and forwards them
309    /// to the child process. It should be spawned in a background task.
310    ///
311    /// The method will block until a signal is received or the shutdown
312    /// notification is triggered.
313    ///
314    /// # Example
315    ///
316    /// ```no_run
317    /// # use warpdrive::process::ProcessSupervisor;
318    /// # use anyhow::Result;
319    /// # async fn example() -> Result<()> {
320    /// # let supervisor = ProcessSupervisor::new("sleep".to_string(), vec!["100".to_string()]);
321    /// # supervisor.start(3000).await?;
322    /// // Spawn signal handler in background
323    /// let signal_handle = tokio::spawn({
324    ///     let supervisor = supervisor.clone();
325    ///     async move {
326    ///         supervisor.handle_signals().await;
327    ///     }
328    /// });
329    ///
330    /// // Later, clean up
331    /// signal_handle.abort();
332    /// # Ok(())
333    /// # }
334    /// ```
335    pub async fn handle_signals(&self) {
336        let mut sigterm = match signal(SignalKind::terminate()) {
337            Ok(s) => s,
338            Err(e) => {
339                error!("Failed to setup SIGTERM handler: {}", e);
340                return;
341            }
342        };
343
344        let mut sigint = match signal(SignalKind::interrupt()) {
345            Ok(s) => s,
346            Err(e) => {
347                error!("Failed to setup SIGINT handler: {}", e);
348                return;
349            }
350        };
351
352        tokio::select! {
353            _ = sigterm.recv() => {
354                info!("Received SIGTERM, forwarding to child process");
355                self.forward_signal(SignalKind::terminate()).await;
356            }
357            _ = sigint.recv() => {
358                info!("Received SIGINT, forwarding to child process");
359                self.forward_signal(SignalKind::interrupt()).await;
360            }
361            _ = self.shutdown.notified() => {
362                debug!("Shutdown notification received");
363            }
364        }
365    }
366
367    /// Wait for the process to start
368    ///
369    /// This method blocks until the `start()` method has been called and
370    /// the process has been spawned.
371    ///
372    /// # Example
373    ///
374    /// ```no_run
375    /// # use warpdrive::process::ProcessSupervisor;
376    /// # use anyhow::Result;
377    /// # async fn example() -> Result<()> {
378    /// # let supervisor = ProcessSupervisor::new("sleep".to_string(), vec!["1".to_string()]);
379    /// // Start in background
380    /// let supervisor_clone = supervisor.clone();
381    /// tokio::spawn(async move {
382    ///     supervisor_clone.start(3000).await.unwrap();
383    /// });
384    ///
385    /// // Wait for startup
386    /// supervisor.wait_for_start().await;
387    /// println!("Process has started");
388    /// # Ok(())
389    /// # }
390    /// ```
391    pub async fn wait_for_start(&self) {
392        self.started.notified().await;
393    }
394
395    /// Forward a signal to the child process
396    async fn forward_signal(&self, kind: SignalKind) {
397        let child_guard = self.child.lock().await;
398
399        if let Some(child) = child_guard.as_ref() {
400            if let Some(pid) = child.id() {
401                if let Err(e) = send_signal(pid, kind) {
402                    error!("Failed to forward signal to child: {}", e);
403                }
404            }
405        }
406    }
407
408    /// Trigger shutdown notification
409    ///
410    /// This notifies the signal handler to stop listening for signals.
411    pub fn trigger_shutdown(&self) {
412        self.shutdown.notify_waiters();
413    }
414
415    /// Check if the process is currently running
416    ///
417    /// Returns `true` if a child process is running, `false` otherwise.
418    ///
419    /// # Example
420    ///
421    /// ```no_run
422    /// # use warpdrive::process::ProcessSupervisor;
423    /// # use anyhow::Result;
424    /// # async fn example() -> Result<()> {
425    /// # let supervisor = ProcessSupervisor::new("sleep".to_string(), vec!["1".to_string()]);
426    /// # supervisor.start(3000).await?;
427    /// if supervisor.is_running().await {
428    ///     println!("Process is running");
429    /// }
430    /// # Ok(())
431    /// # }
432    /// ```
433    pub async fn is_running(&self) -> bool {
434        self.child.lock().await.is_some()
435    }
436}
437
438/// Send a Unix signal to a process
439///
440/// # Arguments
441///
442/// * `pid` - Process ID to send signal to
443/// * `kind` - Type of signal to send
444///
445/// # Errors
446///
447/// Returns an error if the signal cannot be sent or the process doesn't exist
448fn send_signal(pid: u32, kind: SignalKind) -> Result<()> {
449    use nix::sys::signal::{Signal, kill};
450    use nix::unistd::Pid;
451
452    let signal = match kind {
453        k if k == SignalKind::terminate() => Signal::SIGTERM,
454        k if k == SignalKind::interrupt() => Signal::SIGINT,
455        k if k == SignalKind::quit() => Signal::SIGQUIT,
456        _ => return Err(anyhow!("Unsupported signal type")),
457    };
458
459    kill(Pid::from_raw(pid as i32), signal).context("Failed to send signal to process")?;
460
461    Ok(())
462}
463
464#[cfg(test)]
465mod tests {
466    use super::*;
467
468    #[tokio::test]
469    async fn test_process_supervisor_new() {
470        let supervisor = ProcessSupervisor::new("echo".to_string(), vec!["test".to_string()]);
471
472        // Verify initial state
473        assert!(supervisor.child.lock().await.is_none());
474    }
475
476    #[tokio::test]
477    async fn test_spawn_and_wait() {
478        let supervisor = ProcessSupervisor::new("sleep".to_string(), vec!["0.1".to_string()]);
479
480        supervisor.start(3000).await.unwrap();
481        let exit_code = supervisor.wait().await.unwrap();
482
483        assert_eq!(exit_code, 0);
484    }
485
486    #[tokio::test]
487    async fn test_double_start_error() {
488        let supervisor = ProcessSupervisor::new("sleep".to_string(), vec!["10".to_string()]);
489
490        supervisor.start(3000).await.unwrap();
491
492        // Second start should fail
493        let result = supervisor.start(3000).await;
494        assert!(result.is_err());
495
496        // Clean up
497        supervisor.stop().await.unwrap();
498    }
499
500    #[tokio::test]
501    async fn test_stop_running_process() {
502        let supervisor = ProcessSupervisor::new("sleep".to_string(), vec!["100".to_string()]);
503
504        supervisor.start(3000).await.unwrap();
505
506        tokio::time::sleep(Duration::from_millis(100)).await;
507
508        supervisor.stop().await.unwrap();
509
510        // Process should be stopped
511        assert!(supervisor.child.lock().await.is_none());
512    }
513
514    #[tokio::test]
515    async fn test_wait_for_start() {
516        let supervisor = ProcessSupervisor::new("sleep".to_string(), vec!["1".to_string()]);
517
518        let supervisor_clone = supervisor.clone();
519        tokio::spawn(async move {
520            tokio::time::sleep(Duration::from_millis(100)).await;
521            supervisor_clone.start(3000).await.unwrap();
522        });
523
524        supervisor.wait_for_start().await;
525
526        // Process should be running now
527        assert!(supervisor.child.lock().await.is_some());
528
529        // Clean up
530        supervisor.wait().await.unwrap();
531    }
532
533    #[tokio::test]
534    async fn test_invalid_command() {
535        let supervisor = ProcessSupervisor::new("nonexistent_command_xyz".to_string(), vec![]);
536
537        let result = supervisor.start(3000).await;
538        assert!(result.is_err());
539    }
540}