Skip to main content

modkit/backends/
local.rs

1//! Local process backend implementation
2
3use anyhow::{Context, Result, bail};
4use async_trait::async_trait;
5use parking_lot::RwLock;
6use std::collections::HashMap;
7use std::path::Path;
8use std::process::Stdio;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::process::{Child, Command};
12use tokio::task::JoinHandle;
13use tokio_util::sync::CancellationToken;
14use uuid::Uuid;
15
16use super::log_forwarder::{StreamKind, spawn_stream_forwarder};
17use super::{BackendKind, InstanceHandle, ModuleRuntimeBackend, OopModuleConfig};
18
19/// Grace period before force-killing processes on shutdown
20const SHUTDOWN_GRACE_PERIOD: Duration = Duration::from_secs(5);
21
22/// Grace period for individual instance stop
23const INSTANCE_STOP_GRACE_PERIOD: Duration = Duration::from_secs(2);
24
25/// Timeout for waiting on forwarder tasks during shutdown
26const FORWARDER_DRAIN_TIMEOUT: Duration = Duration::from_millis(100);
27
28/// Send graceful termination signal to a child process.
29///
30/// # Returns
31/// - `true` if signal was successfully sent
32/// - `false` if:
33///   - Process has no PID (already exited)
34///   - PID cannot be converted to i32 (extremely rare: PID > 2,147,483,647)
35///   - Signal delivery fails
36///
37/// On Unix: Sends SIGTERM which the process can handle for cleanup.
38/// On Windows: Returns false since there's no reliable graceful termination
39/// method for console applications.
40#[cfg(unix)]
41fn send_terminate_signal(child: &Child) -> bool {
42    use nix::sys::signal::{Signal, kill};
43    use nix::unistd::Pid;
44
45    let Some(pid) = child.id() else {
46        return false;
47    };
48
49    let Ok(pid_i32) = i32::try_from(pid) else {
50        tracing::warn!(
51            pid = pid,
52            "Failed to convert PID to i32, cannot send SIGTERM (PID exceeds i32::MAX: {})",
53            i32::MAX
54        );
55        return false;
56    };
57
58    kill(Pid::from_raw(pid_i32), Signal::SIGTERM).is_ok()
59}
60
61/// Send graceful termination signal to a child process.
62///
63/// # Returns
64/// - `false` always on Windows (no reliable graceful termination available)
65///
66/// On Windows there's no reliable SIGTERM equivalent for console applications.
67/// We return false to indicate that graceful termination is not available,
68/// and the caller should proceed with force kill.
69#[cfg(windows)]
70fn send_terminate_signal(_child: &Child) -> bool {
71    false
72}
73
74/// Stop a child process with graceful termination and timeout.
75///
76/// 1. Sends SIGTERM (Unix) via `send_terminate_signal`
77/// 2. Waits for process exit within `grace` period
78/// 3. On timeout: force kills the process
79async fn stop_child_with_grace(
80    child: &mut Child,
81    handle: &InstanceHandle,
82    grace: Duration,
83    context: &str,
84) {
85    let pid = child.id();
86    let sent = send_terminate_signal(child);
87
88    // Log with module context if termination signal failed
89    if !sent && pid.is_some() {
90        tracing::debug!(
91            module = %handle.module,
92            instance_id = %handle.instance_id,
93            pid = ?pid,
94            "{context}: graceful termination not available, will force kill"
95        );
96    }
97
98    tracing::debug!(
99        module = %handle.module,
100        instance_id = %handle.instance_id,
101        pid = ?pid,
102        graceful = sent,
103        "{context}: sent termination signal"
104    );
105
106    match tokio::time::timeout(grace, child.wait()).await {
107        Ok(Ok(status)) => {
108            tracing::debug!(
109                module = %handle.module,
110                instance_id = %handle.instance_id,
111                status = ?status,
112                "{context}: process exited gracefully"
113            );
114        }
115        Ok(Err(e)) => {
116            tracing::warn!(
117                module = %handle.module,
118                instance_id = %handle.instance_id,
119                error = %e,
120                "{context}: failed to wait for process"
121            );
122        }
123        Err(_) => {
124            tracing::debug!(
125                module = %handle.module,
126                instance_id = %handle.instance_id,
127                "{context}: grace period expired, force killing"
128            );
129            if let Err(e) = child.kill().await {
130                tracing::warn!(
131                    module = %handle.module,
132                    instance_id = %handle.instance_id,
133                    error = %e,
134                    "{context}: failed to force kill"
135                );
136            }
137        }
138    }
139}
140
141/// Wait for a log forwarder task to finish with timeout.
142async fn wait_forwarder(
143    handle: Option<JoinHandle<()>>,
144    module: &str,
145    instance_id: uuid::Uuid,
146    stream: &str,
147) {
148    let Some(h) = handle else { return };
149    match tokio::time::timeout(FORWARDER_DRAIN_TIMEOUT, h).await {
150        Ok(Ok(())) => {}
151        Ok(Err(e)) => {
152            if e.is_panic() {
153                tracing::warn!(module, %instance_id, stream, error = %e, "log forwarder task panicked");
154            } else {
155                tracing::warn!(module, %instance_id, stream, error = %e, "log forwarder task cancelled");
156            }
157        }
158        Err(_) => {
159            tracing::warn!(
160                module,
161                %instance_id,
162                stream,
163                timeout_ms = FORWARDER_DRAIN_TIMEOUT.as_millis(),
164                "log forwarder did not finish within drain timeout",
165            );
166        }
167    }
168}
169
170/// Internal representation of a local process instance
171struct LocalInstance {
172    handle: InstanceHandle,
173    child: Child,
174    /// Task handle for stdout log forwarder
175    stdout_forwarder: Option<JoinHandle<()>>,
176    /// Task handle for stderr log forwarder
177    stderr_forwarder: Option<JoinHandle<()>>,
178}
179
180/// Map key type for instances - uses Uuid directly
181type InstanceMap = HashMap<Uuid, LocalInstance>;
182
183/// Backend that spawns modules as local child processes and manages their lifecycle.
184///
185/// When the cancellation token is triggered, the backend will:
186/// 1. Send termination signal to all processes (SIGTERM on Unix, `TerminateProcess` on Windows)
187/// 2. Wait up to 5 seconds for graceful shutdown
188/// 3. Force kill any remaining processes
189pub struct LocalProcessBackend {
190    instances: Arc<RwLock<InstanceMap>>,
191    cancel: CancellationToken,
192}
193
194impl LocalProcessBackend {
195    /// Create a new `LocalProcessBackend` with the given cancellation token.
196    ///
197    /// When the token is cancelled, all spawned processes will be gracefully stopped.
198    #[must_use]
199    pub fn new(cancel: CancellationToken) -> Self {
200        let backend = Self {
201            instances: Arc::new(RwLock::new(HashMap::new())),
202            cancel: cancel.clone(),
203        };
204
205        // Spawn background task to handle shutdown
206        let instances = Arc::clone(&backend.instances);
207        tokio::spawn(async move {
208            cancel.cancelled().await;
209            tracing::info!("LocalProcessBackend: shutdown signal received, stopping all processes");
210            Self::shutdown_all_instances(instances).await;
211        });
212
213        backend
214    }
215
216    /// Gracefully stop all tracked instances with timeout.
217    async fn shutdown_all_instances(instances: Arc<RwLock<InstanceMap>>) {
218        let mut all_instances: Vec<LocalInstance> = {
219            let mut guard = instances.write();
220            guard.drain().map(|(_, inst)| inst).collect()
221        };
222
223        if all_instances.is_empty() {
224            return;
225        }
226
227        tracing::info!(count = all_instances.len(), "Stopping OoP module processes");
228
229        // Stop all processes with grace period
230        for inst in &mut all_instances {
231            stop_child_with_grace(
232                &mut inst.child,
233                &inst.handle,
234                SHUTDOWN_GRACE_PERIOD,
235                "shutdown",
236            )
237            .await;
238        }
239
240        // Wait for forwarders to drain
241        for inst in all_instances {
242            wait_forwarder(
243                inst.stdout_forwarder,
244                &inst.handle.module,
245                inst.handle.instance_id,
246                "stdout",
247            )
248            .await;
249            wait_forwarder(
250                inst.stderr_forwarder,
251                &inst.handle.module,
252                inst.handle.instance_id,
253                "stderr",
254            )
255            .await;
256        }
257
258        tracing::info!("All OoP module processes stopped");
259    }
260}
261
262#[async_trait]
263impl ModuleRuntimeBackend for LocalProcessBackend {
264    async fn spawn_instance(&self, cfg: &OopModuleConfig) -> Result<InstanceHandle> {
265        // Verify backend kind
266        if cfg.backend != BackendKind::LocalProcess {
267            bail!(
268                "LocalProcessBackend can only spawn LocalProcess instances, got {:?}",
269                cfg.backend
270            );
271        }
272
273        // Ensure binary is set
274        let binary = cfg
275            .binary
276            .as_ref()
277            .context("executable_path must be set for LocalProcess backend")?;
278
279        // Generate unique instance ID using UUID v7
280        let instance_id = Uuid::now_v7();
281
282        // Build command
283        let mut cmd = Command::new(binary);
284        cmd.args(&cfg.args);
285        cmd.envs(&cfg.env);
286
287        // Pipe stdout/stderr for log forwarding
288        cmd.stdout(Stdio::piped());
289        cmd.stderr(Stdio::piped());
290
291        // Set working directory if specified
292        if let Some(ref working_dir) = cfg.working_directory {
293            let path = Path::new(working_dir);
294            if path.exists() && path.is_dir() {
295                cmd.current_dir(path);
296            } else {
297                tracing::warn!(
298                    module = %cfg.name,
299                    working_dir = %working_dir,
300                    "Working directory does not exist or is not a directory, using current dir"
301                );
302            }
303        }
304
305        // Spawn the process
306        let mut child = cmd
307            .spawn()
308            .with_context(|| format!("failed to spawn process: {}", binary.display()))?;
309
310        // Get PID
311        let pid = child.id();
312
313        // Spawn log forwarder tasks for stdout/stderr with cancellation support
314        let module_name = cfg.name.clone();
315        let cancel = self.cancel.clone();
316        let stdout_forwarder = child.stdout.take().map(|stdout| {
317            spawn_stream_forwarder(
318                stdout,
319                module_name.clone(),
320                instance_id,
321                cancel.clone(),
322                StreamKind::Stdout,
323            )
324        });
325        let stderr_forwarder = child.stderr.take().map(|stderr| {
326            spawn_stream_forwarder(
327                stderr,
328                module_name.clone(),
329                instance_id,
330                cancel.clone(),
331                StreamKind::Stderr,
332            )
333        });
334
335        tracing::info!(
336            module = %cfg.name,
337            instance_id = %instance_id,
338            pid = ?pid,
339            "Spawned OoP module with log forwarding"
340        );
341
342        // Create handle
343        let handle = InstanceHandle {
344            module: cfg.name.clone(),
345            instance_id,
346            backend: BackendKind::LocalProcess,
347            pid,
348            created_at: std::time::Instant::now(),
349        };
350
351        // Store in instances map
352        {
353            let mut instances = self.instances.write();
354            instances.insert(
355                instance_id,
356                LocalInstance {
357                    handle: handle.clone(),
358                    child,
359                    stdout_forwarder,
360                    stderr_forwarder,
361                },
362            );
363        }
364
365        Ok(handle)
366    }
367
368    async fn stop_instance(&self, handle: &InstanceHandle) -> Result<()> {
369        let local = {
370            let mut instances = self.instances.write();
371            instances.remove(&handle.instance_id)
372        };
373
374        if let Some(mut local) = local {
375            stop_child_with_grace(
376                &mut local.child,
377                &local.handle,
378                INSTANCE_STOP_GRACE_PERIOD,
379                "stop_instance",
380            )
381            .await;
382
383            // we do not await forwarders here, they'll stop on their own via CancellationToken and pipe close;
384            // shutdown_all_instances handles draining for global shutdown
385        } else {
386            tracing::debug!(
387                module = %handle.module,
388                instance_id = %handle.instance_id,
389                "stop_instance called for unknown instance, ignoring"
390            );
391        }
392
393        Ok(())
394    }
395
396    async fn list_instances(&self, module: &str) -> Result<Vec<InstanceHandle>> {
397        let instances = self.instances.read();
398
399        let result = instances
400            .values()
401            .filter(|inst| inst.handle.module == module)
402            .map(|inst| inst.handle.clone())
403            .collect();
404
405        Ok(result)
406    }
407}
408
409#[cfg(test)]
410mod tests {
411    use super::*;
412    use std::path::PathBuf;
413    use std::time::Instant;
414
415    fn test_backend() -> LocalProcessBackend {
416        LocalProcessBackend::new(CancellationToken::new())
417    }
418
419    #[tokio::test]
420    async fn test_spawn_instance_requires_binary() {
421        let backend = test_backend();
422        let cfg = OopModuleConfig::new("test_module", BackendKind::LocalProcess);
423
424        let result = backend.spawn_instance(&cfg).await;
425        assert!(result.is_err());
426        assert!(
427            result
428                .unwrap_err()
429                .to_string()
430                .contains("executable_path must be set")
431        );
432    }
433
434    #[tokio::test]
435    async fn test_spawn_instance_requires_correct_backend() {
436        let backend = test_backend();
437        let mut cfg = OopModuleConfig::new("test_module", BackendKind::K8s);
438        cfg.binary = Some(PathBuf::from("/bin/echo"));
439
440        let result = backend.spawn_instance(&cfg).await;
441        assert!(result.is_err());
442        assert!(
443            result
444                .unwrap_err()
445                .to_string()
446                .contains("can only spawn LocalProcess")
447        );
448    }
449
450    #[tokio::test]
451    async fn test_spawn_list_stop_lifecycle() {
452        let backend = test_backend();
453
454        // Create config with a valid binary that exists on most systems
455        let mut cfg = OopModuleConfig::new("test_module", BackendKind::LocalProcess);
456
457        // Use a simple command that exists cross-platform
458        #[cfg(windows)]
459        let binary = PathBuf::from("C:\\Windows\\System32\\cmd.exe");
460        #[cfg(not(windows))]
461        let binary = PathBuf::from("/bin/sleep");
462
463        cfg.binary = Some(binary);
464        cfg.args = vec!["10".to_owned()]; // sleep for 10 seconds
465
466        // Spawn instance
467        let handle = backend
468            .spawn_instance(&cfg)
469            .await
470            .expect("should spawn instance");
471
472        assert_eq!(handle.module, "test_module");
473        assert!(!handle.instance_id.is_nil());
474        assert_eq!(handle.backend, BackendKind::LocalProcess);
475
476        // List instances
477        let instances = backend
478            .list_instances("test_module")
479            .await
480            .expect("should list instances");
481        assert_eq!(instances.len(), 1);
482        assert_eq!(instances[0].module, "test_module");
483        assert_eq!(instances[0].instance_id, handle.instance_id);
484
485        // Stop instance
486        backend
487            .stop_instance(&handle)
488            .await
489            .expect("should stop instance");
490
491        // Verify it's removed
492        let instances = backend
493            .list_instances("test_module")
494            .await
495            .expect("should list instances");
496        assert_eq!(instances.len(), 0);
497    }
498
499    #[tokio::test]
500    async fn test_list_instances_filters_by_module() {
501        let backend = test_backend();
502
503        #[cfg(windows)]
504        let binary = PathBuf::from("C:\\Windows\\System32\\cmd.exe");
505        #[cfg(not(windows))]
506        let binary = PathBuf::from("/bin/sleep");
507
508        // Spawn instance for module_a
509        let mut cfg_a = OopModuleConfig::new("module_a", BackendKind::LocalProcess);
510        cfg_a.binary = Some(binary.clone());
511        cfg_a.args = vec!["10".to_owned()];
512
513        let handle_a = backend
514            .spawn_instance(&cfg_a)
515            .await
516            .expect("should spawn module_a");
517
518        // Spawn instance for module_b
519        let mut cfg_b = OopModuleConfig::new("module_b", BackendKind::LocalProcess);
520        cfg_b.binary = Some(binary);
521        cfg_b.args = vec!["10".to_owned()];
522
523        let handle_b = backend
524            .spawn_instance(&cfg_b)
525            .await
526            .expect("should spawn module_b");
527
528        // List module_a instances
529        let instances_a = backend
530            .list_instances("module_a")
531            .await
532            .expect("should list module_a");
533        assert_eq!(instances_a.len(), 1);
534        assert_eq!(instances_a[0].module, "module_a");
535
536        // List module_b instances
537        let instances_b = backend
538            .list_instances("module_b")
539            .await
540            .expect("should list module_b");
541        assert_eq!(instances_b.len(), 1);
542        assert_eq!(instances_b[0].module, "module_b");
543
544        // Clean up
545        backend.stop_instance(&handle_a).await.ok();
546        backend.stop_instance(&handle_b).await.ok();
547    }
548
549    #[tokio::test]
550    async fn test_stop_nonexistent_instance() {
551        let backend = test_backend();
552        let handle = InstanceHandle {
553            module: "test_module".to_owned(),
554            instance_id: Uuid::new_v4(),
555            backend: BackendKind::LocalProcess,
556            pid: None,
557            created_at: Instant::now(),
558        };
559
560        // Should not error even if instance doesn't exist
561        let result = backend.stop_instance(&handle).await;
562        assert!(result.is_ok());
563    }
564
565    #[tokio::test]
566    async fn test_list_instances_empty() {
567        let backend = test_backend();
568        let instances = backend
569            .list_instances("nonexistent_module")
570            .await
571            .expect("should list instances");
572        assert_eq!(instances.len(), 0);
573    }
574
575    mod send_terminate_signal_tests {
576        #[cfg(unix)]
577        use {super::send_terminate_signal, std::time::Duration};
578
579        #[cfg(unix)]
580        #[tokio::test]
581        async fn test_send_terminate_signal_to_valid_process() {
582            // Spawn a long-running process
583            let mut cmd = tokio::process::Command::new("sleep");
584            cmd.args(["30"]);
585
586            let mut child = cmd.spawn().expect("should spawn test process");
587
588            // Send termination signal
589            let result = send_terminate_signal(&child);
590
591            // Should return true indicating signal was sent
592            assert!(result, "Should successfully send SIGTERM to valid process");
593
594            // Wait briefly for graceful shutdown
595            tokio::time::timeout(Duration::from_secs(1), child.wait())
596                .await
597                .expect("process should exit within timeout")
598                .expect("wait should succeed");
599        }
600
601        #[cfg(unix)]
602        #[tokio::test]
603        async fn test_send_terminate_signal_to_exited_process() {
604            // Spawn a process that exits immediately using sh -c 'exit 0'
605            // This works on all Unix systems (Linux, macOS, BSD)
606            let mut cmd = tokio::process::Command::new("/bin/sh");
607            cmd.args(["-c", "exit 0"]);
608            let mut child = cmd.spawn().expect("should spawn test process");
609
610            // Wait for it to exit
611            tokio::time::timeout(Duration::from_millis(100), child.wait())
612                .await
613                .expect("process should exit within timeout")
614                .expect("wait should succeed");
615
616            // Try to send termination signal to exited process
617            let result = send_terminate_signal(&child);
618
619            // Should return false because PID is no longer available
620            assert!(!result, "Should return false for already-exited process");
621        }
622
623        #[cfg(unix)]
624        #[test]
625        fn test_pid_conversion_edge_case_documentation() {
626            // This test documents the edge case behavior for PIDs > i32::MAX
627            // In practice, this is extremely rare as it would require:
628            // 1. System uptime of weeks/months without reboot
629            // 2. PID counter to wrap around multiple times
630            // 3. Specific kernel configuration
631
632            // The maximum value a u32 PID can have
633            let max_u32_pid: u32 = u32::MAX;
634
635            // This would fail to convert to i32
636            let result = i32::try_from(max_u32_pid);
637            assert!(result.is_err(), "u32::MAX should not fit in i32");
638
639            // Our code handles this by logging a warning and returning false
640            // preventing the dangerous unwrap_or(0) that would signal PID 0
641        }
642
643        #[cfg(unix)]
644        #[test]
645        fn test_pid_conversion_normal_range() {
646            // Test that normal PIDs convert successfully
647            let normal_pid: u32 = 12345;
648            let result = i32::try_from(normal_pid);
649            assert!(result.is_ok(), "Normal PID should convert to i32");
650            assert_eq!(result.unwrap(), 12345);
651        }
652    }
653}