1use anyhow::{Context, Result, bail};
4use async_trait::async_trait;
5use parking_lot::RwLock;
6use std::collections::HashMap;
7use std::path::Path;
8use std::process::Stdio;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::process::{Child, Command};
12use tokio::task::JoinHandle;
13use tokio_util::sync::CancellationToken;
14use uuid::Uuid;
15
16use super::log_forwarder::{StreamKind, spawn_stream_forwarder};
17use super::{BackendKind, InstanceHandle, ModuleRuntimeBackend, OopModuleConfig};
18
19const SHUTDOWN_GRACE_PERIOD: Duration = Duration::from_secs(5);
21
22const INSTANCE_STOP_GRACE_PERIOD: Duration = Duration::from_secs(2);
24
25const FORWARDER_DRAIN_TIMEOUT: Duration = Duration::from_millis(100);
27
28#[cfg(unix)]
34fn send_terminate_signal(child: &Child) -> bool {
35 use nix::sys::signal::{Signal, kill};
36 use nix::unistd::Pid;
37
38 if let Some(pid) = child.id() {
39 let pid_i32 = i32::try_from(pid).unwrap_or(0);
40 kill(Pid::from_raw(pid_i32), Signal::SIGTERM).is_ok()
41 } else {
42 false
43 }
44}
45
46#[cfg(windows)]
52fn send_terminate_signal(_child: &Child) -> bool {
53 false
54}
55
56async fn stop_child_with_grace(
62 child: &mut Child,
63 handle: &InstanceHandle,
64 grace: Duration,
65 context: &str,
66) {
67 let pid = child.id();
68 let sent = send_terminate_signal(child);
69
70 tracing::debug!(
71 module = %handle.module,
72 instance_id = %handle.instance_id,
73 pid = ?pid,
74 graceful = sent,
75 "{context}: sent termination signal"
76 );
77
78 match tokio::time::timeout(grace, child.wait()).await {
79 Ok(Ok(status)) => {
80 tracing::debug!(
81 module = %handle.module,
82 instance_id = %handle.instance_id,
83 status = ?status,
84 "{context}: process exited gracefully"
85 );
86 }
87 Ok(Err(e)) => {
88 tracing::warn!(
89 module = %handle.module,
90 instance_id = %handle.instance_id,
91 error = %e,
92 "{context}: failed to wait for process"
93 );
94 }
95 Err(_) => {
96 tracing::debug!(
97 module = %handle.module,
98 instance_id = %handle.instance_id,
99 "{context}: grace period expired, force killing"
100 );
101 if let Err(e) = child.kill().await {
102 tracing::warn!(
103 module = %handle.module,
104 instance_id = %handle.instance_id,
105 error = %e,
106 "{context}: failed to force kill"
107 );
108 }
109 }
110 }
111}
112
113async fn wait_forwarder(handle: Option<JoinHandle<()>>) {
115 if let Some(h) = handle {
116 let _ = tokio::time::timeout(FORWARDER_DRAIN_TIMEOUT, h).await;
117 }
118}
119
120struct LocalInstance {
122 handle: InstanceHandle,
123 child: Child,
124 stdout_forwarder: Option<JoinHandle<()>>,
126 stderr_forwarder: Option<JoinHandle<()>>,
128}
129
130type InstanceMap = HashMap<Uuid, LocalInstance>;
132
133pub struct LocalProcessBackend {
140 instances: Arc<RwLock<InstanceMap>>,
141 cancel: CancellationToken,
142}
143
144impl LocalProcessBackend {
145 #[must_use]
149 pub fn new(cancel: CancellationToken) -> Self {
150 let backend = Self {
151 instances: Arc::new(RwLock::new(HashMap::new())),
152 cancel: cancel.clone(),
153 };
154
155 let instances = Arc::clone(&backend.instances);
157 tokio::spawn(async move {
158 cancel.cancelled().await;
159 tracing::info!("LocalProcessBackend: shutdown signal received, stopping all processes");
160 Self::shutdown_all_instances(instances).await;
161 });
162
163 backend
164 }
165
166 async fn shutdown_all_instances(instances: Arc<RwLock<InstanceMap>>) {
168 let mut all_instances: Vec<LocalInstance> = {
169 let mut guard = instances.write();
170 guard.drain().map(|(_, inst)| inst).collect()
171 };
172
173 if all_instances.is_empty() {
174 return;
175 }
176
177 tracing::info!(count = all_instances.len(), "Stopping OoP module processes");
178
179 for inst in &mut all_instances {
181 stop_child_with_grace(
182 &mut inst.child,
183 &inst.handle,
184 SHUTDOWN_GRACE_PERIOD,
185 "shutdown",
186 )
187 .await;
188 }
189
190 for inst in all_instances {
192 wait_forwarder(inst.stdout_forwarder).await;
193 wait_forwarder(inst.stderr_forwarder).await;
194 }
195
196 tracing::info!("All OoP module processes stopped");
197 }
198}
199
200#[async_trait]
201impl ModuleRuntimeBackend for LocalProcessBackend {
202 async fn spawn_instance(&self, cfg: &OopModuleConfig) -> Result<InstanceHandle> {
203 if cfg.backend != BackendKind::LocalProcess {
205 bail!(
206 "LocalProcessBackend can only spawn LocalProcess instances, got {:?}",
207 cfg.backend
208 );
209 }
210
211 let binary = cfg
213 .binary
214 .as_ref()
215 .context("executable_path must be set for LocalProcess backend")?;
216
217 let instance_id = Uuid::now_v7();
219
220 let mut cmd = Command::new(binary);
222 cmd.args(&cfg.args);
223 cmd.envs(&cfg.env);
224
225 cmd.stdout(Stdio::piped());
227 cmd.stderr(Stdio::piped());
228
229 if let Some(ref working_dir) = cfg.working_directory {
231 let path = Path::new(working_dir);
232 if path.exists() && path.is_dir() {
233 cmd.current_dir(path);
234 } else {
235 tracing::warn!(
236 module = %cfg.name,
237 working_dir = %working_dir,
238 "Working directory does not exist or is not a directory, using current dir"
239 );
240 }
241 }
242
243 let mut child = cmd
245 .spawn()
246 .with_context(|| format!("failed to spawn process: {}", binary.display()))?;
247
248 let pid = child.id();
250
251 let module_name = cfg.name.clone();
253 let cancel = self.cancel.clone();
254 let stdout_forwarder = child.stdout.take().map(|stdout| {
255 spawn_stream_forwarder(
256 stdout,
257 module_name.clone(),
258 instance_id,
259 cancel.clone(),
260 StreamKind::Stdout,
261 )
262 });
263 let stderr_forwarder = child.stderr.take().map(|stderr| {
264 spawn_stream_forwarder(
265 stderr,
266 module_name.clone(),
267 instance_id,
268 cancel.clone(),
269 StreamKind::Stderr,
270 )
271 });
272
273 tracing::info!(
274 module = %cfg.name,
275 instance_id = %instance_id,
276 pid = ?pid,
277 "Spawned OoP module with log forwarding"
278 );
279
280 let handle = InstanceHandle {
282 module: cfg.name.clone(),
283 instance_id,
284 backend: BackendKind::LocalProcess,
285 pid,
286 created_at: std::time::Instant::now(),
287 };
288
289 {
291 let mut instances = self.instances.write();
292 instances.insert(
293 instance_id,
294 LocalInstance {
295 handle: handle.clone(),
296 child,
297 stdout_forwarder,
298 stderr_forwarder,
299 },
300 );
301 }
302
303 Ok(handle)
304 }
305
306 async fn stop_instance(&self, handle: &InstanceHandle) -> Result<()> {
307 let local = {
308 let mut instances = self.instances.write();
309 instances.remove(&handle.instance_id)
310 };
311
312 if let Some(mut local) = local {
313 stop_child_with_grace(
314 &mut local.child,
315 &local.handle,
316 INSTANCE_STOP_GRACE_PERIOD,
317 "stop_instance",
318 )
319 .await;
320
321 } else {
324 tracing::debug!(
325 module = %handle.module,
326 instance_id = %handle.instance_id,
327 "stop_instance called for unknown instance, ignoring"
328 );
329 }
330
331 Ok(())
332 }
333
334 async fn list_instances(&self, module: &str) -> Result<Vec<InstanceHandle>> {
335 let instances = self.instances.read();
336
337 let result = instances
338 .values()
339 .filter(|inst| inst.handle.module == module)
340 .map(|inst| inst.handle.clone())
341 .collect();
342
343 Ok(result)
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350 use std::path::PathBuf;
351 use std::time::Instant;
352
353 fn test_backend() -> LocalProcessBackend {
354 LocalProcessBackend::new(CancellationToken::new())
355 }
356
357 #[tokio::test]
358 async fn test_spawn_instance_requires_binary() {
359 let backend = test_backend();
360 let cfg = OopModuleConfig::new("test_module", BackendKind::LocalProcess);
361
362 let result = backend.spawn_instance(&cfg).await;
363 assert!(result.is_err());
364 assert!(
365 result
366 .unwrap_err()
367 .to_string()
368 .contains("executable_path must be set")
369 );
370 }
371
372 #[tokio::test]
373 async fn test_spawn_instance_requires_correct_backend() {
374 let backend = test_backend();
375 let mut cfg = OopModuleConfig::new("test_module", BackendKind::K8s);
376 cfg.binary = Some(PathBuf::from("/bin/echo"));
377
378 let result = backend.spawn_instance(&cfg).await;
379 assert!(result.is_err());
380 assert!(
381 result
382 .unwrap_err()
383 .to_string()
384 .contains("can only spawn LocalProcess")
385 );
386 }
387
388 #[tokio::test]
389 async fn test_spawn_list_stop_lifecycle() {
390 let backend = test_backend();
391
392 let mut cfg = OopModuleConfig::new("test_module", BackendKind::LocalProcess);
394
395 #[cfg(windows)]
397 let binary = PathBuf::from("C:\\Windows\\System32\\cmd.exe");
398 #[cfg(not(windows))]
399 let binary = PathBuf::from("/bin/sleep");
400
401 cfg.binary = Some(binary);
402 cfg.args = vec!["10".to_owned()]; let handle = backend
406 .spawn_instance(&cfg)
407 .await
408 .expect("should spawn instance");
409
410 assert_eq!(handle.module, "test_module");
411 assert!(!handle.instance_id.is_nil());
412 assert_eq!(handle.backend, BackendKind::LocalProcess);
413
414 let instances = backend
416 .list_instances("test_module")
417 .await
418 .expect("should list instances");
419 assert_eq!(instances.len(), 1);
420 assert_eq!(instances[0].module, "test_module");
421 assert_eq!(instances[0].instance_id, handle.instance_id);
422
423 backend
425 .stop_instance(&handle)
426 .await
427 .expect("should stop instance");
428
429 let instances = backend
431 .list_instances("test_module")
432 .await
433 .expect("should list instances");
434 assert_eq!(instances.len(), 0);
435 }
436
437 #[tokio::test]
438 async fn test_list_instances_filters_by_module() {
439 let backend = test_backend();
440
441 #[cfg(windows)]
442 let binary = PathBuf::from("C:\\Windows\\System32\\cmd.exe");
443 #[cfg(not(windows))]
444 let binary = PathBuf::from("/bin/sleep");
445
446 let mut cfg_a = OopModuleConfig::new("module_a", BackendKind::LocalProcess);
448 cfg_a.binary = Some(binary.clone());
449 cfg_a.args = vec!["10".to_owned()];
450
451 let handle_a = backend
452 .spawn_instance(&cfg_a)
453 .await
454 .expect("should spawn module_a");
455
456 let mut cfg_b = OopModuleConfig::new("module_b", BackendKind::LocalProcess);
458 cfg_b.binary = Some(binary);
459 cfg_b.args = vec!["10".to_owned()];
460
461 let handle_b = backend
462 .spawn_instance(&cfg_b)
463 .await
464 .expect("should spawn module_b");
465
466 let instances_a = backend
468 .list_instances("module_a")
469 .await
470 .expect("should list module_a");
471 assert_eq!(instances_a.len(), 1);
472 assert_eq!(instances_a[0].module, "module_a");
473
474 let instances_b = backend
476 .list_instances("module_b")
477 .await
478 .expect("should list module_b");
479 assert_eq!(instances_b.len(), 1);
480 assert_eq!(instances_b[0].module, "module_b");
481
482 backend.stop_instance(&handle_a).await.ok();
484 backend.stop_instance(&handle_b).await.ok();
485 }
486
487 #[tokio::test]
488 async fn test_stop_nonexistent_instance() {
489 let backend = test_backend();
490 let handle = InstanceHandle {
491 module: "test_module".to_owned(),
492 instance_id: Uuid::new_v4(),
493 backend: BackendKind::LocalProcess,
494 pid: None,
495 created_at: Instant::now(),
496 };
497
498 let result = backend.stop_instance(&handle).await;
500 assert!(result.is_ok());
501 }
502
503 #[tokio::test]
504 async fn test_list_instances_empty() {
505 let backend = test_backend();
506 let instances = backend
507 .list_instances("nonexistent_module")
508 .await
509 .expect("should list instances");
510 assert_eq!(instances.len(), 0);
511 }
512}