1use std::{
2 collections::BTreeMap,
3 fs::{self, File, OpenOptions},
4 io::{Read, Write},
5 os::unix::{fs::OpenOptionsExt, process::CommandExt},
6 path::{Path, PathBuf},
7 process::{Command as StdCommand, Stdio},
8 sync::Arc,
9 time::{Duration, Instant},
10};
11
12use axum::{http::StatusCode, response::IntoResponse};
13use chrono::Utc;
14use nix::sys::{
15 resource::{rlim_t, setrlimit, Resource},
16 signal::{kill, killpg, Signal},
17};
18use nix::unistd::{setpgid, Pid};
19use reqwest::Client;
20use tokio::{sync::Notify, task::JoinHandle, time::sleep};
21use tracing::{info, warn};
22
23#[cfg(target_os = "linux")]
24use nix::unistd::{chdir, chroot};
25
26use crate::{
27 capabilities::{probe_runtime_capabilities, CapabilityProbeInput},
28 cli::{Cli, Command, InternalShimArgs, RemoteTaskArgs, ServeArgs, StatusArgs, WaitArgs},
29 error::{AppError, AppResult},
30 ledger::ResourceLedger,
31 metrics::render_prometheus,
32 policy::resolve_execution_plan,
33 repo::{generate_task_id, CompletionUpdate, Repository, TaskRecord},
34 server::build_router,
35 types::{
36 resolve_workspace_dir, ActiveTaskReservation, CapabilityMode, ErrorCode, EventRecord,
37 ExecutionKind, ExecutionPlan, ResourceCapacity, ResourceEnforcementPlan, ResourceUsage,
38 RuntimeCapabilities, RuntimeConfigResponse, RuntimeErrorInfo, RuntimeInfoResponse,
39 RuntimeResourcesResponse, SubmitTaskRequest, SubmitTaskResponse, TaskArtifacts,
40 TaskResourceReservation, TaskStatus, TaskStatusResponse,
41 },
42};
43
44#[derive(Debug, Clone)]
45pub struct Settings {
46 pub runtime_id: String,
47 pub listen_addr: String,
48 pub data_dir: PathBuf,
49 pub tasks_dir: PathBuf,
50 pub database_path: PathBuf,
51 pub max_running_tasks: usize,
52 pub max_queued_tasks: usize,
53 pub termination_grace: Duration,
54 pub result_retention: Duration,
55 pub gc_interval: Duration,
56 pub dispatch_poll_interval: Duration,
57 pub cgroup_root: PathBuf,
58 pub default_capability_mode: CapabilityMode,
59 pub disable_linux_sandbox: bool,
60 pub disable_cgroup: bool,
61 pub capacity_memory_bytes: Option<u64>,
62 pub capacity_pids: Option<u64>,
63}
64
65impl Settings {
66 pub fn from_args(args: &ServeArgs) -> Self {
67 let data_dir = args.data_dir.clone();
68 let tasks_dir = data_dir.join("tasks");
69 let database_path = data_dir.join("runtime.db");
70 Self {
71 runtime_id: args
72 .runtime_id
73 .clone()
74 .unwrap_or_else(|| default_runtime_id(&args.listen_addr)),
75 listen_addr: args.listen_addr.clone(),
76 data_dir,
77 tasks_dir,
78 database_path,
79 max_running_tasks: args.max_running_tasks,
80 max_queued_tasks: args.max_queued_tasks,
81 termination_grace: Duration::from_millis(args.termination_grace_ms),
82 result_retention: Duration::from_secs(args.result_retention_secs),
83 gc_interval: Duration::from_millis(args.gc_interval_ms),
84 dispatch_poll_interval: Duration::from_millis(args.dispatch_poll_interval_ms),
85 cgroup_root: args.cgroup_root.clone(),
86 default_capability_mode: args.default_capability_mode,
87 disable_linux_sandbox: args.disable_linux_sandbox,
88 disable_cgroup: args.disable_cgroup,
89 capacity_memory_bytes: args.capacity_memory_bytes,
90 capacity_pids: args.capacity_pids,
91 }
92 }
93}
94
95#[derive(Clone)]
96pub struct RuntimeService {
97 settings: Arc<Settings>,
98 repo: Repository,
99 capabilities: Arc<RuntimeCapabilities>,
100 ledger: Arc<ResourceLedger>,
101 started_at: chrono::DateTime<Utc>,
102 dispatcher_notify: Arc<Notify>,
103}
104
105impl RuntimeService {
106 pub async fn new(settings: Settings) -> AppResult<Self> {
107 let started_at = Utc::now();
108 fs::create_dir_all(&settings.data_dir)?;
109 fs::create_dir_all(&settings.tasks_dir)?;
110 let repo = Repository::new(settings.database_path.clone());
111 repo.init()?;
112 let capabilities = probe_runtime_capabilities(&CapabilityProbeInput {
113 runtime_id: settings.runtime_id.clone(),
114 data_dir: settings.data_dir.clone(),
115 cgroup_root: settings.cgroup_root.clone(),
116 max_running_tasks: settings.max_running_tasks,
117 disable_linux_sandbox: settings.disable_linux_sandbox,
118 disable_cgroup: settings.disable_cgroup,
119 capacity_memory_bytes: settings.capacity_memory_bytes,
120 capacity_pids: settings.capacity_pids,
121 });
122 let ledger = ResourceLedger::new(capabilities.resources.capacity.clone());
123 Ok(Self {
124 settings: Arc::new(settings),
125 repo,
126 capabilities: Arc::new(capabilities),
127 ledger: Arc::new(ledger),
128 started_at,
129 dispatcher_notify: Arc::new(Notify::new()),
130 })
131 }
132
133 pub fn settings(&self) -> Arc<Settings> {
134 self.settings.clone()
135 }
136
137 pub fn repo(&self) -> &Repository {
138 &self.repo
139 }
140
141 pub fn capabilities(&self) -> Arc<RuntimeCapabilities> {
142 self.capabilities.clone()
143 }
144
145 pub async fn submit_task(&self, request: SubmitTaskRequest) -> AppResult<SubmitTaskResponse> {
146 request.validate()?;
147 let execution_plan = resolve_execution_plan(
148 &request,
149 &self.capabilities,
150 self.settings.default_capability_mode,
151 )?;
152 let requested_reservation = TaskResourceReservation::from_limits(&request.limits);
153 self.ledger.ensure_within_capacity(&requested_reservation)?;
154
155 if self.repo.count_accepted()? >= self.settings.max_queued_tasks as u64 {
156 return Err(AppError::QueueFull);
157 }
158
159 let task_id = request.task_id.clone().unwrap_or_else(generate_task_id);
160 let control_context = request.control_context.clone();
161 let task_dir = self.settings.tasks_dir.join(&task_id);
162 let workspace_dir = resolve_workspace_dir(&task_dir, &request.sandbox)?;
163 let request_path = task_dir.join("request.json");
164 let result_path = task_dir.join("result.json");
165 let stdout_path = task_dir.join("stdout.log");
166 let stderr_path = task_dir.join("stderr.log");
167 let script_path = if matches!(request.execution.kind, ExecutionKind::Script) {
168 Some(
169 task_dir.join(infer_script_name(
170 request
171 .execution
172 .interpreter
173 .as_ref()
174 .and_then(|items| items.first())
175 .map(String::as_str),
176 )),
177 )
178 } else {
179 None
180 };
181
182 fs::create_dir_all(&workspace_dir)?;
183 fs::create_dir_all(&task_dir)?;
184 write_json_file(&request_path, &request)?;
185 touch_file(&stdout_path)?;
186 touch_file(&stderr_path)?;
187
188 self.repo.insert_task(&crate::repo::NewTaskRecord {
189 task_id: task_id.clone(),
190 request,
191 task_dir,
192 workspace_dir,
193 request_path,
194 result_path,
195 stdout_path,
196 stderr_path,
197 script_path,
198 execution_plan,
199 control_context,
200 })?;
201 self.dispatcher_notify.notify_one();
202
203 Ok(SubmitTaskResponse {
204 handle_id: task_id.clone(),
205 task_id,
206 status: TaskStatus::Accepted,
207 })
208 }
209
210 pub async fn get_task_status(&self, task_id: &str) -> AppResult<TaskStatusResponse> {
211 let task = self.repo.get_task(task_id)?;
212 build_status_response(&task)
213 }
214
215 pub async fn get_events(&self, task_id: &str) -> AppResult<Vec<EventRecord>> {
216 self.repo.list_events(task_id)
217 }
218
219 pub async fn kill_task(&self, task_id: &str) -> AppResult<TaskStatusResponse> {
220 let task = self.repo.get_task(task_id)?;
221 if task.status.is_terminal() {
222 return build_status_response(&task);
223 }
224
225 let updated = self.repo.set_cancel_requested(task_id)?;
226 if updated.status == TaskStatus::Accepted {
227 self.repo.cancel_accepted_task(
228 task_id,
229 RuntimeErrorInfo {
230 code: ErrorCode::Cancelled,
231 message: "task cancelled before execution".into(),
232 details: None,
233 },
234 )?;
235 } else {
236 signal_task_termination(&updated, Signal::SIGTERM)?;
237 self.spawn_escalation(task_id.to_string(), updated.pgid);
238 }
239 self.dispatcher_notify.notify_one();
240 self.get_task_status(task_id).await
241 }
242
243 pub async fn ready(&self) -> AppResult<()> {
244 self.repo.init()?;
245 Ok(())
246 }
247
248 pub async fn metrics(&self) -> impl IntoResponse {
249 match self.repo.metrics_snapshot() {
250 Ok(snapshot) => (
251 StatusCode::OK,
252 [("content-type", "text/plain; version=0.0.4")],
253 render_prometheus(&snapshot),
254 )
255 .into_response(),
256 Err(err) => (
257 StatusCode::INTERNAL_SERVER_ERROR,
258 [("content-type", "text/plain")],
259 format!("metrics_error {err}\n"),
260 )
261 .into_response(),
262 }
263 }
264
265 pub async fn runtime_info(&self) -> RuntimeInfoResponse {
266 RuntimeInfoResponse {
267 runtime_id: self.settings.runtime_id.clone(),
268 version: env!("CARGO_PKG_VERSION").to_string(),
269 started_at: self.started_at,
270 snapshot_version: self.capabilities.snapshot_version.clone(),
271 platform: self.capabilities.platform.clone(),
272 }
273 }
274
275 pub async fn runtime_capabilities(&self) -> RuntimeCapabilities {
276 (*self.capabilities).clone()
277 }
278
279 pub async fn runtime_config(&self) -> RuntimeConfigResponse {
280 RuntimeConfigResponse {
281 runtime_id: self.settings.runtime_id.clone(),
282 listen_addr: self.settings.listen_addr.clone(),
283 data_dir: self.settings.data_dir.to_string_lossy().to_string(),
284 max_running_tasks: self.settings.max_running_tasks,
285 max_queued_tasks: self.settings.max_queued_tasks,
286 termination_grace_ms: self.settings.termination_grace.as_millis() as u64,
287 result_retention_secs: self.settings.result_retention.as_secs(),
288 gc_interval_ms: self.settings.gc_interval.as_millis() as u64,
289 dispatch_poll_interval_ms: self.settings.dispatch_poll_interval.as_millis() as u64,
290 cgroup_root: self.settings.cgroup_root.to_string_lossy().to_string(),
291 default_capability_mode: self.settings.default_capability_mode,
292 cgroup_enabled: !self.settings.disable_cgroup,
293 }
294 }
295
296 pub async fn runtime_resources(&self) -> AppResult<RuntimeResourcesResponse> {
297 let active_tasks = self.repo.list_active_reservations()?;
298 let reservations: Vec<TaskResourceReservation> = active_tasks
299 .iter()
300 .filter_map(|task| task.reservation.clone())
301 .collect();
302 let reserved = self.ledger.reserved_capacity(reservations.iter());
303 let available = self.ledger.available_capacity(&reserved);
304 let active_reservations = active_tasks
305 .into_iter()
306 .filter_map(|task| {
307 task.reservation.map(|reservation| ActiveTaskReservation {
308 task_id: task.task_id,
309 status: task.status,
310 reservation,
311 reserved_at: task.reserved_at,
312 })
313 })
314 .collect();
315
316 Ok(RuntimeResourcesResponse {
317 runtime_id: self.settings.runtime_id.clone(),
318 capacity: self.ledger.capacity().clone(),
319 reserved,
320 available,
321 active_reservations,
322 accepted_waiting_tasks: self.repo.count_accepted_waiting()?,
323 })
324 }
325
326 pub fn start_background_loops(&self) {
327 let dispatcher_service = self.clone();
328 tokio::spawn(async move {
329 dispatcher_service.dispatcher_loop().await;
330 });
331
332 let gc_service = self.clone();
333 tokio::spawn(async move {
334 gc_service.gc_loop().await;
335 });
336 }
337
338 pub async fn recover(&self) -> AppResult<()> {
339 for task in self.repo.list_non_terminal()? {
340 match task.status {
341 TaskStatus::Accepted => {
342 if task.has_active_reservation() {
343 self.repo.release_resources(
344 &task.task_id,
345 "orphan accepted-task reservation released during recovery",
346 )?;
347 }
348 }
349 TaskStatus::Running => {
350 if let Some(shim_pid) = task.shim_pid {
351 if process_exists(shim_pid as i32) {
352 if !task.has_active_reservation() {
353 let reservation =
354 TaskResourceReservation::from_limits(&task.limits);
355 self.repo.reserve_resources(
356 &task.task_id,
357 &reservation,
358 "resource reservation reconstructed during recovery",
359 )?;
360 }
361 self.repo.mark_recovered(&task.task_id)?;
362 } else {
363 self.repo.mark_recovery_lost(&task.task_id)?;
364 persist_latest_result(&self.repo, &task.task_id)?;
365 }
366 } else {
367 self.repo.mark_recovery_lost(&task.task_id)?;
368 persist_latest_result(&self.repo, &task.task_id)?;
369 }
370 }
371 TaskStatus::Success | TaskStatus::Failed | TaskStatus::Cancelled => {}
372 }
373 }
374 self.dispatcher_notify.notify_one();
375 Ok(())
376 }
377
378 async fn dispatcher_loop(&self) {
379 loop {
380 if let Err(err) = self.dispatch_once().await {
381 warn!(error = %err, "dispatcher iteration failed");
382 }
383 tokio::select! {
384 _ = self.dispatcher_notify.notified() => {},
385 _ = sleep(self.settings.dispatch_poll_interval) => {},
386 }
387 }
388 }
389
390 async fn dispatch_once(&self) -> AppResult<()> {
391 let active_reservations = self.repo.list_active_reservations()?;
392 let mut current_reserved = self.ledger.reserved_capacity(
393 active_reservations
394 .iter()
395 .filter_map(|task| task.reservation.as_ref()),
396 );
397 let tasks = self.repo.list_accepted(self.settings.max_queued_tasks)?;
398 for task in tasks {
399 if task.kill_requested {
400 self.repo.cancel_accepted_task(
401 &task.task_id,
402 RuntimeErrorInfo {
403 code: ErrorCode::Cancelled,
404 message: "task cancelled before execution".into(),
405 details: None,
406 },
407 )?;
408 persist_latest_result(&self.repo, &task.task_id)?;
409 continue;
410 }
411
412 if task.has_active_reservation() {
413 continue;
414 }
415
416 let reservation = task
417 .reservation
418 .clone()
419 .unwrap_or_else(|| TaskResourceReservation::from_limits(&task.limits));
420 if !self.ledger.can_reserve(¤t_reserved, &reservation) {
421 continue;
422 }
423
424 self.repo
425 .reserve_resources(&task.task_id, &reservation, "task resources reserved")?;
426 add_reservation(&mut current_reserved, &reservation);
427
428 let exe = std::env::current_exe()
429 .map_err(|err| AppError::LaunchFailed(format!("resolve current exe: {err}")))?;
430 let mut child = StdCommand::new(exe);
431 child
432 .arg("internal-shim")
433 .arg("--database")
434 .arg(self.settings.database_path.as_os_str())
435 .arg("--data-dir")
436 .arg(self.settings.data_dir.as_os_str())
437 .arg("--task-id")
438 .arg(&task.task_id)
439 .arg("--termination-grace-ms")
440 .arg(self.settings.termination_grace.as_millis().to_string())
441 .arg("--cgroup-root")
442 .arg(self.settings.cgroup_root.as_os_str())
443 .stdin(Stdio::null())
444 .stdout(Stdio::null())
445 .stderr(Stdio::null());
446
447 match child.spawn() {
448 Ok(handle) => {
449 self.repo.mark_dispatched(&task.task_id, handle.id())?;
450 info!(task_id = %task.task_id, shim_pid = handle.id(), "task dispatched");
451 }
452 Err(err) => {
453 let update = CompletionUpdate {
454 status: TaskStatus::Failed,
455 finished_at: Utc::now(),
456 duration_ms: Some(0),
457 exit_code: None,
458 exit_signal: None,
459 error: Some(RuntimeErrorInfo {
460 code: ErrorCode::LaunchFailed,
461 message: format!("failed to spawn shim: {err}"),
462 details: None,
463 }),
464 usage: None,
465 result_json: None,
466 };
467 self.repo.complete_task(&task.task_id, &update)?;
468 subtract_reservation(&mut current_reserved, &reservation);
469 persist_latest_result(&self.repo, &task.task_id)?;
470 }
471 }
472 }
473 Ok(())
474 }
475
476 async fn gc_loop(&self) {
477 loop {
478 sleep(self.settings.gc_interval).await;
479 let cutoff = match chrono::Duration::from_std(self.settings.result_retention) {
480 Ok(duration) => Utc::now() - duration,
481 Err(_) => Utc::now(),
482 };
483 match self.repo.list_gc_candidates(cutoff) {
484 Ok(tasks) => {
485 for task in tasks {
486 if let Err(err) = fs::remove_dir_all(&task.task_dir) {
487 if err.kind() != std::io::ErrorKind::NotFound {
488 warn!(task_id = %task.task_id, error = %err, "failed to remove task directory during gc");
489 continue;
490 }
491 }
492 if let Err(err) = self.repo.delete_task(&task.task_id) {
493 warn!(task_id = %task.task_id, error = %err, "failed to delete task row during gc");
494 }
495 }
496 }
497 Err(err) => warn!(error = %err, "gc iteration failed"),
498 }
499 }
500 }
501
502 fn spawn_escalation(&self, task_id: String, pgid: Option<i32>) {
503 let repo = self.repo.clone();
504 let grace = self.settings.termination_grace;
505 tokio::spawn(async move {
506 sleep(grace).await;
507 if let Ok(task) = repo.get_task(&task_id) {
508 if task.status == TaskStatus::Running {
509 if let Some(pgid) = pgid.or(task.pgid) {
510 let _ = killpg(Pid::from_raw(pgid), Signal::SIGKILL);
511 } else if let Some(pid) = task.pid {
512 let _ = kill(Pid::from_raw(pid as i32), Signal::SIGKILL);
513 }
514 }
515 }
516 });
517 }
518}
519
520pub async fn run(cli: Cli) -> AppResult<()> {
521 init_tracing();
522 match cli.command {
523 Command::Serve(args) => run_server(args).await,
524 Command::Submit(args) => submit_remote(args).await,
525 Command::Status(args) => status_remote(args).await,
526 Command::Kill(args) => kill_remote(args).await,
527 Command::Wait(args) => wait_remote(args).await,
528 Command::Run(args) => run_remote(args).await,
529 Command::InternalShim(args) => run_internal_shim(args).await,
530 }
531}
532
533async fn run_server(args: ServeArgs) -> AppResult<()> {
534 let service = RuntimeService::new(Settings::from_args(&args)).await?;
535 service.recover().await?;
536 service.start_background_loops();
537
538 let listener = tokio::net::TcpListener::bind(&service.settings.listen_addr)
539 .await
540 .map_err(|err| AppError::Internal(format!("bind failed: {err}")))?;
541 info!(listen_addr = %service.settings.listen_addr, "execgo-runtime listening");
542 axum::serve(listener, build_router(service))
543 .await
544 .map_err(|err| AppError::Internal(format!("server error: {err}")))
545}
546
547async fn submit_remote(args: RemoteTaskArgs) -> AppResult<()> {
548 let client = http_client();
549 let request = load_request(&args)?;
550 let response = client
551 .post(format!("{}/api/v1/tasks", trim_server(&args.server)))
552 .json(&request)
553 .send()
554 .await?;
555 print_json_response(response).await
556}
557
558async fn status_remote(args: StatusArgs) -> AppResult<()> {
559 let client = http_client();
560 let response = client
561 .get(format!(
562 "{}/api/v1/tasks/{}",
563 trim_server(&args.server),
564 args.task_id
565 ))
566 .send()
567 .await?;
568 print_json_response(response).await
569}
570
571async fn kill_remote(args: StatusArgs) -> AppResult<()> {
572 let client = http_client();
573 let response = client
574 .post(format!(
575 "{}/api/v1/tasks/{}/kill",
576 trim_server(&args.server),
577 args.task_id
578 ))
579 .send()
580 .await?;
581 print_json_response(response).await
582}
583
584async fn wait_remote(args: WaitArgs) -> AppResult<()> {
585 let client = http_client();
586 let start = Instant::now();
587 loop {
588 let response = client
589 .get(format!(
590 "{}/api/v1/tasks/{}",
591 trim_server(&args.server),
592 args.task_id
593 ))
594 .send()
595 .await?;
596
597 if !response.status().is_success() {
598 return print_json_response(response).await;
599 }
600
601 let payload: TaskStatusResponse = response.json().await?;
602 if payload.status.is_terminal() {
603 println!("{}", serde_json::to_string_pretty(&payload)?);
604 return Ok(());
605 }
606 if let Some(timeout) = args.timeout() {
607 if start.elapsed() >= timeout {
608 return Err(AppError::Internal("wait timeout exceeded".into()));
609 }
610 }
611 sleep(Duration::from_millis(args.poll_interval_ms)).await;
612 }
613}
614
615async fn run_remote(args: RemoteTaskArgs) -> AppResult<()> {
616 let request = load_request(&args)?;
617 let client = http_client();
618 let response = client
619 .post(format!("{}/api/v1/tasks", trim_server(&args.server)))
620 .json(&request)
621 .send()
622 .await?;
623 if !response.status().is_success() {
624 return print_json_response(response).await;
625 }
626 let payload: SubmitTaskResponse = response.json().await?;
627 wait_remote(WaitArgs {
628 server: args.server,
629 task_id: payload.task_id,
630 timeout_ms: args.timeout_ms,
631 poll_interval_ms: args.poll_interval_ms,
632 })
633 .await
634}
635
636async fn run_internal_shim(args: InternalShimArgs) -> AppResult<()> {
637 let repo = Repository::new(args.database.clone());
638 repo.init()?;
639 let mut task = repo.get_task(&args.task_id)?;
640 if task.status.is_terminal() {
641 return Ok(());
642 }
643 if task.kill_requested && task.pid.is_none() {
644 repo.complete_task(
645 &task.task_id,
646 &CompletionUpdate {
647 status: TaskStatus::Cancelled,
648 finished_at: Utc::now(),
649 duration_ms: Some(0),
650 exit_code: None,
651 exit_signal: None,
652 error: Some(RuntimeErrorInfo {
653 code: ErrorCode::Cancelled,
654 message: "task cancelled before process launch".into(),
655 details: None,
656 }),
657 usage: None,
658 result_json: None,
659 },
660 )?;
661 persist_latest_result(&repo, &task.task_id)?;
662 return Ok(());
663 }
664
665 let execution_plan = task
666 .execution_plan
667 .clone()
668 .unwrap_or_else(|| legacy_execution_plan(&task));
669
670 match spawn_task_process(&task, &execution_plan, &args.cgroup_root) {
671 Ok(spawned) => {
672 repo.mark_started(
673 &task.task_id,
674 spawned.pid,
675 spawned.pgid,
676 spawned.script_path.as_deref(),
677 )?;
678 task = repo.get_task(&task.task_id)?;
679 let wait_handle = tokio::task::spawn_blocking(move || wait_for_pid(spawned.pid as i32));
680 let outcome = supervise_wait(
681 &repo,
682 &task,
683 wait_handle,
684 args.termination_grace_ms,
685 execution_plan.resource_enforcement.wall_time_ms,
686 spawned.pgid,
687 spawned.cgroup_dir.as_deref(),
688 )
689 .await?;
690 repo.complete_task(&task.task_id, &outcome.completion)?;
691 persist_latest_result(&repo, &task.task_id)?;
692 }
693 Err(err) => {
694 repo.complete_task(
695 &task.task_id,
696 &CompletionUpdate {
697 status: TaskStatus::Failed,
698 finished_at: Utc::now(),
699 duration_ms: Some(0),
700 exit_code: None,
701 exit_signal: None,
702 error: Some(match err {
703 AppError::SandboxSetup(message) => RuntimeErrorInfo {
704 code: ErrorCode::SandboxSetupFailed,
705 message,
706 details: None,
707 },
708 AppError::LaunchFailed(message) => RuntimeErrorInfo {
709 code: ErrorCode::LaunchFailed,
710 message,
711 details: None,
712 },
713 other => RuntimeErrorInfo {
714 code: ErrorCode::Internal,
715 message: other.to_string(),
716 details: None,
717 },
718 }),
719 usage: None,
720 result_json: None,
721 },
722 )?;
723 persist_latest_result(&repo, &task.task_id)?;
724 }
725 }
726 Ok(())
727}
728
729#[derive(Debug)]
730struct SpawnedProcess {
731 pid: u32,
732 pgid: i32,
733 script_path: Option<PathBuf>,
734 cgroup_dir: Option<PathBuf>,
735}
736
737#[derive(Debug)]
738struct WaitOutcome {
739 completion: CompletionUpdate,
740}
741
742fn spawn_task_process(
743 task: &TaskRecord,
744 execution_plan: &ExecutionPlan,
745 _cgroup_root: &Path,
746) -> AppResult<SpawnedProcess> {
747 let stdout_file = open_output_file(&task.stdout_path)?;
748 let stderr_file = open_output_file(&task.stderr_path)?;
749 let (mut command, script_path) = build_command(task, stdout_file, stderr_file)?;
750
751 let resource_enforcement = execution_plan.resource_enforcement.clone();
752 let sandbox = execution_plan.effective_sandbox.clone();
753 let _rootfs = sandbox.rootfs.clone();
754 unsafe {
755 command.pre_exec(move || {
756 setpgid(Pid::from_raw(0), Pid::from_raw(0)).map_err(nix_to_io)?;
757 apply_resource_enforcement(&resource_enforcement).map_err(nix_to_io)?;
758 #[cfg(target_os = "linux")]
759 apply_linux_sandbox(&sandbox, _rootfs.as_deref()).map_err(nix_to_io)?;
760 Ok(())
761 });
762 }
763
764 let child = command
765 .spawn()
766 .map_err(|err| AppError::LaunchFailed(format!("spawn process: {err}")))?;
767 let pid = child.id();
768 let pgid = pid as i32;
769 drop(child);
770
771 let cgroup_dir = if execution_plan.resource_enforcement.cgroup_enforced {
772 #[cfg(target_os = "linux")]
773 {
774 let dir = setup_cgroup(
775 _cgroup_root,
776 &task.task_id,
777 pid as i32,
778 &execution_plan.resource_enforcement,
779 )
780 .map_err(|err| AppError::SandboxSetup(format!("configure cgroup: {err}")))?;
781 Some(dir)
782 }
783 #[cfg(not(target_os = "linux"))]
784 {
785 None
786 }
787 } else {
788 None
789 };
790
791 Ok(SpawnedProcess {
792 pid,
793 pgid,
794 script_path,
795 cgroup_dir,
796 })
797}
798
799fn build_command(
800 task: &TaskRecord,
801 stdout_file: File,
802 stderr_file: File,
803) -> AppResult<(StdCommand, Option<PathBuf>)> {
804 let env = minimal_env(&task.execution.env);
805 let workspace = task.workspace_dir.clone();
806 fs::create_dir_all(&workspace)?;
807
808 let (mut command, script_path) = match task.execution.kind {
809 ExecutionKind::Command => {
810 let program = task
811 .execution
812 .program
813 .as_deref()
814 .ok_or_else(|| AppError::InvalidInput("execution.program missing".into()))?;
815 let mut cmd = StdCommand::new(program);
816 cmd.args(&task.execution.args);
817 (cmd, None)
818 }
819 ExecutionKind::Script => {
820 let script = task
821 .execution
822 .script
823 .as_ref()
824 .ok_or_else(|| AppError::InvalidInput("execution.script missing".into()))?;
825 let path = task
826 .script_path
827 .clone()
828 .unwrap_or_else(|| task.task_dir.join("script.sh"));
829 write_script_file(&path, script)?;
830
831 let cmd = if let Some(interpreter) = &task.execution.interpreter {
832 let mut command = StdCommand::new(&interpreter[0]);
833 command.args(&interpreter[1..]);
834 command.arg(&path);
835 command
836 } else {
837 let mut command = StdCommand::new("/bin/sh");
838 command.arg("-c").arg(script);
839 command
840 };
841 (cmd, Some(path))
842 }
843 };
844
845 command
846 .stdin(Stdio::null())
847 .stdout(Stdio::from(stdout_file))
848 .stderr(Stdio::from(stderr_file))
849 .current_dir(workspace)
850 .env_clear();
851
852 for (key, value) in env {
853 command.env(key, value);
854 }
855 Ok((command, script_path))
856}
857
858async fn supervise_wait(
859 repo: &Repository,
860 task: &TaskRecord,
861 wait_handle: JoinHandle<AppResult<WaitUsage>>,
862 termination_grace_ms: u64,
863 wall_time_ms: u64,
864 pgid: i32,
865 cgroup_dir: Option<&Path>,
866) -> AppResult<WaitOutcome> {
867 let start = Instant::now();
868 let mut wait_handle = std::pin::pin!(wait_handle);
869 let mut poll = tokio::time::interval(Duration::from_millis(250));
870 let mut timeout_started: Option<Instant> = None;
871 let mut cancel_started: Option<Instant> = None;
872 let mut term_sent = false;
873 let mut kill_sent = false;
874
875 loop {
876 tokio::select! {
877 result = &mut wait_handle => {
878 let usage = result
879 .map_err(|err| AppError::Internal(format!("wait join error: {err}")))??;
880 let duration_ms = start.elapsed().as_millis() as u64;
881 let cancel_requested = cancel_started.is_some() || repo.is_cancel_requested(&task.task_id)?;
882 return Ok(WaitOutcome {
883 completion: classify_completion(
884 task,
885 usage,
886 duration_ms,
887 timeout_started.is_some(),
888 cancel_requested,
889 cgroup_dir,
890 ),
891 });
892 }
893 _ = poll.tick() => {
894 let cancel_requested = repo.is_cancel_requested(&task.task_id)?;
895 if timeout_started.is_none() && start.elapsed() >= Duration::from_millis(wall_time_ms) {
896 repo.mark_timeout_triggered(&task.task_id)?;
897 timeout_started = Some(Instant::now());
898 }
899 if cancel_requested && cancel_started.is_none() {
900 cancel_started = Some(Instant::now());
901 }
902
903 if (timeout_started.is_some() || cancel_started.is_some()) && !term_sent {
904 let _ = killpg(Pid::from_raw(pgid), Signal::SIGTERM);
905 term_sent = true;
906 }
907
908 let should_escalate = timeout_started
909 .map(|started| started.elapsed() >= Duration::from_millis(termination_grace_ms))
910 .unwrap_or(false)
911 || cancel_started
912 .map(|started| started.elapsed() >= Duration::from_millis(termination_grace_ms))
913 .unwrap_or(false);
914
915 if should_escalate && !kill_sent {
916 let _ = killpg(Pid::from_raw(pgid), Signal::SIGKILL);
917 kill_sent = true;
918 }
919 }
920 }
921 }
922}
923
924#[derive(Debug)]
925struct WaitUsage {
926 exit_code: Option<i32>,
927 exit_signal: Option<i32>,
928 user_cpu_ms: Option<u64>,
929 system_cpu_ms: Option<u64>,
930 max_rss_bytes: Option<u64>,
931}
932
933fn classify_completion(
934 _task: &TaskRecord,
935 usage: WaitUsage,
936 duration_ms: u64,
937 timed_out: bool,
938 cancelled: bool,
939 cgroup_dir: Option<&Path>,
940) -> CompletionUpdate {
941 let mut usage_payload = ResourceUsage {
942 duration_ms,
943 user_cpu_ms: usage.user_cpu_ms,
944 system_cpu_ms: usage.system_cpu_ms,
945 max_rss_bytes: usage.max_rss_bytes,
946 memory_peak_bytes: read_memory_peak_bytes(cgroup_dir),
947 };
948
949 let (status, error) = if timed_out {
950 (
951 TaskStatus::Failed,
952 Some(RuntimeErrorInfo {
953 code: ErrorCode::Timeout,
954 message: "task exceeded wall_time_ms".into(),
955 details: None,
956 }),
957 )
958 } else if cancelled {
959 (
960 TaskStatus::Cancelled,
961 Some(RuntimeErrorInfo {
962 code: ErrorCode::Cancelled,
963 message: "task cancelled".into(),
964 details: None,
965 }),
966 )
967 } else if oom_killed(cgroup_dir) {
968 (
969 TaskStatus::Failed,
970 Some(RuntimeErrorInfo {
971 code: ErrorCode::MemoryLimitExceeded,
972 message: "task exceeded memory limit".into(),
973 details: None,
974 }),
975 )
976 } else if usage.exit_signal == Some(libc::SIGXCPU) {
977 (
978 TaskStatus::Failed,
979 Some(RuntimeErrorInfo {
980 code: ErrorCode::CpuLimitExceeded,
981 message: "task exceeded cpu_time_sec".into(),
982 details: None,
983 }),
984 )
985 } else if usage.exit_code == Some(0) {
986 (TaskStatus::Success, None)
987 } else if let Some(code) = usage.exit_code {
988 (
989 TaskStatus::Failed,
990 Some(RuntimeErrorInfo {
991 code: ErrorCode::ExitNonZero,
992 message: format!("task exited with code {code}"),
993 details: None,
994 }),
995 )
996 } else if let Some(signal) = usage.exit_signal {
997 (
998 TaskStatus::Failed,
999 Some(RuntimeErrorInfo {
1000 code: ErrorCode::Internal,
1001 message: format!("task terminated by signal {signal}"),
1002 details: None,
1003 }),
1004 )
1005 } else {
1006 (
1007 TaskStatus::Failed,
1008 Some(RuntimeErrorInfo {
1009 code: ErrorCode::Internal,
1010 message: "task failed with unknown outcome".into(),
1011 details: None,
1012 }),
1013 )
1014 };
1015
1016 if usage_payload.duration_ms == 0 {
1017 usage_payload.duration_ms = duration_ms;
1018 }
1019
1020 CompletionUpdate {
1021 status,
1022 finished_at: Utc::now(),
1023 duration_ms: Some(duration_ms),
1024 exit_code: usage.exit_code,
1025 exit_signal: usage.exit_signal,
1026 error,
1027 usage: Some(usage_payload),
1028 result_json: None,
1029 }
1030}
1031
1032fn build_status_response(task: &TaskRecord) -> AppResult<TaskStatusResponse> {
1033 let (stdout, stdout_truncated) = read_output_preview(&task.stdout_path, task.stdout_max_bytes)?;
1034 let (stderr, stderr_truncated) = read_output_preview(&task.stderr_path, task.stderr_max_bytes)?;
1035 let duration_ms = task.duration_ms.or_else(|| {
1036 task.started_at
1037 .map(|started_at| (Utc::now() - started_at).num_milliseconds().max(0) as u64)
1038 });
1039 Ok(TaskStatusResponse {
1040 task_id: task.task_id.clone(),
1041 handle_id: task.handle_id.clone(),
1042 status: task.status.clone(),
1043 created_at: task.created_at,
1044 updated_at: task.updated_at,
1045 started_at: task.started_at,
1046 finished_at: task.finished_at,
1047 duration_ms,
1048 shim_pid: task.shim_pid,
1049 pid: task.pid,
1050 pgid: task.pgid,
1051 exit_code: task.exit_code,
1052 exit_signal: task.exit_signal,
1053 stdout,
1054 stderr,
1055 stdout_truncated,
1056 stderr_truncated,
1057 error: task.error.clone(),
1058 usage: task.usage.clone().or_else(|| {
1059 duration_ms.map(|value| ResourceUsage {
1060 duration_ms: value,
1061 user_cpu_ms: None,
1062 system_cpu_ms: None,
1063 max_rss_bytes: None,
1064 memory_peak_bytes: None,
1065 })
1066 }),
1067 execution_plan: task
1068 .execution_plan
1069 .clone()
1070 .or_else(|| Some(legacy_execution_plan(task))),
1071 reservation: task.reservation.clone(),
1072 artifacts: TaskArtifacts {
1073 task_dir: task.task_dir.to_string_lossy().to_string(),
1074 request_path: task.request_path.to_string_lossy().to_string(),
1075 result_path: task.result_path.to_string_lossy().to_string(),
1076 stdout_path: task.stdout_path.to_string_lossy().to_string(),
1077 stderr_path: task.stderr_path.to_string_lossy().to_string(),
1078 script_path: task
1079 .script_path
1080 .as_ref()
1081 .map(|path| path.to_string_lossy().to_string()),
1082 },
1083 metadata: task.metadata.clone(),
1084 })
1085}
1086
1087fn legacy_execution_plan(task: &TaskRecord) -> ExecutionPlan {
1088 ExecutionPlan::legacy(task.sandbox.clone(), task.limits.clone())
1089}
1090
1091fn add_reservation(current: &mut ResourceCapacity, reservation: &TaskResourceReservation) {
1092 current.task_slots = current.task_slots.saturating_add(reservation.task_slots);
1093 if let Some(value) = reservation.memory_bytes {
1094 current.memory_bytes = Some(current.memory_bytes.unwrap_or(0).saturating_add(value));
1095 }
1096 if let Some(value) = reservation.pids {
1097 current.pids = Some(current.pids.unwrap_or(0).saturating_add(value));
1098 }
1099}
1100
1101fn subtract_reservation(current: &mut ResourceCapacity, reservation: &TaskResourceReservation) {
1102 current.task_slots = current.task_slots.saturating_sub(reservation.task_slots);
1103 if let Some(value) = reservation.memory_bytes {
1104 current.memory_bytes = current
1105 .memory_bytes
1106 .map(|reserved| reserved.saturating_sub(value));
1107 }
1108 if let Some(value) = reservation.pids {
1109 current.pids = current.pids.map(|reserved| reserved.saturating_sub(value));
1110 }
1111}
1112
1113fn persist_latest_result(repo: &Repository, task_id: &str) -> AppResult<()> {
1114 let task = repo.get_task(task_id)?;
1115 let response = build_status_response(&task)?;
1116 write_json_file(&task.result_path, &response)?;
1117 Ok(())
1118}
1119
1120fn signal_task_termination(task: &TaskRecord, signal: Signal) -> AppResult<()> {
1121 if let Some(pgid) = task.pgid {
1122 killpg(Pid::from_raw(pgid), signal)
1123 .map_err(|err| AppError::Internal(format!("signal process group: {err}")))?;
1124 } else if let Some(pid) = task.pid {
1125 kill(Pid::from_raw(pid as i32), signal)
1126 .map_err(|err| AppError::Internal(format!("signal process: {err}")))?;
1127 }
1128 Ok(())
1129}
1130
1131fn infer_script_name(interpreter: Option<&str>) -> &'static str {
1132 match interpreter.unwrap_or_default() {
1133 value if value.contains("python") => "script.py",
1134 value if value.contains("bash") => "script.sh",
1135 value if value.contains("zsh") => "script.zsh",
1136 value if value.contains("node") => "script.js",
1137 _ => "script.sh",
1138 }
1139}
1140
1141fn write_script_file(path: &Path, script: &str) -> AppResult<()> {
1142 if let Some(parent) = path.parent() {
1143 fs::create_dir_all(parent)?;
1144 }
1145 let mut file = OpenOptions::new()
1146 .create(true)
1147 .truncate(true)
1148 .write(true)
1149 .mode(0o700)
1150 .open(path)?;
1151 file.write_all(script.as_bytes())?;
1152 Ok(())
1153}
1154
1155fn read_output_preview(path: &Path, max_bytes: u64) -> AppResult<(String, bool)> {
1156 match File::open(path) {
1157 Ok(mut file) => {
1158 let len = file.metadata()?.len();
1159 let mut buffer = vec![0; max_bytes as usize];
1160 let read = file.read(&mut buffer)?;
1161 buffer.truncate(read);
1162 Ok((
1163 String::from_utf8_lossy(&buffer).to_string(),
1164 len > max_bytes,
1165 ))
1166 }
1167 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok((String::new(), false)),
1168 Err(err) => Err(AppError::Io(err)),
1169 }
1170}
1171
1172fn write_json_file(path: &Path, value: &impl serde::Serialize) -> AppResult<()> {
1173 if let Some(parent) = path.parent() {
1174 fs::create_dir_all(parent)?;
1175 }
1176 let json = serde_json::to_vec_pretty(value)?;
1177 fs::write(path, json)?;
1178 Ok(())
1179}
1180
1181fn touch_file(path: &Path) -> AppResult<()> {
1182 if let Some(parent) = path.parent() {
1183 fs::create_dir_all(parent)?;
1184 }
1185 let _ = OpenOptions::new().create(true).append(true).open(path)?;
1186 Ok(())
1187}
1188
1189fn open_output_file(path: &Path) -> AppResult<File> {
1190 if let Some(parent) = path.parent() {
1191 fs::create_dir_all(parent)?;
1192 }
1193 OpenOptions::new()
1194 .create(true)
1195 .truncate(true)
1196 .write(true)
1197 .mode(0o644)
1198 .open(path)
1199 .map_err(AppError::Io)
1200}
1201
1202fn minimal_env(extra: &std::collections::HashMap<String, String>) -> BTreeMap<String, String> {
1203 let mut env = BTreeMap::new();
1204 for key in ["PATH", "HOME", "LANG", "TMPDIR", "USER"] {
1205 if let Ok(value) = std::env::var(key) {
1206 env.insert(key.to_string(), value);
1207 }
1208 }
1209 env.extend(
1210 extra
1211 .iter()
1212 .map(|(key, value)| (key.clone(), value.clone())),
1213 );
1214 env
1215}
1216
1217fn apply_resource_enforcement(enforcement: &ResourceEnforcementPlan) -> nix::Result<()> {
1218 if enforcement.cpu_time_enforced {
1219 if let Some(cpu_time_sec) = enforcement.cpu_time_sec {
1220 setrlimit(
1221 Resource::RLIMIT_CPU,
1222 cpu_time_sec as rlim_t,
1223 cpu_time_sec as rlim_t,
1224 )?;
1225 }
1226 }
1227 if enforcement.memory_enforced {
1228 if let Some(memory_bytes) = enforcement.memory_bytes {
1229 setrlimit(
1230 Resource::RLIMIT_AS,
1231 memory_bytes as rlim_t,
1232 memory_bytes as rlim_t,
1233 )?;
1234 }
1235 }
1236 Ok(())
1237}
1238
1239#[allow(dead_code)]
1240fn apply_rlimits(limits: &crate::types::ResourceLimits) -> nix::Result<()> {
1241 if let Some(cpu_time_sec) = limits.cpu_time_sec {
1242 setrlimit(
1243 Resource::RLIMIT_CPU,
1244 cpu_time_sec as rlim_t,
1245 cpu_time_sec as rlim_t,
1246 )?;
1247 }
1248 if let Some(memory_bytes) = limits.memory_bytes {
1249 setrlimit(
1250 Resource::RLIMIT_AS,
1251 memory_bytes as rlim_t,
1252 memory_bytes as rlim_t,
1253 )?;
1254 }
1255 Ok(())
1256}
1257
1258#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
1259#[cfg(target_os = "linux")]
1260fn apply_linux_sandbox(
1261 sandbox: &crate::types::SandboxPolicy,
1262 rootfs: Option<&str>,
1263) -> nix::Result<()> {
1264 use nix::sched::{unshare, CloneFlags};
1265
1266 if matches!(sandbox.profile, crate::types::SandboxProfile::LinuxSandbox) {
1267 let namespaces = sandbox.effective_namespaces();
1268 let mut flags = CloneFlags::empty();
1269 if namespaces.mount {
1270 flags |= CloneFlags::CLONE_NEWNS;
1271 }
1272 if namespaces.pid {
1273 flags |= CloneFlags::CLONE_NEWPID;
1274 }
1275 if namespaces.uts {
1276 flags |= CloneFlags::CLONE_NEWUTS;
1277 }
1278 if namespaces.ipc {
1279 flags |= CloneFlags::CLONE_NEWIPC;
1280 }
1281 if namespaces.net {
1282 flags |= CloneFlags::CLONE_NEWNET;
1283 }
1284 if !flags.is_empty() {
1285 unshare(flags)?;
1286 }
1287 if sandbox.chroot {
1288 if let Some(root) = rootfs {
1289 chroot(root)?;
1290 chdir("/")?;
1291 }
1292 }
1293 }
1294 Ok(())
1295}
1296
1297#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
1298#[cfg(not(target_os = "linux"))]
1299fn apply_linux_sandbox(
1300 _sandbox: &crate::types::SandboxPolicy,
1301 _rootfs: Option<&str>,
1302) -> nix::Result<()> {
1303 Ok(())
1304}
1305
1306#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
1307#[cfg(target_os = "linux")]
1308fn setup_cgroup(
1309 cgroup_root: &Path,
1310 task_id: &str,
1311 pid: i32,
1312 enforcement: &ResourceEnforcementPlan,
1313) -> AppResult<PathBuf> {
1314 let dir = cgroup_root.join(task_id);
1315 fs::create_dir_all(&dir)?;
1316 if enforcement.memory_enforced {
1317 if let Some(memory_bytes) = enforcement.memory_bytes {
1318 fs::write(dir.join("memory.max"), memory_bytes.to_string())?;
1319 }
1320 }
1321 if enforcement.pids_enforced {
1322 if let Some(pids_max) = enforcement.pids_max {
1323 fs::write(dir.join("pids.max"), pids_max.to_string())?;
1324 }
1325 }
1326 fs::write(dir.join("cgroup.procs"), pid.to_string())?;
1327 Ok(dir)
1328}
1329
1330#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
1331#[cfg(not(target_os = "linux"))]
1332fn setup_cgroup(
1333 _cgroup_root: &Path,
1334 _task_id: &str,
1335 _pid: i32,
1336 _enforcement: &ResourceEnforcementPlan,
1337) -> AppResult<PathBuf> {
1338 Err(AppError::SandboxSetup(
1339 "linux-sandbox requires a Linux host".into(),
1340 ))
1341}
1342
1343fn read_memory_peak_bytes(_cgroup_dir: Option<&Path>) -> Option<u64> {
1344 #[cfg(target_os = "linux")]
1345 {
1346 let cgroup_dir = _cgroup_dir;
1347 if let Some(cgroup_dir) = cgroup_dir {
1348 let path = cgroup_dir.join("memory.peak");
1349 if let Ok(value) = fs::read_to_string(path) {
1350 return value.trim().parse::<u64>().ok();
1351 }
1352 }
1353 }
1354 None
1355}
1356
1357fn oom_killed(_cgroup_dir: Option<&Path>) -> bool {
1358 #[cfg(target_os = "linux")]
1359 {
1360 let cgroup_dir = _cgroup_dir;
1361 if let Some(cgroup_dir) = cgroup_dir {
1362 let path = cgroup_dir.join("memory.events");
1363 if let Ok(contents) = fs::read_to_string(path) {
1364 for line in contents.lines() {
1365 let mut parts = line.split_whitespace();
1366 if matches!(parts.next(), Some("oom_kill"))
1367 && parts
1368 .next()
1369 .and_then(|value| value.parse::<u64>().ok())
1370 .unwrap_or_default()
1371 > 0
1372 {
1373 return true;
1374 }
1375 }
1376 }
1377 }
1378 }
1379 false
1380}
1381
1382fn wait_for_pid(pid: i32) -> AppResult<WaitUsage> {
1383 let mut status: libc::c_int = 0;
1384 let mut usage = std::mem::MaybeUninit::<libc::rusage>::zeroed();
1385 let wait_result = loop {
1386 let rc = unsafe { libc::wait4(pid, &mut status, 0, usage.as_mut_ptr()) };
1387 if rc == -1 {
1388 let err = std::io::Error::last_os_error();
1389 if err.kind() == std::io::ErrorKind::Interrupted {
1390 continue;
1391 }
1392 return Err(AppError::Io(err));
1393 }
1394 break rc;
1395 };
1396
1397 if wait_result <= 0 {
1398 return Err(AppError::Internal("wait4 returned no child".into()));
1399 }
1400
1401 let usage = unsafe { usage.assume_init() };
1402 let exit_code = if libc::WIFEXITED(status) {
1403 Some(libc::WEXITSTATUS(status))
1404 } else {
1405 None
1406 };
1407 let exit_signal = if libc::WIFSIGNALED(status) {
1408 Some(libc::WTERMSIG(status))
1409 } else {
1410 None
1411 };
1412
1413 Ok(WaitUsage {
1414 exit_code,
1415 exit_signal,
1416 user_cpu_ms: Some(timeval_to_ms(usage.ru_utime)),
1417 system_cpu_ms: Some(timeval_to_ms(usage.ru_stime)),
1418 max_rss_bytes: Some(convert_max_rss(usage.ru_maxrss)),
1419 })
1420}
1421
1422fn timeval_to_ms(tv: libc::timeval) -> u64 {
1423 (tv.tv_sec.max(0) as u64)
1424 .saturating_mul(1000)
1425 .saturating_add((tv.tv_usec.max(0) as u64) / 1000)
1426}
1427
1428#[cfg(target_os = "linux")]
1429fn convert_max_rss(value: libc::c_long) -> u64 {
1430 (value.max(0) as u64).saturating_mul(1024)
1431}
1432
1433#[cfg(not(target_os = "linux"))]
1434fn convert_max_rss(value: libc::c_long) -> u64 {
1435 value.max(0) as u64
1436}
1437
1438fn load_request(args: &RemoteTaskArgs) -> AppResult<SubmitTaskRequest> {
1439 let raw = if let Some(file) = &args.file {
1440 fs::read_to_string(file)?
1441 } else {
1442 args.json.clone().unwrap_or_default()
1443 };
1444 Ok(serde_json::from_str(&raw)?)
1445}
1446
1447fn http_client() -> Client {
1448 Client::builder().build().unwrap_or_else(|_| Client::new())
1449}
1450
1451fn trim_server(server: &str) -> &str {
1452 server.trim_end_matches('/')
1453}
1454
1455fn default_runtime_id(listen_addr: &str) -> String {
1456 let host = std::env::var("HOSTNAME")
1457 .ok()
1458 .filter(|value| !value.trim().is_empty())
1459 .unwrap_or_else(|| "execgo-runtime".into());
1460 format!(
1461 "{}-{}",
1462 sanitize_runtime_id(&host),
1463 sanitize_runtime_id(listen_addr)
1464 )
1465}
1466
1467fn sanitize_runtime_id(value: &str) -> String {
1468 value
1469 .chars()
1470 .map(|ch| {
1471 if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') {
1472 ch
1473 } else {
1474 '-'
1475 }
1476 })
1477 .collect()
1478}
1479
1480async fn print_json_response(response: reqwest::Response) -> AppResult<()> {
1481 let status = response.status();
1482 let body = response.text().await?;
1483 println!("{body}");
1484 if status.is_success() {
1485 Ok(())
1486 } else {
1487 Err(AppError::Internal(format!(
1488 "request failed with status {status}"
1489 )))
1490 }
1491}
1492
1493fn process_exists(pid: i32) -> bool {
1494 if pid <= 0 {
1495 return false;
1496 }
1497 kill(Pid::from_raw(pid), None).is_ok()
1498}
1499
1500fn nix_to_io(err: nix::Error) -> std::io::Error {
1501 std::io::Error::other(err.to_string())
1502}
1503
1504fn init_tracing() {
1505 let _ = tracing_subscriber::fmt()
1506 .with_env_filter(
1507 tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()),
1508 )
1509 .json()
1510 .try_init();
1511}
1512
1513#[cfg(test)]
1514mod tests {
1515 use std::collections::{BTreeMap, HashMap};
1516
1517 use tempfile::TempDir;
1518
1519 use super::*;
1520 use crate::types::{ExecutionKind, ExecutionSpec, ResourceLimits, SandboxPolicy};
1521
1522 #[test]
1523 fn minimal_env_keeps_common_keys_and_overrides() {
1524 let mut extra = HashMap::new();
1525 extra.insert("FOO".to_string(), "bar".to_string());
1526 let env = minimal_env(&extra);
1527 assert_eq!(env.get("FOO").map(String::as_str), Some("bar"));
1528 }
1529
1530 #[tokio::test]
1531 async fn status_response_reads_inline_output() {
1532 let temp = TempDir::new().unwrap();
1533 let stdout_path = temp.path().join("stdout.log");
1534 let stderr_path = temp.path().join("stderr.log");
1535 fs::write(&stdout_path, "hello world").unwrap();
1536 fs::write(&stderr_path, "oops").unwrap();
1537
1538 let task = TaskRecord {
1539 task_id: "t1".into(),
1540 handle_id: "t1".into(),
1541 status: TaskStatus::Success,
1542 execution: ExecutionSpec {
1543 kind: ExecutionKind::Command,
1544 program: Some("echo".into()),
1545 args: vec![],
1546 script: None,
1547 interpreter: None,
1548 env: HashMap::new(),
1549 },
1550 limits: ResourceLimits::default(),
1551 sandbox: SandboxPolicy::default(),
1552 metadata: BTreeMap::new(),
1553 created_at: Utc::now(),
1554 updated_at: Utc::now(),
1555 started_at: None,
1556 finished_at: None,
1557 duration_ms: Some(1),
1558 shim_pid: None,
1559 pid: None,
1560 pgid: None,
1561 exit_code: Some(0),
1562 exit_signal: None,
1563 error_code: None,
1564 error: None,
1565 usage: None,
1566 task_dir: temp.path().to_path_buf(),
1567 workspace_dir: temp.path().join("workspace"),
1568 request_path: temp.path().join("request.json"),
1569 result_path: temp.path().join("result.json"),
1570 stdout_path,
1571 stderr_path,
1572 script_path: None,
1573 stdout_max_bytes: 1024,
1574 stderr_max_bytes: 1024,
1575 kill_requested: false,
1576 kill_requested_at: None,
1577 timeout_triggered: false,
1578 result_json: None,
1579 execution_plan: None,
1580 control_context: None,
1581 reservation: None,
1582 reserved_at: None,
1583 released_at: None,
1584 };
1585
1586 let response = build_status_response(&task).unwrap();
1587 assert_eq!(response.stdout, "hello world");
1588 assert_eq!(response.stderr, "oops");
1589 }
1590
1591 #[test]
1592 fn infer_script_name_follows_interpreter() {
1593 assert_eq!(infer_script_name(Some("python3")), "script.py");
1594 assert_eq!(infer_script_name(Some("bash")), "script.sh");
1595 }
1596}