1use crate::error::JudgeCoreError;
2use crate::judge::common::run_checker;
3use crate::judge::result::{check_user_result, JudgeVerdict};
4use crate::run::executor::Executor;
5use crate::run::process_listener::{ProcessExitMessage, ProcessListener};
6use crate::run::sandbox::{RawRunResultInfo, Sandbox, SCRIPT_LIMIT_CONFIG};
7use crate::utils::get_pathbuf_str;
8
9use nix::errno::Errno;
10use nix::fcntl::{fcntl, FcntlArg, OFlag};
11use nix::sys::epoll::{
12 epoll_create1, epoll_ctl, epoll_wait, EpollCreateFlags, EpollEvent, EpollFlags, EpollOp,
13};
14use nix::unistd::{pipe, read, write};
15use std::fs::File;
16use std::os::unix::io::{AsRawFd, RawFd};
17use std::path::PathBuf;
18use std::time::Duration;
19
20use super::result::JudgeResultInfo;
21use super::JudgeConfig;
22
23const USER_EXIT_SIGNAL: u8 = 41u8;
24const INTERACTOR_EXIT_SIGNAL: u8 = 42u8;
25
26fn set_fd_non_blocking(fd: RawFd) -> Result<libc::c_int, JudgeCoreError> {
27 log::debug!("Setting fd={} to non blocking", fd);
28 Ok(fcntl(fd, FcntlArg::F_SETFL(OFlag::O_NONBLOCK))?)
29}
30
31fn pump_proxy_pipe(from: RawFd, to: RawFd, output: RawFd) -> Result<(), JudgeCoreError> {
34 set_fd_non_blocking(from)?;
35
36 let mut buf = [0; 1024];
37 loop {
38 match read(from, &mut buf) {
39 Ok(nread) => {
40 log::debug!("{} read. {} -> {}", nread, from, to);
41 write(to, &buf[..nread])?;
42 write(output, &buf[..nread])?;
43 }
44 Err(e) => {
45 if e == Errno::EAGAIN || e == Errno::EWOULDBLOCK {
46 return Ok(());
47 }
48 panic!("failed to read from pipe");
49 }
50 }
51 }
52}
53
54fn read_string_from_fd(from: RawFd) -> Result<String, JudgeCoreError> {
56 set_fd_non_blocking(from)?;
57
58 let mut res_buf = Vec::new();
59 let mut buf = [0; 1024];
60 log::debug!("Reading from fd={}", from);
61 loop {
62 log::debug!("Reading from fd={}", from);
63 match read(from, &mut buf) {
64 Ok(nread) => {
65 log::debug!("{} read. {}", nread, from);
66 res_buf.extend_from_slice(&buf[..nread]);
67 }
68 Err(e) => {
69 if e == Errno::EAGAIN || e == Errno::EWOULDBLOCK {
70 let buf_string = String::from_utf8(res_buf)?;
71 return Ok(buf_string);
72 }
73 panic!("failed to read from pipe");
74 }
75 }
76 }
77}
78
79fn read_msg_from_fd(from: RawFd) -> Result<ProcessExitMessage, JudgeCoreError> {
80 let buf_string = read_string_from_fd(from as RawFd)?;
81 log::debug!("Raw Result info: {}", buf_string);
82 let msg: ProcessExitMessage = serde_json::from_str(&buf_string)?;
83 Ok(msg)
84}
85
86fn add_epoll_fd(epoll_fd: RawFd, fd: RawFd) -> Result<(), JudgeCoreError> {
87 let mut event = EpollEvent::new(EpollFlags::EPOLLIN, fd as u64);
88 log::debug!("Adding fd={} to epoll", fd);
89 Ok(epoll_ctl(
90 epoll_fd,
91 EpollOp::EpollCtlAdd,
92 fd,
93 Some(&mut event),
94 )?)
95}
96
97pub fn run_interact(
98 config: &JudgeConfig,
99 mut interactor_executor: Executor,
100 output_path: &String,
101) -> Result<Option<JudgeResultInfo>, JudgeCoreError> {
102 log::debug!("Creating epoll");
103 let epoll_fd = epoll_create1(EpollCreateFlags::EPOLL_CLOEXEC)?;
104
105 log::debug!("Creating interact pipes");
106 let (proxy_read_user, user_write_proxy) = pipe()?;
107 let (proxy_read_interactor, interactor_write_proxy) = pipe()?;
108 let (user_read_proxy, proxy_write_user) = pipe()?;
109 let (interactor_read_proxy, proxy_write_interactor) = pipe()?;
110
111 log::debug!("Adding read proxy fds to epoll");
112 add_epoll_fd(epoll_fd, proxy_read_user)?;
113 add_epoll_fd(epoll_fd, proxy_read_interactor)?;
114
115 log::debug!("Creating exit report pipes with epoll");
116 let (user_exit_read, user_exit_write) = pipe()?;
117 let (interactor_exit_read, interactor_exit_write) = pipe()?;
118 add_epoll_fd(epoll_fd, user_exit_read)?;
119 add_epoll_fd(epoll_fd, interactor_exit_read)?;
120
121 let mut user_listener = ProcessListener::new()?;
122 let mut interact_listener = ProcessListener::new()?;
123 user_listener.setup_exit_report(user_exit_write, USER_EXIT_SIGNAL);
124 interact_listener.setup_exit_report(interactor_exit_write, INTERACTOR_EXIT_SIGNAL);
125
126 if !PathBuf::from(&output_path).exists() {
127 File::create(output_path)?;
128 }
129 let output_file = File::options()
130 .write(true)
131 .truncate(true) .open(output_path)?;
133 let output_raw_fd: RawFd = output_file.as_raw_fd();
134
135 let mut user_sandbox = Sandbox::new(
136 config.program.executor.clone(),
137 config.runtime.rlimit_configs.clone(),
138 Some(user_read_proxy),
139 Some(user_write_proxy),
140 true,
141 )?;
142 user_listener.spawn_with_sandbox(&mut user_sandbox)?;
143
144 let first_args: String = String::from("");
145 let interact_args = vec![
146 first_args,
147 get_pathbuf_str(&config.test_data.input_file_path)?,
148 get_pathbuf_str(&config.program.output_file_path)?,
149 get_pathbuf_str(&config.test_data.answer_file_path)?,
150 ];
151 interactor_executor.set_additional_args(interact_args);
152 let mut interact_sandbox = Sandbox::new(
153 interactor_executor,
154 SCRIPT_LIMIT_CONFIG,
155 Some(interactor_read_proxy),
156 Some(interactor_write_proxy),
157 false,
158 )?;
159 interact_listener.spawn_with_sandbox(&mut interact_sandbox)?;
160
161 log::debug!("Starting epoll");
162 let mut events = [EpollEvent::empty(); 128];
163 let mut user_exited = false;
164 let mut interactor_exited = false;
165 let mut option_user_result: Option<RawRunResultInfo> = None;
166 loop {
167 let num_events = epoll_wait(epoll_fd, &mut events, -1)?;
168 log::debug!("{} events found!", num_events);
169
170 for event in events.iter().take(num_events) {
171 log::debug!("Event: {:?}", event);
172 let fd = event.data() as RawFd;
173 if fd == user_exit_read {
174 log::debug!("{:?} user fd exited", fd);
175 user_exited = true;
176 let exit_msg = read_msg_from_fd(fd)?;
177 option_user_result = exit_msg.option_run_result;
178 }
179 if fd == interactor_exit_read {
180 log::debug!("{:?} interactor fd exited", fd);
181 interactor_exited = true;
182 let _interactor_result: ProcessExitMessage = read_msg_from_fd(fd)?;
183 }
184 if fd == proxy_read_user {
185 log::debug!("proxy_read_user {} fd read", fd);
186 pump_proxy_pipe(proxy_read_user, proxy_write_interactor, output_raw_fd)?;
187 }
188 if fd == proxy_read_interactor {
189 log::debug!("proxy_read_interactor {} fd read", fd);
190 pump_proxy_pipe(proxy_read_interactor, proxy_write_user, output_raw_fd)?;
191 }
192 }
193 if user_exited && interactor_exited {
194 log::debug!("Both user and interactor exited");
195 break;
196 }
197 }
198 log::debug!("Epoll finished!");
199
200 if let Some(user_result) = option_user_result {
201 let option_user_verdict = check_user_result(&user_result);
202 if let Some(verdict) = option_user_verdict {
203 return Ok(Some(JudgeResultInfo {
204 verdict,
205 time_usage: user_result.real_time_cost,
206 memory_usage_bytes: user_result.resource_usage.max_rss,
207 exit_status: user_result.exit_status,
208 checker_exit_status: 0,
209 }));
210 }
211 log::debug!("Running checker process");
212 if let Some(_checker_executor) = config.checker.executor.clone() {
213 let (verdict, checker_exit_status) = run_checker(config)?;
214 Ok(Some(JudgeResultInfo {
215 verdict,
216 time_usage: user_result.real_time_cost,
217 memory_usage_bytes: user_result.resource_usage.max_rss,
218 exit_status: user_result.exit_status,
219 checker_exit_status,
220 }))
221 } else {
222 Err(JudgeCoreError::AnyhowError(anyhow::anyhow!(
223 "Checker path is not provided"
224 )))
225 }
226 } else {
227 Ok(Some(JudgeResultInfo {
229 verdict: JudgeVerdict::IdlenessLimitExceeded,
230 time_usage: Duration::new(0, 0),
231 memory_usage_bytes: 0,
232 exit_status: 0,
233 checker_exit_status: 0,
234 }))
235 }
236}
237
238#[cfg(test)]
239pub mod interact_judge_test {
240 use std::path::PathBuf;
241
242 use crate::{
243 compiler::Language,
244 judge::{
245 result::JudgeVerdict, CheckerConfig, JudgeConfig, ProgramConfig, RuntimeConfig,
246 TestDataConfig,
247 },
248 run::{executor::Executor, sandbox::RlimitConfigs},
249 };
250
251 use super::run_interact;
252
253 const TEST_CONFIG: RlimitConfigs = RlimitConfigs {
254 stack_limit: Some((64 * 1024 * 1024, 64 * 1024 * 1024)),
255 as_limit: Some((64 * 1024 * 1024, 64 * 1024 * 1024)),
256 cpu_limit: Some((1, 2)),
257 nproc_limit: Some((1, 1)),
258 fsize_limit: Some((1024, 1024)),
259 };
260
261 fn init() {
262 let _ = env_logger::builder()
263 .filter_level(log::LevelFilter::Debug)
264 .try_init();
265 }
266
267 fn build_test_config(program_executor: Executor) -> JudgeConfig {
268 JudgeConfig {
269 runtime: RuntimeConfig {
270 rlimit_configs: TEST_CONFIG,
271 },
272 test_data: TestDataConfig {
273 input_file_path: PathBuf::from("../tmp/in"),
274 answer_file_path: PathBuf::from("../tmp/ans"),
275 },
276 checker: CheckerConfig {
277 executor: Some(
278 Executor::new(
279 Language::Cpp,
280 PathBuf::from("./../test-collection/dist/checkers/lcmp"),
281 )
282 .unwrap(),
283 ),
284 output_file_path: PathBuf::from("../tmp/check"),
285 },
286 program: ProgramConfig {
287 executor: program_executor,
288 output_file_path: PathBuf::from("../tmp/out"),
289 },
290 }
291 }
292
293 #[test]
294 fn test_run_interact() {
295 init();
296
297 let interactor_executor = Executor::new(
298 Language::Cpp,
299 PathBuf::from("./../test-collection/dist/checkers/interactor-echo"),
300 )
301 .unwrap();
302 let program_executor = Executor::new(
303 Language::Cpp,
304 PathBuf::from("./../test-collection/dist/programs/read_and_write"),
305 )
306 .unwrap();
307 let runner_config = build_test_config(program_executor);
308 let result = run_interact(
309 &runner_config,
310 interactor_executor,
311 &String::from("../tmp/interactor"),
312 );
313 match result {
314 Ok(Some(result)) => {
315 log::debug!("{:?}", result);
316 assert!(result.verdict == JudgeVerdict::Accepted);
317 }
318 Ok(None) => {
319 log::debug!("Ignoring this result, for it's from a fork child process");
320 }
321 Err(e) => {
322 log::error!("meet error: {:?}", e);
323 assert!(false);
324 }
325 }
326 }
327}