1use anyhow::{Context, Result, bail};
4use async_trait::async_trait;
5use parking_lot::RwLock;
6use std::collections::HashMap;
7use std::path::Path;
8use std::process::Stdio;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::process::{Child, Command};
12use tokio::task::JoinHandle;
13use tokio_util::sync::CancellationToken;
14use uuid::Uuid;
15
16use super::log_forwarder::{StreamKind, spawn_stream_forwarder};
17use super::{BackendKind, InstanceHandle, ModuleRuntimeBackend, OopModuleConfig};
18
19const SHUTDOWN_GRACE_PERIOD: Duration = Duration::from_secs(5);
21
22const INSTANCE_STOP_GRACE_PERIOD: Duration = Duration::from_secs(2);
24
25const FORWARDER_DRAIN_TIMEOUT: Duration = Duration::from_millis(100);
27
28#[cfg(unix)]
41fn send_terminate_signal(child: &Child) -> bool {
42 use nix::sys::signal::{Signal, kill};
43 use nix::unistd::Pid;
44
45 let Some(pid) = child.id() else {
46 return false;
47 };
48
49 let Ok(pid_i32) = i32::try_from(pid) else {
50 tracing::warn!(
51 pid = pid,
52 "Failed to convert PID to i32, cannot send SIGTERM (PID exceeds i32::MAX: {})",
53 i32::MAX
54 );
55 return false;
56 };
57
58 kill(Pid::from_raw(pid_i32), Signal::SIGTERM).is_ok()
59}
60
61#[cfg(windows)]
70fn send_terminate_signal(_child: &Child) -> bool {
71 false
72}
73
74async fn stop_child_with_grace(
80 child: &mut Child,
81 handle: &InstanceHandle,
82 grace: Duration,
83 context: &str,
84) {
85 let pid = child.id();
86 let sent = send_terminate_signal(child);
87
88 if !sent && pid.is_some() {
90 tracing::debug!(
91 module = %handle.module,
92 instance_id = %handle.instance_id,
93 pid = ?pid,
94 "{context}: graceful termination not available, will force kill"
95 );
96 }
97
98 tracing::debug!(
99 module = %handle.module,
100 instance_id = %handle.instance_id,
101 pid = ?pid,
102 graceful = sent,
103 "{context}: sent termination signal"
104 );
105
106 match tokio::time::timeout(grace, child.wait()).await {
107 Ok(Ok(status)) => {
108 tracing::debug!(
109 module = %handle.module,
110 instance_id = %handle.instance_id,
111 status = ?status,
112 "{context}: process exited gracefully"
113 );
114 }
115 Ok(Err(e)) => {
116 tracing::warn!(
117 module = %handle.module,
118 instance_id = %handle.instance_id,
119 error = %e,
120 "{context}: failed to wait for process"
121 );
122 }
123 Err(_) => {
124 tracing::debug!(
125 module = %handle.module,
126 instance_id = %handle.instance_id,
127 "{context}: grace period expired, force killing"
128 );
129 if let Err(e) = child.kill().await {
130 tracing::warn!(
131 module = %handle.module,
132 instance_id = %handle.instance_id,
133 error = %e,
134 "{context}: failed to force kill"
135 );
136 }
137 }
138 }
139}
140
141async fn wait_forwarder(
143 handle: Option<JoinHandle<()>>,
144 module: &str,
145 instance_id: uuid::Uuid,
146 stream: &str,
147) {
148 let Some(h) = handle else { return };
149 match tokio::time::timeout(FORWARDER_DRAIN_TIMEOUT, h).await {
150 Ok(Ok(())) => {}
151 Ok(Err(e)) => {
152 if e.is_panic() {
153 tracing::warn!(module, %instance_id, stream, error = %e, "log forwarder task panicked");
154 } else {
155 tracing::warn!(module, %instance_id, stream, error = %e, "log forwarder task cancelled");
156 }
157 }
158 Err(_) => {
159 tracing::warn!(
160 module,
161 %instance_id,
162 stream,
163 timeout_ms = FORWARDER_DRAIN_TIMEOUT.as_millis(),
164 "log forwarder did not finish within drain timeout",
165 );
166 }
167 }
168}
169
170struct LocalInstance {
172 handle: InstanceHandle,
173 child: Child,
174 stdout_forwarder: Option<JoinHandle<()>>,
176 stderr_forwarder: Option<JoinHandle<()>>,
178}
179
180type InstanceMap = HashMap<Uuid, LocalInstance>;
182
183pub struct LocalProcessBackend {
190 instances: Arc<RwLock<InstanceMap>>,
191 cancel: CancellationToken,
192}
193
194impl LocalProcessBackend {
195 #[must_use]
199 pub fn new(cancel: CancellationToken) -> Self {
200 let backend = Self {
201 instances: Arc::new(RwLock::new(HashMap::new())),
202 cancel: cancel.clone(),
203 };
204
205 let instances = Arc::clone(&backend.instances);
207 tokio::spawn(async move {
208 cancel.cancelled().await;
209 tracing::info!("LocalProcessBackend: shutdown signal received, stopping all processes");
210 Self::shutdown_all_instances(instances).await;
211 });
212
213 backend
214 }
215
216 async fn shutdown_all_instances(instances: Arc<RwLock<InstanceMap>>) {
218 let mut all_instances: Vec<LocalInstance> = {
219 let mut guard = instances.write();
220 guard.drain().map(|(_, inst)| inst).collect()
221 };
222
223 if all_instances.is_empty() {
224 return;
225 }
226
227 tracing::info!(count = all_instances.len(), "Stopping OoP module processes");
228
229 for inst in &mut all_instances {
231 stop_child_with_grace(
232 &mut inst.child,
233 &inst.handle,
234 SHUTDOWN_GRACE_PERIOD,
235 "shutdown",
236 )
237 .await;
238 }
239
240 for inst in all_instances {
242 wait_forwarder(
243 inst.stdout_forwarder,
244 &inst.handle.module,
245 inst.handle.instance_id,
246 "stdout",
247 )
248 .await;
249 wait_forwarder(
250 inst.stderr_forwarder,
251 &inst.handle.module,
252 inst.handle.instance_id,
253 "stderr",
254 )
255 .await;
256 }
257
258 tracing::info!("All OoP module processes stopped");
259 }
260}
261
262#[async_trait]
263impl ModuleRuntimeBackend for LocalProcessBackend {
264 async fn spawn_instance(&self, cfg: &OopModuleConfig) -> Result<InstanceHandle> {
265 if cfg.backend != BackendKind::LocalProcess {
267 bail!(
268 "LocalProcessBackend can only spawn LocalProcess instances, got {:?}",
269 cfg.backend
270 );
271 }
272
273 let binary = cfg
275 .binary
276 .as_ref()
277 .context("executable_path must be set for LocalProcess backend")?;
278
279 let instance_id = Uuid::now_v7();
281
282 let mut cmd = Command::new(binary);
284 cmd.args(&cfg.args);
285 cmd.envs(&cfg.env);
286
287 cmd.stdout(Stdio::piped());
289 cmd.stderr(Stdio::piped());
290
291 if let Some(ref working_dir) = cfg.working_directory {
293 let path = Path::new(working_dir);
294 if path.exists() && path.is_dir() {
295 cmd.current_dir(path);
296 } else {
297 tracing::warn!(
298 module = %cfg.name,
299 working_dir = %working_dir,
300 "Working directory does not exist or is not a directory, using current dir"
301 );
302 }
303 }
304
305 let mut child = cmd
307 .spawn()
308 .with_context(|| format!("failed to spawn process: {}", binary.display()))?;
309
310 let pid = child.id();
312
313 let module_name = cfg.name.clone();
315 let cancel = self.cancel.clone();
316 let stdout_forwarder = child.stdout.take().map(|stdout| {
317 spawn_stream_forwarder(
318 stdout,
319 module_name.clone(),
320 instance_id,
321 cancel.clone(),
322 StreamKind::Stdout,
323 )
324 });
325 let stderr_forwarder = child.stderr.take().map(|stderr| {
326 spawn_stream_forwarder(
327 stderr,
328 module_name.clone(),
329 instance_id,
330 cancel.clone(),
331 StreamKind::Stderr,
332 )
333 });
334
335 tracing::info!(
336 module = %cfg.name,
337 instance_id = %instance_id,
338 pid = ?pid,
339 "Spawned OoP module with log forwarding"
340 );
341
342 let handle = InstanceHandle {
344 module: cfg.name.clone(),
345 instance_id,
346 backend: BackendKind::LocalProcess,
347 pid,
348 created_at: std::time::Instant::now(),
349 };
350
351 {
353 let mut instances = self.instances.write();
354 instances.insert(
355 instance_id,
356 LocalInstance {
357 handle: handle.clone(),
358 child,
359 stdout_forwarder,
360 stderr_forwarder,
361 },
362 );
363 }
364
365 Ok(handle)
366 }
367
368 async fn stop_instance(&self, handle: &InstanceHandle) -> Result<()> {
369 let local = {
370 let mut instances = self.instances.write();
371 instances.remove(&handle.instance_id)
372 };
373
374 if let Some(mut local) = local {
375 stop_child_with_grace(
376 &mut local.child,
377 &local.handle,
378 INSTANCE_STOP_GRACE_PERIOD,
379 "stop_instance",
380 )
381 .await;
382
383 } else {
386 tracing::debug!(
387 module = %handle.module,
388 instance_id = %handle.instance_id,
389 "stop_instance called for unknown instance, ignoring"
390 );
391 }
392
393 Ok(())
394 }
395
396 async fn list_instances(&self, module: &str) -> Result<Vec<InstanceHandle>> {
397 let instances = self.instances.read();
398
399 let result = instances
400 .values()
401 .filter(|inst| inst.handle.module == module)
402 .map(|inst| inst.handle.clone())
403 .collect();
404
405 Ok(result)
406 }
407}
408
409#[cfg(test)]
410mod tests {
411 use super::*;
412 use std::path::PathBuf;
413 use std::time::Instant;
414
415 fn test_backend() -> LocalProcessBackend {
416 LocalProcessBackend::new(CancellationToken::new())
417 }
418
419 #[tokio::test]
420 async fn test_spawn_instance_requires_binary() {
421 let backend = test_backend();
422 let cfg = OopModuleConfig::new("test_module", BackendKind::LocalProcess);
423
424 let result = backend.spawn_instance(&cfg).await;
425 assert!(result.is_err());
426 assert!(
427 result
428 .unwrap_err()
429 .to_string()
430 .contains("executable_path must be set")
431 );
432 }
433
434 #[tokio::test]
435 async fn test_spawn_instance_requires_correct_backend() {
436 let backend = test_backend();
437 let mut cfg = OopModuleConfig::new("test_module", BackendKind::K8s);
438 cfg.binary = Some(PathBuf::from("/bin/echo"));
439
440 let result = backend.spawn_instance(&cfg).await;
441 assert!(result.is_err());
442 assert!(
443 result
444 .unwrap_err()
445 .to_string()
446 .contains("can only spawn LocalProcess")
447 );
448 }
449
450 #[tokio::test]
451 async fn test_spawn_list_stop_lifecycle() {
452 let backend = test_backend();
453
454 let mut cfg = OopModuleConfig::new("test_module", BackendKind::LocalProcess);
456
457 #[cfg(windows)]
459 let binary = PathBuf::from("C:\\Windows\\System32\\cmd.exe");
460 #[cfg(not(windows))]
461 let binary = PathBuf::from("/bin/sleep");
462
463 cfg.binary = Some(binary);
464 cfg.args = vec!["10".to_owned()]; let handle = backend
468 .spawn_instance(&cfg)
469 .await
470 .expect("should spawn instance");
471
472 assert_eq!(handle.module, "test_module");
473 assert!(!handle.instance_id.is_nil());
474 assert_eq!(handle.backend, BackendKind::LocalProcess);
475
476 let instances = backend
478 .list_instances("test_module")
479 .await
480 .expect("should list instances");
481 assert_eq!(instances.len(), 1);
482 assert_eq!(instances[0].module, "test_module");
483 assert_eq!(instances[0].instance_id, handle.instance_id);
484
485 backend
487 .stop_instance(&handle)
488 .await
489 .expect("should stop instance");
490
491 let instances = backend
493 .list_instances("test_module")
494 .await
495 .expect("should list instances");
496 assert_eq!(instances.len(), 0);
497 }
498
499 #[tokio::test]
500 async fn test_list_instances_filters_by_module() {
501 let backend = test_backend();
502
503 #[cfg(windows)]
504 let binary = PathBuf::from("C:\\Windows\\System32\\cmd.exe");
505 #[cfg(not(windows))]
506 let binary = PathBuf::from("/bin/sleep");
507
508 let mut cfg_a = OopModuleConfig::new("module_a", BackendKind::LocalProcess);
510 cfg_a.binary = Some(binary.clone());
511 cfg_a.args = vec!["10".to_owned()];
512
513 let handle_a = backend
514 .spawn_instance(&cfg_a)
515 .await
516 .expect("should spawn module_a");
517
518 let mut cfg_b = OopModuleConfig::new("module_b", BackendKind::LocalProcess);
520 cfg_b.binary = Some(binary);
521 cfg_b.args = vec!["10".to_owned()];
522
523 let handle_b = backend
524 .spawn_instance(&cfg_b)
525 .await
526 .expect("should spawn module_b");
527
528 let instances_a = backend
530 .list_instances("module_a")
531 .await
532 .expect("should list module_a");
533 assert_eq!(instances_a.len(), 1);
534 assert_eq!(instances_a[0].module, "module_a");
535
536 let instances_b = backend
538 .list_instances("module_b")
539 .await
540 .expect("should list module_b");
541 assert_eq!(instances_b.len(), 1);
542 assert_eq!(instances_b[0].module, "module_b");
543
544 backend.stop_instance(&handle_a).await.ok();
546 backend.stop_instance(&handle_b).await.ok();
547 }
548
549 #[tokio::test]
550 async fn test_stop_nonexistent_instance() {
551 let backend = test_backend();
552 let handle = InstanceHandle {
553 module: "test_module".to_owned(),
554 instance_id: Uuid::new_v4(),
555 backend: BackendKind::LocalProcess,
556 pid: None,
557 created_at: Instant::now(),
558 };
559
560 let result = backend.stop_instance(&handle).await;
562 assert!(result.is_ok());
563 }
564
565 #[tokio::test]
566 async fn test_list_instances_empty() {
567 let backend = test_backend();
568 let instances = backend
569 .list_instances("nonexistent_module")
570 .await
571 .expect("should list instances");
572 assert_eq!(instances.len(), 0);
573 }
574
575 mod send_terminate_signal_tests {
576 #[cfg(unix)]
577 use {super::send_terminate_signal, std::time::Duration};
578
579 #[cfg(unix)]
580 #[tokio::test]
581 async fn test_send_terminate_signal_to_valid_process() {
582 let mut cmd = tokio::process::Command::new("sleep");
584 cmd.args(["30"]);
585
586 let mut child = cmd.spawn().expect("should spawn test process");
587
588 let result = send_terminate_signal(&child);
590
591 assert!(result, "Should successfully send SIGTERM to valid process");
593
594 tokio::time::timeout(Duration::from_secs(1), child.wait())
596 .await
597 .expect("process should exit within timeout")
598 .expect("wait should succeed");
599 }
600
601 #[cfg(unix)]
602 #[tokio::test]
603 async fn test_send_terminate_signal_to_exited_process() {
604 let mut cmd = tokio::process::Command::new("/bin/sh");
607 cmd.args(["-c", "exit 0"]);
608 let mut child = cmd.spawn().expect("should spawn test process");
609
610 tokio::time::timeout(Duration::from_millis(100), child.wait())
612 .await
613 .expect("process should exit within timeout")
614 .expect("wait should succeed");
615
616 let result = send_terminate_signal(&child);
618
619 assert!(!result, "Should return false for already-exited process");
621 }
622
623 #[cfg(unix)]
624 #[test]
625 fn test_pid_conversion_edge_case_documentation() {
626 let max_u32_pid: u32 = u32::MAX;
634
635 let result = i32::try_from(max_u32_pid);
637 assert!(result.is_err(), "u32::MAX should not fit in i32");
638
639 }
642
643 #[cfg(unix)]
644 #[test]
645 fn test_pid_conversion_normal_range() {
646 let normal_pid: u32 = 12345;
648 let result = i32::try_from(normal_pid);
649 assert!(result.is_ok(), "Normal PID should convert to i32");
650 assert_eq!(result.unwrap(), 12345);
651 }
652 }
653}