Skip to main content

modkit/backends/
local.rs

1//! Local process backend implementation
2
3use anyhow::{Context, Result, bail};
4use async_trait::async_trait;
5use parking_lot::RwLock;
6use std::collections::HashMap;
7use std::path::Path;
8use std::process::Stdio;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::process::{Child, Command};
12use tokio::task::JoinHandle;
13use tokio_util::sync::CancellationToken;
14use uuid::Uuid;
15
16use super::log_forwarder::{StreamKind, spawn_stream_forwarder};
17use super::{BackendKind, InstanceHandle, ModuleRuntimeBackend, OopModuleConfig};
18
19/// Grace period before force-killing processes on shutdown
20const SHUTDOWN_GRACE_PERIOD: Duration = Duration::from_secs(5);
21
22/// Grace period for individual instance stop
23const INSTANCE_STOP_GRACE_PERIOD: Duration = Duration::from_secs(2);
24
25/// Timeout for waiting on forwarder tasks during shutdown
26const FORWARDER_DRAIN_TIMEOUT: Duration = Duration::from_millis(100);
27
28/// Send graceful termination signal to a child process.
29///
30/// On Unix: Sends SIGTERM which the process can handle for cleanup.
31/// On Windows: Returns false since there's no reliable graceful termination
32/// method for console applications.
33#[cfg(unix)]
34fn send_terminate_signal(child: &Child) -> bool {
35    use nix::sys::signal::{Signal, kill};
36    use nix::unistd::Pid;
37
38    if let Some(pid) = child.id() {
39        let pid_i32 = i32::try_from(pid).unwrap_or(0);
40        kill(Pid::from_raw(pid_i32), Signal::SIGTERM).is_ok()
41    } else {
42        false
43    }
44}
45
46/// Send graceful termination signal to a child process.
47///
48/// On Windows there's no reliable SIGTERM equivalent for console applications.
49/// We return false to indicate that graceful termination is not available,
50/// and the caller should proceed with force kill.
51#[cfg(windows)]
52fn send_terminate_signal(_child: &Child) -> bool {
53    false
54}
55
56/// Stop a child process with graceful termination and timeout.
57///
58/// 1. Sends SIGTERM (Unix) via `send_terminate_signal`
59/// 2. Waits for process exit within `grace` period
60/// 3. On timeout: force kills the process
61async fn stop_child_with_grace(
62    child: &mut Child,
63    handle: &InstanceHandle,
64    grace: Duration,
65    context: &str,
66) {
67    let pid = child.id();
68    let sent = send_terminate_signal(child);
69
70    tracing::debug!(
71        module = %handle.module,
72        instance_id = %handle.instance_id,
73        pid = ?pid,
74        graceful = sent,
75        "{context}: sent termination signal"
76    );
77
78    match tokio::time::timeout(grace, child.wait()).await {
79        Ok(Ok(status)) => {
80            tracing::debug!(
81                module = %handle.module,
82                instance_id = %handle.instance_id,
83                status = ?status,
84                "{context}: process exited gracefully"
85            );
86        }
87        Ok(Err(e)) => {
88            tracing::warn!(
89                module = %handle.module,
90                instance_id = %handle.instance_id,
91                error = %e,
92                "{context}: failed to wait for process"
93            );
94        }
95        Err(_) => {
96            tracing::debug!(
97                module = %handle.module,
98                instance_id = %handle.instance_id,
99                "{context}: grace period expired, force killing"
100            );
101            if let Err(e) = child.kill().await {
102                tracing::warn!(
103                    module = %handle.module,
104                    instance_id = %handle.instance_id,
105                    error = %e,
106                    "{context}: failed to force kill"
107                );
108            }
109        }
110    }
111}
112
113/// Wait for a log forwarder task to finish with timeout.
114async fn wait_forwarder(handle: Option<JoinHandle<()>>) {
115    if let Some(h) = handle {
116        let _ = tokio::time::timeout(FORWARDER_DRAIN_TIMEOUT, h).await;
117    }
118}
119
120/// Internal representation of a local process instance
121struct LocalInstance {
122    handle: InstanceHandle,
123    child: Child,
124    /// Task handle for stdout log forwarder
125    stdout_forwarder: Option<JoinHandle<()>>,
126    /// Task handle for stderr log forwarder
127    stderr_forwarder: Option<JoinHandle<()>>,
128}
129
130/// Map key type for instances - uses Uuid directly
131type InstanceMap = HashMap<Uuid, LocalInstance>;
132
133/// Backend that spawns modules as local child processes and manages their lifecycle.
134///
135/// When the cancellation token is triggered, the backend will:
136/// 1. Send termination signal to all processes (SIGTERM on Unix, `TerminateProcess` on Windows)
137/// 2. Wait up to 5 seconds for graceful shutdown
138/// 3. Force kill any remaining processes
139pub struct LocalProcessBackend {
140    instances: Arc<RwLock<InstanceMap>>,
141    cancel: CancellationToken,
142}
143
144impl LocalProcessBackend {
145    /// Create a new `LocalProcessBackend` with the given cancellation token.
146    ///
147    /// When the token is cancelled, all spawned processes will be gracefully stopped.
148    #[must_use]
149    pub fn new(cancel: CancellationToken) -> Self {
150        let backend = Self {
151            instances: Arc::new(RwLock::new(HashMap::new())),
152            cancel: cancel.clone(),
153        };
154
155        // Spawn background task to handle shutdown
156        let instances = Arc::clone(&backend.instances);
157        tokio::spawn(async move {
158            cancel.cancelled().await;
159            tracing::info!("LocalProcessBackend: shutdown signal received, stopping all processes");
160            Self::shutdown_all_instances(instances).await;
161        });
162
163        backend
164    }
165
166    /// Gracefully stop all tracked instances with timeout.
167    async fn shutdown_all_instances(instances: Arc<RwLock<InstanceMap>>) {
168        let mut all_instances: Vec<LocalInstance> = {
169            let mut guard = instances.write();
170            guard.drain().map(|(_, inst)| inst).collect()
171        };
172
173        if all_instances.is_empty() {
174            return;
175        }
176
177        tracing::info!(count = all_instances.len(), "Stopping OoP module processes");
178
179        // Stop all processes with grace period
180        for inst in &mut all_instances {
181            stop_child_with_grace(
182                &mut inst.child,
183                &inst.handle,
184                SHUTDOWN_GRACE_PERIOD,
185                "shutdown",
186            )
187            .await;
188        }
189
190        // Wait for forwarders to drain
191        for inst in all_instances {
192            wait_forwarder(inst.stdout_forwarder).await;
193            wait_forwarder(inst.stderr_forwarder).await;
194        }
195
196        tracing::info!("All OoP module processes stopped");
197    }
198}
199
200#[async_trait]
201impl ModuleRuntimeBackend for LocalProcessBackend {
202    async fn spawn_instance(&self, cfg: &OopModuleConfig) -> Result<InstanceHandle> {
203        // Verify backend kind
204        if cfg.backend != BackendKind::LocalProcess {
205            bail!(
206                "LocalProcessBackend can only spawn LocalProcess instances, got {:?}",
207                cfg.backend
208            );
209        }
210
211        // Ensure binary is set
212        let binary = cfg
213            .binary
214            .as_ref()
215            .context("executable_path must be set for LocalProcess backend")?;
216
217        // Generate unique instance ID using UUID v7
218        let instance_id = Uuid::now_v7();
219
220        // Build command
221        let mut cmd = Command::new(binary);
222        cmd.args(&cfg.args);
223        cmd.envs(&cfg.env);
224
225        // Pipe stdout/stderr for log forwarding
226        cmd.stdout(Stdio::piped());
227        cmd.stderr(Stdio::piped());
228
229        // Set working directory if specified
230        if let Some(ref working_dir) = cfg.working_directory {
231            let path = Path::new(working_dir);
232            if path.exists() && path.is_dir() {
233                cmd.current_dir(path);
234            } else {
235                tracing::warn!(
236                    module = %cfg.name,
237                    working_dir = %working_dir,
238                    "Working directory does not exist or is not a directory, using current dir"
239                );
240            }
241        }
242
243        // Spawn the process
244        let mut child = cmd
245            .spawn()
246            .with_context(|| format!("failed to spawn process: {}", binary.display()))?;
247
248        // Get PID
249        let pid = child.id();
250
251        // Spawn log forwarder tasks for stdout/stderr with cancellation support
252        let module_name = cfg.name.clone();
253        let cancel = self.cancel.clone();
254        let stdout_forwarder = child.stdout.take().map(|stdout| {
255            spawn_stream_forwarder(
256                stdout,
257                module_name.clone(),
258                instance_id,
259                cancel.clone(),
260                StreamKind::Stdout,
261            )
262        });
263        let stderr_forwarder = child.stderr.take().map(|stderr| {
264            spawn_stream_forwarder(
265                stderr,
266                module_name.clone(),
267                instance_id,
268                cancel.clone(),
269                StreamKind::Stderr,
270            )
271        });
272
273        tracing::info!(
274            module = %cfg.name,
275            instance_id = %instance_id,
276            pid = ?pid,
277            "Spawned OoP module with log forwarding"
278        );
279
280        // Create handle
281        let handle = InstanceHandle {
282            module: cfg.name.clone(),
283            instance_id,
284            backend: BackendKind::LocalProcess,
285            pid,
286            created_at: std::time::Instant::now(),
287        };
288
289        // Store in instances map
290        {
291            let mut instances = self.instances.write();
292            instances.insert(
293                instance_id,
294                LocalInstance {
295                    handle: handle.clone(),
296                    child,
297                    stdout_forwarder,
298                    stderr_forwarder,
299                },
300            );
301        }
302
303        Ok(handle)
304    }
305
306    async fn stop_instance(&self, handle: &InstanceHandle) -> Result<()> {
307        let local = {
308            let mut instances = self.instances.write();
309            instances.remove(&handle.instance_id)
310        };
311
312        if let Some(mut local) = local {
313            stop_child_with_grace(
314                &mut local.child,
315                &local.handle,
316                INSTANCE_STOP_GRACE_PERIOD,
317                "stop_instance",
318            )
319            .await;
320
321            // we do not await forwarders here, they'll stop on their own via CancellationToken and pipe close;
322            // shutdown_all_instances handles draining for global shutdown
323        } else {
324            tracing::debug!(
325                module = %handle.module,
326                instance_id = %handle.instance_id,
327                "stop_instance called for unknown instance, ignoring"
328            );
329        }
330
331        Ok(())
332    }
333
334    async fn list_instances(&self, module: &str) -> Result<Vec<InstanceHandle>> {
335        let instances = self.instances.read();
336
337        let result = instances
338            .values()
339            .filter(|inst| inst.handle.module == module)
340            .map(|inst| inst.handle.clone())
341            .collect();
342
343        Ok(result)
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350    use std::path::PathBuf;
351    use std::time::Instant;
352
353    fn test_backend() -> LocalProcessBackend {
354        LocalProcessBackend::new(CancellationToken::new())
355    }
356
357    #[tokio::test]
358    async fn test_spawn_instance_requires_binary() {
359        let backend = test_backend();
360        let cfg = OopModuleConfig::new("test_module", BackendKind::LocalProcess);
361
362        let result = backend.spawn_instance(&cfg).await;
363        assert!(result.is_err());
364        assert!(
365            result
366                .unwrap_err()
367                .to_string()
368                .contains("executable_path must be set")
369        );
370    }
371
372    #[tokio::test]
373    async fn test_spawn_instance_requires_correct_backend() {
374        let backend = test_backend();
375        let mut cfg = OopModuleConfig::new("test_module", BackendKind::K8s);
376        cfg.binary = Some(PathBuf::from("/bin/echo"));
377
378        let result = backend.spawn_instance(&cfg).await;
379        assert!(result.is_err());
380        assert!(
381            result
382                .unwrap_err()
383                .to_string()
384                .contains("can only spawn LocalProcess")
385        );
386    }
387
388    #[tokio::test]
389    async fn test_spawn_list_stop_lifecycle() {
390        let backend = test_backend();
391
392        // Create config with a valid binary that exists on most systems
393        let mut cfg = OopModuleConfig::new("test_module", BackendKind::LocalProcess);
394
395        // Use a simple command that exists cross-platform
396        #[cfg(windows)]
397        let binary = PathBuf::from("C:\\Windows\\System32\\cmd.exe");
398        #[cfg(not(windows))]
399        let binary = PathBuf::from("/bin/sleep");
400
401        cfg.binary = Some(binary);
402        cfg.args = vec!["10".to_owned()]; // sleep for 10 seconds
403
404        // Spawn instance
405        let handle = backend
406            .spawn_instance(&cfg)
407            .await
408            .expect("should spawn instance");
409
410        assert_eq!(handle.module, "test_module");
411        assert!(!handle.instance_id.is_nil());
412        assert_eq!(handle.backend, BackendKind::LocalProcess);
413
414        // List instances
415        let instances = backend
416            .list_instances("test_module")
417            .await
418            .expect("should list instances");
419        assert_eq!(instances.len(), 1);
420        assert_eq!(instances[0].module, "test_module");
421        assert_eq!(instances[0].instance_id, handle.instance_id);
422
423        // Stop instance
424        backend
425            .stop_instance(&handle)
426            .await
427            .expect("should stop instance");
428
429        // Verify it's removed
430        let instances = backend
431            .list_instances("test_module")
432            .await
433            .expect("should list instances");
434        assert_eq!(instances.len(), 0);
435    }
436
437    #[tokio::test]
438    async fn test_list_instances_filters_by_module() {
439        let backend = test_backend();
440
441        #[cfg(windows)]
442        let binary = PathBuf::from("C:\\Windows\\System32\\cmd.exe");
443        #[cfg(not(windows))]
444        let binary = PathBuf::from("/bin/sleep");
445
446        // Spawn instance for module_a
447        let mut cfg_a = OopModuleConfig::new("module_a", BackendKind::LocalProcess);
448        cfg_a.binary = Some(binary.clone());
449        cfg_a.args = vec!["10".to_owned()];
450
451        let handle_a = backend
452            .spawn_instance(&cfg_a)
453            .await
454            .expect("should spawn module_a");
455
456        // Spawn instance for module_b
457        let mut cfg_b = OopModuleConfig::new("module_b", BackendKind::LocalProcess);
458        cfg_b.binary = Some(binary);
459        cfg_b.args = vec!["10".to_owned()];
460
461        let handle_b = backend
462            .spawn_instance(&cfg_b)
463            .await
464            .expect("should spawn module_b");
465
466        // List module_a instances
467        let instances_a = backend
468            .list_instances("module_a")
469            .await
470            .expect("should list module_a");
471        assert_eq!(instances_a.len(), 1);
472        assert_eq!(instances_a[0].module, "module_a");
473
474        // List module_b instances
475        let instances_b = backend
476            .list_instances("module_b")
477            .await
478            .expect("should list module_b");
479        assert_eq!(instances_b.len(), 1);
480        assert_eq!(instances_b[0].module, "module_b");
481
482        // Clean up
483        backend.stop_instance(&handle_a).await.ok();
484        backend.stop_instance(&handle_b).await.ok();
485    }
486
487    #[tokio::test]
488    async fn test_stop_nonexistent_instance() {
489        let backend = test_backend();
490        let handle = InstanceHandle {
491            module: "test_module".to_owned(),
492            instance_id: Uuid::new_v4(),
493            backend: BackendKind::LocalProcess,
494            pid: None,
495            created_at: Instant::now(),
496        };
497
498        // Should not error even if instance doesn't exist
499        let result = backend.stop_instance(&handle).await;
500        assert!(result.is_ok());
501    }
502
503    #[tokio::test]
504    async fn test_list_instances_empty() {
505        let backend = test_backend();
506        let instances = backend
507            .list_instances("nonexistent_module")
508            .await
509            .expect("should list instances");
510        assert_eq!(instances.len(), 0);
511    }
512}