1use std::collections::HashMap;
4use std::fs;
5use std::sync::Arc;
6use std::sync::Mutex;
7
8use anyhow::Context;
9use anyhow::Result;
10use anyhow::bail;
11use crankshaft::config::backend;
12use crankshaft::engine::Task;
13use crankshaft::engine::service::name::GeneratorIterator;
14use crankshaft::engine::service::name::UniqueAlphanumeric;
15use crankshaft::engine::service::runner::Backend;
16use crankshaft::engine::service::runner::backend::docker;
17use crankshaft::engine::task::Execution;
18use crankshaft::engine::task::Input;
19use crankshaft::engine::task::Output;
20use crankshaft::engine::task::Resources;
21use crankshaft::engine::task::input::Contents;
22use crankshaft::engine::task::input::Type as InputType;
23use crankshaft::engine::task::output::Type as OutputType;
24use crankshaft::events::Event;
25use nonempty::NonEmpty;
26use tokio::sync::broadcast;
27use tokio::sync::oneshot;
28use tokio::sync::oneshot::Receiver;
29use tokio_util::sync::CancellationToken;
30use tracing::info;
31use tracing::warn;
32use url::Url;
33
34use super::TaskExecutionBackend;
35use super::TaskExecutionConstraints;
36use super::TaskExecutionResult;
37use super::TaskManager;
38use super::TaskManagerRequest;
39use super::TaskSpawnRequest;
40use crate::COMMAND_FILE_NAME;
41use crate::ONE_GIBIBYTE;
42use crate::PrimitiveValue;
43use crate::STDERR_FILE_NAME;
44use crate::STDOUT_FILE_NAME;
45use crate::Value;
46use crate::WORK_DIR_NAME;
47use crate::backend::INITIAL_EXPECTED_NAMES;
48use crate::config::Config;
49use crate::config::DEFAULT_TASK_SHELL;
50use crate::config::DockerBackendConfig;
51use crate::config::TaskResourceLimitBehavior;
52use crate::path::EvaluationPath;
53use crate::v1::container;
54use crate::v1::cpu;
55use crate::v1::max_cpu;
56use crate::v1::max_memory;
57use crate::v1::memory;
58
59const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs/";
61
62const GUEST_WORK_DIR: &str = "/mnt/task/work";
64
65const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
67
68const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
70
71const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
73
74#[derive(Debug)]
77struct DockerTaskRequest {
78 config: Arc<Config>,
80 inner: TaskSpawnRequest,
82 backend: Arc<docker::Backend>,
84 name: String,
86 container: String,
88 cpu: f64,
90 memory: u64,
92 max_cpu: Option<f64>,
94 max_memory: Option<u64>,
96 token: CancellationToken,
98}
99
100impl TaskManagerRequest for DockerTaskRequest {
101 fn cpu(&self) -> f64 {
102 self.cpu
103 }
104
105 fn memory(&self) -> u64 {
106 self.memory
107 }
108
109 async fn run(self) -> Result<TaskExecutionResult> {
110 let work_dir = self.inner.attempt_dir().join(WORK_DIR_NAME);
112 fs::create_dir_all(&work_dir).with_context(|| {
113 format!(
114 "failed to create directory `{path}`",
115 path = work_dir.display()
116 )
117 })?;
118
119 #[cfg(unix)]
123 {
124 use std::fs::Permissions;
125 use std::fs::set_permissions;
126 use std::os::unix::fs::PermissionsExt;
127 set_permissions(&work_dir, Permissions::from_mode(0o770)).with_context(|| {
128 format!(
129 "failed to set permissions for work directory `{path}`",
130 path = work_dir.display()
131 )
132 })?;
133 }
134
135 let command_path = self.inner.attempt_dir().join(COMMAND_FILE_NAME);
138 fs::write(&command_path, self.inner.command()).with_context(|| {
139 format!(
140 "failed to write command contents to `{path}`",
141 path = command_path.display()
142 )
143 })?;
144
145 let mut inputs = Vec::with_capacity(self.inner.inputs().len() + 2);
148 for input in self.inner.inputs().iter() {
149 let guest_path = input.guest_path().expect("input should have guest path");
150 let local_path = input.local_path().expect("input should be localized");
151
152 if !local_path.exists() {
154 bail!(
155 "cannot mount input `{path}` as it does not exist",
156 path = local_path.display()
157 );
158 }
159
160 inputs.push(
161 Input::builder()
162 .path(guest_path.as_str())
163 .contents(Contents::Path(local_path.into()))
164 .ty(input.kind())
165 .read_only(true)
166 .build(),
167 );
168 }
169
170 inputs.push(
172 Input::builder()
173 .path(GUEST_WORK_DIR)
174 .contents(Contents::Path(work_dir.to_path_buf()))
175 .ty(InputType::Directory)
176 .read_only(false)
177 .build(),
178 );
179
180 inputs.push(
182 Input::builder()
183 .path(GUEST_COMMAND_PATH)
184 .contents(Contents::Path(command_path.to_path_buf()))
185 .ty(InputType::File)
186 .read_only(true)
187 .build(),
188 );
189
190 let stdout_path = self.inner.attempt_dir().join(STDOUT_FILE_NAME);
191 let stderr_path = self.inner.attempt_dir().join(STDERR_FILE_NAME);
192
193 let outputs = vec![
194 Output::builder()
195 .path(GUEST_STDOUT_PATH)
196 .url(Url::from_file_path(&stdout_path).expect("path should be absolute"))
197 .ty(OutputType::File)
198 .build(),
199 Output::builder()
200 .path(GUEST_STDERR_PATH)
201 .url(Url::from_file_path(&stderr_path).expect("path should be absolute"))
202 .ty(OutputType::File)
203 .build(),
204 ];
205
206 let task = Task::builder()
207 .name(self.name)
208 .executions(NonEmpty::new(
209 Execution::builder()
210 .image(self.container)
211 .program(
212 self.config
213 .task
214 .shell
215 .as_deref()
216 .unwrap_or(DEFAULT_TASK_SHELL),
217 )
218 .args([GUEST_COMMAND_PATH.to_string()])
219 .work_dir(GUEST_WORK_DIR)
220 .env(self.inner.env().clone())
221 .stdout(GUEST_STDOUT_PATH)
222 .stderr(GUEST_STDERR_PATH)
223 .build(),
224 ))
225 .inputs(inputs)
226 .outputs(outputs)
227 .resources(
228 Resources::builder()
229 .cpu(self.cpu)
230 .maybe_cpu_limit(self.max_cpu)
231 .ram(self.memory as f64 / ONE_GIBIBYTE)
232 .maybe_ram_limit(self.max_memory.map(|m| m as f64 / ONE_GIBIBYTE))
233 .build(),
234 )
235 .build();
236
237 let statuses = self.backend.run(task, self.token.clone())?.await?;
238
239 assert_eq!(statuses.len(), 1, "there should only be one exit status");
240 let status = statuses.first();
241
242 Ok(TaskExecutionResult {
243 exit_code: status.code().expect("should have exit code"),
244 work_dir: EvaluationPath::Local(work_dir),
245 stdout: PrimitiveValue::new_file(
246 stdout_path
247 .into_os_string()
248 .into_string()
249 .expect("path should be UTF-8"),
250 )
251 .into(),
252 stderr: PrimitiveValue::new_file(
253 stderr_path
254 .into_os_string()
255 .into_string()
256 .expect("path should be UTF-8"),
257 )
258 .into(),
259 })
260 }
261}
262
263pub struct DockerBackend {
265 config: Arc<Config>,
267 inner: Arc<docker::Backend>,
269 max_concurrency: u64,
271 max_cpu: u64,
273 max_memory: u64,
275 manager: TaskManager<DockerTaskRequest>,
277 names: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
279}
280
281impl DockerBackend {
282 pub async fn new(
287 config: Arc<Config>,
288 backend_config: &DockerBackendConfig,
289 events: Option<broadcast::Sender<Event>>,
290 ) -> Result<Self> {
291 info!("initializing Docker backend");
292
293 let names = Arc::new(Mutex::new(GeneratorIterator::new(
294 UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
295 INITIAL_EXPECTED_NAMES,
296 )));
297
298 let backend = docker::Backend::initialize_default_with(
299 backend::docker::Config::builder()
300 .cleanup(backend_config.cleanup)
301 .build(),
302 names.clone(),
303 events,
304 )
305 .await
306 .context("failed to initialize Docker backend")?;
307
308 let resources = *backend.resources();
309 let cpu = resources.cpu();
310 let max_cpu = resources.max_cpu();
311 let memory = resources.memory();
312 let max_memory = resources.max_memory();
313
314 let manager = if resources.use_service() {
318 TaskManager::new_unlimited(max_cpu, max_memory)
319 } else {
320 TaskManager::new(cpu, max_cpu, memory, max_memory)
321 };
322
323 Ok(Self {
324 config,
325 inner: Arc::new(backend),
326 max_concurrency: cpu,
327 max_cpu,
328 max_memory,
329 manager,
330 names,
331 })
332 }
333}
334
335impl TaskExecutionBackend for DockerBackend {
336 fn max_concurrency(&self) -> u64 {
337 self.max_concurrency
338 }
339
340 fn constraints(
341 &self,
342 requirements: &HashMap<String, Value>,
343 _: &HashMap<String, Value>,
344 ) -> Result<TaskExecutionConstraints> {
345 let container = container(requirements, self.config.task.container.as_deref());
346
347 let mut cpu = cpu(requirements);
348 if (self.max_cpu as f64) < cpu {
349 let env_specific = if self.config.suppress_env_specific_output {
350 String::new()
351 } else {
352 format!(
353 ", but the execution backend has a maximum of {max_cpu}",
354 max_cpu = self.max_cpu,
355 )
356 };
357 match self.config.task.cpu_limit_behavior {
358 TaskResourceLimitBehavior::TryWithMax => {
359 warn!(
360 "task requires at least {cpu} CPU{s}{env_specific}",
361 s = if cpu == 1.0 { "" } else { "s" },
362 );
363 cpu = self.max_cpu as f64;
365 }
366 TaskResourceLimitBehavior::Deny => {
367 bail!(
368 "task requires at least {cpu} CPU{s}{env_specific}",
369 s = if cpu == 1.0 { "" } else { "s" },
370 );
371 }
372 }
373 }
374
375 let mut memory = memory(requirements)?;
376 if self.max_memory < memory as u64 {
377 let env_specific = if self.config.suppress_env_specific_output {
378 String::new()
379 } else {
380 format!(
381 ", but the execution backend has a maximum of {max_memory} GiB",
382 max_memory = self.max_memory as f64 / ONE_GIBIBYTE,
383 )
384 };
385 match self.config.task.memory_limit_behavior {
386 TaskResourceLimitBehavior::TryWithMax => {
387 warn!(
388 "task requires at least {memory} GiB of memory{env_specific}",
389 memory = memory as f64 / ONE_GIBIBYTE,
391 );
392 memory = self.max_memory.try_into().unwrap_or(i64::MAX);
394 }
395 TaskResourceLimitBehavior::Deny => {
396 bail!(
397 "task requires at least {memory} GiB of memory{env_specific}",
398 memory = memory as f64 / ONE_GIBIBYTE,
400 );
401 }
402 }
403 }
404
405 Ok(TaskExecutionConstraints {
406 container: Some(container.into_owned()),
407 cpu,
408 memory,
409 gpu: Default::default(),
410 fpga: Default::default(),
411 disks: Default::default(),
412 })
413 }
414
415 fn guest_inputs_dir(&self) -> Option<&'static str> {
416 Some(GUEST_INPUTS_DIR)
417 }
418
419 fn needs_local_inputs(&self) -> bool {
420 true
421 }
422
423 fn spawn(
424 &self,
425 request: TaskSpawnRequest,
426 token: CancellationToken,
427 ) -> Result<Receiver<Result<TaskExecutionResult>>> {
428 let (completed_tx, completed_rx) = oneshot::channel();
429
430 let requirements = request.requirements();
431 let hints = request.hints();
432
433 let container = container(requirements, self.config.task.container.as_deref()).into_owned();
434 let mut cpu = cpu(requirements);
435 if let TaskResourceLimitBehavior::TryWithMax = self.config.task.cpu_limit_behavior {
436 cpu = std::cmp::min(cpu.ceil() as u64, self.max_cpu) as f64;
437 }
438 let mut memory = memory(requirements)? as u64;
439 if let TaskResourceLimitBehavior::TryWithMax = self.config.task.memory_limit_behavior {
440 memory = std::cmp::min(memory, self.max_memory);
441 }
442 let max_cpu = max_cpu(hints);
443 let max_memory = max_memory(hints)?.map(|i| i as u64);
444
445 let name = format!(
446 "{id}-{generated}",
447 id = request.id(),
448 generated = self
449 .names
450 .lock()
451 .expect("generator should always acquire")
452 .next()
453 .expect("generator should never be exhausted")
454 );
455 self.manager.send(
456 DockerTaskRequest {
457 config: self.config.clone(),
458 inner: request,
459 backend: self.inner.clone(),
460 name,
461 container,
462 cpu,
463 memory,
464 max_cpu,
465 max_memory,
466 token,
467 },
468 completed_tx,
469 );
470
471 Ok(completed_rx)
472 }
473
474 #[cfg(unix)]
475 fn cleanup<'a>(
476 &'a self,
477 work_dir: &'a EvaluationPath,
478 token: CancellationToken,
479 ) -> Option<futures::future::BoxFuture<'a, ()>> {
480 use futures::FutureExt;
481 use tracing::debug;
482
483 const GUEST_WORK_DIR: &str = "/mnt/work";
485 const CLEANUP_CPU: f64 = 0.1;
487 const CLEANUP_MEMORY: f64 = 0.05;
489
490 let work_dir = work_dir.as_local().expect("path should be local");
492 assert!(work_dir.is_absolute(), "work directory should be absolute");
493
494 let backend = self.inner.clone();
495 let names = self.names.clone();
496
497 Some(
498 async move {
499 let result = async {
500 let (uid, gid) = unsafe { (libc::geteuid(), libc::getegid()) };
501 let ownership = format!("{uid}:{gid}");
502
503 let name = format!(
504 "docker-backend-cleanup-{id}",
505 id = names
506 .lock()
507 .expect("generator should always acquire")
508 .next()
509 .expect("generator should never be exhausted")
510 );
511
512 let task = Task::builder()
513 .name(&name)
514 .executions(NonEmpty::new(
515 Execution::builder()
516 .image("alpine:latest")
517 .program("chown")
518 .args([
519 "-R".to_string(),
520 ownership.clone(),
521 GUEST_WORK_DIR.to_string(),
522 ])
523 .build(),
524 ))
525 .inputs([Input::builder()
526 .path(GUEST_WORK_DIR)
527 .contents(Contents::Path(work_dir.to_path_buf()))
528 .ty(InputType::Directory)
529 .read_only(false)
531 .build()])
532 .resources(
533 Resources::builder()
534 .cpu(CLEANUP_CPU)
535 .ram(CLEANUP_MEMORY)
536 .build(),
537 )
538 .build();
539
540 debug!(
541 "running cleanup task `{name}` to change ownership of `{path}` to \
542 `{ownership}`",
543 path = work_dir.display(),
544 );
545
546 let statuses = backend
547 .run(task, token)
548 .context("failed to submit cleanup task")?
549 .await
550 .context("failed to run cleanup task")?;
551 let status = statuses.first();
552 if status.success() {
553 Ok(())
554 } else {
555 bail!(
556 "failed to chown task work directory `{path}`",
557 path = work_dir.display()
558 );
559 }
560 }
561 .await;
562
563 if let Err(e) = result {
564 tracing::error!("Docker backend cleanup failed: {e:#}");
565 }
566 }
567 .boxed(),
568 )
569 }
570}