1use std::collections::HashMap;
4use std::fs;
5use std::sync::Arc;
6use std::sync::Mutex;
7
8use anyhow::Context;
9use anyhow::Result;
10use anyhow::bail;
11use crankshaft::config::backend;
12use crankshaft::engine::Task;
13use crankshaft::engine::service::name::GeneratorIterator;
14use crankshaft::engine::service::name::UniqueAlphanumeric;
15use crankshaft::engine::service::runner::Backend;
16use crankshaft::engine::service::runner::backend::docker;
17use crankshaft::engine::task::Execution;
18use crankshaft::engine::task::Input;
19use crankshaft::engine::task::Output;
20use crankshaft::engine::task::Resources;
21use crankshaft::engine::task::input::Contents;
22use crankshaft::engine::task::input::Type as InputType;
23use crankshaft::engine::task::output::Type as OutputType;
24use crankshaft::events::Event;
25use nonempty::NonEmpty;
26use tokio::sync::broadcast;
27use tokio::sync::oneshot;
28use tokio::sync::oneshot::Receiver;
29use tokio_util::sync::CancellationToken;
30use tracing::info;
31use tracing::warn;
32use url::Url;
33
34use super::TaskExecutionBackend;
35use super::TaskExecutionConstraints;
36use super::TaskExecutionResult;
37use super::TaskManager;
38use super::TaskManagerRequest;
39use super::TaskSpawnRequest;
40use crate::COMMAND_FILE_NAME;
41use crate::ONE_GIBIBYTE;
42use crate::PrimitiveValue;
43use crate::STDERR_FILE_NAME;
44use crate::STDOUT_FILE_NAME;
45use crate::Value;
46use crate::WORK_DIR_NAME;
47use crate::backend::INITIAL_EXPECTED_NAMES;
48use crate::config::Config;
49use crate::config::DEFAULT_TASK_SHELL;
50use crate::config::DockerBackendConfig;
51use crate::config::TaskResourceLimitBehavior;
52use crate::path::EvaluationPath;
53use crate::v1::container;
54use crate::v1::cpu;
55use crate::v1::gpu;
56use crate::v1::max_cpu;
57use crate::v1::max_memory;
58use crate::v1::memory;
59
60const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs/";
62
63const GUEST_WORK_DIR: &str = "/mnt/task/work";
65
66const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
68
69const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
71
72const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
74
75#[derive(Debug)]
78struct DockerTaskRequest {
79 config: Arc<Config>,
81 inner: TaskSpawnRequest,
83 backend: Arc<docker::Backend>,
85 name: String,
87 container: String,
89 cpu: f64,
91 memory: u64,
93 max_cpu: Option<f64>,
95 max_memory: Option<u64>,
97 gpu: Option<u64>,
99 token: CancellationToken,
101}
102
103impl TaskManagerRequest for DockerTaskRequest {
104 fn cpu(&self) -> f64 {
105 self.cpu
106 }
107
108 fn memory(&self) -> u64 {
109 self.memory
110 }
111
112 async fn run(self) -> Result<TaskExecutionResult> {
113 let work_dir = self.inner.attempt_dir().join(WORK_DIR_NAME);
115 fs::create_dir_all(&work_dir).with_context(|| {
116 format!(
117 "failed to create directory `{path}`",
118 path = work_dir.display()
119 )
120 })?;
121
122 #[cfg(unix)]
126 {
127 use std::fs::Permissions;
128 use std::fs::set_permissions;
129 use std::os::unix::fs::PermissionsExt;
130 set_permissions(&work_dir, Permissions::from_mode(0o770)).with_context(|| {
131 format!(
132 "failed to set permissions for work directory `{path}`",
133 path = work_dir.display()
134 )
135 })?;
136 }
137
138 let command_path = self.inner.attempt_dir().join(COMMAND_FILE_NAME);
141 fs::write(&command_path, self.inner.command()).with_context(|| {
142 format!(
143 "failed to write command contents to `{path}`",
144 path = command_path.display()
145 )
146 })?;
147
148 let mut inputs = Vec::with_capacity(self.inner.inputs().len() + 2);
151 for input in self.inner.inputs().iter() {
152 let guest_path = input.guest_path().expect("input should have guest path");
153 let local_path = input.local_path().expect("input should be localized");
154
155 if !local_path.exists() {
157 bail!(
158 "cannot mount input `{path}` as it does not exist",
159 path = local_path.display()
160 );
161 }
162
163 inputs.push(
164 Input::builder()
165 .path(guest_path.as_str())
166 .contents(Contents::Path(local_path.into()))
167 .ty(input.kind())
168 .read_only(true)
169 .build(),
170 );
171 }
172
173 inputs.push(
175 Input::builder()
176 .path(GUEST_WORK_DIR)
177 .contents(Contents::Path(work_dir.to_path_buf()))
178 .ty(InputType::Directory)
179 .read_only(false)
180 .build(),
181 );
182
183 inputs.push(
185 Input::builder()
186 .path(GUEST_COMMAND_PATH)
187 .contents(Contents::Path(command_path.to_path_buf()))
188 .ty(InputType::File)
189 .read_only(true)
190 .build(),
191 );
192
193 let stdout_path = self.inner.attempt_dir().join(STDOUT_FILE_NAME);
194 let stderr_path = self.inner.attempt_dir().join(STDERR_FILE_NAME);
195
196 let outputs = vec![
197 Output::builder()
198 .path(GUEST_STDOUT_PATH)
199 .url(Url::from_file_path(&stdout_path).expect("path should be absolute"))
200 .ty(OutputType::File)
201 .build(),
202 Output::builder()
203 .path(GUEST_STDERR_PATH)
204 .url(Url::from_file_path(&stderr_path).expect("path should be absolute"))
205 .ty(OutputType::File)
206 .build(),
207 ];
208
209 let task = Task::builder()
210 .name(self.name)
211 .executions(NonEmpty::new(
212 Execution::builder()
213 .image(self.container)
214 .program(
215 self.config
216 .task
217 .shell
218 .as_deref()
219 .unwrap_or(DEFAULT_TASK_SHELL),
220 )
221 .args([GUEST_COMMAND_PATH.to_string()])
222 .work_dir(GUEST_WORK_DIR)
223 .env(self.inner.env().clone())
224 .stdout(GUEST_STDOUT_PATH)
225 .stderr(GUEST_STDERR_PATH)
226 .build(),
227 ))
228 .inputs(inputs)
229 .outputs(outputs)
230 .resources(
231 Resources::builder()
232 .cpu(self.cpu)
233 .maybe_cpu_limit(self.max_cpu)
234 .ram(self.memory as f64 / ONE_GIBIBYTE)
235 .maybe_ram_limit(self.max_memory.map(|m| m as f64 / ONE_GIBIBYTE))
236 .maybe_gpu(self.gpu)
237 .build(),
238 )
239 .build();
240
241 let statuses = self.backend.run(task, self.token.clone())?.await?;
242
243 assert_eq!(statuses.len(), 1, "there should only be one exit status");
244 let status = statuses.first();
245
246 Ok(TaskExecutionResult {
247 exit_code: status.code().expect("should have exit code"),
248 work_dir: EvaluationPath::Local(work_dir),
249 stdout: PrimitiveValue::new_file(
250 stdout_path
251 .into_os_string()
252 .into_string()
253 .expect("path should be UTF-8"),
254 )
255 .into(),
256 stderr: PrimitiveValue::new_file(
257 stderr_path
258 .into_os_string()
259 .into_string()
260 .expect("path should be UTF-8"),
261 )
262 .into(),
263 })
264 }
265}
266
267pub struct DockerBackend {
269 config: Arc<Config>,
271 inner: Arc<docker::Backend>,
273 max_concurrency: u64,
275 max_cpu: u64,
277 max_memory: u64,
279 manager: TaskManager<DockerTaskRequest>,
281 names: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
283}
284
285impl DockerBackend {
286 pub async fn new(
291 config: Arc<Config>,
292 backend_config: &DockerBackendConfig,
293 events: Option<broadcast::Sender<Event>>,
294 ) -> Result<Self> {
295 info!("initializing Docker backend");
296
297 let names = Arc::new(Mutex::new(GeneratorIterator::new(
298 UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
299 INITIAL_EXPECTED_NAMES,
300 )));
301
302 let backend = docker::Backend::initialize_default_with(
303 backend::docker::Config::builder()
304 .cleanup(backend_config.cleanup)
305 .build(),
306 names.clone(),
307 events,
308 )
309 .await
310 .context("failed to initialize Docker backend")?;
311
312 let resources = *backend.resources();
313 let cpu = resources.cpu();
314 let max_cpu = resources.max_cpu();
315 let memory = resources.memory();
316 let max_memory = resources.max_memory();
317
318 let manager = if resources.use_service() {
322 TaskManager::new_unlimited(max_cpu, max_memory)
323 } else {
324 TaskManager::new(cpu, max_cpu, memory, max_memory)
325 };
326
327 Ok(Self {
328 config,
329 inner: Arc::new(backend),
330 max_concurrency: cpu,
331 max_cpu,
332 max_memory,
333 manager,
334 names,
335 })
336 }
337}
338
339impl TaskExecutionBackend for DockerBackend {
340 fn max_concurrency(&self) -> u64 {
341 self.max_concurrency
342 }
343
344 fn constraints(
345 &self,
346 requirements: &HashMap<String, Value>,
347 hints: &HashMap<String, Value>,
348 ) -> Result<TaskExecutionConstraints> {
349 let container = container(requirements, self.config.task.container.as_deref());
350
351 let mut cpu = cpu(requirements);
352 if (self.max_cpu as f64) < cpu {
353 let env_specific = if self.config.suppress_env_specific_output {
354 String::new()
355 } else {
356 format!(
357 ", but the execution backend has a maximum of {max_cpu}",
358 max_cpu = self.max_cpu,
359 )
360 };
361 match self.config.task.cpu_limit_behavior {
362 TaskResourceLimitBehavior::TryWithMax => {
363 warn!(
364 "task requires at least {cpu} CPU{s}{env_specific}",
365 s = if cpu == 1.0 { "" } else { "s" },
366 );
367 cpu = self.max_cpu as f64;
369 }
370 TaskResourceLimitBehavior::Deny => {
371 bail!(
372 "task requires at least {cpu} CPU{s}{env_specific}",
373 s = if cpu == 1.0 { "" } else { "s" },
374 );
375 }
376 }
377 }
378
379 let mut memory = memory(requirements)?;
380 if self.max_memory < memory as u64 {
381 let env_specific = if self.config.suppress_env_specific_output {
382 String::new()
383 } else {
384 format!(
385 ", but the execution backend has a maximum of {max_memory} GiB",
386 max_memory = self.max_memory as f64 / ONE_GIBIBYTE,
387 )
388 };
389 match self.config.task.memory_limit_behavior {
390 TaskResourceLimitBehavior::TryWithMax => {
391 warn!(
392 "task requires at least {memory} GiB of memory{env_specific}",
393 memory = memory as f64 / ONE_GIBIBYTE,
395 );
396 memory = self.max_memory.try_into().unwrap_or(i64::MAX);
398 }
399 TaskResourceLimitBehavior::Deny => {
400 bail!(
401 "task requires at least {memory} GiB of memory{env_specific}",
402 memory = memory as f64 / ONE_GIBIBYTE,
404 );
405 }
406 }
407 }
408
409 let gpu = gpu(requirements, hints)
415 .map(|count| (0..count).map(|i| format!("nvidia-gpu-{i}")).collect())
416 .unwrap_or_default();
417
418 Ok(TaskExecutionConstraints {
419 container: Some(container.into_owned()),
420 cpu,
421 memory,
422 gpu,
423 fpga: Default::default(),
424 disks: Default::default(),
425 })
426 }
427
428 fn guest_inputs_dir(&self) -> Option<&'static str> {
429 Some(GUEST_INPUTS_DIR)
430 }
431
432 fn needs_local_inputs(&self) -> bool {
433 true
434 }
435
436 fn spawn(
437 &self,
438 request: TaskSpawnRequest,
439 token: CancellationToken,
440 ) -> Result<Receiver<Result<TaskExecutionResult>>> {
441 let (completed_tx, completed_rx) = oneshot::channel();
442
443 let requirements = request.requirements();
444 let hints = request.hints();
445
446 let container = container(requirements, self.config.task.container.as_deref()).into_owned();
447 let mut cpu = cpu(requirements);
448 if let TaskResourceLimitBehavior::TryWithMax = self.config.task.cpu_limit_behavior {
449 cpu = std::cmp::min(cpu.ceil() as u64, self.max_cpu) as f64;
450 }
451 let mut memory = memory(requirements)? as u64;
452 if let TaskResourceLimitBehavior::TryWithMax = self.config.task.memory_limit_behavior {
453 memory = std::cmp::min(memory, self.max_memory);
454 }
455 let max_cpu = max_cpu(hints);
456 let max_memory = max_memory(hints)?.map(|i| i as u64);
457 let gpu = gpu(requirements, hints);
458
459 let name = format!(
460 "{id}-{generated}",
461 id = request.id(),
462 generated = self
463 .names
464 .lock()
465 .expect("generator should always acquire")
466 .next()
467 .expect("generator should never be exhausted")
468 );
469 self.manager.send(
470 DockerTaskRequest {
471 config: self.config.clone(),
472 inner: request,
473 backend: self.inner.clone(),
474 name,
475 container,
476 cpu,
477 memory,
478 max_cpu,
479 max_memory,
480 gpu,
481 token,
482 },
483 completed_tx,
484 );
485
486 Ok(completed_rx)
487 }
488
489 #[cfg(unix)]
490 fn cleanup<'a>(
491 &'a self,
492 work_dir: &'a EvaluationPath,
493 token: CancellationToken,
494 ) -> Option<futures::future::BoxFuture<'a, ()>> {
495 use futures::FutureExt;
496 use tracing::debug;
497
498 const GUEST_WORK_DIR: &str = "/mnt/work";
500 const CLEANUP_CPU: f64 = 0.1;
502 const CLEANUP_MEMORY: f64 = 0.05;
504
505 let work_dir = work_dir.as_local().expect("path should be local");
507 assert!(work_dir.is_absolute(), "work directory should be absolute");
508
509 let backend = self.inner.clone();
510 let names = self.names.clone();
511
512 Some(
513 async move {
514 let result = async {
515 let (uid, gid) = unsafe { (libc::geteuid(), libc::getegid()) };
516 let ownership = format!("{uid}:{gid}");
517
518 let name = format!(
519 "docker-backend-cleanup-{id}",
520 id = names
521 .lock()
522 .expect("generator should always acquire")
523 .next()
524 .expect("generator should never be exhausted")
525 );
526
527 let task = Task::builder()
528 .name(&name)
529 .executions(NonEmpty::new(
530 Execution::builder()
531 .image("alpine:latest")
532 .program("chown")
533 .args([
534 "-R".to_string(),
535 ownership.clone(),
536 GUEST_WORK_DIR.to_string(),
537 ])
538 .build(),
539 ))
540 .inputs([Input::builder()
541 .path(GUEST_WORK_DIR)
542 .contents(Contents::Path(work_dir.to_path_buf()))
543 .ty(InputType::Directory)
544 .read_only(false)
546 .build()])
547 .resources(
548 Resources::builder()
549 .cpu(CLEANUP_CPU)
550 .ram(CLEANUP_MEMORY)
551 .build(),
552 )
553 .build();
554
555 debug!(
556 "running cleanup task `{name}` to change ownership of `{path}` to \
557 `{ownership}`",
558 path = work_dir.display(),
559 );
560
561 let statuses = backend
562 .run(task, token)
563 .context("failed to submit cleanup task")?
564 .await
565 .context("failed to run cleanup task")?;
566 let status = statuses.first();
567 if status.success() {
568 Ok(())
569 } else {
570 bail!(
571 "failed to chown task work directory `{path}`",
572 path = work_dir.display()
573 );
574 }
575 }
576 .await;
577
578 if let Err(e) = result {
579 tracing::error!("Docker backend cleanup failed: {e:#}");
580 }
581 }
582 .boxed(),
583 )
584 }
585}