code_executor/
runner.rs

1use std::{
2    io,
3    path::Path,
4    process::{self, Stdio},
5    sync::Arc,
6    time::Duration,
7};
8
9use cached::proc_macro::cached;
10use cgroups_rs::{Cgroup, CgroupPid, cgroup_builder::CgroupBuilder, hierarchies};
11use tokio::{
12    io::{AsyncReadExt, AsyncWriteExt},
13    process::Command,
14    time::{Instant, sleep},
15};
16
17use crate::{CommandArgs, ExitStatus, Result, metrics::Metrics};
18
19#[cached(result = true)]
20fn create_cgroup(memory_limit: i64, process_count_limit: usize) -> Result<Cgroup> {
21    let cgroup_name = format!("runner/{}-{}", memory_limit, process_count_limit);
22    let hier = hierarchies::auto();
23    let cgroup = CgroupBuilder::new(&cgroup_name)
24        .memory()
25        .memory_swap_limit(memory_limit)
26        .memory_soft_limit(memory_limit)
27        .memory_hard_limit(memory_limit)
28        .done()
29        .pid()
30        .maximum_number_of_processes(cgroups_rs::MaxValue::Value(process_count_limit as i64))
31        .done()
32        .build(hier)?;
33
34    Ok(cgroup)
35}
36
37#[derive(Debug)]
38pub struct Runner<'a> {
39    pub args: CommandArgs<'a>,
40    pub project_path: &'a Path,
41    pub time_limit: Duration,
42    pub cgroup: Arc<Cgroup>,
43}
44
45impl<'a> Runner<'a> {
46    #[tracing::instrument(err)]
47    pub fn new(
48        args: CommandArgs<'a>,
49        project_path: &'a Path,
50        time_limit: Duration,
51        memory_limit: i64,
52        process_count_limit: usize,
53    ) -> Result<Self> {
54        let cgroup = create_cgroup(memory_limit, process_count_limit)?;
55
56        Ok(Self {
57            args,
58            project_path,
59            cgroup: Arc::new(cgroup),
60            time_limit,
61        })
62    }
63
64    #[tracing::instrument(err)]
65    pub async fn run(&self, input: &[u8]) -> Result<Metrics> {
66        let CommandArgs { binary, args } = self.args;
67
68        let cgroup = self.cgroup.clone();
69
70        let mut child = Command::new(binary);
71        let child = child
72            .current_dir(self.project_path)
73            .args(args)
74            .stdin(Stdio::piped())
75            .stdout(Stdio::piped())
76            .stderr(Stdio::piped());
77        let child = unsafe {
78            child.pre_exec(move || {
79                cgroup
80                    .add_task_by_tgid(CgroupPid::from(process::id() as u64))
81                    .map_err(std::io::Error::other)
82            })
83        };
84        let start = Instant::now();
85        let mut child = child.spawn()?;
86        let mut stdin = child.stdin.take().unwrap();
87        let mut stdout = child.stdout.take().unwrap();
88        let mut stderr = child.stderr.take().unwrap();
89
90        let stdout_observer = async move {
91            let mut buffer = Vec::new();
92            stdout.read_to_end(&mut buffer).await?;
93
94            Ok::<_, io::Error>(buffer)
95        };
96        let stderr_observer = async move {
97            let mut buffer = Vec::new();
98            stderr.read_to_end(&mut buffer).await?;
99            Ok::<_, io::Error>(buffer)
100        };
101
102        let exit_status = tokio::select! {
103            exit_status = async {
104                stdin.write_all(input).await?;
105                let exit_status = child.wait().await?;
106
107                Ok::<_, io::Error>(exit_status)
108            } => {
109                exit_status.map(|raw| raw.into())
110            }
111            _ = sleep(self.time_limit) => {
112                child.kill().await?;
113                child.wait().await?;
114
115                Ok(ExitStatus::Timeout)
116            }
117        }?;
118
119        let (stdout, stderr) = tokio::try_join!(stdout_observer, stderr_observer)?;
120
121        Ok(Metrics {
122            exit_status,
123            stdout,
124            stderr,
125            run_time: start.elapsed(),
126        })
127    }
128}