Skip to main content

yarli_cli/yarli-exec/src/
runner.rs

1//! Command runner — spawns child processes and streams output.
2//!
3//! The [`CommandRunner`] trait abstracts over command execution so callers
4//! don't depend on OS process semantics directly. [`LocalCommandRunner`]
5//! implements it using `tokio::process::Command`.
6//!
7//! Integration points:
8//! - Uses `CommandExecution` entity from yarli-core for state tracking.
9//! - Emits `StreamChunk`s for stdout/stderr lines.
10//! - Respects `CancellationToken` from shutdown infrastructure.
11//! - Supports configurable timeouts.
12
13use std::fs;
14use std::io;
15use std::process::Stdio;
16use std::time::{Duration, Instant};
17
18use crate::yarli_observability::YarliMetrics;
19use chrono::Utc;
20use tokio::io::{AsyncBufReadExt, BufReader};
21use tokio::process::Command;
22use tokio_util::sync::CancellationToken;
23use tracing::{debug, info, warn};
24use uuid::Uuid;
25
26use crate::yarli_core::domain::{CommandClass, CorrelationId, RunId, TaskId};
27use crate::yarli_core::entities::command_execution::{
28    CommandExecution, CommandResourceUsage, StreamChunk, StreamType, TokenUsage,
29};
30use crate::yarli_core::fsm::command::CommandState;
31use crate::yarli_core::shutdown::ShutdownController;
32
33use crate::yarli_exec::error::ExecError;
34
35/// Derive a deterministic command ID from an idempotency key.
36///
37/// This keeps command entity IDs stable across retries/replays and lets callers
38/// pre-emit lifecycle events before runner completion.
39pub(crate) fn command_id_from_idempotency_key(idempotency_key: &str) -> Uuid {
40    let namespaced = format!("yarli:command:{idempotency_key}");
41    Uuid::new_v5(&Uuid::NAMESPACE_OID, namespaced.as_bytes())
42}
43
44pub(crate) fn command_id_for_request(idempotency_key: Option<&str>) -> Uuid {
45    idempotency_key
46        .map(command_id_from_idempotency_key)
47        .unwrap_or_else(Uuid::now_v7)
48}
49
50/// Resource limits to apply to a child process via rlimits and/or cgroups.
51#[derive(Debug, Clone, Default)]
52pub struct ResourceLimits {
53    /// Maximum virtual memory (RLIMIT_AS) in bytes.
54    pub max_memory_bytes: Option<u64>,
55    /// Maximum CPU time (RLIMIT_CPU) in seconds.
56    pub max_cpu_seconds: Option<u64>,
57    /// Maximum open file descriptors (RLIMIT_NOFILE).
58    pub max_open_files: Option<u64>,
59    /// Maximum number of processes/threads (RLIMIT_NPROC).
60    pub max_pids: Option<u64>,
61}
62
63/// Request to execute a command.
64#[derive(Debug, Clone)]
65pub struct CommandRequest {
66    /// Task that owns this command.
67    pub task_id: TaskId,
68    /// Run this command belongs to.
69    pub run_id: RunId,
70    /// The command string (program + args, shell-parsed).
71    pub command: String,
72    /// Working directory.
73    pub working_dir: String,
74    /// Command class for concurrency accounting.
75    pub command_class: CommandClass,
76    /// Correlation ID for tracing.
77    pub correlation_id: CorrelationId,
78    /// Optional idempotency key.
79    pub idempotency_key: Option<String>,
80    /// Timeout for the command (None = no timeout).
81    pub timeout: Option<Duration>,
82    /// Environment variables to set (in addition to inherited env).
83    pub env: Vec<(String, String)>,
84    /// Optional channel for live output streaming. Each chunk is sent as it arrives.
85    pub live_output_tx: Option<tokio::sync::mpsc::UnboundedSender<StreamChunk>>,
86    /// Optional resource limits (rlimits/cgroup) applied to the child process.
87    pub resource_limits: Option<ResourceLimits>,
88    /// Estimated prompt/context reload cost for this command, if available.
89    pub rehydration_tokens: Option<u64>,
90}
91
92/// Result of a completed command execution.
93#[derive(Debug)]
94pub struct CommandResult {
95    /// The final command execution entity (in a terminal state).
96    pub execution: CommandExecution,
97    /// All captured output chunks.
98    pub chunks: Vec<StreamChunk>,
99    /// Actor label for persisted journal events.
100    pub runner_actor: String,
101    /// Optional backend-specific metadata for audit/debug.
102    pub backend_metadata: Option<serde_json::Value>,
103}
104
105/// Trait for running commands. Implementations must be Send + Sync for
106/// use across async tasks.
107#[allow(async_fn_in_trait)]
108pub trait CommandRunner: Send + Sync {
109    /// Execute a command to completion, streaming output as it arrives.
110    ///
111    /// The runner should:
112    /// 1. Transition the command through CmdQueued → CmdStarted → CmdStreaming → terminal.
113    /// 2. Capture stdout/stderr as StreamChunks.
114    /// 3. Respect the cancellation token for graceful shutdown.
115    /// 4. Apply timeout if configured.
116    fn run(
117        &self,
118        request: CommandRequest,
119        cancel: CancellationToken,
120    ) -> impl std::future::Future<Output = Result<CommandResult, ExecError>> + Send;
121}
122
123/// Local command runner that spawns OS processes via `tokio::process`.
124#[derive(Debug, Clone)]
125pub struct LocalCommandRunner {
126    /// Default timeout if not specified per-command.
127    pub default_timeout: Option<Duration>,
128    /// Kill the child process if no stdout/stderr output for this duration.
129    /// Detects processes stuck in infinite read-think-compact loops.
130    pub idle_kill_timeout: Option<Duration>,
131    /// Optional metrics registry for telemetry.
132    pub metrics: Option<std::sync::Arc<YarliMetrics>>,
133    /// Optional shutdown controller for tracking child PIDs.
134    /// When set, spawned children are registered so `terminate_children()`
135    /// can clean them up on programmatic failure (not just Ctrl+C).
136    shutdown: Option<ShutdownController>,
137    #[cfg(feature = "chaos")]
138    chaos: Option<std::sync::Arc<crate::yarli_chaos::ChaosController>>,
139}
140
141impl LocalCommandRunner {
142    pub fn new() -> Self {
143        Self {
144            default_timeout: None,
145            idle_kill_timeout: None,
146            metrics: None,
147            shutdown: None,
148            #[cfg(feature = "chaos")]
149            chaos: None,
150        }
151    }
152
153    pub fn with_default_timeout(mut self, timeout: Duration) -> Self {
154        self.default_timeout = Some(timeout);
155        self
156    }
157
158    pub fn with_idle_kill_timeout(mut self, timeout: Duration) -> Self {
159        self.idle_kill_timeout = Some(timeout);
160        self
161    }
162
163    pub fn with_metrics(mut self, metrics: std::sync::Arc<YarliMetrics>) -> Self {
164        self.metrics = Some(metrics);
165        self
166    }
167
168    pub fn with_shutdown(mut self, shutdown: ShutdownController) -> Self {
169        self.shutdown = Some(shutdown);
170        self
171    }
172
173    #[cfg(feature = "chaos")]
174    pub fn with_chaos(
175        mut self,
176        chaos: std::sync::Arc<crate::yarli_chaos::ChaosController>,
177    ) -> Self {
178        self.chaos = Some(chaos);
179        self
180    }
181
182    fn record_overhead(&self, class: CommandClass, phase: &str, duration: Duration) {
183        if let Some(metrics) = &self.metrics {
184            let label = command_class_metric_label(class);
185            metrics.record_command_overhead_duration(label, phase, duration.as_secs_f64());
186        }
187    }
188
189    fn record_enforcement(&self, mechanism: &str, outcome: &str, reason: &str) {
190        if let Some(metrics) = &self.metrics {
191            metrics.record_enforcement_outcome(mechanism, outcome, reason);
192        }
193    }
194}
195
196impl Default for LocalCommandRunner {
197    fn default() -> Self {
198        Self::new()
199    }
200}
201
202impl CommandRunner for LocalCommandRunner {
203    #[tracing::instrument(
204        skip(self, request, cancel),
205        fields(
206            run_id = %request.run_id,
207            task_id = %request.task_id,
208            correlation_id = %request.correlation_id,
209            command = %request.command
210        )
211    )]
212    async fn run(
213        &self,
214        request: CommandRequest,
215        cancel: CancellationToken,
216    ) -> Result<CommandResult, ExecError> {
217        let timeout = request.timeout.or(self.default_timeout);
218        let idle_kill_timeout = self.idle_kill_timeout;
219        let live_tx = request.live_output_tx;
220
221        // Create entity in CmdQueued state.
222        let mut execution = CommandExecution::new(
223            request.task_id,
224            request.run_id,
225            &request.command,
226            &request.working_dir,
227            request.command_class,
228            request.correlation_id,
229        );
230        execution.id = command_id_for_request(request.idempotency_key.as_deref());
231        if let Some(key) = &request.idempotency_key {
232            execution = execution.with_idempotency_key(key);
233        }
234
235        let mut chunks: Vec<StreamChunk> = Vec::new();
236        let mut seq: u64 = 0;
237        let cmd_id = execution.id;
238
239        // Spawn the child process.
240        #[cfg(feature = "chaos")]
241        if let Some(chaos) = &self.chaos {
242            chaos
243                .inject("exec_command_spawn")
244                .await
245                .map_err(|e| ExecError::Io(std::io::Error::other(e.to_string())))?;
246        }
247
248        debug!(command = %request.command, working_dir = %request.working_dir, "spawning command");
249
250        let spawn_start = Instant::now();
251        let mut command = Command::new("sh");
252        command
253            .arg("-c")
254            .arg(&request.command)
255            .current_dir(&request.working_dir)
256            .stdin(Stdio::null())
257            .stdout(Stdio::piped())
258            .stderr(Stdio::piped())
259            .envs(request.env.iter().map(|(k, v)| (k.as_str(), v.as_str())))
260            .kill_on_drop(true);
261        #[cfg(unix)]
262        {
263            let rlimits = request.resource_limits.clone();
264            // Isolate each command in its own process group so cancellation tears
265            // down descendants (shell + child toolchain processes) reliably.
266            // Also apply rlimits if configured.
267            unsafe {
268                command.pre_exec(move || {
269                    if libc::setpgid(0, 0) != 0 {
270                        return Err(std::io::Error::last_os_error());
271                    }
272                    if let Some(ref limits) = rlimits {
273                        apply_rlimits(limits)?;
274                    }
275                    Ok(())
276                });
277            }
278        }
279        let mut child = command.spawn().map_err(ExecError::SpawnFailed)?;
280        if request.resource_limits.is_some() {
281            self.record_enforcement("rlimit", "applied", "pre_exec");
282        }
283        self.record_overhead(request.command_class, "spawn", spawn_start.elapsed());
284        let child_pid = child.id();
285        let process_group_id = child_pid.and_then(pid_to_process_group_id);
286
287        // Acquire a pidfd-backed handle (falls back to raw PID on older kernels).
288        #[cfg(unix)]
289        let process_handle = child_pid.map(crate::yarli_exec::pidfd::ProcessHandle::acquire);
290        #[cfg(not(unix))]
291        let process_handle: Option<crate::yarli_exec::pidfd::ProcessHandle> = None;
292        #[cfg(unix)]
293        if let Some(handle) = process_handle.as_ref() {
294            if handle.is_pidfd() {
295                self.record_enforcement("pidfd", "applied", "pidfd");
296            } else {
297                self.record_enforcement("pidfd", "fallback", "raw_pid");
298            }
299        }
300
301        // Create cgroup sandbox and add child PID (best-effort, non-fatal).
302        let _cgroup_sandbox =
303            if let (Some(pid), Some(ref limits)) = (child_pid, &request.resource_limits) {
304                let run_short = &request.run_id.to_string()[..8];
305                let task_short = &request.task_id.to_string()[..8];
306                let mgr = crate::yarli_exec::cgroup::LocalCgroupManager::new();
307                match crate::yarli_exec::cgroup::CgroupManager::create_sandbox(
308                    &mgr, run_short, task_short, limits,
309                ) {
310                    crate::yarli_exec::cgroup::CgroupSandboxOutcome::Attached(sb) => {
311                        if let Err(e) = sb.add_pid(pid) {
312                            self.record_enforcement(
313                                "cgroup",
314                                "failed",
315                                &format!("add_pid_{}", io_error_reason(&e)),
316                            );
317                            debug!(error = %e, "failed to add child pid to cgroup sandbox");
318                            None
319                        } else {
320                            self.record_enforcement("cgroup", "applied", "attached");
321                            Some(sb)
322                        }
323                    }
324                    crate::yarli_exec::cgroup::CgroupSandboxOutcome::Fallback(mode) => {
325                        self.record_enforcement("cgroup", "fallback", mode.as_label());
326                        None
327                    }
328                }
329            } else {
330                None
331            };
332
333        // Track child PID in shutdown controller so it can be killed on
334        // programmatic failure (RunFailed) — not just on Ctrl+C signal.
335        #[cfg(unix)]
336        if let (Some(pid), Some(shutdown)) = (child_pid, &self.shutdown) {
337            shutdown.track_child(pid);
338        }
339
340        let capture_start = Instant::now();
341        let monitor = child.id().map(spawn_resource_monitor);
342        self.record_overhead(
343            request.command_class,
344            "resource_capture_init",
345            capture_start.elapsed(),
346        );
347
348        // Transition: CmdQueued → CmdStarted
349        execution
350            .transition(
351                CommandState::CmdStarted,
352                "process spawned",
353                "local_runner",
354                None,
355            )
356            .map_err(ExecError::Transition)?;
357
358        info!(cmd_id = %cmd_id, pid = ?child.id(), "command started");
359
360        // Take stdout/stderr handles before moving child into wait.
361        let stdout = child.stdout.take();
362        let stderr = child.stderr.take();
363
364        // Transition: CmdStarted → CmdStreaming
365        execution
366            .transition(
367                CommandState::CmdStreaming,
368                "reading output",
369                "local_runner",
370                None,
371            )
372            .map_err(ExecError::Transition)?;
373
374        // Spawn tasks to read stdout and stderr concurrently.
375        let (stdout_tx, mut stdout_rx) = tokio::sync::mpsc::channel::<(StreamType, String)>(256);
376        let stderr_tx = stdout_tx.clone();
377
378        if let Some(out) = stdout {
379            tokio::spawn(async move {
380                let reader = BufReader::new(out);
381                let mut lines = reader.lines();
382                while let Ok(Some(line)) = lines.next_line().await {
383                    if stdout_tx.send((StreamType::Stdout, line)).await.is_err() {
384                        break;
385                    }
386                }
387            });
388        }
389
390        if let Some(err) = stderr {
391            tokio::spawn(async move {
392                let reader = BufReader::new(err);
393                let mut lines = reader.lines();
394                while let Ok(Some(line)) = lines.next_line().await {
395                    if stderr_tx.send((StreamType::Stderr, line)).await.is_err() {
396                        break;
397                    }
398                }
399            });
400        }
401
402        // Drop the last sender so the channel closes when readers finish.
403        // (stdout_tx was cloned to stderr_tx, and stdout_tx moved into the spawn,
404        //  but we still hold the original — we need to drop it here. Actually, both
405        //  senders were moved into spawns, so we just need to handle the channel.)
406
407        // Collect output while waiting for the process to exit.
408        let wait_result = if let Some(dur) = timeout {
409            tokio::select! {
410                biased;
411                _ = cancel.cancelled() => {
412                    warn!(cmd_id = %cmd_id, "command cancelled by shutdown");
413                    kill_child(
414                        &mut child,
415                        process_group_id,
416                        &process_handle,
417                        self.metrics.clone(),
418                    )
419                    .await;
420                    // Drain remaining output.
421                    drain_channel(&mut stdout_rx, cmd_id, &mut seq, &mut chunks, &live_tx).await;
422                    Err(ExecError::Killed { reason: "shutdown".into() })
423                }
424                _ = tokio::time::sleep(dur) => {
425                    warn!(cmd_id = %cmd_id, timeout = ?dur, "command timed out");
426                    kill_child(
427                        &mut child,
428                        process_group_id,
429                        &process_handle,
430                        self.metrics.clone(),
431                    )
432                    .await;
433                    drain_channel(&mut stdout_rx, cmd_id, &mut seq, &mut chunks, &live_tx).await;
434                    Err(ExecError::Timeout(dur))
435                }
436                result = collect_and_wait(&mut child, process_group_id, &mut stdout_rx, cmd_id, &mut seq, &mut chunks, &live_tx, idle_kill_timeout, &process_handle) => {
437                    result
438                }
439            }
440        } else {
441            tokio::select! {
442                biased;
443                _ = cancel.cancelled() => {
444                    warn!(cmd_id = %cmd_id, "command cancelled by shutdown");
445                    kill_child(
446                        &mut child,
447                        process_group_id,
448                        &process_handle,
449                        self.metrics.clone(),
450                    )
451                    .await;
452                    drain_channel(&mut stdout_rx, cmd_id, &mut seq, &mut chunks, &live_tx).await;
453                    Err(ExecError::Killed { reason: "shutdown".into() })
454                }
455                result = collect_and_wait(&mut child, process_group_id, &mut stdout_rx, cmd_id, &mut seq, &mut chunks, &live_tx, idle_kill_timeout, &process_handle) => {
456                    result
457                }
458            }
459        };
460        // Drop the live sender to signal streaming is done for this command.
461        drop(live_tx);
462
463        let resource_usage = if let Some((stop_tx, monitor_handle)) = monitor {
464            let _ = stop_tx.send(());
465            monitor_handle.await.unwrap_or_default()
466        } else {
467            None
468        };
469        execution.resource_usage = resource_usage;
470        execution.token_usage = Some(estimate_token_usage(
471            &execution.command,
472            &chunks,
473            request.rehydration_tokens,
474        ));
475
476        // Untrack child PID — process has exited (normally, timed out, or killed).
477        #[cfg(unix)]
478        if let (Some(pid), Some(shutdown)) = (child_pid, &self.shutdown) {
479            shutdown.untrack_child(pid);
480        }
481
482        // Apply terminal transition based on outcome.
483        match wait_result {
484            Ok(exit_code) => {
485                execution.chunk_count = seq;
486                execution
487                    .exit(exit_code, "local_runner", None)
488                    .map_err(ExecError::Transition)?;
489                info!(cmd_id = %cmd_id, exit_code, "command exited");
490                Ok(CommandResult {
491                    execution,
492                    chunks,
493                    runner_actor: "local_runner".to_string(),
494                    backend_metadata: None,
495                })
496            }
497            Err(ExecError::Timeout(dur)) => {
498                execution.chunk_count = seq;
499                execution
500                    .transition(
501                        CommandState::CmdTimedOut,
502                        format!("timeout after {dur:?}"),
503                        "local_runner",
504                        None,
505                    )
506                    .map_err(ExecError::Transition)?;
507                Ok(CommandResult {
508                    execution,
509                    chunks,
510                    runner_actor: "local_runner".to_string(),
511                    backend_metadata: None,
512                })
513            }
514            Err(ExecError::Killed { ref reason }) => {
515                execution.chunk_count = seq;
516                execution
517                    .transition(
518                        CommandState::CmdKilled,
519                        format!("killed: {reason}"),
520                        "local_runner",
521                        None,
522                    )
523                    .map_err(ExecError::Transition)?;
524                Ok(CommandResult {
525                    execution,
526                    chunks,
527                    runner_actor: "local_runner".to_string(),
528                    backend_metadata: None,
529                })
530            }
531            Err(e) => Err(e),
532        }
533    }
534}
535
536pub(crate) fn estimate_token_usage(
537    command: &str,
538    chunks: &[StreamChunk],
539    rehydration_tokens: Option<u64>,
540) -> TokenUsage {
541    let prompt_tokens = estimate_tokens(command);
542    let completion_chars: u64 = chunks.iter().map(|c| c.data.chars().count() as u64).sum();
543    let completion_tokens = if completion_chars == 0 {
544        0
545    } else {
546        completion_chars.div_ceil(4)
547    };
548    let total_tokens = prompt_tokens.saturating_add(completion_tokens);
549    TokenUsage {
550        prompt_tokens,
551        completion_tokens,
552        total_tokens,
553        rehydration_tokens,
554        source: "char_count_div4_estimate_v1".to_string(),
555    }
556}
557
558pub(crate) fn estimate_tokens(input: &str) -> u64 {
559    let chars = input.chars().count() as u64;
560    if chars == 0 {
561        0
562    } else {
563        chars.div_ceil(4)
564    }
565}
566
567// TODO(phase2): eBPF-based resource tracking for zero-overhead in-kernel enforcement.
568//   - Attach BPF programs to task cgroup to track RSS, CPU, and I/O.
569//   - Use eBPF maps to enforce budgets and send signals when thresholds are breached.
570//   - Consider libbpf-rs or aya crate for safe Rust BPF program loading.
571//
572// TODO(phase2): DTrace-based resource tracking on macOS/Illumos/FreeBSD.
573//   - Use DTrace probes (proc:::, syscall:::, io:::) to instrument child processes.
574//   - On macOS where /proc is unavailable, DTrace is the primary introspection path.
575//   - Consider shelling out to `dtrace -n` with a D script or using libdtrace bindings.
576//   - Fall back gracefully when DTrace is not available or SIP restricts it.
577//
578// TODO(phase3): seccomp profiles per command class, namespace isolation.
579fn spawn_resource_monitor(
580    pid: u32,
581) -> (
582    tokio::sync::oneshot::Sender<()>,
583    tokio::task::JoinHandle<Option<CommandResourceUsage>>,
584) {
585    let (stop_tx, mut stop_rx) = tokio::sync::oneshot::channel::<()>();
586    let handle = tokio::spawn(async move {
587        let mut usage = CommandResourceUsage::default();
588        let mut saw_sample = false;
589
590        if let Some(sample) = read_process_sample(pid) {
591            saw_sample = true;
592            apply_sample(&mut usage, sample);
593        }
594
595        loop {
596            tokio::select! {
597                _ = tokio::time::sleep(Duration::from_millis(25)) => {
598                    if let Some(sample) = read_process_sample(pid) {
599                        saw_sample = true;
600                        apply_sample(&mut usage, sample);
601                    }
602                }
603                _ = &mut stop_rx => {
604                    break;
605                }
606            }
607        }
608
609        if let Some(sample) = read_process_sample(pid) {
610            saw_sample = true;
611            apply_sample(&mut usage, sample);
612        }
613
614        if saw_sample {
615            Some(usage)
616        } else {
617            None
618        }
619    });
620    (stop_tx, handle)
621}
622
623#[derive(Debug, Clone, Copy, Default)]
624struct ProcessSample {
625    rss_bytes: Option<u64>,
626    cpu_user_ticks: Option<u64>,
627    cpu_system_ticks: Option<u64>,
628    io_read_bytes: Option<u64>,
629    io_write_bytes: Option<u64>,
630}
631
632fn apply_sample(usage: &mut CommandResourceUsage, sample: ProcessSample) {
633    if let Some(rss_bytes) = sample.rss_bytes {
634        usage.max_rss_bytes = Some(usage.max_rss_bytes.unwrap_or(0).max(rss_bytes));
635    }
636    if sample.cpu_user_ticks.is_some() {
637        usage.cpu_user_ticks = sample.cpu_user_ticks;
638    }
639    if sample.cpu_system_ticks.is_some() {
640        usage.cpu_system_ticks = sample.cpu_system_ticks;
641    }
642    if sample.io_read_bytes.is_some() {
643        usage.io_read_bytes = sample.io_read_bytes;
644    }
645    if sample.io_write_bytes.is_some() {
646        usage.io_write_bytes = sample.io_write_bytes;
647    }
648}
649
650#[cfg(target_os = "linux")]
651fn read_process_sample(pid: u32) -> Option<ProcessSample> {
652    let mut sample = ProcessSample::default();
653
654    // /proc/<pid>/status -> VmRSS (kB)
655    let status_path = format!("/proc/{pid}/status");
656    let status = fs::read_to_string(&status_path).ok()?;
657    for line in status.lines() {
658        if let Some(rest) = line.strip_prefix("VmRSS:") {
659            let kb = rest
660                .split_whitespace()
661                .next()
662                .and_then(|n| n.parse::<u64>().ok());
663            sample.rss_bytes = kb.map(|v| v.saturating_mul(1024));
664            break;
665        }
666    }
667
668    // /proc/<pid>/stat -> utime/stime ticks.
669    let stat_path = format!("/proc/{pid}/stat");
670    if let Ok(stat_text) = fs::read_to_string(&stat_path) {
671        if let Some(end_comm) = stat_text.rfind(')') {
672            let after = stat_text.get(end_comm + 2..).unwrap_or("");
673            let fields: Vec<&str> = after.split_whitespace().collect();
674            // after starts at field #3, so utime (#14) => idx 11, stime (#15) => idx 12.
675            sample.cpu_user_ticks = fields.get(11).and_then(|v| v.parse::<u64>().ok());
676            sample.cpu_system_ticks = fields.get(12).and_then(|v| v.parse::<u64>().ok());
677        }
678    }
679
680    // /proc/<pid>/io -> read_bytes/write_bytes.
681    let io_path = format!("/proc/{pid}/io");
682    if let Ok(io_text) = fs::read_to_string(&io_path) {
683        for line in io_text.lines() {
684            if let Some(v) = line.strip_prefix("read_bytes:") {
685                sample.io_read_bytes = v.trim().parse::<u64>().ok();
686            } else if let Some(v) = line.strip_prefix("write_bytes:") {
687                sample.io_write_bytes = v.trim().parse::<u64>().ok();
688            }
689        }
690    }
691
692    Some(sample)
693}
694
695// TODO: Implement macOS/BSD process sampling via DTrace or libproc.
696//   - macOS: use `proc_pidinfo(PROC_PIDTASKINFO)` from libproc for RSS/CPU,
697//     or DTrace probes for IO tracking.
698//   - FreeBSD/Illumos: use kinfo_proc via sysctl or DTrace.
699#[cfg(not(target_os = "linux"))]
700fn read_process_sample(_pid: u32) -> Option<ProcessSample> {
701    None
702}
703
704/// Collect output from the channel and wait for the process to exit.
705///
706/// If `idle_kill_timeout` is Some, the child will be killed if no output
707/// is received within the given duration (detects hung processes).
708#[allow(clippy::too_many_arguments)]
709async fn collect_and_wait(
710    child: &mut tokio::process::Child,
711    process_group_id: Option<i32>,
712    rx: &mut tokio::sync::mpsc::Receiver<(StreamType, String)>,
713    cmd_id: Uuid,
714    seq: &mut u64,
715    chunks: &mut Vec<StreamChunk>,
716    live_tx: &Option<tokio::sync::mpsc::UnboundedSender<StreamChunk>>,
717    idle_kill_timeout: Option<Duration>,
718    process_handle: &Option<crate::yarli_exec::pidfd::ProcessHandle>,
719) -> Result<i32, ExecError> {
720    let mut last_output_at = Instant::now();
721
722    // Read output lines until the channel closes (readers done).
723    // Meanwhile the child is running.
724    loop {
725        let idle_sleep = async {
726            match idle_kill_timeout {
727                Some(dur) => {
728                    let remaining = dur.saturating_sub(last_output_at.elapsed());
729                    tokio::time::sleep(remaining).await;
730                }
731                None => std::future::pending::<()>().await,
732            }
733        };
734
735        tokio::select! {
736            biased;
737            line = rx.recv() => {
738                match line {
739                    Some((stream, data)) => {
740                        last_output_at = Instant::now();
741                        *seq += 1;
742                        let chunk = StreamChunk {
743                            command_id: cmd_id,
744                            sequence: *seq,
745                            stream,
746                            data,
747                            captured_at: Utc::now(),
748                        };
749                        if let Some(tx) = live_tx {
750                            let _ = tx.send(chunk.clone());
751                        }
752                        chunks.push(chunk);
753                    }
754                    None => break, // channel closed, readers done
755                }
756            }
757            _ = idle_sleep => {
758                let dur = idle_kill_timeout.unwrap();
759                warn!(
760                    cmd_id = %cmd_id,
761                    idle_timeout = ?dur,
762                    "command produced no output for idle_kill_timeout — killing"
763                );
764                kill_child(child, process_group_id, process_handle, None).await;
765                drain_channel(rx, cmd_id, seq, chunks, live_tx).await;
766                return Err(ExecError::Timeout(dur));
767            }
768        }
769    }
770
771    // Wait for the process to fully exit.
772    let status = child.wait().await.map_err(ExecError::Io)?;
773    Ok(status.code().unwrap_or(-1))
774}
775
776/// Drain remaining output from the channel after killing.
777async fn drain_channel(
778    rx: &mut tokio::sync::mpsc::Receiver<(StreamType, String)>,
779    cmd_id: Uuid,
780    seq: &mut u64,
781    chunks: &mut Vec<StreamChunk>,
782    live_tx: &Option<tokio::sync::mpsc::UnboundedSender<StreamChunk>>,
783) {
784    // Close the channel and drain remaining messages.
785    rx.close();
786    while let Some((stream, data)) = rx.recv().await {
787        *seq += 1;
788        let chunk = StreamChunk {
789            command_id: cmd_id,
790            sequence: *seq,
791            stream,
792            data,
793            captured_at: Utc::now(),
794        };
795        if let Some(tx) = live_tx {
796            let _ = tx.send(chunk.clone());
797        }
798        chunks.push(chunk);
799    }
800}
801
802/// Apply rlimits to the current process (called in pre_exec context).
803///
804/// Sets RLIMIT_AS, RLIMIT_CPU, RLIMIT_NOFILE, and RLIMIT_NPROC based on
805/// the provided `ResourceLimits`. Only limits with `Some` values are applied.
806#[cfg(unix)]
807fn apply_rlimits(limits: &ResourceLimits) -> std::io::Result<()> {
808    fn set_rlimit(resource: libc::__rlimit_resource_t, value: u64) -> std::io::Result<()> {
809        let lim = libc::rlimit {
810            rlim_cur: value as libc::rlim_t,
811            rlim_max: value as libc::rlim_t,
812        };
813        let rc = unsafe { libc::setrlimit(resource, &lim) };
814        if rc == 0 {
815            Ok(())
816        } else {
817            Err(std::io::Error::last_os_error())
818        }
819    }
820
821    if let Some(bytes) = limits.max_memory_bytes {
822        set_rlimit(libc::RLIMIT_AS, bytes)?;
823    }
824    if let Some(secs) = limits.max_cpu_seconds {
825        set_rlimit(libc::RLIMIT_CPU, secs)?;
826    }
827    if let Some(nfiles) = limits.max_open_files {
828        set_rlimit(libc::RLIMIT_NOFILE, nfiles)?;
829    }
830    if let Some(nproc) = limits.max_pids {
831        set_rlimit(libc::RLIMIT_NPROC, nproc)?;
832    }
833    Ok(())
834}
835
836fn pid_to_process_group_id(pid: u32) -> Option<i32> {
837    i32::try_from(pid).ok()
838}
839
840fn command_class_metric_label(class: CommandClass) -> &'static str {
841    match class {
842        CommandClass::Io => "io",
843        CommandClass::Cpu => "cpu",
844        CommandClass::Git => "git",
845        CommandClass::Tool => "tool",
846    }
847}
848
849fn io_error_reason(err: &io::Error) -> &'static str {
850    if matches!(err.raw_os_error(), Some(code) if code == libc::EROFS) {
851        "read_only"
852    } else {
853        match err.kind() {
854            io::ErrorKind::PermissionDenied => "permission_denied",
855            io::ErrorKind::NotFound => "not_found",
856            io::ErrorKind::AlreadyExists => "already_exists",
857            io::ErrorKind::TimedOut => "timed_out",
858            io::ErrorKind::Interrupted => "interrupted",
859            io::ErrorKind::Unsupported => "unsupported",
860            io::ErrorKind::InvalidInput => "invalid_input",
861            io::ErrorKind::BrokenPipe => "broken_pipe",
862            io::ErrorKind::UnexpectedEof => "unexpected_eof",
863            _ => "other",
864        }
865    }
866}
867
868/// Kill a child process (best-effort).
869///
870/// Uses process-group signaling for SIGTERM/SIGKILL to tear down all descendants.
871/// The optional `ProcessHandle` provides race-free signal delivery via pidfd
872/// on supported kernels.
873async fn kill_child(
874    child: &mut tokio::process::Child,
875    process_group_id: Option<i32>,
876    #[allow(unused_variables)] handle: &Option<crate::yarli_exec::pidfd::ProcessHandle>,
877    metrics: Option<std::sync::Arc<YarliMetrics>>,
878) {
879    let record_failure = |reason: String| {
880        if let Some(metrics) = metrics.as_ref() {
881            metrics.record_enforcement_outcome("pid_termination", "failed", &reason);
882        }
883    };
884
885    #[cfg(unix)]
886    if let Some(pgid) = process_group_id {
887        if let Err(err) = crate::yarli_exec::pidfd::signal_process_group(pgid, libc::SIGTERM) {
888            record_failure(format!("sigterm_process_group_{}", io_error_reason(&err)));
889        }
890        for _ in 0..10 {
891            match child.try_wait() {
892                Ok(Some(_)) => return,
893                Ok(None) => tokio::time::sleep(Duration::from_millis(25)).await,
894                Err(err) => {
895                    record_failure(format!("try_wait_{}", io_error_reason(&err)));
896                    warn!(error = %err, "failed to poll child process status during cancellation");
897                    break;
898                }
899            }
900        }
901        // Escalate to SIGKILL via process group.
902        if let Err(err) = crate::yarli_exec::pidfd::signal_process_group(pgid, libc::SIGKILL) {
903            record_failure(format!("sigkill_process_group_{}", io_error_reason(&err)));
904        }
905        // Also send SIGKILL via pidfd handle for race-free delivery to the lead process.
906        if let Some(h) = handle {
907            if let Err(err) = h.send_signal(libc::SIGKILL) {
908                let mechanism = if h.is_pidfd() { "pidfd" } else { "raw_pid" };
909                record_failure(format!("sigkill_{mechanism}_{}", io_error_reason(&err)));
910            }
911        }
912    }
913    if let Err(e) = child.kill().await {
914        record_failure(format!("child_kill_{}", io_error_reason(&e)));
915        warn!(error = %e, "failed to kill child process");
916    }
917}
918
919#[cfg(test)]
920mod tests {
921    use super::*;
922    #[cfg(unix)]
923    use std::io;
924
925    fn make_request(cmd: &str) -> CommandRequest {
926        CommandRequest {
927            task_id: Uuid::now_v7(),
928            run_id: Uuid::now_v7(),
929            command: cmd.to_string(),
930            working_dir: "/tmp".to_string(),
931            command_class: CommandClass::Io,
932            correlation_id: Uuid::now_v7(),
933            idempotency_key: None,
934            timeout: None,
935            env: vec![],
936            live_output_tx: None,
937            resource_limits: None,
938            rehydration_tokens: None,
939        }
940    }
941
942    #[tokio::test]
943    async fn test_simple_echo() {
944        let runner = LocalCommandRunner::new();
945        let cancel = CancellationToken::new();
946        let req = make_request("echo hello");
947        let result = runner.run(req, cancel).await.unwrap();
948
949        assert_eq!(result.execution.state, CommandState::CmdExited);
950        assert_eq!(result.execution.exit_code, Some(0));
951        assert!(result.execution.started_at.is_some());
952        assert!(result.execution.ended_at.is_some());
953        assert!(!result.chunks.is_empty());
954        assert_eq!(result.chunks[0].data, "hello");
955        assert_eq!(result.chunks[0].stream, StreamType::Stdout);
956        assert_eq!(result.chunks[0].sequence, 1);
957    }
958
959    #[tokio::test]
960    async fn test_multiline_output() {
961        let runner = LocalCommandRunner::new();
962        let cancel = CancellationToken::new();
963        let req = make_request("printf 'line1\nline2\nline3\n'");
964        let result = runner.run(req, cancel).await.unwrap();
965
966        assert_eq!(result.execution.state, CommandState::CmdExited);
967        assert_eq!(result.execution.exit_code, Some(0));
968        assert_eq!(result.chunks.len(), 3);
969        assert_eq!(result.chunks[0].data, "line1");
970        assert_eq!(result.chunks[1].data, "line2");
971        assert_eq!(result.chunks[2].data, "line3");
972        // Sequences are monotonically increasing.
973        assert_eq!(result.chunks[0].sequence, 1);
974        assert_eq!(result.chunks[1].sequence, 2);
975        assert_eq!(result.chunks[2].sequence, 3);
976    }
977
978    #[tokio::test]
979    async fn test_stderr_capture() {
980        let runner = LocalCommandRunner::new();
981        let cancel = CancellationToken::new();
982        let req = make_request("echo err >&2");
983        let result = runner.run(req, cancel).await.unwrap();
984
985        assert_eq!(result.execution.state, CommandState::CmdExited);
986        assert_eq!(result.execution.exit_code, Some(0));
987        assert!(!result.chunks.is_empty());
988        assert_eq!(result.chunks[0].data, "err");
989        assert_eq!(result.chunks[0].stream, StreamType::Stderr);
990    }
991
992    #[tokio::test]
993    async fn test_mixed_stdout_stderr() {
994        let runner = LocalCommandRunner::new();
995        let cancel = CancellationToken::new();
996        let req = make_request("echo out && echo err >&2");
997        let result = runner.run(req, cancel).await.unwrap();
998
999        assert_eq!(result.execution.state, CommandState::CmdExited);
1000        assert_eq!(result.chunks.len(), 2);
1001        let has_stdout = result.chunks.iter().any(|c| c.stream == StreamType::Stdout);
1002        let has_stderr = result.chunks.iter().any(|c| c.stream == StreamType::Stderr);
1003        assert!(has_stdout);
1004        assert!(has_stderr);
1005    }
1006
1007    #[tokio::test]
1008    async fn test_nonzero_exit_code() {
1009        let runner = LocalCommandRunner::new();
1010        let cancel = CancellationToken::new();
1011        let req = make_request("exit 42");
1012        let result = runner.run(req, cancel).await.unwrap();
1013
1014        assert_eq!(result.execution.state, CommandState::CmdExited);
1015        assert_eq!(result.execution.exit_code, Some(42));
1016    }
1017
1018    #[tokio::test]
1019    async fn test_timeout() {
1020        let runner = LocalCommandRunner::new();
1021        let cancel = CancellationToken::new();
1022        let mut req = make_request("sleep 60");
1023        req.timeout = Some(Duration::from_millis(100));
1024        let result = runner.run(req, cancel).await.unwrap();
1025
1026        assert_eq!(result.execution.state, CommandState::CmdTimedOut);
1027        assert!(result.execution.ended_at.is_some());
1028    }
1029
1030    #[tokio::test]
1031    async fn test_cancellation() {
1032        let runner = LocalCommandRunner::new();
1033        let cancel = CancellationToken::new();
1034        let req = make_request("sleep 60");
1035
1036        let cancel_clone = cancel.clone();
1037        // Cancel after a short delay.
1038        tokio::spawn(async move {
1039            tokio::time::sleep(Duration::from_millis(100)).await;
1040            cancel_clone.cancel();
1041        });
1042
1043        let result = runner.run(req, cancel).await.unwrap();
1044        assert_eq!(result.execution.state, CommandState::CmdKilled);
1045        assert!(result.execution.ended_at.is_some());
1046    }
1047
1048    #[cfg(unix)]
1049    #[tokio::test]
1050    async fn test_cancellation_terminates_descendant_processes() {
1051        let runner = LocalCommandRunner::new();
1052        let cancel = CancellationToken::new();
1053        let cancel_clone = cancel.clone();
1054        let (live_tx, mut live_rx) = tokio::sync::mpsc::unbounded_channel::<StreamChunk>();
1055        let mut req = make_request("sleep 60 & echo child:$!; wait");
1056        req.live_output_tx = Some(live_tx);
1057
1058        tokio::spawn(async move {
1059            while let Some(chunk) = live_rx.recv().await {
1060                if chunk.data.starts_with("child:") {
1061                    cancel_clone.cancel();
1062                    break;
1063                }
1064            }
1065        });
1066
1067        let result = runner.run(req, cancel).await.unwrap();
1068        assert_eq!(result.execution.state, CommandState::CmdKilled);
1069
1070        let child_pid = result
1071            .chunks
1072            .iter()
1073            .find_map(|chunk| chunk.data.strip_prefix("child:"))
1074            .and_then(|raw| raw.trim().parse::<i32>().ok())
1075            .expect("expected child pid line in output");
1076
1077        wait_for_process_exit(child_pid, Duration::from_secs(2))
1078            .await
1079            .expect("descendant process should be terminated by cancellation");
1080    }
1081
1082    #[tokio::test]
1083    async fn test_env_vars() {
1084        let runner = LocalCommandRunner::new();
1085        let cancel = CancellationToken::new();
1086        let mut req = make_request("echo $MY_VAR");
1087        req.env = vec![("MY_VAR".to_string(), "test_value".to_string())];
1088        let result = runner.run(req, cancel).await.unwrap();
1089
1090        assert_eq!(result.execution.state, CommandState::CmdExited);
1091        assert_eq!(result.execution.exit_code, Some(0));
1092        assert!(!result.chunks.is_empty());
1093        assert_eq!(result.chunks[0].data, "test_value");
1094    }
1095
1096    #[tokio::test]
1097    async fn test_working_directory() {
1098        let runner = LocalCommandRunner::new();
1099        let cancel = CancellationToken::new();
1100        let req = make_request("pwd");
1101        let result = runner.run(req, cancel).await.unwrap();
1102
1103        assert_eq!(result.execution.state, CommandState::CmdExited);
1104        assert!(!result.chunks.is_empty());
1105        assert_eq!(result.chunks[0].data, "/tmp");
1106    }
1107
1108    #[tokio::test]
1109    async fn test_spawn_failure() {
1110        let runner = LocalCommandRunner::new();
1111        let cancel = CancellationToken::new();
1112        let mut req = make_request("true");
1113        req.working_dir = "/nonexistent_dir_that_should_not_exist".to_string();
1114        let result = runner.run(req, cancel).await;
1115
1116        assert!(result.is_err());
1117        match result.unwrap_err() {
1118            ExecError::SpawnFailed(_) => {}
1119            other => panic!("expected SpawnFailed, got {other:?}"),
1120        }
1121    }
1122
1123    #[tokio::test]
1124    async fn test_idempotency_key_preserved() {
1125        let runner = LocalCommandRunner::new();
1126        let cancel = CancellationToken::new();
1127        let mut req = make_request("echo ok");
1128        req.idempotency_key = Some("test-key-123".to_string());
1129        let result = runner.run(req, cancel).await.unwrap();
1130
1131        assert_eq!(
1132            result.execution.idempotency_key.as_deref(),
1133            Some("test-key-123")
1134        );
1135    }
1136
1137    #[tokio::test]
1138    async fn test_chunk_count_matches() {
1139        let runner = LocalCommandRunner::new();
1140        let cancel = CancellationToken::new();
1141        let req = make_request("printf 'a\nb\nc\n'");
1142        let result = runner.run(req, cancel).await.unwrap();
1143
1144        assert_eq!(result.execution.chunk_count, result.chunks.len() as u64);
1145    }
1146
1147    #[tokio::test]
1148    async fn test_default_timeout() {
1149        let runner = LocalCommandRunner::new().with_default_timeout(Duration::from_millis(100));
1150        let cancel = CancellationToken::new();
1151        let req = make_request("sleep 60");
1152        let result = runner.run(req, cancel).await.unwrap();
1153
1154        assert_eq!(result.execution.state, CommandState::CmdTimedOut);
1155    }
1156
1157    #[tokio::test]
1158    async fn test_per_command_timeout_overrides_default() {
1159        let runner = LocalCommandRunner::new().with_default_timeout(Duration::from_secs(60));
1160        let cancel = CancellationToken::new();
1161        let mut req = make_request("sleep 60");
1162        req.timeout = Some(Duration::from_millis(100));
1163        let result = runner.run(req, cancel).await.unwrap();
1164
1165        assert_eq!(result.execution.state, CommandState::CmdTimedOut);
1166    }
1167
1168    #[tokio::test]
1169    async fn test_duration_populated() {
1170        let runner = LocalCommandRunner::new();
1171        let cancel = CancellationToken::new();
1172        let req = make_request("echo fast");
1173        let result = runner.run(req, cancel).await.unwrap();
1174
1175        let dur = result.execution.duration();
1176        assert!(dur.is_some());
1177        // Should be very short for echo.
1178        assert!(dur.unwrap().num_seconds() < 5);
1179    }
1180
1181    #[tokio::test]
1182    async fn test_empty_output() {
1183        let runner = LocalCommandRunner::new();
1184        let cancel = CancellationToken::new();
1185        let req = make_request("true");
1186        let result = runner.run(req, cancel).await.unwrap();
1187
1188        assert_eq!(result.execution.state, CommandState::CmdExited);
1189        assert_eq!(result.execution.exit_code, Some(0));
1190        assert!(result.chunks.is_empty());
1191        assert_eq!(result.execution.chunk_count, 0);
1192    }
1193
1194    #[tokio::test]
1195    async fn test_token_usage_is_attached() {
1196        let runner = LocalCommandRunner::new();
1197        let cancel = CancellationToken::new();
1198        let req = make_request("echo token-test");
1199        let result = runner.run(req, cancel).await.unwrap();
1200
1201        let usage = result
1202            .execution
1203            .token_usage
1204            .expect("token usage should exist");
1205        assert_eq!(usage.source, "char_count_div4_estimate_v1");
1206        assert!(usage.total_tokens >= usage.prompt_tokens);
1207    }
1208
1209    #[cfg(target_os = "linux")]
1210    #[tokio::test]
1211    async fn test_resource_usage_is_captured_for_long_running_command() {
1212        let runner = LocalCommandRunner::new();
1213        let cancel = CancellationToken::new();
1214        let req = make_request("sleep 0.2");
1215        let result = runner.run(req, cancel).await.unwrap();
1216
1217        let usage = result
1218            .execution
1219            .resource_usage
1220            .expect("resource usage should exist on linux for running command");
1221
1222        let has_signal = usage.max_rss_bytes.is_some()
1223            || usage.cpu_user_ticks.is_some()
1224            || usage.cpu_system_ticks.is_some()
1225            || usage.io_read_bytes.is_some()
1226            || usage.io_write_bytes.is_some();
1227        assert!(has_signal, "expected at least one resource usage metric");
1228    }
1229
1230    #[tokio::test]
1231    async fn test_live_output_tx_receives_chunks_as_they_arrive() {
1232        let runner = LocalCommandRunner::new();
1233        let cancel = CancellationToken::new();
1234        let (live_tx, mut live_rx) = tokio::sync::mpsc::unbounded_channel::<StreamChunk>();
1235        let mut req = make_request("printf 'line1\nline2\nline3\n'");
1236        req.live_output_tx = Some(live_tx);
1237
1238        let result = runner.run(req, cancel).await.unwrap();
1239        assert_eq!(result.execution.state, CommandState::CmdExited);
1240        assert_eq!(result.chunks.len(), 3);
1241
1242        // All chunks should also have been sent through the live channel.
1243        let mut live_chunks = Vec::new();
1244        while let Ok(chunk) = live_rx.try_recv() {
1245            live_chunks.push(chunk);
1246        }
1247        assert_eq!(live_chunks.len(), 3);
1248        assert_eq!(live_chunks[0].data, "line1");
1249        assert_eq!(live_chunks[1].data, "line2");
1250        assert_eq!(live_chunks[2].data, "line3");
1251        // Sequences must match.
1252        assert_eq!(live_chunks[0].sequence, result.chunks[0].sequence);
1253    }
1254
1255    #[cfg(unix)]
1256    fn process_exists(pid: i32) -> bool {
1257        let rc = unsafe { libc::kill(pid, 0) };
1258        if rc == 0 {
1259            return true;
1260        }
1261        let err = io::Error::last_os_error();
1262        matches!(err.raw_os_error(), Some(code) if code == libc::EPERM)
1263    }
1264
1265    #[cfg(unix)]
1266    async fn wait_for_process_exit(pid: i32, timeout: Duration) -> Result<(), ()> {
1267        let started = Instant::now();
1268        loop {
1269            if !process_exists(pid) {
1270                return Ok(());
1271            }
1272            if started.elapsed() >= timeout {
1273                return Err(());
1274            }
1275            tokio::time::sleep(Duration::from_millis(25)).await;
1276        }
1277    }
1278
1279    #[cfg(target_os = "linux")]
1280    #[tokio::test]
1281    async fn rlimit_memory_kills_allocation() {
1282        let runner = LocalCommandRunner::new();
1283        let cancel = CancellationToken::new();
1284        // Use python to try allocating 100MB in one shot. With RLIMIT_AS at
1285        // a small value, the python interpreter itself may fail to start or
1286        // the allocation will fail with MemoryError.
1287        let mut req = make_request("python3 -c 'b = bytearray(100_000_000); print(len(b))'");
1288        req.resource_limits = Some(ResourceLimits {
1289            max_memory_bytes: Some(50 * 1024 * 1024), // 50 MB virtual AS limit
1290            ..Default::default()
1291        });
1292        req.timeout = Some(Duration::from_secs(5));
1293        let result = runner.run(req, cancel).await.unwrap();
1294        // Python should fail to start or fail to allocate.
1295        let exit_code = result.execution.exit_code;
1296        let state = result.execution.state;
1297        assert!(
1298            exit_code != Some(0) || state == CommandState::CmdTimedOut,
1299            "expected non-zero exit or timeout with 50MB memory limit, got state={state:?} exit={exit_code:?}"
1300        );
1301    }
1302
1303    #[cfg(target_os = "linux")]
1304    #[tokio::test]
1305    async fn rlimit_cpu_kills_busy_loop() {
1306        let runner = LocalCommandRunner::new();
1307        let cancel = CancellationToken::new();
1308        let mut req = make_request("while true; do :; done");
1309        req.resource_limits = Some(ResourceLimits {
1310            max_cpu_seconds: Some(1),
1311            ..Default::default()
1312        });
1313        req.timeout = Some(Duration::from_secs(10));
1314        let result = runner.run(req, cancel).await.unwrap();
1315        // SIGXCPU (signal 24) kills the process, so exit code should be non-zero.
1316        let exit_code = result.execution.exit_code;
1317        assert!(
1318            exit_code != Some(0),
1319            "expected non-zero exit from CPU limit, got {exit_code:?}"
1320        );
1321    }
1322
1323    #[cfg(target_os = "linux")]
1324    #[tokio::test]
1325    async fn rlimit_nproc_prevents_fork_bomb() {
1326        let runner = LocalCommandRunner::new();
1327        let cancel = CancellationToken::new();
1328        // Try to fork 10 subprocesses; with nproc=2 most should fail.
1329        let mut req = make_request("for i in $(seq 1 10); do (echo $i) & done; wait; echo done");
1330        req.resource_limits = Some(ResourceLimits {
1331            max_pids: Some(2),
1332            ..Default::default()
1333        });
1334        req.timeout = Some(Duration::from_secs(5));
1335        let result = runner.run(req, cancel).await.unwrap();
1336        // Some forks should fail, but the main shell may still exit 0.
1337        // The important thing is we don't hang and the command completes.
1338        let state = result.execution.state;
1339        assert!(
1340            state == CommandState::CmdExited || state == CommandState::CmdTimedOut,
1341            "expected CmdExited or CmdTimedOut, got {state:?}"
1342        );
1343    }
1344
1345    #[tokio::test]
1346    async fn no_rlimits_when_none() {
1347        let runner = LocalCommandRunner::new();
1348        let cancel = CancellationToken::new();
1349        let mut req = make_request("echo ok");
1350        req.resource_limits = None;
1351        let result = runner.run(req, cancel).await.unwrap();
1352        assert_eq!(result.execution.state, CommandState::CmdExited);
1353        assert_eq!(result.execution.exit_code, Some(0));
1354        assert!(!result.chunks.is_empty());
1355        assert_eq!(result.chunks[0].data, "ok");
1356    }
1357}