1use std::collections::HashMap;
4use std::fs;
5use std::sync::Arc;
6use std::sync::Mutex;
7
8use anyhow::Context;
9use anyhow::Result;
10use anyhow::bail;
11use crankshaft::config::backend;
12use crankshaft::engine::Task;
13use crankshaft::engine::service::name::GeneratorIterator;
14use crankshaft::engine::service::name::UniqueAlphanumeric;
15use crankshaft::engine::service::runner::Backend;
16use crankshaft::engine::service::runner::backend::TaskRunError;
17use crankshaft::engine::service::runner::backend::docker;
18use crankshaft::engine::task::Execution;
19use crankshaft::engine::task::Input;
20use crankshaft::engine::task::Output;
21use crankshaft::engine::task::Resources;
22use crankshaft::engine::task::input::Contents;
23use crankshaft::engine::task::input::Type as InputType;
24use crankshaft::engine::task::output::Type as OutputType;
25use futures::FutureExt;
26use futures::future::BoxFuture;
27use nonempty::NonEmpty;
28use tracing::debug;
29use tracing::info;
30use tracing::warn;
31use url::Url;
32
33use super::TaskExecutionBackend;
34use super::TaskExecutionConstraints;
35use super::TaskExecutionResult;
36use crate::CancellationContext;
37use crate::EvaluationPath;
38use crate::Events;
39use crate::ONE_GIBIBYTE;
40use crate::PrimitiveValue;
41use crate::TaskInputs;
42use crate::Value;
43use crate::backend::ExecuteTaskRequest;
44use crate::backend::INITIAL_EXPECTED_NAMES;
45use crate::backend::manager::TaskManager;
46use crate::config::Config;
47use crate::config::DEFAULT_TASK_SHELL;
48use crate::config::TaskResourceLimitBehavior;
49use crate::http::Transferer;
50use crate::v1::DEFAULT_DISK_MOUNT_POINT;
51use crate::v1::hints;
52use crate::v1::requirements;
53use crate::v1::requirements::ContainerSource;
54
55const GUEST_WORK_DIR: &str = "/mnt/task/work";
57
58const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
60
61const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
63
64const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
66
67#[cfg(unix)]
69const CLEANUP_TASK_CPU: f64 = 0.1;
70
71#[cfg(unix)]
75const CLEANUP_TASK_MEMORY: u64 = 4096 * 1024;
76
77struct DockerTask<'a> {
79 config: Arc<Config>,
81 request: ExecuteTaskRequest<'a>,
83 backend: Arc<docker::Backend>,
85 name: String,
87 max_cpu: Option<f64>,
89 max_memory: Option<u64>,
91 gpu: Option<u64>,
93 cancellation: CancellationContext,
95}
96
97impl<'a> DockerTask<'a> {
98 async fn run(self) -> Result<Option<TaskExecutionResult>> {
102 let work_dir = self.request.work_dir();
104 fs::create_dir_all(&work_dir).with_context(|| {
105 format!(
106 "failed to create directory `{path}`",
107 path = work_dir.display()
108 )
109 })?;
110
111 #[cfg(unix)]
115 {
116 use std::fs::Permissions;
117 use std::fs::set_permissions;
118 use std::os::unix::fs::PermissionsExt;
119 set_permissions(&work_dir, Permissions::from_mode(0o770)).with_context(|| {
120 format!(
121 "failed to set permissions for work directory `{path}`",
122 path = work_dir.display()
123 )
124 })?;
125 }
126
127 let command_path = self.request.command_path();
130 fs::write(&command_path, self.request.command).with_context(|| {
131 format!(
132 "failed to write command contents to `{path}`",
133 path = command_path.display()
134 )
135 })?;
136
137 let mut inputs = Vec::with_capacity(self.request.backend_inputs.len() + 2);
140 for input in self.request.backend_inputs.iter() {
141 let guest_path = input.guest_path().expect("input should have guest path");
142 let local_path = input.local_path().expect("input should be localized");
143
144 if !local_path.exists() {
146 bail!(
147 "cannot mount input `{path}` as it does not exist",
148 path = local_path.display()
149 );
150 }
151
152 inputs.push(
153 Input::builder()
154 .path(guest_path.as_str())
155 .contents(Contents::Path(local_path.into()))
156 .ty(input.kind())
157 .read_only(true)
158 .build(),
159 );
160 }
161
162 inputs.push(
164 Input::builder()
165 .path(GUEST_WORK_DIR)
166 .contents(Contents::Path(work_dir.to_path_buf()))
167 .ty(InputType::Directory)
168 .read_only(false)
169 .build(),
170 );
171
172 inputs.push(
174 Input::builder()
175 .path(GUEST_COMMAND_PATH)
176 .contents(Contents::Path(command_path.to_path_buf()))
177 .ty(InputType::File)
178 .read_only(true)
179 .build(),
180 );
181
182 let stdout_path = self.request.stdout_path();
183 let stderr_path = self.request.stderr_path();
184
185 let outputs = vec![
186 Output::builder()
187 .path(GUEST_STDOUT_PATH)
188 .url(Url::from_file_path(&stdout_path).expect("path should be absolute"))
189 .ty(OutputType::File)
190 .build(),
191 Output::builder()
192 .path(GUEST_STDERR_PATH)
193 .url(Url::from_file_path(&stderr_path).expect("path should be absolute"))
194 .ty(OutputType::File)
195 .build(),
196 ];
197
198 let volumes = self
199 .request
200 .constraints
201 .disks
202 .keys()
203 .filter_map(|mp| {
204 if mp == DEFAULT_DISK_MOUNT_POINT {
208 None
209 } else {
210 Some(mp.clone())
211 }
212 })
213 .collect::<Vec<_>>();
214
215 if !volumes.is_empty() {
216 debug!(
217 "disk size constraints cannot be enforced by the Docker backend; mount points \
218 will be created but sizes will not be limited"
219 );
220 }
221
222 let task = Task::builder()
223 .name(&self.name)
224 .executions(NonEmpty::new(
225 Execution::builder()
226 .image(
227 self.request
228 .constraints
229 .container
230 .as_ref()
231 .expect("must have container")
232 .to_string(),
233 )
234 .program(
235 self.config
236 .task
237 .shell
238 .as_deref()
239 .unwrap_or(DEFAULT_TASK_SHELL),
240 )
241 .args([GUEST_COMMAND_PATH.to_string()])
242 .work_dir(GUEST_WORK_DIR)
243 .env(self.request.env.clone())
244 .stdout(GUEST_STDOUT_PATH)
245 .stderr(GUEST_STDERR_PATH)
246 .build(),
247 ))
248 .inputs(inputs)
249 .outputs(outputs)
250 .resources(
251 Resources::builder()
252 .cpu(self.request.constraints.cpu)
253 .maybe_cpu_limit(self.max_cpu)
254 .ram(self.request.constraints.memory as f64 / ONE_GIBIBYTE)
255 .maybe_ram_limit(self.max_memory.map(|m| m as f64 / ONE_GIBIBYTE))
256 .maybe_gpu(self.gpu)
257 .build(),
258 )
259 .volumes(volumes)
260 .build();
261
262 let statuses = match self.backend.run(task, self.cancellation.second())?.await {
263 Ok(statuses) => statuses,
264 Err(TaskRunError::Canceled) => return Ok(None),
265 Err(e) => return Err(e.into()),
266 };
267
268 assert_eq!(statuses.len(), 1, "there should only be one exit status");
269 let status = statuses.first();
270
271 Ok(Some(TaskExecutionResult {
272 exit_code: status.code().expect("should have exit code"),
273 work_dir: EvaluationPath::from_local_path(work_dir),
274 stdout: PrimitiveValue::new_file(
275 stdout_path
276 .into_os_string()
277 .into_string()
278 .expect("path should be UTF-8"),
279 )
280 .into(),
281 stderr: PrimitiveValue::new_file(
282 stderr_path
283 .into_os_string()
284 .into_string()
285 .expect("path should be UTF-8"),
286 )
287 .into(),
288 }))
289 }
290}
291
292#[cfg(unix)]
299struct CleanupTask {
300 name: String,
302 work_dir: EvaluationPath,
304 backend: Arc<docker::Backend>,
306 cancellation: CancellationContext,
308}
309
310#[cfg(unix)]
311impl CleanupTask {
312 async fn run(self) -> Result<Option<()>> {
316 use crankshaft::engine::service::runner::backend::TaskRunError;
317 use tracing::debug;
318
319 let work_dir = self.work_dir.as_local().expect("path should be local");
321 assert!(work_dir.is_absolute(), "work directory should be absolute");
322
323 let (uid, gid) = unsafe { (libc::geteuid(), libc::getegid()) };
324 let ownership = format!("{uid}:{gid}");
325
326 let task = Task::builder()
327 .name(&self.name)
328 .executions(NonEmpty::new(
329 Execution::builder()
330 .image("alpine:latest")
331 .program("chown")
332 .args([
333 "-R".to_string(),
334 ownership.clone(),
335 GUEST_WORK_DIR.to_string(),
336 ])
337 .build(),
338 ))
339 .inputs([Input::builder()
340 .path(GUEST_WORK_DIR)
341 .contents(Contents::Path(work_dir.to_path_buf()))
342 .ty(InputType::Directory)
343 .read_only(false)
345 .build()])
346 .resources(
347 Resources::builder()
348 .cpu(CLEANUP_TASK_CPU)
349 .ram(CLEANUP_TASK_MEMORY as f64 / ONE_GIBIBYTE)
350 .build(),
351 )
352 .build();
353
354 debug!(
355 "running cleanup task `{name}` to change ownership of `{path}` to `{ownership}`",
356 name = self.name,
357 path = work_dir.display(),
358 );
359
360 match self
361 .backend
362 .run(task, self.cancellation.second())
363 .context("failed to submit cleanup task")?
364 .await
365 {
366 Ok(statuses) => {
367 let status = statuses.first();
368 if status.success() {
369 Ok(Some(()))
370 } else {
371 bail!(
372 "failed to chown task work directory `{path}`",
373 path = work_dir.display()
374 );
375 }
376 }
377 Err(TaskRunError::Canceled) => Ok(None),
378 Err(e) => Err(e).context("failed to run cleanup task"),
379 }
380 }
381}
382
383pub struct DockerBackend {
385 config: Arc<Config>,
387 inner: Arc<docker::Backend>,
389 cancellation: CancellationContext,
391 max_cpu: f64,
393 max_memory: u64,
395 manager: TaskManager,
397 names: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
399}
400
401impl DockerBackend {
402 pub async fn new(
407 config: Arc<Config>,
408 events: Events,
409 cancellation: CancellationContext,
410 ) -> Result<Self> {
411 info!("initializing Docker backend");
412
413 let names = Arc::new(Mutex::new(GeneratorIterator::new(
414 UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
415 INITIAL_EXPECTED_NAMES,
416 )));
417
418 let backend_config = config.backend()?;
419 let backend_config = backend_config
420 .as_docker()
421 .context("configured backend is not Docker")?;
422
423 let backend = docker::Backend::initialize_default_with(
424 backend::docker::Config::builder()
425 .cleanup(backend_config.cleanup)
426 .build(),
427 names.clone(),
428 events.crankshaft().clone(),
429 )
430 .await
431 .context("failed to initialize Docker backend")?;
432
433 let resources = *backend.resources();
434 let cpu = resources.cpu() as f64;
435 let max_cpu = resources.max_cpu() as f64;
436 let memory = resources.memory();
437 let max_memory = resources.max_memory();
438
439 let manager = if resources.use_service() {
443 TaskManager::new_unlimited(max_cpu, max_memory)
444 } else {
445 TaskManager::new(
446 cpu,
447 max_cpu,
448 memory,
449 max_memory,
450 events,
451 cancellation.clone(),
452 )
453 };
454
455 Ok(Self {
456 config,
457 inner: Arc::new(backend),
458 cancellation,
459 max_cpu,
460 max_memory,
461 manager,
462 names,
463 })
464 }
465}
466
467impl TaskExecutionBackend for DockerBackend {
468 fn constraints(
469 &self,
470 inputs: &TaskInputs,
471 requirements: &HashMap<String, Value>,
472 hints: &HashMap<String, Value>,
473 ) -> Result<TaskExecutionConstraints> {
474 let container =
475 requirements::container(inputs, requirements, self.config.task.container.as_deref());
476 match &container {
477 ContainerSource::Docker(_) => {}
478 ContainerSource::Library(_) | ContainerSource::Oras(_) => {
479 bail!(
480 "Docker backend does not support `{container:#}`; use a Docker registry image \
481 instead"
482 )
483 }
484 ContainerSource::SifFile(_) => {
485 bail!(
486 "Docker backend does not support local SIF file `{container:#}`; use a Docker \
487 registry image instead"
488 )
489 }
490 ContainerSource::Unknown(_) => {
491 bail!("Docker backend does not support unknown container source `{container:#}`")
492 }
493 };
494
495 let mut cpu = requirements::cpu(inputs, requirements);
496 if self.max_cpu < cpu {
497 let env_specific = if self.config.suppress_env_specific_output {
498 String::new()
499 } else {
500 format!(
501 ", but the execution backend has a maximum of {max_cpu}",
502 max_cpu = self.max_cpu,
503 )
504 };
505 match self.config.task.cpu_limit_behavior {
506 TaskResourceLimitBehavior::TryWithMax => {
507 warn!(
508 "task requires at least {cpu} CPU{s}{env_specific}",
509 s = if cpu == 1.0 { "" } else { "s" },
510 );
511 cpu = self.max_cpu;
513 }
514 TaskResourceLimitBehavior::Deny => {
515 bail!(
516 "task requires at least {cpu} CPU{s}{env_specific}",
517 s = if cpu == 1.0 { "" } else { "s" },
518 );
519 }
520 }
521 }
522
523 let mut memory = requirements::memory(inputs, requirements)? as u64;
524 if self.max_memory < memory as u64 {
525 let env_specific = if self.config.suppress_env_specific_output {
526 String::new()
527 } else {
528 format!(
529 ", but the execution backend has a maximum of {max_memory} GiB",
530 max_memory = self.max_memory as f64 / ONE_GIBIBYTE,
531 )
532 };
533 match self.config.task.memory_limit_behavior {
534 TaskResourceLimitBehavior::TryWithMax => {
535 warn!(
536 "task requires at least {memory} GiB of memory{env_specific}",
537 memory = memory as f64 / ONE_GIBIBYTE,
539 );
540 memory = self.max_memory;
542 }
543 TaskResourceLimitBehavior::Deny => {
544 bail!(
545 "task requires at least {memory} GiB of memory{env_specific}",
546 memory = memory as f64 / ONE_GIBIBYTE,
548 );
549 }
550 }
551 }
552
553 let gpu = requirements::gpu(inputs, requirements, hints)
559 .map(|count| (0..count).map(|i| format!("nvidia-gpu-{i}")).collect())
560 .unwrap_or_default();
561
562 let disks = requirements::disks(inputs, requirements, hints)?
563 .into_iter()
564 .map(|(mount_point, disk)| (mount_point.to_string(), disk.size))
565 .collect();
566
567 Ok(TaskExecutionConstraints {
568 container: Some(container),
569 cpu,
570 memory,
571 gpu,
572 fpga: Default::default(),
573 disks,
574 })
575 }
576
577 fn execute<'a>(
578 &'a self,
579 _: &'a Arc<dyn Transferer>,
580 request: ExecuteTaskRequest<'a>,
581 ) -> BoxFuture<'a, Result<Option<TaskExecutionResult>>> {
582 async move {
583 let cpu = request.constraints.cpu;
584 let memory = request.constraints.memory;
585 let max_cpu =
589 hints::max_cpu(request.inputs, request.hints).map(|m| m.min(self.max_cpu));
590 let max_memory = hints::max_memory(request.inputs, request.hints)?
591 .map(|i| (i as u64).min(self.max_memory));
592 let gpu = requirements::gpu(request.inputs, request.requirements, request.hints);
593
594 let name = format!(
595 "{id}-{generated}",
596 id = request.id,
597 generated = self
598 .names
599 .lock()
600 .expect("generator should always acquire")
601 .next()
602 .expect("generator should never be exhausted")
603 );
604
605 let task = DockerTask {
606 config: self.config.clone(),
607 request,
608 backend: self.inner.clone(),
609 name,
610 max_cpu,
611 max_memory,
612 gpu,
613 cancellation: self.cancellation.clone(),
614 };
615
616 match self.manager.run(cpu, memory, task.run()).await? {
617 Some(res) => {
618 #[cfg(unix)]
620 {
621 let name = format!(
622 "docker-chown-{id}",
623 id = self
624 .names
625 .lock()
626 .expect("generator should always acquire")
627 .next()
628 .expect("generator should never be exhausted")
629 );
630
631 let task = CleanupTask {
632 name,
633 work_dir: res.work_dir.clone(),
634 backend: self.inner.clone(),
635 cancellation: self.cancellation.clone(),
636 };
637
638 if let Err(e) = self
639 .manager
640 .run(CLEANUP_TASK_CPU, CLEANUP_TASK_MEMORY, task.run())
641 .await
642 {
643 tracing::error!("Docker backend cleanup failed: {e:#}");
644 }
645 }
646
647 Ok(Some(res))
648 }
649 None => Ok(None),
650 }
651 }
652 .boxed()
653 }
654}