Skip to main content

execgo_runtime/
runtime.rs

1use std::{
2    collections::BTreeMap,
3    fs::{self, File, OpenOptions},
4    io::{Read, Write},
5    os::unix::{fs::OpenOptionsExt, process::CommandExt},
6    path::{Path, PathBuf},
7    process::{Command as StdCommand, Stdio},
8    sync::Arc,
9    time::{Duration, Instant},
10};
11
12use axum::{http::StatusCode, response::IntoResponse};
13use chrono::Utc;
14use nix::sys::{
15    resource::{rlim_t, setrlimit, Resource},
16    signal::{kill, killpg, Signal},
17};
18use nix::unistd::{setpgid, Pid};
19use reqwest::Client;
20use tokio::{sync::Notify, task::JoinHandle, time::sleep};
21use tracing::{info, warn};
22
23#[cfg(target_os = "linux")]
24use nix::unistd::{chdir, chroot};
25
26use crate::{
27    capabilities::{probe_runtime_capabilities, CapabilityProbeInput},
28    cli::{Cli, Command, InternalShimArgs, RemoteTaskArgs, ServeArgs, StatusArgs, WaitArgs},
29    error::{AppError, AppResult},
30    ledger::ResourceLedger,
31    metrics::render_prometheus,
32    policy::resolve_execution_plan,
33    repo::{generate_task_id, CompletionUpdate, Repository, TaskRecord},
34    server::build_router,
35    types::{
36        resolve_workspace_dir, ActiveTaskReservation, CapabilityMode, ErrorCode, EventRecord,
37        ExecutionKind, ExecutionPlan, ResourceCapacity, ResourceEnforcementPlan, ResourceUsage,
38        RuntimeCapabilities, RuntimeConfigResponse, RuntimeErrorInfo, RuntimeInfoResponse,
39        RuntimeResourcesResponse, SubmitTaskRequest, SubmitTaskResponse, TaskArtifacts,
40        TaskResourceReservation, TaskStatus, TaskStatusResponse,
41    },
42};
43
44#[derive(Debug, Clone)]
45pub struct Settings {
46    pub runtime_id: String,
47    pub listen_addr: String,
48    pub data_dir: PathBuf,
49    pub tasks_dir: PathBuf,
50    pub database_path: PathBuf,
51    pub max_running_tasks: usize,
52    pub max_queued_tasks: usize,
53    pub termination_grace: Duration,
54    pub result_retention: Duration,
55    pub gc_interval: Duration,
56    pub dispatch_poll_interval: Duration,
57    pub cgroup_root: PathBuf,
58    pub default_capability_mode: CapabilityMode,
59    pub disable_linux_sandbox: bool,
60    pub disable_cgroup: bool,
61    pub capacity_memory_bytes: Option<u64>,
62    pub capacity_pids: Option<u64>,
63}
64
65impl Settings {
66    pub fn from_args(args: &ServeArgs) -> Self {
67        let data_dir = args.data_dir.clone();
68        let tasks_dir = data_dir.join("tasks");
69        let database_path = data_dir.join("runtime.db");
70        Self {
71            runtime_id: args
72                .runtime_id
73                .clone()
74                .unwrap_or_else(|| default_runtime_id(&args.listen_addr)),
75            listen_addr: args.listen_addr.clone(),
76            data_dir,
77            tasks_dir,
78            database_path,
79            max_running_tasks: args.max_running_tasks,
80            max_queued_tasks: args.max_queued_tasks,
81            termination_grace: Duration::from_millis(args.termination_grace_ms),
82            result_retention: Duration::from_secs(args.result_retention_secs),
83            gc_interval: Duration::from_millis(args.gc_interval_ms),
84            dispatch_poll_interval: Duration::from_millis(args.dispatch_poll_interval_ms),
85            cgroup_root: args.cgroup_root.clone(),
86            default_capability_mode: args.default_capability_mode,
87            disable_linux_sandbox: args.disable_linux_sandbox,
88            disable_cgroup: args.disable_cgroup,
89            capacity_memory_bytes: args.capacity_memory_bytes,
90            capacity_pids: args.capacity_pids,
91        }
92    }
93}
94
95#[derive(Clone)]
96pub struct RuntimeService {
97    settings: Arc<Settings>,
98    repo: Repository,
99    capabilities: Arc<RuntimeCapabilities>,
100    ledger: Arc<ResourceLedger>,
101    started_at: chrono::DateTime<Utc>,
102    dispatcher_notify: Arc<Notify>,
103}
104
105impl RuntimeService {
106    pub async fn new(settings: Settings) -> AppResult<Self> {
107        let started_at = Utc::now();
108        fs::create_dir_all(&settings.data_dir)?;
109        fs::create_dir_all(&settings.tasks_dir)?;
110        let repo = Repository::new(settings.database_path.clone());
111        repo.init()?;
112        let capabilities = probe_runtime_capabilities(&CapabilityProbeInput {
113            runtime_id: settings.runtime_id.clone(),
114            data_dir: settings.data_dir.clone(),
115            cgroup_root: settings.cgroup_root.clone(),
116            max_running_tasks: settings.max_running_tasks,
117            disable_linux_sandbox: settings.disable_linux_sandbox,
118            disable_cgroup: settings.disable_cgroup,
119            capacity_memory_bytes: settings.capacity_memory_bytes,
120            capacity_pids: settings.capacity_pids,
121        });
122        let ledger = ResourceLedger::new(capabilities.resources.capacity.clone());
123        Ok(Self {
124            settings: Arc::new(settings),
125            repo,
126            capabilities: Arc::new(capabilities),
127            ledger: Arc::new(ledger),
128            started_at,
129            dispatcher_notify: Arc::new(Notify::new()),
130        })
131    }
132
133    pub fn settings(&self) -> Arc<Settings> {
134        self.settings.clone()
135    }
136
137    pub fn repo(&self) -> &Repository {
138        &self.repo
139    }
140
141    pub fn capabilities(&self) -> Arc<RuntimeCapabilities> {
142        self.capabilities.clone()
143    }
144
145    pub async fn submit_task(&self, request: SubmitTaskRequest) -> AppResult<SubmitTaskResponse> {
146        request.validate()?;
147        let execution_plan = resolve_execution_plan(
148            &request,
149            &self.capabilities,
150            self.settings.default_capability_mode,
151        )?;
152        let requested_reservation = TaskResourceReservation::from_limits(&request.limits);
153        self.ledger.ensure_within_capacity(&requested_reservation)?;
154
155        if self.repo.count_accepted()? >= self.settings.max_queued_tasks as u64 {
156            return Err(AppError::QueueFull);
157        }
158
159        let task_id = request.task_id.clone().unwrap_or_else(generate_task_id);
160        let control_context = request.control_context.clone();
161        let task_dir = self.settings.tasks_dir.join(&task_id);
162        let workspace_dir = resolve_workspace_dir(&task_dir, &request.sandbox)?;
163        let request_path = task_dir.join("request.json");
164        let result_path = task_dir.join("result.json");
165        let stdout_path = task_dir.join("stdout.log");
166        let stderr_path = task_dir.join("stderr.log");
167        let script_path = if matches!(request.execution.kind, ExecutionKind::Script) {
168            Some(
169                task_dir.join(infer_script_name(
170                    request
171                        .execution
172                        .interpreter
173                        .as_ref()
174                        .and_then(|items| items.first())
175                        .map(String::as_str),
176                )),
177            )
178        } else {
179            None
180        };
181
182        fs::create_dir_all(&workspace_dir)?;
183        fs::create_dir_all(&task_dir)?;
184        write_json_file(&request_path, &request)?;
185        touch_file(&stdout_path)?;
186        touch_file(&stderr_path)?;
187
188        self.repo.insert_task(&crate::repo::NewTaskRecord {
189            task_id: task_id.clone(),
190            request,
191            task_dir,
192            workspace_dir,
193            request_path,
194            result_path,
195            stdout_path,
196            stderr_path,
197            script_path,
198            execution_plan,
199            control_context,
200        })?;
201        self.dispatcher_notify.notify_one();
202
203        Ok(SubmitTaskResponse {
204            handle_id: task_id.clone(),
205            task_id,
206            status: TaskStatus::Accepted,
207        })
208    }
209
210    pub async fn get_task_status(&self, task_id: &str) -> AppResult<TaskStatusResponse> {
211        let task = self.repo.get_task(task_id)?;
212        build_status_response(&task)
213    }
214
215    pub async fn get_events(&self, task_id: &str) -> AppResult<Vec<EventRecord>> {
216        self.repo.list_events(task_id)
217    }
218
219    pub async fn kill_task(&self, task_id: &str) -> AppResult<TaskStatusResponse> {
220        let task = self.repo.get_task(task_id)?;
221        if task.status.is_terminal() {
222            return build_status_response(&task);
223        }
224
225        let updated = self.repo.set_cancel_requested(task_id)?;
226        if updated.status == TaskStatus::Accepted {
227            self.repo.cancel_accepted_task(
228                task_id,
229                RuntimeErrorInfo {
230                    code: ErrorCode::Cancelled,
231                    message: "task cancelled before execution".into(),
232                    details: None,
233                },
234            )?;
235        } else {
236            signal_task_termination(&updated, Signal::SIGTERM)?;
237            self.spawn_escalation(task_id.to_string(), updated.pgid);
238        }
239        self.dispatcher_notify.notify_one();
240        self.get_task_status(task_id).await
241    }
242
243    pub async fn ready(&self) -> AppResult<()> {
244        self.repo.init()?;
245        Ok(())
246    }
247
248    pub async fn metrics(&self) -> impl IntoResponse {
249        match self.repo.metrics_snapshot() {
250            Ok(snapshot) => (
251                StatusCode::OK,
252                [("content-type", "text/plain; version=0.0.4")],
253                render_prometheus(&snapshot),
254            )
255                .into_response(),
256            Err(err) => (
257                StatusCode::INTERNAL_SERVER_ERROR,
258                [("content-type", "text/plain")],
259                format!("metrics_error {err}\n"),
260            )
261                .into_response(),
262        }
263    }
264
265    pub async fn runtime_info(&self) -> RuntimeInfoResponse {
266        RuntimeInfoResponse {
267            runtime_id: self.settings.runtime_id.clone(),
268            version: env!("CARGO_PKG_VERSION").to_string(),
269            started_at: self.started_at,
270            snapshot_version: self.capabilities.snapshot_version.clone(),
271            platform: self.capabilities.platform.clone(),
272        }
273    }
274
275    pub async fn runtime_capabilities(&self) -> RuntimeCapabilities {
276        (*self.capabilities).clone()
277    }
278
279    pub async fn runtime_config(&self) -> RuntimeConfigResponse {
280        RuntimeConfigResponse {
281            runtime_id: self.settings.runtime_id.clone(),
282            listen_addr: self.settings.listen_addr.clone(),
283            data_dir: self.settings.data_dir.to_string_lossy().to_string(),
284            max_running_tasks: self.settings.max_running_tasks,
285            max_queued_tasks: self.settings.max_queued_tasks,
286            termination_grace_ms: self.settings.termination_grace.as_millis() as u64,
287            result_retention_secs: self.settings.result_retention.as_secs(),
288            gc_interval_ms: self.settings.gc_interval.as_millis() as u64,
289            dispatch_poll_interval_ms: self.settings.dispatch_poll_interval.as_millis() as u64,
290            cgroup_root: self.settings.cgroup_root.to_string_lossy().to_string(),
291            default_capability_mode: self.settings.default_capability_mode,
292            cgroup_enabled: !self.settings.disable_cgroup,
293        }
294    }
295
296    pub async fn runtime_resources(&self) -> AppResult<RuntimeResourcesResponse> {
297        let active_tasks = self.repo.list_active_reservations()?;
298        let reservations: Vec<TaskResourceReservation> = active_tasks
299            .iter()
300            .filter_map(|task| task.reservation.clone())
301            .collect();
302        let reserved = self.ledger.reserved_capacity(reservations.iter());
303        let available = self.ledger.available_capacity(&reserved);
304        let active_reservations = active_tasks
305            .into_iter()
306            .filter_map(|task| {
307                task.reservation.map(|reservation| ActiveTaskReservation {
308                    task_id: task.task_id,
309                    status: task.status,
310                    reservation,
311                    reserved_at: task.reserved_at,
312                })
313            })
314            .collect();
315
316        Ok(RuntimeResourcesResponse {
317            runtime_id: self.settings.runtime_id.clone(),
318            capacity: self.ledger.capacity().clone(),
319            reserved,
320            available,
321            active_reservations,
322            accepted_waiting_tasks: self.repo.count_accepted_waiting()?,
323        })
324    }
325
326    pub fn start_background_loops(&self) {
327        let dispatcher_service = self.clone();
328        tokio::spawn(async move {
329            dispatcher_service.dispatcher_loop().await;
330        });
331
332        let gc_service = self.clone();
333        tokio::spawn(async move {
334            gc_service.gc_loop().await;
335        });
336    }
337
338    pub async fn recover(&self) -> AppResult<()> {
339        for task in self.repo.list_non_terminal()? {
340            match task.status {
341                TaskStatus::Accepted => {
342                    if task.has_active_reservation() {
343                        self.repo.release_resources(
344                            &task.task_id,
345                            "orphan accepted-task reservation released during recovery",
346                        )?;
347                    }
348                }
349                TaskStatus::Running => {
350                    if let Some(shim_pid) = task.shim_pid {
351                        if process_exists(shim_pid as i32) {
352                            if !task.has_active_reservation() {
353                                let reservation =
354                                    TaskResourceReservation::from_limits(&task.limits);
355                                self.repo.reserve_resources(
356                                    &task.task_id,
357                                    &reservation,
358                                    "resource reservation reconstructed during recovery",
359                                )?;
360                            }
361                            self.repo.mark_recovered(&task.task_id)?;
362                        } else {
363                            self.repo.mark_recovery_lost(&task.task_id)?;
364                            persist_latest_result(&self.repo, &task.task_id)?;
365                        }
366                    } else {
367                        self.repo.mark_recovery_lost(&task.task_id)?;
368                        persist_latest_result(&self.repo, &task.task_id)?;
369                    }
370                }
371                TaskStatus::Success | TaskStatus::Failed | TaskStatus::Cancelled => {}
372            }
373        }
374        self.dispatcher_notify.notify_one();
375        Ok(())
376    }
377
378    async fn dispatcher_loop(&self) {
379        loop {
380            if let Err(err) = self.dispatch_once().await {
381                warn!(error = %err, "dispatcher iteration failed");
382            }
383            tokio::select! {
384                _ = self.dispatcher_notify.notified() => {},
385                _ = sleep(self.settings.dispatch_poll_interval) => {},
386            }
387        }
388    }
389
390    async fn dispatch_once(&self) -> AppResult<()> {
391        let active_reservations = self.repo.list_active_reservations()?;
392        let mut current_reserved = self.ledger.reserved_capacity(
393            active_reservations
394                .iter()
395                .filter_map(|task| task.reservation.as_ref()),
396        );
397        let tasks = self.repo.list_accepted(self.settings.max_queued_tasks)?;
398        for task in tasks {
399            if task.kill_requested {
400                self.repo.cancel_accepted_task(
401                    &task.task_id,
402                    RuntimeErrorInfo {
403                        code: ErrorCode::Cancelled,
404                        message: "task cancelled before execution".into(),
405                        details: None,
406                    },
407                )?;
408                persist_latest_result(&self.repo, &task.task_id)?;
409                continue;
410            }
411
412            if task.has_active_reservation() {
413                continue;
414            }
415
416            let reservation = task
417                .reservation
418                .clone()
419                .unwrap_or_else(|| TaskResourceReservation::from_limits(&task.limits));
420            if !self.ledger.can_reserve(&current_reserved, &reservation) {
421                continue;
422            }
423
424            self.repo
425                .reserve_resources(&task.task_id, &reservation, "task resources reserved")?;
426            add_reservation(&mut current_reserved, &reservation);
427
428            let exe = std::env::current_exe()
429                .map_err(|err| AppError::LaunchFailed(format!("resolve current exe: {err}")))?;
430            let mut child = StdCommand::new(exe);
431            child
432                .arg("internal-shim")
433                .arg("--database")
434                .arg(self.settings.database_path.as_os_str())
435                .arg("--data-dir")
436                .arg(self.settings.data_dir.as_os_str())
437                .arg("--task-id")
438                .arg(&task.task_id)
439                .arg("--termination-grace-ms")
440                .arg(self.settings.termination_grace.as_millis().to_string())
441                .arg("--cgroup-root")
442                .arg(self.settings.cgroup_root.as_os_str())
443                .stdin(Stdio::null())
444                .stdout(Stdio::null())
445                .stderr(Stdio::null());
446
447            match child.spawn() {
448                Ok(handle) => {
449                    self.repo.mark_dispatched(&task.task_id, handle.id())?;
450                    info!(task_id = %task.task_id, shim_pid = handle.id(), "task dispatched");
451                }
452                Err(err) => {
453                    let update = CompletionUpdate {
454                        status: TaskStatus::Failed,
455                        finished_at: Utc::now(),
456                        duration_ms: Some(0),
457                        exit_code: None,
458                        exit_signal: None,
459                        error: Some(RuntimeErrorInfo {
460                            code: ErrorCode::LaunchFailed,
461                            message: format!("failed to spawn shim: {err}"),
462                            details: None,
463                        }),
464                        usage: None,
465                        result_json: None,
466                    };
467                    self.repo.complete_task(&task.task_id, &update)?;
468                    subtract_reservation(&mut current_reserved, &reservation);
469                    persist_latest_result(&self.repo, &task.task_id)?;
470                }
471            }
472        }
473        Ok(())
474    }
475
476    async fn gc_loop(&self) {
477        loop {
478            sleep(self.settings.gc_interval).await;
479            let cutoff = match chrono::Duration::from_std(self.settings.result_retention) {
480                Ok(duration) => Utc::now() - duration,
481                Err(_) => Utc::now(),
482            };
483            match self.repo.list_gc_candidates(cutoff) {
484                Ok(tasks) => {
485                    for task in tasks {
486                        if let Err(err) = fs::remove_dir_all(&task.task_dir) {
487                            if err.kind() != std::io::ErrorKind::NotFound {
488                                warn!(task_id = %task.task_id, error = %err, "failed to remove task directory during gc");
489                                continue;
490                            }
491                        }
492                        if let Err(err) = self.repo.delete_task(&task.task_id) {
493                            warn!(task_id = %task.task_id, error = %err, "failed to delete task row during gc");
494                        }
495                    }
496                }
497                Err(err) => warn!(error = %err, "gc iteration failed"),
498            }
499        }
500    }
501
502    fn spawn_escalation(&self, task_id: String, pgid: Option<i32>) {
503        let repo = self.repo.clone();
504        let grace = self.settings.termination_grace;
505        tokio::spawn(async move {
506            sleep(grace).await;
507            if let Ok(task) = repo.get_task(&task_id) {
508                if task.status == TaskStatus::Running {
509                    if let Some(pgid) = pgid.or(task.pgid) {
510                        let _ = killpg(Pid::from_raw(pgid), Signal::SIGKILL);
511                    } else if let Some(pid) = task.pid {
512                        let _ = kill(Pid::from_raw(pid as i32), Signal::SIGKILL);
513                    }
514                }
515            }
516        });
517    }
518}
519
520pub async fn run(cli: Cli) -> AppResult<()> {
521    init_tracing();
522    match cli.command {
523        Command::Serve(args) => run_server(args).await,
524        Command::Submit(args) => submit_remote(args).await,
525        Command::Status(args) => status_remote(args).await,
526        Command::Kill(args) => kill_remote(args).await,
527        Command::Wait(args) => wait_remote(args).await,
528        Command::Run(args) => run_remote(args).await,
529        Command::InternalShim(args) => run_internal_shim(args).await,
530    }
531}
532
533async fn run_server(args: ServeArgs) -> AppResult<()> {
534    let service = RuntimeService::new(Settings::from_args(&args)).await?;
535    service.recover().await?;
536    service.start_background_loops();
537
538    let listener = tokio::net::TcpListener::bind(&service.settings.listen_addr)
539        .await
540        .map_err(|err| AppError::Internal(format!("bind failed: {err}")))?;
541    info!(listen_addr = %service.settings.listen_addr, "execgo-runtime listening");
542    axum::serve(listener, build_router(service))
543        .await
544        .map_err(|err| AppError::Internal(format!("server error: {err}")))
545}
546
547async fn submit_remote(args: RemoteTaskArgs) -> AppResult<()> {
548    let client = http_client();
549    let request = load_request(&args)?;
550    let response = client
551        .post(format!("{}/api/v1/tasks", trim_server(&args.server)))
552        .json(&request)
553        .send()
554        .await?;
555    print_json_response(response).await
556}
557
558async fn status_remote(args: StatusArgs) -> AppResult<()> {
559    let client = http_client();
560    let response = client
561        .get(format!(
562            "{}/api/v1/tasks/{}",
563            trim_server(&args.server),
564            args.task_id
565        ))
566        .send()
567        .await?;
568    print_json_response(response).await
569}
570
571async fn kill_remote(args: StatusArgs) -> AppResult<()> {
572    let client = http_client();
573    let response = client
574        .post(format!(
575            "{}/api/v1/tasks/{}/kill",
576            trim_server(&args.server),
577            args.task_id
578        ))
579        .send()
580        .await?;
581    print_json_response(response).await
582}
583
584async fn wait_remote(args: WaitArgs) -> AppResult<()> {
585    let client = http_client();
586    let start = Instant::now();
587    loop {
588        let response = client
589            .get(format!(
590                "{}/api/v1/tasks/{}",
591                trim_server(&args.server),
592                args.task_id
593            ))
594            .send()
595            .await?;
596
597        if !response.status().is_success() {
598            return print_json_response(response).await;
599        }
600
601        let payload: TaskStatusResponse = response.json().await?;
602        if payload.status.is_terminal() {
603            println!("{}", serde_json::to_string_pretty(&payload)?);
604            return Ok(());
605        }
606        if let Some(timeout) = args.timeout() {
607            if start.elapsed() >= timeout {
608                return Err(AppError::Internal("wait timeout exceeded".into()));
609            }
610        }
611        sleep(Duration::from_millis(args.poll_interval_ms)).await;
612    }
613}
614
615async fn run_remote(args: RemoteTaskArgs) -> AppResult<()> {
616    let request = load_request(&args)?;
617    let client = http_client();
618    let response = client
619        .post(format!("{}/api/v1/tasks", trim_server(&args.server)))
620        .json(&request)
621        .send()
622        .await?;
623    if !response.status().is_success() {
624        return print_json_response(response).await;
625    }
626    let payload: SubmitTaskResponse = response.json().await?;
627    wait_remote(WaitArgs {
628        server: args.server,
629        task_id: payload.task_id,
630        timeout_ms: args.timeout_ms,
631        poll_interval_ms: args.poll_interval_ms,
632    })
633    .await
634}
635
636async fn run_internal_shim(args: InternalShimArgs) -> AppResult<()> {
637    let repo = Repository::new(args.database.clone());
638    repo.init()?;
639    let mut task = repo.get_task(&args.task_id)?;
640    if task.status.is_terminal() {
641        return Ok(());
642    }
643    if task.kill_requested && task.pid.is_none() {
644        repo.complete_task(
645            &task.task_id,
646            &CompletionUpdate {
647                status: TaskStatus::Cancelled,
648                finished_at: Utc::now(),
649                duration_ms: Some(0),
650                exit_code: None,
651                exit_signal: None,
652                error: Some(RuntimeErrorInfo {
653                    code: ErrorCode::Cancelled,
654                    message: "task cancelled before process launch".into(),
655                    details: None,
656                }),
657                usage: None,
658                result_json: None,
659            },
660        )?;
661        persist_latest_result(&repo, &task.task_id)?;
662        return Ok(());
663    }
664
665    let execution_plan = task
666        .execution_plan
667        .clone()
668        .unwrap_or_else(|| legacy_execution_plan(&task));
669
670    match spawn_task_process(&task, &execution_plan, &args.cgroup_root) {
671        Ok(spawned) => {
672            repo.mark_started(
673                &task.task_id,
674                spawned.pid,
675                spawned.pgid,
676                spawned.script_path.as_deref(),
677            )?;
678            task = repo.get_task(&task.task_id)?;
679            let wait_handle = tokio::task::spawn_blocking(move || wait_for_pid(spawned.pid as i32));
680            let outcome = supervise_wait(
681                &repo,
682                &task,
683                wait_handle,
684                args.termination_grace_ms,
685                execution_plan.resource_enforcement.wall_time_ms,
686                spawned.pgid,
687                spawned.cgroup_dir.as_deref(),
688            )
689            .await?;
690            repo.complete_task(&task.task_id, &outcome.completion)?;
691            persist_latest_result(&repo, &task.task_id)?;
692        }
693        Err(err) => {
694            repo.complete_task(
695                &task.task_id,
696                &CompletionUpdate {
697                    status: TaskStatus::Failed,
698                    finished_at: Utc::now(),
699                    duration_ms: Some(0),
700                    exit_code: None,
701                    exit_signal: None,
702                    error: Some(match err {
703                        AppError::SandboxSetup(message) => RuntimeErrorInfo {
704                            code: ErrorCode::SandboxSetupFailed,
705                            message,
706                            details: None,
707                        },
708                        AppError::LaunchFailed(message) => RuntimeErrorInfo {
709                            code: ErrorCode::LaunchFailed,
710                            message,
711                            details: None,
712                        },
713                        other => RuntimeErrorInfo {
714                            code: ErrorCode::Internal,
715                            message: other.to_string(),
716                            details: None,
717                        },
718                    }),
719                    usage: None,
720                    result_json: None,
721                },
722            )?;
723            persist_latest_result(&repo, &task.task_id)?;
724        }
725    }
726    Ok(())
727}
728
729#[derive(Debug)]
730struct SpawnedProcess {
731    pid: u32,
732    pgid: i32,
733    script_path: Option<PathBuf>,
734    cgroup_dir: Option<PathBuf>,
735}
736
737#[derive(Debug)]
738struct WaitOutcome {
739    completion: CompletionUpdate,
740}
741
742fn spawn_task_process(
743    task: &TaskRecord,
744    execution_plan: &ExecutionPlan,
745    _cgroup_root: &Path,
746) -> AppResult<SpawnedProcess> {
747    let stdout_file = open_output_file(&task.stdout_path)?;
748    let stderr_file = open_output_file(&task.stderr_path)?;
749    let (mut command, script_path) = build_command(task, stdout_file, stderr_file)?;
750
751    let resource_enforcement = execution_plan.resource_enforcement.clone();
752    let sandbox = execution_plan.effective_sandbox.clone();
753    let _rootfs = sandbox.rootfs.clone();
754    unsafe {
755        command.pre_exec(move || {
756            setpgid(Pid::from_raw(0), Pid::from_raw(0)).map_err(nix_to_io)?;
757            apply_resource_enforcement(&resource_enforcement).map_err(nix_to_io)?;
758            #[cfg(target_os = "linux")]
759            apply_linux_sandbox(&sandbox, _rootfs.as_deref()).map_err(nix_to_io)?;
760            Ok(())
761        });
762    }
763
764    let child = command
765        .spawn()
766        .map_err(|err| AppError::LaunchFailed(format!("spawn process: {err}")))?;
767    let pid = child.id();
768    let pgid = pid as i32;
769    drop(child);
770
771    let cgroup_dir = if execution_plan.resource_enforcement.cgroup_enforced {
772        #[cfg(target_os = "linux")]
773        {
774            let dir = setup_cgroup(
775                _cgroup_root,
776                &task.task_id,
777                pid as i32,
778                &execution_plan.resource_enforcement,
779            )
780            .map_err(|err| AppError::SandboxSetup(format!("configure cgroup: {err}")))?;
781            Some(dir)
782        }
783        #[cfg(not(target_os = "linux"))]
784        {
785            None
786        }
787    } else {
788        None
789    };
790
791    Ok(SpawnedProcess {
792        pid,
793        pgid,
794        script_path,
795        cgroup_dir,
796    })
797}
798
799fn build_command(
800    task: &TaskRecord,
801    stdout_file: File,
802    stderr_file: File,
803) -> AppResult<(StdCommand, Option<PathBuf>)> {
804    let env = minimal_env(&task.execution.env);
805    let workspace = task.workspace_dir.clone();
806    fs::create_dir_all(&workspace)?;
807
808    let (mut command, script_path) = match task.execution.kind {
809        ExecutionKind::Command => {
810            let program = task
811                .execution
812                .program
813                .as_deref()
814                .ok_or_else(|| AppError::InvalidInput("execution.program missing".into()))?;
815            let mut cmd = StdCommand::new(program);
816            cmd.args(&task.execution.args);
817            (cmd, None)
818        }
819        ExecutionKind::Script => {
820            let script = task
821                .execution
822                .script
823                .as_ref()
824                .ok_or_else(|| AppError::InvalidInput("execution.script missing".into()))?;
825            let path = task
826                .script_path
827                .clone()
828                .unwrap_or_else(|| task.task_dir.join("script.sh"));
829            write_script_file(&path, script)?;
830
831            let cmd = if let Some(interpreter) = &task.execution.interpreter {
832                let mut command = StdCommand::new(&interpreter[0]);
833                command.args(&interpreter[1..]);
834                command.arg(&path);
835                command
836            } else {
837                let mut command = StdCommand::new("/bin/sh");
838                command.arg("-c").arg(script);
839                command
840            };
841            (cmd, Some(path))
842        }
843    };
844
845    command
846        .stdin(Stdio::null())
847        .stdout(Stdio::from(stdout_file))
848        .stderr(Stdio::from(stderr_file))
849        .current_dir(workspace)
850        .env_clear();
851
852    for (key, value) in env {
853        command.env(key, value);
854    }
855    Ok((command, script_path))
856}
857
858async fn supervise_wait(
859    repo: &Repository,
860    task: &TaskRecord,
861    wait_handle: JoinHandle<AppResult<WaitUsage>>,
862    termination_grace_ms: u64,
863    wall_time_ms: u64,
864    pgid: i32,
865    cgroup_dir: Option<&Path>,
866) -> AppResult<WaitOutcome> {
867    let start = Instant::now();
868    let mut wait_handle = std::pin::pin!(wait_handle);
869    let mut poll = tokio::time::interval(Duration::from_millis(250));
870    let mut timeout_started: Option<Instant> = None;
871    let mut cancel_started: Option<Instant> = None;
872    let mut term_sent = false;
873    let mut kill_sent = false;
874
875    loop {
876        tokio::select! {
877            result = &mut wait_handle => {
878                let usage = result
879                    .map_err(|err| AppError::Internal(format!("wait join error: {err}")))??;
880                let duration_ms = start.elapsed().as_millis() as u64;
881                let cancel_requested = cancel_started.is_some() || repo.is_cancel_requested(&task.task_id)?;
882                return Ok(WaitOutcome {
883                    completion: classify_completion(
884                        task,
885                        usage,
886                        duration_ms,
887                        timeout_started.is_some(),
888                        cancel_requested,
889                        cgroup_dir,
890                    ),
891                });
892            }
893            _ = poll.tick() => {
894                let cancel_requested = repo.is_cancel_requested(&task.task_id)?;
895                if timeout_started.is_none() && start.elapsed() >= Duration::from_millis(wall_time_ms) {
896                    repo.mark_timeout_triggered(&task.task_id)?;
897                    timeout_started = Some(Instant::now());
898                }
899                if cancel_requested && cancel_started.is_none() {
900                    cancel_started = Some(Instant::now());
901                }
902
903                if (timeout_started.is_some() || cancel_started.is_some()) && !term_sent {
904                    let _ = killpg(Pid::from_raw(pgid), Signal::SIGTERM);
905                    term_sent = true;
906                }
907
908                let should_escalate = timeout_started
909                    .map(|started| started.elapsed() >= Duration::from_millis(termination_grace_ms))
910                    .unwrap_or(false)
911                    || cancel_started
912                        .map(|started| started.elapsed() >= Duration::from_millis(termination_grace_ms))
913                        .unwrap_or(false);
914
915                if should_escalate && !kill_sent {
916                    let _ = killpg(Pid::from_raw(pgid), Signal::SIGKILL);
917                    kill_sent = true;
918                }
919            }
920        }
921    }
922}
923
924#[derive(Debug)]
925struct WaitUsage {
926    exit_code: Option<i32>,
927    exit_signal: Option<i32>,
928    user_cpu_ms: Option<u64>,
929    system_cpu_ms: Option<u64>,
930    max_rss_bytes: Option<u64>,
931}
932
933fn classify_completion(
934    _task: &TaskRecord,
935    usage: WaitUsage,
936    duration_ms: u64,
937    timed_out: bool,
938    cancelled: bool,
939    cgroup_dir: Option<&Path>,
940) -> CompletionUpdate {
941    let mut usage_payload = ResourceUsage {
942        duration_ms,
943        user_cpu_ms: usage.user_cpu_ms,
944        system_cpu_ms: usage.system_cpu_ms,
945        max_rss_bytes: usage.max_rss_bytes,
946        memory_peak_bytes: read_memory_peak_bytes(cgroup_dir),
947    };
948
949    let (status, error) = if timed_out {
950        (
951            TaskStatus::Failed,
952            Some(RuntimeErrorInfo {
953                code: ErrorCode::Timeout,
954                message: "task exceeded wall_time_ms".into(),
955                details: None,
956            }),
957        )
958    } else if cancelled {
959        (
960            TaskStatus::Cancelled,
961            Some(RuntimeErrorInfo {
962                code: ErrorCode::Cancelled,
963                message: "task cancelled".into(),
964                details: None,
965            }),
966        )
967    } else if oom_killed(cgroup_dir) {
968        (
969            TaskStatus::Failed,
970            Some(RuntimeErrorInfo {
971                code: ErrorCode::MemoryLimitExceeded,
972                message: "task exceeded memory limit".into(),
973                details: None,
974            }),
975        )
976    } else if usage.exit_signal == Some(libc::SIGXCPU) {
977        (
978            TaskStatus::Failed,
979            Some(RuntimeErrorInfo {
980                code: ErrorCode::CpuLimitExceeded,
981                message: "task exceeded cpu_time_sec".into(),
982                details: None,
983            }),
984        )
985    } else if usage.exit_code == Some(0) {
986        (TaskStatus::Success, None)
987    } else if let Some(code) = usage.exit_code {
988        (
989            TaskStatus::Failed,
990            Some(RuntimeErrorInfo {
991                code: ErrorCode::ExitNonZero,
992                message: format!("task exited with code {code}"),
993                details: None,
994            }),
995        )
996    } else if let Some(signal) = usage.exit_signal {
997        (
998            TaskStatus::Failed,
999            Some(RuntimeErrorInfo {
1000                code: ErrorCode::Internal,
1001                message: format!("task terminated by signal {signal}"),
1002                details: None,
1003            }),
1004        )
1005    } else {
1006        (
1007            TaskStatus::Failed,
1008            Some(RuntimeErrorInfo {
1009                code: ErrorCode::Internal,
1010                message: "task failed with unknown outcome".into(),
1011                details: None,
1012            }),
1013        )
1014    };
1015
1016    if usage_payload.duration_ms == 0 {
1017        usage_payload.duration_ms = duration_ms;
1018    }
1019
1020    CompletionUpdate {
1021        status,
1022        finished_at: Utc::now(),
1023        duration_ms: Some(duration_ms),
1024        exit_code: usage.exit_code,
1025        exit_signal: usage.exit_signal,
1026        error,
1027        usage: Some(usage_payload),
1028        result_json: None,
1029    }
1030}
1031
1032fn build_status_response(task: &TaskRecord) -> AppResult<TaskStatusResponse> {
1033    let (stdout, stdout_truncated) = read_output_preview(&task.stdout_path, task.stdout_max_bytes)?;
1034    let (stderr, stderr_truncated) = read_output_preview(&task.stderr_path, task.stderr_max_bytes)?;
1035    let duration_ms = task.duration_ms.or_else(|| {
1036        task.started_at
1037            .map(|started_at| (Utc::now() - started_at).num_milliseconds().max(0) as u64)
1038    });
1039    Ok(TaskStatusResponse {
1040        task_id: task.task_id.clone(),
1041        handle_id: task.handle_id.clone(),
1042        status: task.status.clone(),
1043        created_at: task.created_at,
1044        updated_at: task.updated_at,
1045        started_at: task.started_at,
1046        finished_at: task.finished_at,
1047        duration_ms,
1048        shim_pid: task.shim_pid,
1049        pid: task.pid,
1050        pgid: task.pgid,
1051        exit_code: task.exit_code,
1052        exit_signal: task.exit_signal,
1053        stdout,
1054        stderr,
1055        stdout_truncated,
1056        stderr_truncated,
1057        error: task.error.clone(),
1058        usage: task.usage.clone().or_else(|| {
1059            duration_ms.map(|value| ResourceUsage {
1060                duration_ms: value,
1061                user_cpu_ms: None,
1062                system_cpu_ms: None,
1063                max_rss_bytes: None,
1064                memory_peak_bytes: None,
1065            })
1066        }),
1067        execution_plan: task
1068            .execution_plan
1069            .clone()
1070            .or_else(|| Some(legacy_execution_plan(task))),
1071        reservation: task.reservation.clone(),
1072        artifacts: TaskArtifacts {
1073            task_dir: task.task_dir.to_string_lossy().to_string(),
1074            request_path: task.request_path.to_string_lossy().to_string(),
1075            result_path: task.result_path.to_string_lossy().to_string(),
1076            stdout_path: task.stdout_path.to_string_lossy().to_string(),
1077            stderr_path: task.stderr_path.to_string_lossy().to_string(),
1078            script_path: task
1079                .script_path
1080                .as_ref()
1081                .map(|path| path.to_string_lossy().to_string()),
1082        },
1083        metadata: task.metadata.clone(),
1084    })
1085}
1086
1087fn legacy_execution_plan(task: &TaskRecord) -> ExecutionPlan {
1088    ExecutionPlan::legacy(task.sandbox.clone(), task.limits.clone())
1089}
1090
1091fn add_reservation(current: &mut ResourceCapacity, reservation: &TaskResourceReservation) {
1092    current.task_slots = current.task_slots.saturating_add(reservation.task_slots);
1093    if let Some(value) = reservation.memory_bytes {
1094        current.memory_bytes = Some(current.memory_bytes.unwrap_or(0).saturating_add(value));
1095    }
1096    if let Some(value) = reservation.pids {
1097        current.pids = Some(current.pids.unwrap_or(0).saturating_add(value));
1098    }
1099}
1100
1101fn subtract_reservation(current: &mut ResourceCapacity, reservation: &TaskResourceReservation) {
1102    current.task_slots = current.task_slots.saturating_sub(reservation.task_slots);
1103    if let Some(value) = reservation.memory_bytes {
1104        current.memory_bytes = current
1105            .memory_bytes
1106            .map(|reserved| reserved.saturating_sub(value));
1107    }
1108    if let Some(value) = reservation.pids {
1109        current.pids = current.pids.map(|reserved| reserved.saturating_sub(value));
1110    }
1111}
1112
1113fn persist_latest_result(repo: &Repository, task_id: &str) -> AppResult<()> {
1114    let task = repo.get_task(task_id)?;
1115    let response = build_status_response(&task)?;
1116    write_json_file(&task.result_path, &response)?;
1117    Ok(())
1118}
1119
1120fn signal_task_termination(task: &TaskRecord, signal: Signal) -> AppResult<()> {
1121    if let Some(pgid) = task.pgid {
1122        killpg(Pid::from_raw(pgid), signal)
1123            .map_err(|err| AppError::Internal(format!("signal process group: {err}")))?;
1124    } else if let Some(pid) = task.pid {
1125        kill(Pid::from_raw(pid as i32), signal)
1126            .map_err(|err| AppError::Internal(format!("signal process: {err}")))?;
1127    }
1128    Ok(())
1129}
1130
1131fn infer_script_name(interpreter: Option<&str>) -> &'static str {
1132    match interpreter.unwrap_or_default() {
1133        value if value.contains("python") => "script.py",
1134        value if value.contains("bash") => "script.sh",
1135        value if value.contains("zsh") => "script.zsh",
1136        value if value.contains("node") => "script.js",
1137        _ => "script.sh",
1138    }
1139}
1140
1141fn write_script_file(path: &Path, script: &str) -> AppResult<()> {
1142    if let Some(parent) = path.parent() {
1143        fs::create_dir_all(parent)?;
1144    }
1145    let mut file = OpenOptions::new()
1146        .create(true)
1147        .truncate(true)
1148        .write(true)
1149        .mode(0o700)
1150        .open(path)?;
1151    file.write_all(script.as_bytes())?;
1152    Ok(())
1153}
1154
1155fn read_output_preview(path: &Path, max_bytes: u64) -> AppResult<(String, bool)> {
1156    match File::open(path) {
1157        Ok(mut file) => {
1158            let len = file.metadata()?.len();
1159            let mut buffer = vec![0; max_bytes as usize];
1160            let read = file.read(&mut buffer)?;
1161            buffer.truncate(read);
1162            Ok((
1163                String::from_utf8_lossy(&buffer).to_string(),
1164                len > max_bytes,
1165            ))
1166        }
1167        Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok((String::new(), false)),
1168        Err(err) => Err(AppError::Io(err)),
1169    }
1170}
1171
1172fn write_json_file(path: &Path, value: &impl serde::Serialize) -> AppResult<()> {
1173    if let Some(parent) = path.parent() {
1174        fs::create_dir_all(parent)?;
1175    }
1176    let json = serde_json::to_vec_pretty(value)?;
1177    fs::write(path, json)?;
1178    Ok(())
1179}
1180
1181fn touch_file(path: &Path) -> AppResult<()> {
1182    if let Some(parent) = path.parent() {
1183        fs::create_dir_all(parent)?;
1184    }
1185    let _ = OpenOptions::new().create(true).append(true).open(path)?;
1186    Ok(())
1187}
1188
1189fn open_output_file(path: &Path) -> AppResult<File> {
1190    if let Some(parent) = path.parent() {
1191        fs::create_dir_all(parent)?;
1192    }
1193    OpenOptions::new()
1194        .create(true)
1195        .truncate(true)
1196        .write(true)
1197        .mode(0o644)
1198        .open(path)
1199        .map_err(AppError::Io)
1200}
1201
1202fn minimal_env(extra: &std::collections::HashMap<String, String>) -> BTreeMap<String, String> {
1203    let mut env = BTreeMap::new();
1204    for key in ["PATH", "HOME", "LANG", "TMPDIR", "USER"] {
1205        if let Ok(value) = std::env::var(key) {
1206            env.insert(key.to_string(), value);
1207        }
1208    }
1209    env.extend(
1210        extra
1211            .iter()
1212            .map(|(key, value)| (key.clone(), value.clone())),
1213    );
1214    env
1215}
1216
1217fn apply_resource_enforcement(enforcement: &ResourceEnforcementPlan) -> nix::Result<()> {
1218    if enforcement.cpu_time_enforced {
1219        if let Some(cpu_time_sec) = enforcement.cpu_time_sec {
1220            setrlimit(
1221                Resource::RLIMIT_CPU,
1222                cpu_time_sec as rlim_t,
1223                cpu_time_sec as rlim_t,
1224            )?;
1225        }
1226    }
1227    if enforcement.memory_enforced {
1228        if let Some(memory_bytes) = enforcement.memory_bytes {
1229            setrlimit(
1230                Resource::RLIMIT_AS,
1231                memory_bytes as rlim_t,
1232                memory_bytes as rlim_t,
1233            )?;
1234        }
1235    }
1236    Ok(())
1237}
1238
1239#[allow(dead_code)]
1240fn apply_rlimits(limits: &crate::types::ResourceLimits) -> nix::Result<()> {
1241    if let Some(cpu_time_sec) = limits.cpu_time_sec {
1242        setrlimit(
1243            Resource::RLIMIT_CPU,
1244            cpu_time_sec as rlim_t,
1245            cpu_time_sec as rlim_t,
1246        )?;
1247    }
1248    if let Some(memory_bytes) = limits.memory_bytes {
1249        setrlimit(
1250            Resource::RLIMIT_AS,
1251            memory_bytes as rlim_t,
1252            memory_bytes as rlim_t,
1253        )?;
1254    }
1255    Ok(())
1256}
1257
1258#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
1259#[cfg(target_os = "linux")]
1260fn apply_linux_sandbox(
1261    sandbox: &crate::types::SandboxPolicy,
1262    rootfs: Option<&str>,
1263) -> nix::Result<()> {
1264    use nix::sched::{unshare, CloneFlags};
1265
1266    if matches!(sandbox.profile, crate::types::SandboxProfile::LinuxSandbox) {
1267        let namespaces = sandbox.effective_namespaces();
1268        let mut flags = CloneFlags::empty();
1269        if namespaces.mount {
1270            flags |= CloneFlags::CLONE_NEWNS;
1271        }
1272        if namespaces.pid {
1273            flags |= CloneFlags::CLONE_NEWPID;
1274        }
1275        if namespaces.uts {
1276            flags |= CloneFlags::CLONE_NEWUTS;
1277        }
1278        if namespaces.ipc {
1279            flags |= CloneFlags::CLONE_NEWIPC;
1280        }
1281        if namespaces.net {
1282            flags |= CloneFlags::CLONE_NEWNET;
1283        }
1284        if !flags.is_empty() {
1285            unshare(flags)?;
1286        }
1287        if sandbox.chroot {
1288            if let Some(root) = rootfs {
1289                chroot(root)?;
1290                chdir("/")?;
1291            }
1292        }
1293    }
1294    Ok(())
1295}
1296
1297#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
1298#[cfg(not(target_os = "linux"))]
1299fn apply_linux_sandbox(
1300    _sandbox: &crate::types::SandboxPolicy,
1301    _rootfs: Option<&str>,
1302) -> nix::Result<()> {
1303    Ok(())
1304}
1305
1306#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
1307#[cfg(target_os = "linux")]
1308fn setup_cgroup(
1309    cgroup_root: &Path,
1310    task_id: &str,
1311    pid: i32,
1312    enforcement: &ResourceEnforcementPlan,
1313) -> AppResult<PathBuf> {
1314    let dir = cgroup_root.join(task_id);
1315    fs::create_dir_all(&dir)?;
1316    if enforcement.memory_enforced {
1317        if let Some(memory_bytes) = enforcement.memory_bytes {
1318            fs::write(dir.join("memory.max"), memory_bytes.to_string())?;
1319        }
1320    }
1321    if enforcement.pids_enforced {
1322        if let Some(pids_max) = enforcement.pids_max {
1323            fs::write(dir.join("pids.max"), pids_max.to_string())?;
1324        }
1325    }
1326    fs::write(dir.join("cgroup.procs"), pid.to_string())?;
1327    Ok(dir)
1328}
1329
1330#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
1331#[cfg(not(target_os = "linux"))]
1332fn setup_cgroup(
1333    _cgroup_root: &Path,
1334    _task_id: &str,
1335    _pid: i32,
1336    _enforcement: &ResourceEnforcementPlan,
1337) -> AppResult<PathBuf> {
1338    Err(AppError::SandboxSetup(
1339        "linux-sandbox requires a Linux host".into(),
1340    ))
1341}
1342
1343fn read_memory_peak_bytes(_cgroup_dir: Option<&Path>) -> Option<u64> {
1344    #[cfg(target_os = "linux")]
1345    {
1346        let cgroup_dir = _cgroup_dir;
1347        if let Some(cgroup_dir) = cgroup_dir {
1348            let path = cgroup_dir.join("memory.peak");
1349            if let Ok(value) = fs::read_to_string(path) {
1350                return value.trim().parse::<u64>().ok();
1351            }
1352        }
1353    }
1354    None
1355}
1356
1357fn oom_killed(_cgroup_dir: Option<&Path>) -> bool {
1358    #[cfg(target_os = "linux")]
1359    {
1360        let cgroup_dir = _cgroup_dir;
1361        if let Some(cgroup_dir) = cgroup_dir {
1362            let path = cgroup_dir.join("memory.events");
1363            if let Ok(contents) = fs::read_to_string(path) {
1364                for line in contents.lines() {
1365                    let mut parts = line.split_whitespace();
1366                    if matches!(parts.next(), Some("oom_kill"))
1367                        && parts
1368                            .next()
1369                            .and_then(|value| value.parse::<u64>().ok())
1370                            .unwrap_or_default()
1371                            > 0
1372                    {
1373                        return true;
1374                    }
1375                }
1376            }
1377        }
1378    }
1379    false
1380}
1381
1382fn wait_for_pid(pid: i32) -> AppResult<WaitUsage> {
1383    let mut status: libc::c_int = 0;
1384    let mut usage = std::mem::MaybeUninit::<libc::rusage>::zeroed();
1385    let wait_result = loop {
1386        let rc = unsafe { libc::wait4(pid, &mut status, 0, usage.as_mut_ptr()) };
1387        if rc == -1 {
1388            let err = std::io::Error::last_os_error();
1389            if err.kind() == std::io::ErrorKind::Interrupted {
1390                continue;
1391            }
1392            return Err(AppError::Io(err));
1393        }
1394        break rc;
1395    };
1396
1397    if wait_result <= 0 {
1398        return Err(AppError::Internal("wait4 returned no child".into()));
1399    }
1400
1401    let usage = unsafe { usage.assume_init() };
1402    let exit_code = if libc::WIFEXITED(status) {
1403        Some(libc::WEXITSTATUS(status))
1404    } else {
1405        None
1406    };
1407    let exit_signal = if libc::WIFSIGNALED(status) {
1408        Some(libc::WTERMSIG(status))
1409    } else {
1410        None
1411    };
1412
1413    Ok(WaitUsage {
1414        exit_code,
1415        exit_signal,
1416        user_cpu_ms: Some(timeval_to_ms(usage.ru_utime)),
1417        system_cpu_ms: Some(timeval_to_ms(usage.ru_stime)),
1418        max_rss_bytes: Some(convert_max_rss(usage.ru_maxrss)),
1419    })
1420}
1421
1422fn timeval_to_ms(tv: libc::timeval) -> u64 {
1423    (tv.tv_sec.max(0) as u64)
1424        .saturating_mul(1000)
1425        .saturating_add((tv.tv_usec.max(0) as u64) / 1000)
1426}
1427
1428#[cfg(target_os = "linux")]
1429fn convert_max_rss(value: libc::c_long) -> u64 {
1430    (value.max(0) as u64).saturating_mul(1024)
1431}
1432
1433#[cfg(not(target_os = "linux"))]
1434fn convert_max_rss(value: libc::c_long) -> u64 {
1435    value.max(0) as u64
1436}
1437
1438fn load_request(args: &RemoteTaskArgs) -> AppResult<SubmitTaskRequest> {
1439    let raw = if let Some(file) = &args.file {
1440        fs::read_to_string(file)?
1441    } else {
1442        args.json.clone().unwrap_or_default()
1443    };
1444    Ok(serde_json::from_str(&raw)?)
1445}
1446
1447fn http_client() -> Client {
1448    Client::builder().build().unwrap_or_else(|_| Client::new())
1449}
1450
1451fn trim_server(server: &str) -> &str {
1452    server.trim_end_matches('/')
1453}
1454
1455fn default_runtime_id(listen_addr: &str) -> String {
1456    let host = std::env::var("HOSTNAME")
1457        .ok()
1458        .filter(|value| !value.trim().is_empty())
1459        .unwrap_or_else(|| "execgo-runtime".into());
1460    format!(
1461        "{}-{}",
1462        sanitize_runtime_id(&host),
1463        sanitize_runtime_id(listen_addr)
1464    )
1465}
1466
1467fn sanitize_runtime_id(value: &str) -> String {
1468    value
1469        .chars()
1470        .map(|ch| {
1471            if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') {
1472                ch
1473            } else {
1474                '-'
1475            }
1476        })
1477        .collect()
1478}
1479
1480async fn print_json_response(response: reqwest::Response) -> AppResult<()> {
1481    let status = response.status();
1482    let body = response.text().await?;
1483    println!("{body}");
1484    if status.is_success() {
1485        Ok(())
1486    } else {
1487        Err(AppError::Internal(format!(
1488            "request failed with status {status}"
1489        )))
1490    }
1491}
1492
1493fn process_exists(pid: i32) -> bool {
1494    if pid <= 0 {
1495        return false;
1496    }
1497    kill(Pid::from_raw(pid), None).is_ok()
1498}
1499
1500fn nix_to_io(err: nix::Error) -> std::io::Error {
1501    std::io::Error::other(err.to_string())
1502}
1503
1504fn init_tracing() {
1505    let _ = tracing_subscriber::fmt()
1506        .with_env_filter(
1507            tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()),
1508        )
1509        .json()
1510        .try_init();
1511}
1512
1513#[cfg(test)]
1514mod tests {
1515    use std::collections::{BTreeMap, HashMap};
1516
1517    use tempfile::TempDir;
1518
1519    use super::*;
1520    use crate::types::{ExecutionKind, ExecutionSpec, ResourceLimits, SandboxPolicy};
1521
1522    #[test]
1523    fn minimal_env_keeps_common_keys_and_overrides() {
1524        let mut extra = HashMap::new();
1525        extra.insert("FOO".to_string(), "bar".to_string());
1526        let env = minimal_env(&extra);
1527        assert_eq!(env.get("FOO").map(String::as_str), Some("bar"));
1528    }
1529
1530    #[tokio::test]
1531    async fn status_response_reads_inline_output() {
1532        let temp = TempDir::new().unwrap();
1533        let stdout_path = temp.path().join("stdout.log");
1534        let stderr_path = temp.path().join("stderr.log");
1535        fs::write(&stdout_path, "hello world").unwrap();
1536        fs::write(&stderr_path, "oops").unwrap();
1537
1538        let task = TaskRecord {
1539            task_id: "t1".into(),
1540            handle_id: "t1".into(),
1541            status: TaskStatus::Success,
1542            execution: ExecutionSpec {
1543                kind: ExecutionKind::Command,
1544                program: Some("echo".into()),
1545                args: vec![],
1546                script: None,
1547                interpreter: None,
1548                env: HashMap::new(),
1549            },
1550            limits: ResourceLimits::default(),
1551            sandbox: SandboxPolicy::default(),
1552            metadata: BTreeMap::new(),
1553            created_at: Utc::now(),
1554            updated_at: Utc::now(),
1555            started_at: None,
1556            finished_at: None,
1557            duration_ms: Some(1),
1558            shim_pid: None,
1559            pid: None,
1560            pgid: None,
1561            exit_code: Some(0),
1562            exit_signal: None,
1563            error_code: None,
1564            error: None,
1565            usage: None,
1566            task_dir: temp.path().to_path_buf(),
1567            workspace_dir: temp.path().join("workspace"),
1568            request_path: temp.path().join("request.json"),
1569            result_path: temp.path().join("result.json"),
1570            stdout_path,
1571            stderr_path,
1572            script_path: None,
1573            stdout_max_bytes: 1024,
1574            stderr_max_bytes: 1024,
1575            kill_requested: false,
1576            kill_requested_at: None,
1577            timeout_triggered: false,
1578            result_json: None,
1579            execution_plan: None,
1580            control_context: None,
1581            reservation: None,
1582            reserved_at: None,
1583            released_at: None,
1584        };
1585
1586        let response = build_status_response(&task).unwrap();
1587        assert_eq!(response.stdout, "hello world");
1588        assert_eq!(response.stderr, "oops");
1589    }
1590
1591    #[test]
1592    fn infer_script_name_follows_interpreter() {
1593        assert_eq!(infer_script_name(Some("python3")), "script.py");
1594        assert_eq!(infer_script_name(Some("bash")), "script.sh");
1595    }
1596}