Skip to main content

modkit/backends/
local.rs

1//! Local process backend implementation
2
3use anyhow::{Context, Result, bail};
4use async_trait::async_trait;
5use parking_lot::RwLock;
6use std::collections::HashMap;
7use std::path::Path;
8use std::process::Stdio;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::process::{Child, Command};
12use tokio::task::JoinHandle;
13use tokio_util::sync::CancellationToken;
14use uuid::Uuid;
15
16use super::log_forwarder::{StreamKind, spawn_stream_forwarder};
17use super::{BackendKind, InstanceHandle, ModuleRuntimeBackend, OopModuleConfig};
18
19/// Grace period before force-killing processes on shutdown
20const SHUTDOWN_GRACE_PERIOD: Duration = Duration::from_secs(5);
21
22/// Grace period for individual instance stop
23const INSTANCE_STOP_GRACE_PERIOD: Duration = Duration::from_secs(2);
24
25/// Timeout for waiting on forwarder tasks during shutdown
26const FORWARDER_DRAIN_TIMEOUT: Duration = Duration::from_millis(100);
27
28/// Send graceful termination signal to a child process.
29///
30/// # Returns
31/// - `true` if signal was successfully sent
32/// - `false` if:
33///   - Process has no PID (already exited)
34///   - PID cannot be converted to i32 (extremely rare: PID > 2,147,483,647)
35///   - Signal delivery fails
36///
37/// On Unix: Sends SIGTERM which the process can handle for cleanup.
38/// On Windows: Returns false since there's no reliable graceful termination
39/// method for console applications.
40#[cfg(unix)]
41fn send_terminate_signal(child: &Child) -> bool {
42    use nix::sys::signal::{Signal, kill};
43    use nix::unistd::Pid;
44
45    let Some(pid) = child.id() else {
46        return false;
47    };
48
49    let Ok(pid_i32) = i32::try_from(pid) else {
50        tracing::warn!(
51            pid = pid,
52            "Failed to convert PID to i32, cannot send SIGTERM (PID exceeds i32::MAX: {})",
53            i32::MAX
54        );
55        return false;
56    };
57
58    kill(Pid::from_raw(pid_i32), Signal::SIGTERM).is_ok()
59}
60
61/// Send graceful termination signal to a child process.
62///
63/// # Returns
64/// - `false` always on Windows (no reliable graceful termination available)
65///
66/// On Windows there's no reliable SIGTERM equivalent for console applications.
67/// We return false to indicate that graceful termination is not available,
68/// and the caller should proceed with force kill.
69#[cfg(windows)]
70fn send_terminate_signal(_child: &Child) -> bool {
71    false
72}
73
74/// Stop a child process with graceful termination and timeout.
75///
76/// 1. Sends SIGTERM (Unix) via `send_terminate_signal`
77/// 2. Waits for process exit within `grace` period
78/// 3. On timeout: force kills the process
79async fn stop_child_with_grace(
80    child: &mut Child,
81    handle: &InstanceHandle,
82    grace: Duration,
83    context: &str,
84) {
85    let pid = child.id();
86    let sent = send_terminate_signal(child);
87
88    // Log with module context if termination signal failed
89    if !sent && pid.is_some() {
90        tracing::debug!(
91            module = %handle.module,
92            instance_id = %handle.instance_id,
93            pid = ?pid,
94            "{context}: graceful termination not available, will force kill"
95        );
96    }
97
98    tracing::debug!(
99        module = %handle.module,
100        instance_id = %handle.instance_id,
101        pid = ?pid,
102        graceful = sent,
103        "{context}: sent termination signal"
104    );
105
106    match tokio::time::timeout(grace, child.wait()).await {
107        Ok(Ok(status)) => {
108            tracing::debug!(
109                module = %handle.module,
110                instance_id = %handle.instance_id,
111                status = ?status,
112                "{context}: process exited gracefully"
113            );
114        }
115        Ok(Err(e)) => {
116            tracing::warn!(
117                module = %handle.module,
118                instance_id = %handle.instance_id,
119                error = %e,
120                "{context}: failed to wait for process"
121            );
122        }
123        Err(_) => {
124            tracing::debug!(
125                module = %handle.module,
126                instance_id = %handle.instance_id,
127                "{context}: grace period expired, force killing"
128            );
129            if let Err(e) = child.kill().await {
130                tracing::warn!(
131                    module = %handle.module,
132                    instance_id = %handle.instance_id,
133                    error = %e,
134                    "{context}: failed to force kill"
135                );
136            }
137        }
138    }
139}
140
141/// Wait for a log forwarder task to finish with timeout.
142async fn wait_forwarder(handle: Option<JoinHandle<()>>) {
143    if let Some(h) = handle {
144        _ = tokio::time::timeout(FORWARDER_DRAIN_TIMEOUT, h).await;
145    }
146}
147
148/// Internal representation of a local process instance
149struct LocalInstance {
150    handle: InstanceHandle,
151    child: Child,
152    /// Task handle for stdout log forwarder
153    stdout_forwarder: Option<JoinHandle<()>>,
154    /// Task handle for stderr log forwarder
155    stderr_forwarder: Option<JoinHandle<()>>,
156}
157
158/// Map key type for instances - uses Uuid directly
159type InstanceMap = HashMap<Uuid, LocalInstance>;
160
161/// Backend that spawns modules as local child processes and manages their lifecycle.
162///
163/// When the cancellation token is triggered, the backend will:
164/// 1. Send termination signal to all processes (SIGTERM on Unix, `TerminateProcess` on Windows)
165/// 2. Wait up to 5 seconds for graceful shutdown
166/// 3. Force kill any remaining processes
167pub struct LocalProcessBackend {
168    instances: Arc<RwLock<InstanceMap>>,
169    cancel: CancellationToken,
170}
171
172impl LocalProcessBackend {
173    /// Create a new `LocalProcessBackend` with the given cancellation token.
174    ///
175    /// When the token is cancelled, all spawned processes will be gracefully stopped.
176    #[must_use]
177    pub fn new(cancel: CancellationToken) -> Self {
178        let backend = Self {
179            instances: Arc::new(RwLock::new(HashMap::new())),
180            cancel: cancel.clone(),
181        };
182
183        // Spawn background task to handle shutdown
184        let instances = Arc::clone(&backend.instances);
185        tokio::spawn(async move {
186            cancel.cancelled().await;
187            tracing::info!("LocalProcessBackend: shutdown signal received, stopping all processes");
188            Self::shutdown_all_instances(instances).await;
189        });
190
191        backend
192    }
193
194    /// Gracefully stop all tracked instances with timeout.
195    async fn shutdown_all_instances(instances: Arc<RwLock<InstanceMap>>) {
196        let mut all_instances: Vec<LocalInstance> = {
197            let mut guard = instances.write();
198            guard.drain().map(|(_, inst)| inst).collect()
199        };
200
201        if all_instances.is_empty() {
202            return;
203        }
204
205        tracing::info!(count = all_instances.len(), "Stopping OoP module processes");
206
207        // Stop all processes with grace period
208        for inst in &mut all_instances {
209            stop_child_with_grace(
210                &mut inst.child,
211                &inst.handle,
212                SHUTDOWN_GRACE_PERIOD,
213                "shutdown",
214            )
215            .await;
216        }
217
218        // Wait for forwarders to drain
219        for inst in all_instances {
220            wait_forwarder(inst.stdout_forwarder).await;
221            wait_forwarder(inst.stderr_forwarder).await;
222        }
223
224        tracing::info!("All OoP module processes stopped");
225    }
226}
227
228#[async_trait]
229impl ModuleRuntimeBackend for LocalProcessBackend {
230    async fn spawn_instance(&self, cfg: &OopModuleConfig) -> Result<InstanceHandle> {
231        // Verify backend kind
232        if cfg.backend != BackendKind::LocalProcess {
233            bail!(
234                "LocalProcessBackend can only spawn LocalProcess instances, got {:?}",
235                cfg.backend
236            );
237        }
238
239        // Ensure binary is set
240        let binary = cfg
241            .binary
242            .as_ref()
243            .context("executable_path must be set for LocalProcess backend")?;
244
245        // Generate unique instance ID using UUID v7
246        let instance_id = Uuid::now_v7();
247
248        // Build command
249        let mut cmd = Command::new(binary);
250        cmd.args(&cfg.args);
251        cmd.envs(&cfg.env);
252
253        // Pipe stdout/stderr for log forwarding
254        cmd.stdout(Stdio::piped());
255        cmd.stderr(Stdio::piped());
256
257        // Set working directory if specified
258        if let Some(ref working_dir) = cfg.working_directory {
259            let path = Path::new(working_dir);
260            if path.exists() && path.is_dir() {
261                cmd.current_dir(path);
262            } else {
263                tracing::warn!(
264                    module = %cfg.name,
265                    working_dir = %working_dir,
266                    "Working directory does not exist or is not a directory, using current dir"
267                );
268            }
269        }
270
271        // Spawn the process
272        let mut child = cmd
273            .spawn()
274            .with_context(|| format!("failed to spawn process: {}", binary.display()))?;
275
276        // Get PID
277        let pid = child.id();
278
279        // Spawn log forwarder tasks for stdout/stderr with cancellation support
280        let module_name = cfg.name.clone();
281        let cancel = self.cancel.clone();
282        let stdout_forwarder = child.stdout.take().map(|stdout| {
283            spawn_stream_forwarder(
284                stdout,
285                module_name.clone(),
286                instance_id,
287                cancel.clone(),
288                StreamKind::Stdout,
289            )
290        });
291        let stderr_forwarder = child.stderr.take().map(|stderr| {
292            spawn_stream_forwarder(
293                stderr,
294                module_name.clone(),
295                instance_id,
296                cancel.clone(),
297                StreamKind::Stderr,
298            )
299        });
300
301        tracing::info!(
302            module = %cfg.name,
303            instance_id = %instance_id,
304            pid = ?pid,
305            "Spawned OoP module with log forwarding"
306        );
307
308        // Create handle
309        let handle = InstanceHandle {
310            module: cfg.name.clone(),
311            instance_id,
312            backend: BackendKind::LocalProcess,
313            pid,
314            created_at: std::time::Instant::now(),
315        };
316
317        // Store in instances map
318        {
319            let mut instances = self.instances.write();
320            instances.insert(
321                instance_id,
322                LocalInstance {
323                    handle: handle.clone(),
324                    child,
325                    stdout_forwarder,
326                    stderr_forwarder,
327                },
328            );
329        }
330
331        Ok(handle)
332    }
333
334    async fn stop_instance(&self, handle: &InstanceHandle) -> Result<()> {
335        let local = {
336            let mut instances = self.instances.write();
337            instances.remove(&handle.instance_id)
338        };
339
340        if let Some(mut local) = local {
341            stop_child_with_grace(
342                &mut local.child,
343                &local.handle,
344                INSTANCE_STOP_GRACE_PERIOD,
345                "stop_instance",
346            )
347            .await;
348
349            // we do not await forwarders here, they'll stop on their own via CancellationToken and pipe close;
350            // shutdown_all_instances handles draining for global shutdown
351        } else {
352            tracing::debug!(
353                module = %handle.module,
354                instance_id = %handle.instance_id,
355                "stop_instance called for unknown instance, ignoring"
356            );
357        }
358
359        Ok(())
360    }
361
362    async fn list_instances(&self, module: &str) -> Result<Vec<InstanceHandle>> {
363        let instances = self.instances.read();
364
365        let result = instances
366            .values()
367            .filter(|inst| inst.handle.module == module)
368            .map(|inst| inst.handle.clone())
369            .collect();
370
371        Ok(result)
372    }
373}
374
375#[cfg(test)]
376mod tests {
377    use super::*;
378    use std::path::PathBuf;
379    use std::time::Instant;
380
381    fn test_backend() -> LocalProcessBackend {
382        LocalProcessBackend::new(CancellationToken::new())
383    }
384
385    #[tokio::test]
386    async fn test_spawn_instance_requires_binary() {
387        let backend = test_backend();
388        let cfg = OopModuleConfig::new("test_module", BackendKind::LocalProcess);
389
390        let result = backend.spawn_instance(&cfg).await;
391        assert!(result.is_err());
392        assert!(
393            result
394                .unwrap_err()
395                .to_string()
396                .contains("executable_path must be set")
397        );
398    }
399
400    #[tokio::test]
401    async fn test_spawn_instance_requires_correct_backend() {
402        let backend = test_backend();
403        let mut cfg = OopModuleConfig::new("test_module", BackendKind::K8s);
404        cfg.binary = Some(PathBuf::from("/bin/echo"));
405
406        let result = backend.spawn_instance(&cfg).await;
407        assert!(result.is_err());
408        assert!(
409            result
410                .unwrap_err()
411                .to_string()
412                .contains("can only spawn LocalProcess")
413        );
414    }
415
416    #[tokio::test]
417    async fn test_spawn_list_stop_lifecycle() {
418        let backend = test_backend();
419
420        // Create config with a valid binary that exists on most systems
421        let mut cfg = OopModuleConfig::new("test_module", BackendKind::LocalProcess);
422
423        // Use a simple command that exists cross-platform
424        #[cfg(windows)]
425        let binary = PathBuf::from("C:\\Windows\\System32\\cmd.exe");
426        #[cfg(not(windows))]
427        let binary = PathBuf::from("/bin/sleep");
428
429        cfg.binary = Some(binary);
430        cfg.args = vec!["10".to_owned()]; // sleep for 10 seconds
431
432        // Spawn instance
433        let handle = backend
434            .spawn_instance(&cfg)
435            .await
436            .expect("should spawn instance");
437
438        assert_eq!(handle.module, "test_module");
439        assert!(!handle.instance_id.is_nil());
440        assert_eq!(handle.backend, BackendKind::LocalProcess);
441
442        // List instances
443        let instances = backend
444            .list_instances("test_module")
445            .await
446            .expect("should list instances");
447        assert_eq!(instances.len(), 1);
448        assert_eq!(instances[0].module, "test_module");
449        assert_eq!(instances[0].instance_id, handle.instance_id);
450
451        // Stop instance
452        backend
453            .stop_instance(&handle)
454            .await
455            .expect("should stop instance");
456
457        // Verify it's removed
458        let instances = backend
459            .list_instances("test_module")
460            .await
461            .expect("should list instances");
462        assert_eq!(instances.len(), 0);
463    }
464
465    #[tokio::test]
466    async fn test_list_instances_filters_by_module() {
467        let backend = test_backend();
468
469        #[cfg(windows)]
470        let binary = PathBuf::from("C:\\Windows\\System32\\cmd.exe");
471        #[cfg(not(windows))]
472        let binary = PathBuf::from("/bin/sleep");
473
474        // Spawn instance for module_a
475        let mut cfg_a = OopModuleConfig::new("module_a", BackendKind::LocalProcess);
476        cfg_a.binary = Some(binary.clone());
477        cfg_a.args = vec!["10".to_owned()];
478
479        let handle_a = backend
480            .spawn_instance(&cfg_a)
481            .await
482            .expect("should spawn module_a");
483
484        // Spawn instance for module_b
485        let mut cfg_b = OopModuleConfig::new("module_b", BackendKind::LocalProcess);
486        cfg_b.binary = Some(binary);
487        cfg_b.args = vec!["10".to_owned()];
488
489        let handle_b = backend
490            .spawn_instance(&cfg_b)
491            .await
492            .expect("should spawn module_b");
493
494        // List module_a instances
495        let instances_a = backend
496            .list_instances("module_a")
497            .await
498            .expect("should list module_a");
499        assert_eq!(instances_a.len(), 1);
500        assert_eq!(instances_a[0].module, "module_a");
501
502        // List module_b instances
503        let instances_b = backend
504            .list_instances("module_b")
505            .await
506            .expect("should list module_b");
507        assert_eq!(instances_b.len(), 1);
508        assert_eq!(instances_b[0].module, "module_b");
509
510        // Clean up
511        backend.stop_instance(&handle_a).await.ok();
512        backend.stop_instance(&handle_b).await.ok();
513    }
514
515    #[tokio::test]
516    async fn test_stop_nonexistent_instance() {
517        let backend = test_backend();
518        let handle = InstanceHandle {
519            module: "test_module".to_owned(),
520            instance_id: Uuid::new_v4(),
521            backend: BackendKind::LocalProcess,
522            pid: None,
523            created_at: Instant::now(),
524        };
525
526        // Should not error even if instance doesn't exist
527        let result = backend.stop_instance(&handle).await;
528        assert!(result.is_ok());
529    }
530
531    #[tokio::test]
532    async fn test_list_instances_empty() {
533        let backend = test_backend();
534        let instances = backend
535            .list_instances("nonexistent_module")
536            .await
537            .expect("should list instances");
538        assert_eq!(instances.len(), 0);
539    }
540
541    mod send_terminate_signal_tests {
542        #[cfg(unix)]
543        use {super::send_terminate_signal, std::time::Duration};
544
545        #[cfg(unix)]
546        #[tokio::test]
547        async fn test_send_terminate_signal_to_valid_process() {
548            // Spawn a long-running process
549            let mut cmd = tokio::process::Command::new("sleep");
550            cmd.args(["30"]);
551
552            let mut child = cmd.spawn().expect("should spawn test process");
553
554            // Send termination signal
555            let result = send_terminate_signal(&child);
556
557            // Should return true indicating signal was sent
558            assert!(result, "Should successfully send SIGTERM to valid process");
559
560            // Wait briefly for graceful shutdown
561            tokio::time::timeout(Duration::from_secs(1), child.wait())
562                .await
563                .expect("process should exit within timeout")
564                .expect("wait should succeed");
565        }
566
567        #[cfg(unix)]
568        #[tokio::test]
569        async fn test_send_terminate_signal_to_exited_process() {
570            // Spawn a process that exits immediately using sh -c 'exit 0'
571            // This works on all Unix systems (Linux, macOS, BSD)
572            let mut cmd = tokio::process::Command::new("/bin/sh");
573            cmd.args(["-c", "exit 0"]);
574            let mut child = cmd.spawn().expect("should spawn test process");
575
576            // Wait for it to exit
577            tokio::time::timeout(Duration::from_millis(100), child.wait())
578                .await
579                .expect("process should exit within timeout")
580                .expect("wait should succeed");
581
582            // Try to send termination signal to exited process
583            let result = send_terminate_signal(&child);
584
585            // Should return false because PID is no longer available
586            assert!(!result, "Should return false for already-exited process");
587        }
588
589        #[cfg(unix)]
590        #[test]
591        fn test_pid_conversion_edge_case_documentation() {
592            // This test documents the edge case behavior for PIDs > i32::MAX
593            // In practice, this is extremely rare as it would require:
594            // 1. System uptime of weeks/months without reboot
595            // 2. PID counter to wrap around multiple times
596            // 3. Specific kernel configuration
597
598            // The maximum value a u32 PID can have
599            let max_u32_pid: u32 = u32::MAX;
600
601            // This would fail to convert to i32
602            let result = i32::try_from(max_u32_pid);
603            assert!(result.is_err(), "u32::MAX should not fit in i32");
604
605            // Our code handles this by logging a warning and returning false
606            // preventing the dangerous unwrap_or(0) that would signal PID 0
607        }
608
609        #[cfg(unix)]
610        #[test]
611        fn test_pid_conversion_normal_range() {
612            // Test that normal PIDs convert successfully
613            let normal_pid: u32 = 12345;
614            let result = i32::try_from(normal_pid);
615            assert!(result.is_ok(), "Normal PID should convert to i32");
616            assert_eq!(result.unwrap(), 12345);
617        }
618    }
619}