1use anyhow::{Context, Result, bail};
4use async_trait::async_trait;
5use parking_lot::RwLock;
6use std::collections::HashMap;
7use std::path::Path;
8use std::process::Stdio;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::process::{Child, Command};
12use tokio::task::JoinHandle;
13use tokio_util::sync::CancellationToken;
14use uuid::Uuid;
15
16use super::log_forwarder::{StreamKind, spawn_stream_forwarder};
17use super::{BackendKind, InstanceHandle, ModuleRuntimeBackend, OopModuleConfig};
18
19const SHUTDOWN_GRACE_PERIOD: Duration = Duration::from_secs(5);
21
22const INSTANCE_STOP_GRACE_PERIOD: Duration = Duration::from_secs(2);
24
25const FORWARDER_DRAIN_TIMEOUT: Duration = Duration::from_millis(100);
27
28#[cfg(unix)]
41fn send_terminate_signal(child: &Child) -> bool {
42 use nix::sys::signal::{Signal, kill};
43 use nix::unistd::Pid;
44
45 let Some(pid) = child.id() else {
46 return false;
47 };
48
49 let Ok(pid_i32) = i32::try_from(pid) else {
50 tracing::warn!(
51 pid = pid,
52 "Failed to convert PID to i32, cannot send SIGTERM (PID exceeds i32::MAX: {})",
53 i32::MAX
54 );
55 return false;
56 };
57
58 kill(Pid::from_raw(pid_i32), Signal::SIGTERM).is_ok()
59}
60
61#[cfg(windows)]
70fn send_terminate_signal(_child: &Child) -> bool {
71 false
72}
73
74async fn stop_child_with_grace(
80 child: &mut Child,
81 handle: &InstanceHandle,
82 grace: Duration,
83 context: &str,
84) {
85 let pid = child.id();
86 let sent = send_terminate_signal(child);
87
88 if !sent && pid.is_some() {
90 tracing::debug!(
91 module = %handle.module,
92 instance_id = %handle.instance_id,
93 pid = ?pid,
94 "{context}: graceful termination not available, will force kill"
95 );
96 }
97
98 tracing::debug!(
99 module = %handle.module,
100 instance_id = %handle.instance_id,
101 pid = ?pid,
102 graceful = sent,
103 "{context}: sent termination signal"
104 );
105
106 match tokio::time::timeout(grace, child.wait()).await {
107 Ok(Ok(status)) => {
108 tracing::debug!(
109 module = %handle.module,
110 instance_id = %handle.instance_id,
111 status = ?status,
112 "{context}: process exited gracefully"
113 );
114 }
115 Ok(Err(e)) => {
116 tracing::warn!(
117 module = %handle.module,
118 instance_id = %handle.instance_id,
119 error = %e,
120 "{context}: failed to wait for process"
121 );
122 }
123 Err(_) => {
124 tracing::debug!(
125 module = %handle.module,
126 instance_id = %handle.instance_id,
127 "{context}: grace period expired, force killing"
128 );
129 if let Err(e) = child.kill().await {
130 tracing::warn!(
131 module = %handle.module,
132 instance_id = %handle.instance_id,
133 error = %e,
134 "{context}: failed to force kill"
135 );
136 }
137 }
138 }
139}
140
141async fn wait_forwarder(handle: Option<JoinHandle<()>>) {
143 if let Some(h) = handle {
144 let _ = tokio::time::timeout(FORWARDER_DRAIN_TIMEOUT, h).await;
145 }
146}
147
148struct LocalInstance {
150 handle: InstanceHandle,
151 child: Child,
152 stdout_forwarder: Option<JoinHandle<()>>,
154 stderr_forwarder: Option<JoinHandle<()>>,
156}
157
158type InstanceMap = HashMap<Uuid, LocalInstance>;
160
161pub struct LocalProcessBackend {
168 instances: Arc<RwLock<InstanceMap>>,
169 cancel: CancellationToken,
170}
171
172impl LocalProcessBackend {
173 #[must_use]
177 pub fn new(cancel: CancellationToken) -> Self {
178 let backend = Self {
179 instances: Arc::new(RwLock::new(HashMap::new())),
180 cancel: cancel.clone(),
181 };
182
183 let instances = Arc::clone(&backend.instances);
185 tokio::spawn(async move {
186 cancel.cancelled().await;
187 tracing::info!("LocalProcessBackend: shutdown signal received, stopping all processes");
188 Self::shutdown_all_instances(instances).await;
189 });
190
191 backend
192 }
193
194 async fn shutdown_all_instances(instances: Arc<RwLock<InstanceMap>>) {
196 let mut all_instances: Vec<LocalInstance> = {
197 let mut guard = instances.write();
198 guard.drain().map(|(_, inst)| inst).collect()
199 };
200
201 if all_instances.is_empty() {
202 return;
203 }
204
205 tracing::info!(count = all_instances.len(), "Stopping OoP module processes");
206
207 for inst in &mut all_instances {
209 stop_child_with_grace(
210 &mut inst.child,
211 &inst.handle,
212 SHUTDOWN_GRACE_PERIOD,
213 "shutdown",
214 )
215 .await;
216 }
217
218 for inst in all_instances {
220 wait_forwarder(inst.stdout_forwarder).await;
221 wait_forwarder(inst.stderr_forwarder).await;
222 }
223
224 tracing::info!("All OoP module processes stopped");
225 }
226}
227
228#[async_trait]
229impl ModuleRuntimeBackend for LocalProcessBackend {
230 async fn spawn_instance(&self, cfg: &OopModuleConfig) -> Result<InstanceHandle> {
231 if cfg.backend != BackendKind::LocalProcess {
233 bail!(
234 "LocalProcessBackend can only spawn LocalProcess instances, got {:?}",
235 cfg.backend
236 );
237 }
238
239 let binary = cfg
241 .binary
242 .as_ref()
243 .context("executable_path must be set for LocalProcess backend")?;
244
245 let instance_id = Uuid::now_v7();
247
248 let mut cmd = Command::new(binary);
250 cmd.args(&cfg.args);
251 cmd.envs(&cfg.env);
252
253 cmd.stdout(Stdio::piped());
255 cmd.stderr(Stdio::piped());
256
257 if let Some(ref working_dir) = cfg.working_directory {
259 let path = Path::new(working_dir);
260 if path.exists() && path.is_dir() {
261 cmd.current_dir(path);
262 } else {
263 tracing::warn!(
264 module = %cfg.name,
265 working_dir = %working_dir,
266 "Working directory does not exist or is not a directory, using current dir"
267 );
268 }
269 }
270
271 let mut child = cmd
273 .spawn()
274 .with_context(|| format!("failed to spawn process: {}", binary.display()))?;
275
276 let pid = child.id();
278
279 let module_name = cfg.name.clone();
281 let cancel = self.cancel.clone();
282 let stdout_forwarder = child.stdout.take().map(|stdout| {
283 spawn_stream_forwarder(
284 stdout,
285 module_name.clone(),
286 instance_id,
287 cancel.clone(),
288 StreamKind::Stdout,
289 )
290 });
291 let stderr_forwarder = child.stderr.take().map(|stderr| {
292 spawn_stream_forwarder(
293 stderr,
294 module_name.clone(),
295 instance_id,
296 cancel.clone(),
297 StreamKind::Stderr,
298 )
299 });
300
301 tracing::info!(
302 module = %cfg.name,
303 instance_id = %instance_id,
304 pid = ?pid,
305 "Spawned OoP module with log forwarding"
306 );
307
308 let handle = InstanceHandle {
310 module: cfg.name.clone(),
311 instance_id,
312 backend: BackendKind::LocalProcess,
313 pid,
314 created_at: std::time::Instant::now(),
315 };
316
317 {
319 let mut instances = self.instances.write();
320 instances.insert(
321 instance_id,
322 LocalInstance {
323 handle: handle.clone(),
324 child,
325 stdout_forwarder,
326 stderr_forwarder,
327 },
328 );
329 }
330
331 Ok(handle)
332 }
333
334 async fn stop_instance(&self, handle: &InstanceHandle) -> Result<()> {
335 let local = {
336 let mut instances = self.instances.write();
337 instances.remove(&handle.instance_id)
338 };
339
340 if let Some(mut local) = local {
341 stop_child_with_grace(
342 &mut local.child,
343 &local.handle,
344 INSTANCE_STOP_GRACE_PERIOD,
345 "stop_instance",
346 )
347 .await;
348
349 } else {
352 tracing::debug!(
353 module = %handle.module,
354 instance_id = %handle.instance_id,
355 "stop_instance called for unknown instance, ignoring"
356 );
357 }
358
359 Ok(())
360 }
361
362 async fn list_instances(&self, module: &str) -> Result<Vec<InstanceHandle>> {
363 let instances = self.instances.read();
364
365 let result = instances
366 .values()
367 .filter(|inst| inst.handle.module == module)
368 .map(|inst| inst.handle.clone())
369 .collect();
370
371 Ok(result)
372 }
373}
374
375#[cfg(test)]
376mod tests {
377 use super::*;
378 use std::path::PathBuf;
379 use std::time::Instant;
380
381 fn test_backend() -> LocalProcessBackend {
382 LocalProcessBackend::new(CancellationToken::new())
383 }
384
385 #[tokio::test]
386 async fn test_spawn_instance_requires_binary() {
387 let backend = test_backend();
388 let cfg = OopModuleConfig::new("test_module", BackendKind::LocalProcess);
389
390 let result = backend.spawn_instance(&cfg).await;
391 assert!(result.is_err());
392 assert!(
393 result
394 .unwrap_err()
395 .to_string()
396 .contains("executable_path must be set")
397 );
398 }
399
400 #[tokio::test]
401 async fn test_spawn_instance_requires_correct_backend() {
402 let backend = test_backend();
403 let mut cfg = OopModuleConfig::new("test_module", BackendKind::K8s);
404 cfg.binary = Some(PathBuf::from("/bin/echo"));
405
406 let result = backend.spawn_instance(&cfg).await;
407 assert!(result.is_err());
408 assert!(
409 result
410 .unwrap_err()
411 .to_string()
412 .contains("can only spawn LocalProcess")
413 );
414 }
415
416 #[tokio::test]
417 async fn test_spawn_list_stop_lifecycle() {
418 let backend = test_backend();
419
420 let mut cfg = OopModuleConfig::new("test_module", BackendKind::LocalProcess);
422
423 #[cfg(windows)]
425 let binary = PathBuf::from("C:\\Windows\\System32\\cmd.exe");
426 #[cfg(not(windows))]
427 let binary = PathBuf::from("/bin/sleep");
428
429 cfg.binary = Some(binary);
430 cfg.args = vec!["10".to_owned()]; let handle = backend
434 .spawn_instance(&cfg)
435 .await
436 .expect("should spawn instance");
437
438 assert_eq!(handle.module, "test_module");
439 assert!(!handle.instance_id.is_nil());
440 assert_eq!(handle.backend, BackendKind::LocalProcess);
441
442 let instances = backend
444 .list_instances("test_module")
445 .await
446 .expect("should list instances");
447 assert_eq!(instances.len(), 1);
448 assert_eq!(instances[0].module, "test_module");
449 assert_eq!(instances[0].instance_id, handle.instance_id);
450
451 backend
453 .stop_instance(&handle)
454 .await
455 .expect("should stop instance");
456
457 let instances = backend
459 .list_instances("test_module")
460 .await
461 .expect("should list instances");
462 assert_eq!(instances.len(), 0);
463 }
464
465 #[tokio::test]
466 async fn test_list_instances_filters_by_module() {
467 let backend = test_backend();
468
469 #[cfg(windows)]
470 let binary = PathBuf::from("C:\\Windows\\System32\\cmd.exe");
471 #[cfg(not(windows))]
472 let binary = PathBuf::from("/bin/sleep");
473
474 let mut cfg_a = OopModuleConfig::new("module_a", BackendKind::LocalProcess);
476 cfg_a.binary = Some(binary.clone());
477 cfg_a.args = vec!["10".to_owned()];
478
479 let handle_a = backend
480 .spawn_instance(&cfg_a)
481 .await
482 .expect("should spawn module_a");
483
484 let mut cfg_b = OopModuleConfig::new("module_b", BackendKind::LocalProcess);
486 cfg_b.binary = Some(binary);
487 cfg_b.args = vec!["10".to_owned()];
488
489 let handle_b = backend
490 .spawn_instance(&cfg_b)
491 .await
492 .expect("should spawn module_b");
493
494 let instances_a = backend
496 .list_instances("module_a")
497 .await
498 .expect("should list module_a");
499 assert_eq!(instances_a.len(), 1);
500 assert_eq!(instances_a[0].module, "module_a");
501
502 let instances_b = backend
504 .list_instances("module_b")
505 .await
506 .expect("should list module_b");
507 assert_eq!(instances_b.len(), 1);
508 assert_eq!(instances_b[0].module, "module_b");
509
510 backend.stop_instance(&handle_a).await.ok();
512 backend.stop_instance(&handle_b).await.ok();
513 }
514
515 #[tokio::test]
516 async fn test_stop_nonexistent_instance() {
517 let backend = test_backend();
518 let handle = InstanceHandle {
519 module: "test_module".to_owned(),
520 instance_id: Uuid::new_v4(),
521 backend: BackendKind::LocalProcess,
522 pid: None,
523 created_at: Instant::now(),
524 };
525
526 let result = backend.stop_instance(&handle).await;
528 assert!(result.is_ok());
529 }
530
531 #[tokio::test]
532 async fn test_list_instances_empty() {
533 let backend = test_backend();
534 let instances = backend
535 .list_instances("nonexistent_module")
536 .await
537 .expect("should list instances");
538 assert_eq!(instances.len(), 0);
539 }
540
541 mod send_terminate_signal_tests {
542 #[cfg(unix)]
543 use {super::send_terminate_signal, std::time::Duration};
544
545 #[cfg(unix)]
546 #[tokio::test]
547 async fn test_send_terminate_signal_to_valid_process() {
548 let mut cmd = tokio::process::Command::new("/bin/sh");
551 cmd.args(["-c", "sleep 30"]);
552
553 let mut child = cmd.spawn().expect("should spawn test process");
554
555 let result = send_terminate_signal(&child);
557
558 assert!(result, "Should successfully send SIGTERM to valid process");
560
561 tokio::time::timeout(Duration::from_millis(100), child.wait())
563 .await
564 .expect("process should exit within timeout")
565 .expect("wait should succeed");
566 }
567
568 #[cfg(unix)]
569 #[tokio::test]
570 async fn test_send_terminate_signal_to_exited_process() {
571 let mut cmd = tokio::process::Command::new("/bin/sh");
574 cmd.args(["-c", "exit 0"]);
575 let mut child = cmd.spawn().expect("should spawn test process");
576
577 tokio::time::timeout(Duration::from_millis(100), child.wait())
579 .await
580 .expect("process should exit within timeout")
581 .expect("wait should succeed");
582
583 let result = send_terminate_signal(&child);
585
586 assert!(!result, "Should return false for already-exited process");
588 }
589
590 #[cfg(unix)]
591 #[test]
592 fn test_pid_conversion_edge_case_documentation() {
593 let max_u32_pid: u32 = u32::MAX;
601
602 let result = i32::try_from(max_u32_pid);
604 assert!(result.is_err(), "u32::MAX should not fit in i32");
605
606 }
609
610 #[cfg(unix)]
611 #[test]
612 fn test_pid_conversion_normal_range() {
613 let normal_pid: u32 = 12345;
615 let result = i32::try_from(normal_pid);
616 assert!(result.is_ok(), "Normal PID should convert to i32");
617 assert_eq!(result.unwrap(), 12345);
618 }
619 }
620}