1use std::fs;
14use std::io;
15use std::process::Stdio;
16use std::time::{Duration, Instant};
17
18use crate::yarli_observability::YarliMetrics;
19use chrono::Utc;
20use tokio::io::{AsyncBufReadExt, BufReader};
21use tokio::process::Command;
22use tokio_util::sync::CancellationToken;
23use tracing::{debug, info, warn};
24use uuid::Uuid;
25
26use crate::yarli_core::domain::{CommandClass, CorrelationId, RunId, TaskId};
27use crate::yarli_core::entities::command_execution::{
28 CommandExecution, CommandResourceUsage, StreamChunk, StreamType, TokenUsage,
29};
30use crate::yarli_core::fsm::command::CommandState;
31use crate::yarli_core::shutdown::ShutdownController;
32
33use crate::yarli_exec::error::ExecError;
34
35pub(crate) fn command_id_from_idempotency_key(idempotency_key: &str) -> Uuid {
40 let namespaced = format!("yarli:command:{idempotency_key}");
41 Uuid::new_v5(&Uuid::NAMESPACE_OID, namespaced.as_bytes())
42}
43
44pub(crate) fn command_id_for_request(idempotency_key: Option<&str>) -> Uuid {
45 idempotency_key
46 .map(command_id_from_idempotency_key)
47 .unwrap_or_else(Uuid::now_v7)
48}
49
50#[derive(Debug, Clone, Default)]
52pub struct ResourceLimits {
53 pub max_memory_bytes: Option<u64>,
55 pub max_cpu_seconds: Option<u64>,
57 pub max_open_files: Option<u64>,
59 pub max_pids: Option<u64>,
61}
62
63#[derive(Debug, Clone)]
65pub struct CommandRequest {
66 pub task_id: TaskId,
68 pub run_id: RunId,
70 pub command: String,
72 pub working_dir: String,
74 pub command_class: CommandClass,
76 pub correlation_id: CorrelationId,
78 pub idempotency_key: Option<String>,
80 pub timeout: Option<Duration>,
82 pub env: Vec<(String, String)>,
84 pub live_output_tx: Option<tokio::sync::mpsc::UnboundedSender<StreamChunk>>,
86 pub resource_limits: Option<ResourceLimits>,
88 pub rehydration_tokens: Option<u64>,
90}
91
92#[derive(Debug)]
94pub struct CommandResult {
95 pub execution: CommandExecution,
97 pub chunks: Vec<StreamChunk>,
99 pub runner_actor: String,
101 pub backend_metadata: Option<serde_json::Value>,
103}
104
105#[allow(async_fn_in_trait)]
108pub trait CommandRunner: Send + Sync {
109 fn run(
117 &self,
118 request: CommandRequest,
119 cancel: CancellationToken,
120 ) -> impl std::future::Future<Output = Result<CommandResult, ExecError>> + Send;
121}
122
123#[derive(Debug, Clone)]
125pub struct LocalCommandRunner {
126 pub default_timeout: Option<Duration>,
128 pub idle_kill_timeout: Option<Duration>,
131 pub metrics: Option<std::sync::Arc<YarliMetrics>>,
133 shutdown: Option<ShutdownController>,
137 #[cfg(feature = "chaos")]
138 chaos: Option<std::sync::Arc<crate::yarli_chaos::ChaosController>>,
139}
140
141impl LocalCommandRunner {
142 pub fn new() -> Self {
143 Self {
144 default_timeout: None,
145 idle_kill_timeout: None,
146 metrics: None,
147 shutdown: None,
148 #[cfg(feature = "chaos")]
149 chaos: None,
150 }
151 }
152
153 pub fn with_default_timeout(mut self, timeout: Duration) -> Self {
154 self.default_timeout = Some(timeout);
155 self
156 }
157
158 pub fn with_idle_kill_timeout(mut self, timeout: Duration) -> Self {
159 self.idle_kill_timeout = Some(timeout);
160 self
161 }
162
163 pub fn with_metrics(mut self, metrics: std::sync::Arc<YarliMetrics>) -> Self {
164 self.metrics = Some(metrics);
165 self
166 }
167
168 pub fn with_shutdown(mut self, shutdown: ShutdownController) -> Self {
169 self.shutdown = Some(shutdown);
170 self
171 }
172
173 #[cfg(feature = "chaos")]
174 pub fn with_chaos(
175 mut self,
176 chaos: std::sync::Arc<crate::yarli_chaos::ChaosController>,
177 ) -> Self {
178 self.chaos = Some(chaos);
179 self
180 }
181
182 fn record_overhead(&self, class: CommandClass, phase: &str, duration: Duration) {
183 if let Some(metrics) = &self.metrics {
184 let label = command_class_metric_label(class);
185 metrics.record_command_overhead_duration(label, phase, duration.as_secs_f64());
186 }
187 }
188
189 fn record_enforcement(&self, mechanism: &str, outcome: &str, reason: &str) {
190 if let Some(metrics) = &self.metrics {
191 metrics.record_enforcement_outcome(mechanism, outcome, reason);
192 }
193 }
194}
195
196impl Default for LocalCommandRunner {
197 fn default() -> Self {
198 Self::new()
199 }
200}
201
202impl CommandRunner for LocalCommandRunner {
203 #[tracing::instrument(
204 skip(self, request, cancel),
205 fields(
206 run_id = %request.run_id,
207 task_id = %request.task_id,
208 correlation_id = %request.correlation_id,
209 command = %request.command
210 )
211 )]
212 async fn run(
213 &self,
214 request: CommandRequest,
215 cancel: CancellationToken,
216 ) -> Result<CommandResult, ExecError> {
217 let timeout = request.timeout.or(self.default_timeout);
218 let idle_kill_timeout = self.idle_kill_timeout;
219 let live_tx = request.live_output_tx;
220
221 let mut execution = CommandExecution::new(
223 request.task_id,
224 request.run_id,
225 &request.command,
226 &request.working_dir,
227 request.command_class,
228 request.correlation_id,
229 );
230 execution.id = command_id_for_request(request.idempotency_key.as_deref());
231 if let Some(key) = &request.idempotency_key {
232 execution = execution.with_idempotency_key(key);
233 }
234
235 let mut chunks: Vec<StreamChunk> = Vec::new();
236 let mut seq: u64 = 0;
237 let cmd_id = execution.id;
238
239 #[cfg(feature = "chaos")]
241 if let Some(chaos) = &self.chaos {
242 chaos
243 .inject("exec_command_spawn")
244 .await
245 .map_err(|e| ExecError::Io(std::io::Error::other(e.to_string())))?;
246 }
247
248 debug!(command = %request.command, working_dir = %request.working_dir, "spawning command");
249
250 let spawn_start = Instant::now();
251 let mut command = Command::new("sh");
252 command
253 .arg("-c")
254 .arg(&request.command)
255 .current_dir(&request.working_dir)
256 .stdin(Stdio::null())
257 .stdout(Stdio::piped())
258 .stderr(Stdio::piped())
259 .envs(request.env.iter().map(|(k, v)| (k.as_str(), v.as_str())))
260 .kill_on_drop(true);
261 #[cfg(unix)]
262 {
263 let rlimits = request.resource_limits.clone();
264 unsafe {
268 command.pre_exec(move || {
269 if libc::setpgid(0, 0) != 0 {
270 return Err(std::io::Error::last_os_error());
271 }
272 if let Some(ref limits) = rlimits {
273 apply_rlimits(limits)?;
274 }
275 Ok(())
276 });
277 }
278 }
279 let mut child = command.spawn().map_err(ExecError::SpawnFailed)?;
280 if request.resource_limits.is_some() {
281 self.record_enforcement("rlimit", "applied", "pre_exec");
282 }
283 self.record_overhead(request.command_class, "spawn", spawn_start.elapsed());
284 let child_pid = child.id();
285 let process_group_id = child_pid.and_then(pid_to_process_group_id);
286
287 #[cfg(unix)]
289 let process_handle = child_pid.map(crate::yarli_exec::pidfd::ProcessHandle::acquire);
290 #[cfg(not(unix))]
291 let process_handle: Option<crate::yarli_exec::pidfd::ProcessHandle> = None;
292 #[cfg(unix)]
293 if let Some(handle) = process_handle.as_ref() {
294 if handle.is_pidfd() {
295 self.record_enforcement("pidfd", "applied", "pidfd");
296 } else {
297 self.record_enforcement("pidfd", "fallback", "raw_pid");
298 }
299 }
300
301 let _cgroup_sandbox =
303 if let (Some(pid), Some(ref limits)) = (child_pid, &request.resource_limits) {
304 let run_short = &request.run_id.to_string()[..8];
305 let task_short = &request.task_id.to_string()[..8];
306 let mgr = crate::yarli_exec::cgroup::LocalCgroupManager::new();
307 match crate::yarli_exec::cgroup::CgroupManager::create_sandbox(
308 &mgr, run_short, task_short, limits,
309 ) {
310 crate::yarli_exec::cgroup::CgroupSandboxOutcome::Attached(sb) => {
311 if let Err(e) = sb.add_pid(pid) {
312 self.record_enforcement(
313 "cgroup",
314 "failed",
315 &format!("add_pid_{}", io_error_reason(&e)),
316 );
317 debug!(error = %e, "failed to add child pid to cgroup sandbox");
318 None
319 } else {
320 self.record_enforcement("cgroup", "applied", "attached");
321 Some(sb)
322 }
323 }
324 crate::yarli_exec::cgroup::CgroupSandboxOutcome::Fallback(mode) => {
325 self.record_enforcement("cgroup", "fallback", mode.as_label());
326 None
327 }
328 }
329 } else {
330 None
331 };
332
333 #[cfg(unix)]
336 if let (Some(pid), Some(shutdown)) = (child_pid, &self.shutdown) {
337 shutdown.track_child(pid);
338 }
339
340 let capture_start = Instant::now();
341 let monitor = child.id().map(spawn_resource_monitor);
342 self.record_overhead(
343 request.command_class,
344 "resource_capture_init",
345 capture_start.elapsed(),
346 );
347
348 execution
350 .transition(
351 CommandState::CmdStarted,
352 "process spawned",
353 "local_runner",
354 None,
355 )
356 .map_err(ExecError::Transition)?;
357
358 info!(cmd_id = %cmd_id, pid = ?child.id(), "command started");
359
360 let stdout = child.stdout.take();
362 let stderr = child.stderr.take();
363
364 execution
366 .transition(
367 CommandState::CmdStreaming,
368 "reading output",
369 "local_runner",
370 None,
371 )
372 .map_err(ExecError::Transition)?;
373
374 let (stdout_tx, mut stdout_rx) = tokio::sync::mpsc::channel::<(StreamType, String)>(256);
376 let stderr_tx = stdout_tx.clone();
377
378 if let Some(out) = stdout {
379 tokio::spawn(async move {
380 let reader = BufReader::new(out);
381 let mut lines = reader.lines();
382 while let Ok(Some(line)) = lines.next_line().await {
383 if stdout_tx.send((StreamType::Stdout, line)).await.is_err() {
384 break;
385 }
386 }
387 });
388 }
389
390 if let Some(err) = stderr {
391 tokio::spawn(async move {
392 let reader = BufReader::new(err);
393 let mut lines = reader.lines();
394 while let Ok(Some(line)) = lines.next_line().await {
395 if stderr_tx.send((StreamType::Stderr, line)).await.is_err() {
396 break;
397 }
398 }
399 });
400 }
401
402 let wait_result = if let Some(dur) = timeout {
409 tokio::select! {
410 biased;
411 _ = cancel.cancelled() => {
412 warn!(cmd_id = %cmd_id, "command cancelled by shutdown");
413 kill_child(
414 &mut child,
415 process_group_id,
416 &process_handle,
417 self.metrics.clone(),
418 )
419 .await;
420 drain_channel(&mut stdout_rx, cmd_id, &mut seq, &mut chunks, &live_tx).await;
422 Err(ExecError::Killed { reason: "shutdown".into() })
423 }
424 _ = tokio::time::sleep(dur) => {
425 warn!(cmd_id = %cmd_id, timeout = ?dur, "command timed out");
426 kill_child(
427 &mut child,
428 process_group_id,
429 &process_handle,
430 self.metrics.clone(),
431 )
432 .await;
433 drain_channel(&mut stdout_rx, cmd_id, &mut seq, &mut chunks, &live_tx).await;
434 Err(ExecError::Timeout(dur))
435 }
436 result = collect_and_wait(&mut child, process_group_id, &mut stdout_rx, cmd_id, &mut seq, &mut chunks, &live_tx, idle_kill_timeout, &process_handle) => {
437 result
438 }
439 }
440 } else {
441 tokio::select! {
442 biased;
443 _ = cancel.cancelled() => {
444 warn!(cmd_id = %cmd_id, "command cancelled by shutdown");
445 kill_child(
446 &mut child,
447 process_group_id,
448 &process_handle,
449 self.metrics.clone(),
450 )
451 .await;
452 drain_channel(&mut stdout_rx, cmd_id, &mut seq, &mut chunks, &live_tx).await;
453 Err(ExecError::Killed { reason: "shutdown".into() })
454 }
455 result = collect_and_wait(&mut child, process_group_id, &mut stdout_rx, cmd_id, &mut seq, &mut chunks, &live_tx, idle_kill_timeout, &process_handle) => {
456 result
457 }
458 }
459 };
460 drop(live_tx);
462
463 let resource_usage = if let Some((stop_tx, monitor_handle)) = monitor {
464 let _ = stop_tx.send(());
465 monitor_handle.await.unwrap_or_default()
466 } else {
467 None
468 };
469 execution.resource_usage = resource_usage;
470 execution.token_usage = Some(estimate_token_usage(
471 &execution.command,
472 &chunks,
473 request.rehydration_tokens,
474 ));
475
476 #[cfg(unix)]
478 if let (Some(pid), Some(shutdown)) = (child_pid, &self.shutdown) {
479 shutdown.untrack_child(pid);
480 }
481
482 match wait_result {
484 Ok(exit_code) => {
485 execution.chunk_count = seq;
486 execution
487 .exit(exit_code, "local_runner", None)
488 .map_err(ExecError::Transition)?;
489 info!(cmd_id = %cmd_id, exit_code, "command exited");
490 Ok(CommandResult {
491 execution,
492 chunks,
493 runner_actor: "local_runner".to_string(),
494 backend_metadata: None,
495 })
496 }
497 Err(ExecError::Timeout(dur)) => {
498 execution.chunk_count = seq;
499 execution
500 .transition(
501 CommandState::CmdTimedOut,
502 format!("timeout after {dur:?}"),
503 "local_runner",
504 None,
505 )
506 .map_err(ExecError::Transition)?;
507 Ok(CommandResult {
508 execution,
509 chunks,
510 runner_actor: "local_runner".to_string(),
511 backend_metadata: None,
512 })
513 }
514 Err(ExecError::Killed { ref reason }) => {
515 execution.chunk_count = seq;
516 execution
517 .transition(
518 CommandState::CmdKilled,
519 format!("killed: {reason}"),
520 "local_runner",
521 None,
522 )
523 .map_err(ExecError::Transition)?;
524 Ok(CommandResult {
525 execution,
526 chunks,
527 runner_actor: "local_runner".to_string(),
528 backend_metadata: None,
529 })
530 }
531 Err(e) => Err(e),
532 }
533 }
534}
535
536pub(crate) fn estimate_token_usage(
537 command: &str,
538 chunks: &[StreamChunk],
539 rehydration_tokens: Option<u64>,
540) -> TokenUsage {
541 let prompt_tokens = estimate_tokens(command);
542 let completion_chars: u64 = chunks.iter().map(|c| c.data.chars().count() as u64).sum();
543 let completion_tokens = if completion_chars == 0 {
544 0
545 } else {
546 completion_chars.div_ceil(4)
547 };
548 let total_tokens = prompt_tokens.saturating_add(completion_tokens);
549 TokenUsage {
550 prompt_tokens,
551 completion_tokens,
552 total_tokens,
553 rehydration_tokens,
554 source: "char_count_div4_estimate_v1".to_string(),
555 }
556}
557
558pub(crate) fn estimate_tokens(input: &str) -> u64 {
559 let chars = input.chars().count() as u64;
560 if chars == 0 {
561 0
562 } else {
563 chars.div_ceil(4)
564 }
565}
566
567fn spawn_resource_monitor(
580 pid: u32,
581) -> (
582 tokio::sync::oneshot::Sender<()>,
583 tokio::task::JoinHandle<Option<CommandResourceUsage>>,
584) {
585 let (stop_tx, mut stop_rx) = tokio::sync::oneshot::channel::<()>();
586 let handle = tokio::spawn(async move {
587 let mut usage = CommandResourceUsage::default();
588 let mut saw_sample = false;
589
590 if let Some(sample) = read_process_sample(pid) {
591 saw_sample = true;
592 apply_sample(&mut usage, sample);
593 }
594
595 loop {
596 tokio::select! {
597 _ = tokio::time::sleep(Duration::from_millis(25)) => {
598 if let Some(sample) = read_process_sample(pid) {
599 saw_sample = true;
600 apply_sample(&mut usage, sample);
601 }
602 }
603 _ = &mut stop_rx => {
604 break;
605 }
606 }
607 }
608
609 if let Some(sample) = read_process_sample(pid) {
610 saw_sample = true;
611 apply_sample(&mut usage, sample);
612 }
613
614 if saw_sample {
615 Some(usage)
616 } else {
617 None
618 }
619 });
620 (stop_tx, handle)
621}
622
623#[derive(Debug, Clone, Copy, Default)]
624struct ProcessSample {
625 rss_bytes: Option<u64>,
626 cpu_user_ticks: Option<u64>,
627 cpu_system_ticks: Option<u64>,
628 io_read_bytes: Option<u64>,
629 io_write_bytes: Option<u64>,
630}
631
632fn apply_sample(usage: &mut CommandResourceUsage, sample: ProcessSample) {
633 if let Some(rss_bytes) = sample.rss_bytes {
634 usage.max_rss_bytes = Some(usage.max_rss_bytes.unwrap_or(0).max(rss_bytes));
635 }
636 if sample.cpu_user_ticks.is_some() {
637 usage.cpu_user_ticks = sample.cpu_user_ticks;
638 }
639 if sample.cpu_system_ticks.is_some() {
640 usage.cpu_system_ticks = sample.cpu_system_ticks;
641 }
642 if sample.io_read_bytes.is_some() {
643 usage.io_read_bytes = sample.io_read_bytes;
644 }
645 if sample.io_write_bytes.is_some() {
646 usage.io_write_bytes = sample.io_write_bytes;
647 }
648}
649
650#[cfg(target_os = "linux")]
651fn read_process_sample(pid: u32) -> Option<ProcessSample> {
652 let mut sample = ProcessSample::default();
653
654 let status_path = format!("/proc/{pid}/status");
656 let status = fs::read_to_string(&status_path).ok()?;
657 for line in status.lines() {
658 if let Some(rest) = line.strip_prefix("VmRSS:") {
659 let kb = rest
660 .split_whitespace()
661 .next()
662 .and_then(|n| n.parse::<u64>().ok());
663 sample.rss_bytes = kb.map(|v| v.saturating_mul(1024));
664 break;
665 }
666 }
667
668 let stat_path = format!("/proc/{pid}/stat");
670 if let Ok(stat_text) = fs::read_to_string(&stat_path) {
671 if let Some(end_comm) = stat_text.rfind(')') {
672 let after = stat_text.get(end_comm + 2..).unwrap_or("");
673 let fields: Vec<&str> = after.split_whitespace().collect();
674 sample.cpu_user_ticks = fields.get(11).and_then(|v| v.parse::<u64>().ok());
676 sample.cpu_system_ticks = fields.get(12).and_then(|v| v.parse::<u64>().ok());
677 }
678 }
679
680 let io_path = format!("/proc/{pid}/io");
682 if let Ok(io_text) = fs::read_to_string(&io_path) {
683 for line in io_text.lines() {
684 if let Some(v) = line.strip_prefix("read_bytes:") {
685 sample.io_read_bytes = v.trim().parse::<u64>().ok();
686 } else if let Some(v) = line.strip_prefix("write_bytes:") {
687 sample.io_write_bytes = v.trim().parse::<u64>().ok();
688 }
689 }
690 }
691
692 Some(sample)
693}
694
695#[cfg(not(target_os = "linux"))]
700fn read_process_sample(_pid: u32) -> Option<ProcessSample> {
701 None
702}
703
704#[allow(clippy::too_many_arguments)]
709async fn collect_and_wait(
710 child: &mut tokio::process::Child,
711 process_group_id: Option<i32>,
712 rx: &mut tokio::sync::mpsc::Receiver<(StreamType, String)>,
713 cmd_id: Uuid,
714 seq: &mut u64,
715 chunks: &mut Vec<StreamChunk>,
716 live_tx: &Option<tokio::sync::mpsc::UnboundedSender<StreamChunk>>,
717 idle_kill_timeout: Option<Duration>,
718 process_handle: &Option<crate::yarli_exec::pidfd::ProcessHandle>,
719) -> Result<i32, ExecError> {
720 let mut last_output_at = Instant::now();
721
722 loop {
725 let idle_sleep = async {
726 match idle_kill_timeout {
727 Some(dur) => {
728 let remaining = dur.saturating_sub(last_output_at.elapsed());
729 tokio::time::sleep(remaining).await;
730 }
731 None => std::future::pending::<()>().await,
732 }
733 };
734
735 tokio::select! {
736 biased;
737 line = rx.recv() => {
738 match line {
739 Some((stream, data)) => {
740 last_output_at = Instant::now();
741 *seq += 1;
742 let chunk = StreamChunk {
743 command_id: cmd_id,
744 sequence: *seq,
745 stream,
746 data,
747 captured_at: Utc::now(),
748 };
749 if let Some(tx) = live_tx {
750 let _ = tx.send(chunk.clone());
751 }
752 chunks.push(chunk);
753 }
754 None => break, }
756 }
757 _ = idle_sleep => {
758 let dur = idle_kill_timeout.unwrap();
759 warn!(
760 cmd_id = %cmd_id,
761 idle_timeout = ?dur,
762 "command produced no output for idle_kill_timeout — killing"
763 );
764 kill_child(child, process_group_id, process_handle, None).await;
765 drain_channel(rx, cmd_id, seq, chunks, live_tx).await;
766 return Err(ExecError::Timeout(dur));
767 }
768 }
769 }
770
771 let status = child.wait().await.map_err(ExecError::Io)?;
773 Ok(status.code().unwrap_or(-1))
774}
775
776async fn drain_channel(
778 rx: &mut tokio::sync::mpsc::Receiver<(StreamType, String)>,
779 cmd_id: Uuid,
780 seq: &mut u64,
781 chunks: &mut Vec<StreamChunk>,
782 live_tx: &Option<tokio::sync::mpsc::UnboundedSender<StreamChunk>>,
783) {
784 rx.close();
786 while let Some((stream, data)) = rx.recv().await {
787 *seq += 1;
788 let chunk = StreamChunk {
789 command_id: cmd_id,
790 sequence: *seq,
791 stream,
792 data,
793 captured_at: Utc::now(),
794 };
795 if let Some(tx) = live_tx {
796 let _ = tx.send(chunk.clone());
797 }
798 chunks.push(chunk);
799 }
800}
801
802#[cfg(unix)]
807fn apply_rlimits(limits: &ResourceLimits) -> std::io::Result<()> {
808 fn set_rlimit(resource: libc::__rlimit_resource_t, value: u64) -> std::io::Result<()> {
809 let lim = libc::rlimit {
810 rlim_cur: value as libc::rlim_t,
811 rlim_max: value as libc::rlim_t,
812 };
813 let rc = unsafe { libc::setrlimit(resource, &lim) };
814 if rc == 0 {
815 Ok(())
816 } else {
817 Err(std::io::Error::last_os_error())
818 }
819 }
820
821 if let Some(bytes) = limits.max_memory_bytes {
822 set_rlimit(libc::RLIMIT_AS, bytes)?;
823 }
824 if let Some(secs) = limits.max_cpu_seconds {
825 set_rlimit(libc::RLIMIT_CPU, secs)?;
826 }
827 if let Some(nfiles) = limits.max_open_files {
828 set_rlimit(libc::RLIMIT_NOFILE, nfiles)?;
829 }
830 if let Some(nproc) = limits.max_pids {
831 set_rlimit(libc::RLIMIT_NPROC, nproc)?;
832 }
833 Ok(())
834}
835
836fn pid_to_process_group_id(pid: u32) -> Option<i32> {
837 i32::try_from(pid).ok()
838}
839
840fn command_class_metric_label(class: CommandClass) -> &'static str {
841 match class {
842 CommandClass::Io => "io",
843 CommandClass::Cpu => "cpu",
844 CommandClass::Git => "git",
845 CommandClass::Tool => "tool",
846 }
847}
848
849fn io_error_reason(err: &io::Error) -> &'static str {
850 if matches!(err.raw_os_error(), Some(code) if code == libc::EROFS) {
851 "read_only"
852 } else {
853 match err.kind() {
854 io::ErrorKind::PermissionDenied => "permission_denied",
855 io::ErrorKind::NotFound => "not_found",
856 io::ErrorKind::AlreadyExists => "already_exists",
857 io::ErrorKind::TimedOut => "timed_out",
858 io::ErrorKind::Interrupted => "interrupted",
859 io::ErrorKind::Unsupported => "unsupported",
860 io::ErrorKind::InvalidInput => "invalid_input",
861 io::ErrorKind::BrokenPipe => "broken_pipe",
862 io::ErrorKind::UnexpectedEof => "unexpected_eof",
863 _ => "other",
864 }
865 }
866}
867
868async fn kill_child(
874 child: &mut tokio::process::Child,
875 process_group_id: Option<i32>,
876 #[allow(unused_variables)] handle: &Option<crate::yarli_exec::pidfd::ProcessHandle>,
877 metrics: Option<std::sync::Arc<YarliMetrics>>,
878) {
879 let record_failure = |reason: String| {
880 if let Some(metrics) = metrics.as_ref() {
881 metrics.record_enforcement_outcome("pid_termination", "failed", &reason);
882 }
883 };
884
885 #[cfg(unix)]
886 if let Some(pgid) = process_group_id {
887 if let Err(err) = crate::yarli_exec::pidfd::signal_process_group(pgid, libc::SIGTERM) {
888 record_failure(format!("sigterm_process_group_{}", io_error_reason(&err)));
889 }
890 for _ in 0..10 {
891 match child.try_wait() {
892 Ok(Some(_)) => return,
893 Ok(None) => tokio::time::sleep(Duration::from_millis(25)).await,
894 Err(err) => {
895 record_failure(format!("try_wait_{}", io_error_reason(&err)));
896 warn!(error = %err, "failed to poll child process status during cancellation");
897 break;
898 }
899 }
900 }
901 if let Err(err) = crate::yarli_exec::pidfd::signal_process_group(pgid, libc::SIGKILL) {
903 record_failure(format!("sigkill_process_group_{}", io_error_reason(&err)));
904 }
905 if let Some(h) = handle {
907 if let Err(err) = h.send_signal(libc::SIGKILL) {
908 let mechanism = if h.is_pidfd() { "pidfd" } else { "raw_pid" };
909 record_failure(format!("sigkill_{mechanism}_{}", io_error_reason(&err)));
910 }
911 }
912 }
913 if let Err(e) = child.kill().await {
914 record_failure(format!("child_kill_{}", io_error_reason(&e)));
915 warn!(error = %e, "failed to kill child process");
916 }
917}
918
919#[cfg(test)]
920mod tests {
921 use super::*;
922 #[cfg(unix)]
923 use std::io;
924
925 fn make_request(cmd: &str) -> CommandRequest {
926 CommandRequest {
927 task_id: Uuid::now_v7(),
928 run_id: Uuid::now_v7(),
929 command: cmd.to_string(),
930 working_dir: "/tmp".to_string(),
931 command_class: CommandClass::Io,
932 correlation_id: Uuid::now_v7(),
933 idempotency_key: None,
934 timeout: None,
935 env: vec![],
936 live_output_tx: None,
937 resource_limits: None,
938 rehydration_tokens: None,
939 }
940 }
941
942 #[tokio::test]
943 async fn test_simple_echo() {
944 let runner = LocalCommandRunner::new();
945 let cancel = CancellationToken::new();
946 let req = make_request("echo hello");
947 let result = runner.run(req, cancel).await.unwrap();
948
949 assert_eq!(result.execution.state, CommandState::CmdExited);
950 assert_eq!(result.execution.exit_code, Some(0));
951 assert!(result.execution.started_at.is_some());
952 assert!(result.execution.ended_at.is_some());
953 assert!(!result.chunks.is_empty());
954 assert_eq!(result.chunks[0].data, "hello");
955 assert_eq!(result.chunks[0].stream, StreamType::Stdout);
956 assert_eq!(result.chunks[0].sequence, 1);
957 }
958
959 #[tokio::test]
960 async fn test_multiline_output() {
961 let runner = LocalCommandRunner::new();
962 let cancel = CancellationToken::new();
963 let req = make_request("printf 'line1\nline2\nline3\n'");
964 let result = runner.run(req, cancel).await.unwrap();
965
966 assert_eq!(result.execution.state, CommandState::CmdExited);
967 assert_eq!(result.execution.exit_code, Some(0));
968 assert_eq!(result.chunks.len(), 3);
969 assert_eq!(result.chunks[0].data, "line1");
970 assert_eq!(result.chunks[1].data, "line2");
971 assert_eq!(result.chunks[2].data, "line3");
972 assert_eq!(result.chunks[0].sequence, 1);
974 assert_eq!(result.chunks[1].sequence, 2);
975 assert_eq!(result.chunks[2].sequence, 3);
976 }
977
978 #[tokio::test]
979 async fn test_stderr_capture() {
980 let runner = LocalCommandRunner::new();
981 let cancel = CancellationToken::new();
982 let req = make_request("echo err >&2");
983 let result = runner.run(req, cancel).await.unwrap();
984
985 assert_eq!(result.execution.state, CommandState::CmdExited);
986 assert_eq!(result.execution.exit_code, Some(0));
987 assert!(!result.chunks.is_empty());
988 assert_eq!(result.chunks[0].data, "err");
989 assert_eq!(result.chunks[0].stream, StreamType::Stderr);
990 }
991
992 #[tokio::test]
993 async fn test_mixed_stdout_stderr() {
994 let runner = LocalCommandRunner::new();
995 let cancel = CancellationToken::new();
996 let req = make_request("echo out && echo err >&2");
997 let result = runner.run(req, cancel).await.unwrap();
998
999 assert_eq!(result.execution.state, CommandState::CmdExited);
1000 assert_eq!(result.chunks.len(), 2);
1001 let has_stdout = result.chunks.iter().any(|c| c.stream == StreamType::Stdout);
1002 let has_stderr = result.chunks.iter().any(|c| c.stream == StreamType::Stderr);
1003 assert!(has_stdout);
1004 assert!(has_stderr);
1005 }
1006
1007 #[tokio::test]
1008 async fn test_nonzero_exit_code() {
1009 let runner = LocalCommandRunner::new();
1010 let cancel = CancellationToken::new();
1011 let req = make_request("exit 42");
1012 let result = runner.run(req, cancel).await.unwrap();
1013
1014 assert_eq!(result.execution.state, CommandState::CmdExited);
1015 assert_eq!(result.execution.exit_code, Some(42));
1016 }
1017
1018 #[tokio::test]
1019 async fn test_timeout() {
1020 let runner = LocalCommandRunner::new();
1021 let cancel = CancellationToken::new();
1022 let mut req = make_request("sleep 60");
1023 req.timeout = Some(Duration::from_millis(100));
1024 let result = runner.run(req, cancel).await.unwrap();
1025
1026 assert_eq!(result.execution.state, CommandState::CmdTimedOut);
1027 assert!(result.execution.ended_at.is_some());
1028 }
1029
1030 #[tokio::test]
1031 async fn test_cancellation() {
1032 let runner = LocalCommandRunner::new();
1033 let cancel = CancellationToken::new();
1034 let req = make_request("sleep 60");
1035
1036 let cancel_clone = cancel.clone();
1037 tokio::spawn(async move {
1039 tokio::time::sleep(Duration::from_millis(100)).await;
1040 cancel_clone.cancel();
1041 });
1042
1043 let result = runner.run(req, cancel).await.unwrap();
1044 assert_eq!(result.execution.state, CommandState::CmdKilled);
1045 assert!(result.execution.ended_at.is_some());
1046 }
1047
1048 #[cfg(unix)]
1049 #[tokio::test]
1050 async fn test_cancellation_terminates_descendant_processes() {
1051 let runner = LocalCommandRunner::new();
1052 let cancel = CancellationToken::new();
1053 let cancel_clone = cancel.clone();
1054 let (live_tx, mut live_rx) = tokio::sync::mpsc::unbounded_channel::<StreamChunk>();
1055 let mut req = make_request("sleep 60 & echo child:$!; wait");
1056 req.live_output_tx = Some(live_tx);
1057
1058 tokio::spawn(async move {
1059 while let Some(chunk) = live_rx.recv().await {
1060 if chunk.data.starts_with("child:") {
1061 cancel_clone.cancel();
1062 break;
1063 }
1064 }
1065 });
1066
1067 let result = runner.run(req, cancel).await.unwrap();
1068 assert_eq!(result.execution.state, CommandState::CmdKilled);
1069
1070 let child_pid = result
1071 .chunks
1072 .iter()
1073 .find_map(|chunk| chunk.data.strip_prefix("child:"))
1074 .and_then(|raw| raw.trim().parse::<i32>().ok())
1075 .expect("expected child pid line in output");
1076
1077 wait_for_process_exit(child_pid, Duration::from_secs(2))
1078 .await
1079 .expect("descendant process should be terminated by cancellation");
1080 }
1081
1082 #[tokio::test]
1083 async fn test_env_vars() {
1084 let runner = LocalCommandRunner::new();
1085 let cancel = CancellationToken::new();
1086 let mut req = make_request("echo $MY_VAR");
1087 req.env = vec![("MY_VAR".to_string(), "test_value".to_string())];
1088 let result = runner.run(req, cancel).await.unwrap();
1089
1090 assert_eq!(result.execution.state, CommandState::CmdExited);
1091 assert_eq!(result.execution.exit_code, Some(0));
1092 assert!(!result.chunks.is_empty());
1093 assert_eq!(result.chunks[0].data, "test_value");
1094 }
1095
1096 #[tokio::test]
1097 async fn test_working_directory() {
1098 let runner = LocalCommandRunner::new();
1099 let cancel = CancellationToken::new();
1100 let req = make_request("pwd");
1101 let result = runner.run(req, cancel).await.unwrap();
1102
1103 assert_eq!(result.execution.state, CommandState::CmdExited);
1104 assert!(!result.chunks.is_empty());
1105 assert_eq!(result.chunks[0].data, "/tmp");
1106 }
1107
1108 #[tokio::test]
1109 async fn test_spawn_failure() {
1110 let runner = LocalCommandRunner::new();
1111 let cancel = CancellationToken::new();
1112 let mut req = make_request("true");
1113 req.working_dir = "/nonexistent_dir_that_should_not_exist".to_string();
1114 let result = runner.run(req, cancel).await;
1115
1116 assert!(result.is_err());
1117 match result.unwrap_err() {
1118 ExecError::SpawnFailed(_) => {}
1119 other => panic!("expected SpawnFailed, got {other:?}"),
1120 }
1121 }
1122
1123 #[tokio::test]
1124 async fn test_idempotency_key_preserved() {
1125 let runner = LocalCommandRunner::new();
1126 let cancel = CancellationToken::new();
1127 let mut req = make_request("echo ok");
1128 req.idempotency_key = Some("test-key-123".to_string());
1129 let result = runner.run(req, cancel).await.unwrap();
1130
1131 assert_eq!(
1132 result.execution.idempotency_key.as_deref(),
1133 Some("test-key-123")
1134 );
1135 }
1136
1137 #[tokio::test]
1138 async fn test_chunk_count_matches() {
1139 let runner = LocalCommandRunner::new();
1140 let cancel = CancellationToken::new();
1141 let req = make_request("printf 'a\nb\nc\n'");
1142 let result = runner.run(req, cancel).await.unwrap();
1143
1144 assert_eq!(result.execution.chunk_count, result.chunks.len() as u64);
1145 }
1146
1147 #[tokio::test]
1148 async fn test_default_timeout() {
1149 let runner = LocalCommandRunner::new().with_default_timeout(Duration::from_millis(100));
1150 let cancel = CancellationToken::new();
1151 let req = make_request("sleep 60");
1152 let result = runner.run(req, cancel).await.unwrap();
1153
1154 assert_eq!(result.execution.state, CommandState::CmdTimedOut);
1155 }
1156
1157 #[tokio::test]
1158 async fn test_per_command_timeout_overrides_default() {
1159 let runner = LocalCommandRunner::new().with_default_timeout(Duration::from_secs(60));
1160 let cancel = CancellationToken::new();
1161 let mut req = make_request("sleep 60");
1162 req.timeout = Some(Duration::from_millis(100));
1163 let result = runner.run(req, cancel).await.unwrap();
1164
1165 assert_eq!(result.execution.state, CommandState::CmdTimedOut);
1166 }
1167
1168 #[tokio::test]
1169 async fn test_duration_populated() {
1170 let runner = LocalCommandRunner::new();
1171 let cancel = CancellationToken::new();
1172 let req = make_request("echo fast");
1173 let result = runner.run(req, cancel).await.unwrap();
1174
1175 let dur = result.execution.duration();
1176 assert!(dur.is_some());
1177 assert!(dur.unwrap().num_seconds() < 5);
1179 }
1180
1181 #[tokio::test]
1182 async fn test_empty_output() {
1183 let runner = LocalCommandRunner::new();
1184 let cancel = CancellationToken::new();
1185 let req = make_request("true");
1186 let result = runner.run(req, cancel).await.unwrap();
1187
1188 assert_eq!(result.execution.state, CommandState::CmdExited);
1189 assert_eq!(result.execution.exit_code, Some(0));
1190 assert!(result.chunks.is_empty());
1191 assert_eq!(result.execution.chunk_count, 0);
1192 }
1193
1194 #[tokio::test]
1195 async fn test_token_usage_is_attached() {
1196 let runner = LocalCommandRunner::new();
1197 let cancel = CancellationToken::new();
1198 let req = make_request("echo token-test");
1199 let result = runner.run(req, cancel).await.unwrap();
1200
1201 let usage = result
1202 .execution
1203 .token_usage
1204 .expect("token usage should exist");
1205 assert_eq!(usage.source, "char_count_div4_estimate_v1");
1206 assert!(usage.total_tokens >= usage.prompt_tokens);
1207 }
1208
1209 #[cfg(target_os = "linux")]
1210 #[tokio::test]
1211 async fn test_resource_usage_is_captured_for_long_running_command() {
1212 let runner = LocalCommandRunner::new();
1213 let cancel = CancellationToken::new();
1214 let req = make_request("sleep 0.2");
1215 let result = runner.run(req, cancel).await.unwrap();
1216
1217 let usage = result
1218 .execution
1219 .resource_usage
1220 .expect("resource usage should exist on linux for running command");
1221
1222 let has_signal = usage.max_rss_bytes.is_some()
1223 || usage.cpu_user_ticks.is_some()
1224 || usage.cpu_system_ticks.is_some()
1225 || usage.io_read_bytes.is_some()
1226 || usage.io_write_bytes.is_some();
1227 assert!(has_signal, "expected at least one resource usage metric");
1228 }
1229
1230 #[tokio::test]
1231 async fn test_live_output_tx_receives_chunks_as_they_arrive() {
1232 let runner = LocalCommandRunner::new();
1233 let cancel = CancellationToken::new();
1234 let (live_tx, mut live_rx) = tokio::sync::mpsc::unbounded_channel::<StreamChunk>();
1235 let mut req = make_request("printf 'line1\nline2\nline3\n'");
1236 req.live_output_tx = Some(live_tx);
1237
1238 let result = runner.run(req, cancel).await.unwrap();
1239 assert_eq!(result.execution.state, CommandState::CmdExited);
1240 assert_eq!(result.chunks.len(), 3);
1241
1242 let mut live_chunks = Vec::new();
1244 while let Ok(chunk) = live_rx.try_recv() {
1245 live_chunks.push(chunk);
1246 }
1247 assert_eq!(live_chunks.len(), 3);
1248 assert_eq!(live_chunks[0].data, "line1");
1249 assert_eq!(live_chunks[1].data, "line2");
1250 assert_eq!(live_chunks[2].data, "line3");
1251 assert_eq!(live_chunks[0].sequence, result.chunks[0].sequence);
1253 }
1254
1255 #[cfg(unix)]
1256 fn process_exists(pid: i32) -> bool {
1257 let rc = unsafe { libc::kill(pid, 0) };
1258 if rc == 0 {
1259 return true;
1260 }
1261 let err = io::Error::last_os_error();
1262 matches!(err.raw_os_error(), Some(code) if code == libc::EPERM)
1263 }
1264
1265 #[cfg(unix)]
1266 async fn wait_for_process_exit(pid: i32, timeout: Duration) -> Result<(), ()> {
1267 let started = Instant::now();
1268 loop {
1269 if !process_exists(pid) {
1270 return Ok(());
1271 }
1272 if started.elapsed() >= timeout {
1273 return Err(());
1274 }
1275 tokio::time::sleep(Duration::from_millis(25)).await;
1276 }
1277 }
1278
1279 #[cfg(target_os = "linux")]
1280 #[tokio::test]
1281 async fn rlimit_memory_kills_allocation() {
1282 let runner = LocalCommandRunner::new();
1283 let cancel = CancellationToken::new();
1284 let mut req = make_request("python3 -c 'b = bytearray(100_000_000); print(len(b))'");
1288 req.resource_limits = Some(ResourceLimits {
1289 max_memory_bytes: Some(50 * 1024 * 1024), ..Default::default()
1291 });
1292 req.timeout = Some(Duration::from_secs(5));
1293 let result = runner.run(req, cancel).await.unwrap();
1294 let exit_code = result.execution.exit_code;
1296 let state = result.execution.state;
1297 assert!(
1298 exit_code != Some(0) || state == CommandState::CmdTimedOut,
1299 "expected non-zero exit or timeout with 50MB memory limit, got state={state:?} exit={exit_code:?}"
1300 );
1301 }
1302
1303 #[cfg(target_os = "linux")]
1304 #[tokio::test]
1305 async fn rlimit_cpu_kills_busy_loop() {
1306 let runner = LocalCommandRunner::new();
1307 let cancel = CancellationToken::new();
1308 let mut req = make_request("while true; do :; done");
1309 req.resource_limits = Some(ResourceLimits {
1310 max_cpu_seconds: Some(1),
1311 ..Default::default()
1312 });
1313 req.timeout = Some(Duration::from_secs(10));
1314 let result = runner.run(req, cancel).await.unwrap();
1315 let exit_code = result.execution.exit_code;
1317 assert!(
1318 exit_code != Some(0),
1319 "expected non-zero exit from CPU limit, got {exit_code:?}"
1320 );
1321 }
1322
1323 #[cfg(target_os = "linux")]
1324 #[tokio::test]
1325 async fn rlimit_nproc_prevents_fork_bomb() {
1326 let runner = LocalCommandRunner::new();
1327 let cancel = CancellationToken::new();
1328 let mut req = make_request("for i in $(seq 1 10); do (echo $i) & done; wait; echo done");
1330 req.resource_limits = Some(ResourceLimits {
1331 max_pids: Some(2),
1332 ..Default::default()
1333 });
1334 req.timeout = Some(Duration::from_secs(5));
1335 let result = runner.run(req, cancel).await.unwrap();
1336 let state = result.execution.state;
1339 assert!(
1340 state == CommandState::CmdExited || state == CommandState::CmdTimedOut,
1341 "expected CmdExited or CmdTimedOut, got {state:?}"
1342 );
1343 }
1344
1345 #[tokio::test]
1346 async fn no_rlimits_when_none() {
1347 let runner = LocalCommandRunner::new();
1348 let cancel = CancellationToken::new();
1349 let mut req = make_request("echo ok");
1350 req.resource_limits = None;
1351 let result = runner.run(req, cancel).await.unwrap();
1352 assert_eq!(result.execution.state, CommandState::CmdExited);
1353 assert_eq!(result.execution.exit_code, Some(0));
1354 assert!(!result.chunks.is_empty());
1355 assert_eq!(result.chunks[0].data, "ok");
1356 }
1357}