warpdrive_proxy/process/supervisor.rs
1//! Process supervisor for managing upstream application servers
2//!
3//! This module provides process supervision capabilities to spawn and monitor
4//! application servers (e.g., Rails, Node.js). It handles:
5//! - Process spawning with environment setup
6//! - Signal forwarding (SIGTERM/SIGINT)
7//! - Graceful shutdown with timeout
8//! - Automatic restart on crash (optional)
9//!
10//! # Example
11//!
12//! ```no_run
13//! use warpdrive::process::ProcessSupervisor;
14//! use anyhow::Result;
15//!
16//! #[tokio::main]
17//! async fn main() -> Result<()> {
18//! let mut supervisor = ProcessSupervisor::new(
19//! "bundle".to_string(),
20//! vec!["exec".to_string(), "puma".to_string()],
21//! );
22//!
23//! // Start the process with PORT env var
24//! supervisor.start(3000).await?;
25//!
26//! // Handle signals in background
27//! let signal_handle = tokio::spawn({
28//! let supervisor = supervisor.clone();
29//! async move {
30//! supervisor.handle_signals().await;
31//! }
32//! });
33//!
34//! // Wait for process to exit
35//! let exit_code = supervisor.wait().await?;
36//!
37//! // Clean up signal handler
38//! signal_handle.abort();
39//!
40//! std::process::exit(exit_code);
41//! }
42//! ```
43
44use anyhow::{Context, Result, anyhow};
45use std::sync::Arc;
46use std::time::Duration;
47use tokio::process::{Child, Command};
48use tokio::signal::unix::{SignalKind, signal};
49use tokio::sync::{Mutex, Notify};
50use tracing::{debug, error, info, warn};
51
52/// Graceful shutdown timeout before sending SIGKILL
53const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30);
54
55/// Process supervisor for managing upstream application servers
56///
57/// The ProcessSupervisor spawns and monitors a child process, forwarding signals
58/// and handling graceful shutdown. It's designed to work with application servers
59/// that listen on a specific port.
60#[derive(Clone)]
61pub struct ProcessSupervisor {
62 /// Command to execute
63 command: String,
64
65 /// Arguments for the command
66 args: Vec<String>,
67
68 /// Child process handle (protected by mutex for async access)
69 child: Arc<Mutex<Option<Child>>>,
70
71 /// Notification channel for process startup
72 started: Arc<Notify>,
73
74 /// Notification channel for shutdown signal
75 shutdown: Arc<Notify>,
76}
77
78impl ProcessSupervisor {
79 /// Create a new process supervisor
80 ///
81 /// # Arguments
82 ///
83 /// * `command` - The command to execute (e.g., "bundle", "node")
84 /// * `args` - Command-line arguments
85 ///
86 /// # Example
87 ///
88 /// ```
89 /// use warpdrive::process::ProcessSupervisor;
90 ///
91 /// let supervisor = ProcessSupervisor::new(
92 /// "bundle".to_string(),
93 /// vec!["exec".to_string(), "puma".to_string()],
94 /// );
95 /// ```
96 pub fn new(command: String, args: Vec<String>) -> Self {
97 Self {
98 command,
99 args,
100 child: Arc::new(Mutex::new(None)),
101 started: Arc::new(Notify::new()),
102 shutdown: Arc::new(Notify::new()),
103 }
104 }
105
106 /// Start the child process with the specified port
107 ///
108 /// Spawns the child process with the PORT environment variable set.
109 /// The process inherits stdin, stdout, and stderr from the parent.
110 ///
111 /// # Arguments
112 ///
113 /// * `port` - Port number to pass via PORT environment variable
114 ///
115 /// # Errors
116 ///
117 /// Returns an error if:
118 /// - The command cannot be found or executed
119 /// - The process is already running
120 ///
121 /// # Example
122 ///
123 /// ```no_run
124 /// # use warpdrive::process::ProcessSupervisor;
125 /// # use anyhow::Result;
126 /// # async fn example() -> Result<()> {
127 /// let mut supervisor = ProcessSupervisor::new("rails".to_string(), vec!["server".to_string()]);
128 /// supervisor.start(3000).await?;
129 /// # Ok(())
130 /// # }
131 /// ```
132 pub async fn start(&self, port: u16) -> Result<()> {
133 let mut child_guard = self.child.lock().await;
134
135 // Check if process is already running
136 if child_guard.is_some() {
137 return Err(anyhow!("Process is already running"));
138 }
139
140 info!(
141 "Starting upstream process: {} {}",
142 self.command,
143 self.args.join(" ")
144 );
145
146 // Spawn the child process with PORT environment variable
147 let child = Command::new(&self.command)
148 .args(&self.args)
149 .env("PORT", port.to_string())
150 .stdin(std::process::Stdio::inherit())
151 .stdout(std::process::Stdio::inherit())
152 .stderr(std::process::Stdio::inherit())
153 .spawn()
154 .with_context(|| format!("Failed to spawn command: {}", self.command))?;
155
156 let pid = child
157 .id()
158 .ok_or_else(|| anyhow!("Failed to get child process ID"))?;
159
160 info!("Upstream process started with PID: {}", pid);
161
162 *child_guard = Some(child);
163 drop(child_guard);
164
165 // Notify that process has started
166 self.started.notify_waiters();
167
168 Ok(())
169 }
170
171 /// Wait for the child process to exit
172 ///
173 /// This method blocks until the child process terminates and returns
174 /// the exit code. A non-zero exit code indicates the process was
175 /// terminated by a signal or crashed.
176 ///
177 /// # Errors
178 ///
179 /// Returns an error if:
180 /// - No process is running
181 /// - Failed to wait for the process
182 ///
183 /// # Returns
184 ///
185 /// The exit code of the process (0 for success, non-zero for error/signal)
186 ///
187 /// # Example
188 ///
189 /// ```no_run
190 /// # use warpdrive::process::ProcessSupervisor;
191 /// # use anyhow::Result;
192 /// # async fn example() -> Result<()> {
193 /// # let mut supervisor = ProcessSupervisor::new("sleep".to_string(), vec!["1".to_string()]);
194 /// # supervisor.start(3000).await?;
195 /// let exit_code = supervisor.wait().await?;
196 /// println!("Process exited with code: {}", exit_code);
197 /// # Ok(())
198 /// # }
199 /// ```
200 pub async fn wait(&self) -> Result<i32> {
201 let mut child_guard = self.child.lock().await;
202
203 let child = child_guard
204 .as_mut()
205 .ok_or_else(|| anyhow!("No process is running"))?;
206
207 let status = child
208 .wait()
209 .await
210 .context("Failed to wait for child process")?;
211
212 // Clear the child handle
213 *child_guard = None;
214 drop(child_guard);
215
216 let exit_code = status.code().unwrap_or(-1);
217
218 if status.success() {
219 info!("Upstream process exited successfully");
220 } else {
221 warn!("Upstream process exited with code: {}", exit_code);
222 }
223
224 Ok(exit_code)
225 }
226
227 /// Stop the child process gracefully
228 ///
229 /// Sends SIGTERM to the process and waits for it to exit within the
230 /// timeout period. If the process doesn't exit within the timeout,
231 /// sends SIGKILL to force termination.
232 ///
233 /// # Errors
234 ///
235 /// Returns an error if:
236 /// - No process is running
237 /// - Failed to send signals to the process
238 ///
239 /// # Example
240 ///
241 /// ```no_run
242 /// # use warpdrive::process::ProcessSupervisor;
243 /// # use anyhow::Result;
244 /// # async fn example() -> Result<()> {
245 /// # let mut supervisor = ProcessSupervisor::new("sleep".to_string(), vec!["100".to_string()]);
246 /// # supervisor.start(3000).await?;
247 /// supervisor.stop().await?;
248 /// # Ok(())
249 /// # }
250 /// ```
251 pub async fn stop(&self) -> Result<()> {
252 let mut child_guard = self.child.lock().await;
253
254 let child = match child_guard.as_mut() {
255 Some(c) => c,
256 None => return Ok(()), // No process running, nothing to stop
257 };
258
259 let pid = child
260 .id()
261 .ok_or_else(|| anyhow!("Failed to get child process ID"))?;
262
263 info!("Stopping upstream process (PID: {})", pid);
264
265 // Send SIGTERM for graceful shutdown
266 debug!("Sending SIGTERM to PID {}", pid);
267 send_signal(pid, SignalKind::terminate())?;
268
269 // Wait for process to exit with timeout
270 let wait_result = tokio::select! {
271 status = child.wait() => {
272 match status {
273 Ok(s) => {
274 info!("Process exited gracefully with code: {}", s.code().unwrap_or(-1));
275 Ok(())
276 }
277 Err(e) => Err(anyhow!("Failed to wait for process: {}", e))
278 }
279 }
280 _ = tokio::time::sleep(SHUTDOWN_TIMEOUT) => {
281 warn!("Process did not exit within timeout, sending SIGKILL");
282 Err(anyhow!("Shutdown timeout"))
283 }
284 };
285
286 // If graceful shutdown failed, force kill
287 if wait_result.is_err() {
288 debug!("Sending SIGKILL to PID {}", pid);
289 if let Err(e) = child.kill().await {
290 error!("Failed to kill process: {}", e);
291 }
292
293 // Wait for process to be reaped
294 if let Err(e) = child.wait().await {
295 error!("Failed to wait for killed process: {}", e);
296 }
297 }
298
299 // Clear the child handle
300 *child_guard = None;
301
302 info!("Upstream process stopped");
303 Ok(())
304 }
305
306 /// Handle signals and forward them to the child process
307 ///
308 /// This method listens for SIGTERM and SIGINT signals and forwards them
309 /// to the child process. It should be spawned in a background task.
310 ///
311 /// The method will block until a signal is received or the shutdown
312 /// notification is triggered.
313 ///
314 /// # Example
315 ///
316 /// ```no_run
317 /// # use warpdrive::process::ProcessSupervisor;
318 /// # use anyhow::Result;
319 /// # async fn example() -> Result<()> {
320 /// # let supervisor = ProcessSupervisor::new("sleep".to_string(), vec!["100".to_string()]);
321 /// # supervisor.start(3000).await?;
322 /// // Spawn signal handler in background
323 /// let signal_handle = tokio::spawn({
324 /// let supervisor = supervisor.clone();
325 /// async move {
326 /// supervisor.handle_signals().await;
327 /// }
328 /// });
329 ///
330 /// // Later, clean up
331 /// signal_handle.abort();
332 /// # Ok(())
333 /// # }
334 /// ```
335 pub async fn handle_signals(&self) {
336 let mut sigterm = match signal(SignalKind::terminate()) {
337 Ok(s) => s,
338 Err(e) => {
339 error!("Failed to setup SIGTERM handler: {}", e);
340 return;
341 }
342 };
343
344 let mut sigint = match signal(SignalKind::interrupt()) {
345 Ok(s) => s,
346 Err(e) => {
347 error!("Failed to setup SIGINT handler: {}", e);
348 return;
349 }
350 };
351
352 tokio::select! {
353 _ = sigterm.recv() => {
354 info!("Received SIGTERM, forwarding to child process");
355 self.forward_signal(SignalKind::terminate()).await;
356 }
357 _ = sigint.recv() => {
358 info!("Received SIGINT, forwarding to child process");
359 self.forward_signal(SignalKind::interrupt()).await;
360 }
361 _ = self.shutdown.notified() => {
362 debug!("Shutdown notification received");
363 }
364 }
365 }
366
367 /// Wait for the process to start
368 ///
369 /// This method blocks until the `start()` method has been called and
370 /// the process has been spawned.
371 ///
372 /// # Example
373 ///
374 /// ```no_run
375 /// # use warpdrive::process::ProcessSupervisor;
376 /// # use anyhow::Result;
377 /// # async fn example() -> Result<()> {
378 /// # let supervisor = ProcessSupervisor::new("sleep".to_string(), vec!["1".to_string()]);
379 /// // Start in background
380 /// let supervisor_clone = supervisor.clone();
381 /// tokio::spawn(async move {
382 /// supervisor_clone.start(3000).await.unwrap();
383 /// });
384 ///
385 /// // Wait for startup
386 /// supervisor.wait_for_start().await;
387 /// println!("Process has started");
388 /// # Ok(())
389 /// # }
390 /// ```
391 pub async fn wait_for_start(&self) {
392 self.started.notified().await;
393 }
394
395 /// Forward a signal to the child process
396 async fn forward_signal(&self, kind: SignalKind) {
397 let child_guard = self.child.lock().await;
398
399 if let Some(child) = child_guard.as_ref() {
400 if let Some(pid) = child.id() {
401 if let Err(e) = send_signal(pid, kind) {
402 error!("Failed to forward signal to child: {}", e);
403 }
404 }
405 }
406 }
407
408 /// Trigger shutdown notification
409 ///
410 /// This notifies the signal handler to stop listening for signals.
411 pub fn trigger_shutdown(&self) {
412 self.shutdown.notify_waiters();
413 }
414
415 /// Check if the process is currently running
416 ///
417 /// Returns `true` if a child process is running, `false` otherwise.
418 ///
419 /// # Example
420 ///
421 /// ```no_run
422 /// # use warpdrive::process::ProcessSupervisor;
423 /// # use anyhow::Result;
424 /// # async fn example() -> Result<()> {
425 /// # let supervisor = ProcessSupervisor::new("sleep".to_string(), vec!["1".to_string()]);
426 /// # supervisor.start(3000).await?;
427 /// if supervisor.is_running().await {
428 /// println!("Process is running");
429 /// }
430 /// # Ok(())
431 /// # }
432 /// ```
433 pub async fn is_running(&self) -> bool {
434 self.child.lock().await.is_some()
435 }
436}
437
438/// Send a Unix signal to a process
439///
440/// # Arguments
441///
442/// * `pid` - Process ID to send signal to
443/// * `kind` - Type of signal to send
444///
445/// # Errors
446///
447/// Returns an error if the signal cannot be sent or the process doesn't exist
448fn send_signal(pid: u32, kind: SignalKind) -> Result<()> {
449 use nix::sys::signal::{Signal, kill};
450 use nix::unistd::Pid;
451
452 let signal = match kind {
453 k if k == SignalKind::terminate() => Signal::SIGTERM,
454 k if k == SignalKind::interrupt() => Signal::SIGINT,
455 k if k == SignalKind::quit() => Signal::SIGQUIT,
456 _ => return Err(anyhow!("Unsupported signal type")),
457 };
458
459 kill(Pid::from_raw(pid as i32), signal).context("Failed to send signal to process")?;
460
461 Ok(())
462}
463
464#[cfg(test)]
465mod tests {
466 use super::*;
467
468 #[tokio::test]
469 async fn test_process_supervisor_new() {
470 let supervisor = ProcessSupervisor::new("echo".to_string(), vec!["test".to_string()]);
471
472 // Verify initial state
473 assert!(supervisor.child.lock().await.is_none());
474 }
475
476 #[tokio::test]
477 async fn test_spawn_and_wait() {
478 let supervisor = ProcessSupervisor::new("sleep".to_string(), vec!["0.1".to_string()]);
479
480 supervisor.start(3000).await.unwrap();
481 let exit_code = supervisor.wait().await.unwrap();
482
483 assert_eq!(exit_code, 0);
484 }
485
486 #[tokio::test]
487 async fn test_double_start_error() {
488 let supervisor = ProcessSupervisor::new("sleep".to_string(), vec!["10".to_string()]);
489
490 supervisor.start(3000).await.unwrap();
491
492 // Second start should fail
493 let result = supervisor.start(3000).await;
494 assert!(result.is_err());
495
496 // Clean up
497 supervisor.stop().await.unwrap();
498 }
499
500 #[tokio::test]
501 async fn test_stop_running_process() {
502 let supervisor = ProcessSupervisor::new("sleep".to_string(), vec!["100".to_string()]);
503
504 supervisor.start(3000).await.unwrap();
505
506 tokio::time::sleep(Duration::from_millis(100)).await;
507
508 supervisor.stop().await.unwrap();
509
510 // Process should be stopped
511 assert!(supervisor.child.lock().await.is_none());
512 }
513
514 #[tokio::test]
515 async fn test_wait_for_start() {
516 let supervisor = ProcessSupervisor::new("sleep".to_string(), vec!["1".to_string()]);
517
518 let supervisor_clone = supervisor.clone();
519 tokio::spawn(async move {
520 tokio::time::sleep(Duration::from_millis(100)).await;
521 supervisor_clone.start(3000).await.unwrap();
522 });
523
524 supervisor.wait_for_start().await;
525
526 // Process should be running now
527 assert!(supervisor.child.lock().await.is_some());
528
529 // Clean up
530 supervisor.wait().await.unwrap();
531 }
532
533 #[tokio::test]
534 async fn test_invalid_command() {
535 let supervisor = ProcessSupervisor::new("nonexistent_command_xyz".to_string(), vec![]);
536
537 let result = supervisor.start(3000).await;
538 assert!(result.is_err());
539 }
540}