1use std::{
2 io,
3 path::Path,
4 process::{self, Stdio},
5 sync::Arc,
6 time::Duration,
7};
8
9use cached::proc_macro::cached;
10use cgroups_rs::{Cgroup, CgroupPid, cgroup_builder::CgroupBuilder, hierarchies};
11use tokio::{
12 io::{AsyncReadExt, AsyncWriteExt},
13 process::Command,
14 time::{Instant, sleep},
15};
16
17use crate::{CommandArgs, ExitStatus, Result, metrics::Metrics};
18
19#[cached(result = true)]
20fn create_cgroup(memory_limit: i64, process_count_limit: usize) -> Result<Cgroup> {
21 let cgroup_name = format!("runner/{}-{}", memory_limit, process_count_limit);
22 let hier = hierarchies::auto();
23 let cgroup = CgroupBuilder::new(&cgroup_name)
24 .memory()
25 .memory_swap_limit(memory_limit)
26 .memory_soft_limit(memory_limit)
27 .memory_hard_limit(memory_limit)
28 .done()
29 .pid()
30 .maximum_number_of_processes(cgroups_rs::MaxValue::Value(process_count_limit as i64))
31 .done()
32 .build(hier)?;
33
34 Ok(cgroup)
35}
36
37#[derive(Debug)]
38pub struct Runner<'a> {
39 pub args: CommandArgs<'a>,
40 pub project_path: &'a Path,
41 pub time_limit: Duration,
42 pub cgroup: Arc<Cgroup>,
43}
44
45impl<'a> Runner<'a> {
46 #[tracing::instrument(err)]
47 pub fn new(
48 args: CommandArgs<'a>,
49 project_path: &'a Path,
50 time_limit: Duration,
51 memory_limit: i64,
52 process_count_limit: usize,
53 ) -> Result<Self> {
54 let cgroup = create_cgroup(memory_limit, process_count_limit)?;
55
56 Ok(Self {
57 args,
58 project_path,
59 cgroup: Arc::new(cgroup),
60 time_limit,
61 })
62 }
63
64 #[tracing::instrument(err)]
65 pub async fn run(&self, input: &[u8]) -> Result<Metrics> {
66 let CommandArgs { binary, args } = self.args;
67
68 let cgroup = self.cgroup.clone();
69
70 let mut child = Command::new(binary);
71 let child = child
72 .current_dir(self.project_path)
73 .args(args)
74 .stdin(Stdio::piped())
75 .stdout(Stdio::piped())
76 .stderr(Stdio::piped());
77 let child = unsafe {
78 child.pre_exec(move || {
79 cgroup
80 .add_task_by_tgid(CgroupPid::from(process::id() as u64))
81 .map_err(std::io::Error::other)
82 })
83 };
84 let start = Instant::now();
85 let mut child = child.spawn()?;
86 let mut stdin = child.stdin.take().unwrap();
87 let mut stdout = child.stdout.take().unwrap();
88 let mut stderr = child.stderr.take().unwrap();
89
90 let stdout_observer = async move {
91 let mut buffer = Vec::new();
92 stdout.read_to_end(&mut buffer).await?;
93
94 Ok::<_, io::Error>(buffer)
95 };
96 let stderr_observer = async move {
97 let mut buffer = Vec::new();
98 stderr.read_to_end(&mut buffer).await?;
99 Ok::<_, io::Error>(buffer)
100 };
101
102 let exit_status = tokio::select! {
103 exit_status = async {
104 stdin.write_all(input).await?;
105 let exit_status = child.wait().await?;
106
107 Ok::<_, io::Error>(exit_status)
108 } => {
109 exit_status.map(|raw| raw.into())
110 }
111 _ = sleep(self.time_limit) => {
112 child.kill().await?;
113 child.wait().await?;
114
115 Ok(ExitStatus::Timeout)
116 }
117 }?;
118
119 let (stdout, stderr) = tokio::try_join!(stdout_observer, stderr_observer)?;
120
121 Ok(Metrics {
122 exit_status,
123 stdout,
124 stderr,
125 run_time: start.elapsed(),
126 })
127 }
128}