Skip to main content

wdl_engine/backend/
docker.rs

1//! Implementation of the Docker backend.
2
3use std::collections::HashMap;
4use std::fs;
5use std::sync::Arc;
6use std::sync::Mutex;
7
8use anyhow::Context;
9use anyhow::Result;
10use anyhow::bail;
11use crankshaft::config::backend;
12use crankshaft::engine::Task;
13use crankshaft::engine::service::name::GeneratorIterator;
14use crankshaft::engine::service::name::UniqueAlphanumeric;
15use crankshaft::engine::service::runner::Backend;
16use crankshaft::engine::service::runner::backend::TaskRunError;
17use crankshaft::engine::service::runner::backend::docker;
18use crankshaft::engine::task::Execution;
19use crankshaft::engine::task::Input;
20use crankshaft::engine::task::Output;
21use crankshaft::engine::task::Resources;
22use crankshaft::engine::task::input::Contents;
23use crankshaft::engine::task::input::Type as InputType;
24use crankshaft::engine::task::output::Type as OutputType;
25use futures::FutureExt;
26use futures::future::BoxFuture;
27use nonempty::NonEmpty;
28use tracing::debug;
29use tracing::info;
30use tracing::warn;
31use url::Url;
32
33use super::TaskExecutionBackend;
34use super::TaskExecutionConstraints;
35use super::TaskExecutionResult;
36use crate::CancellationContext;
37use crate::EvaluationPath;
38use crate::Events;
39use crate::ONE_GIBIBYTE;
40use crate::PrimitiveValue;
41use crate::TaskInputs;
42use crate::Value;
43use crate::backend::ExecuteTaskRequest;
44use crate::backend::INITIAL_EXPECTED_NAMES;
45use crate::backend::manager::TaskManager;
46use crate::config::Config;
47use crate::config::DEFAULT_TASK_SHELL;
48use crate::config::TaskResourceLimitBehavior;
49use crate::http::Transferer;
50use crate::v1::DEFAULT_DISK_MOUNT_POINT;
51use crate::v1::hints;
52use crate::v1::requirements;
53use crate::v1::requirements::ContainerSource;
54
55/// The guest working directory.
56const GUEST_WORK_DIR: &str = "/mnt/task/work";
57
58/// The guest path for the command file.
59const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
60
61/// The path to the container's stdout.
62const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
63
64/// The path to the container's stderr.
65const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
66
67/// Amount of CPU to request for the cleanup task.
68#[cfg(unix)]
69const CLEANUP_TASK_CPU: f64 = 0.1;
70
71/// Amount of memory to request for the cleanup task, in bytes.
72///
73/// The Docker daemon requires memory values to be at least 4MiB.
74#[cfg(unix)]
75const CLEANUP_TASK_MEMORY: u64 = 4096 * 1024;
76
77/// Represents a task that runs with a Docker container.
78struct DockerTask<'a> {
79    /// The engine configuration.
80    config: Arc<Config>,
81    /// The task execution request.
82    request: ExecuteTaskRequest<'a>,
83    /// The underlying Crankshaft backend.
84    backend: Arc<docker::Backend>,
85    /// The name of the task.
86    name: String,
87    /// The requested maximum CPU limit for the task.
88    max_cpu: Option<f64>,
89    /// The requested maximum memory limit for the task, in bytes.
90    max_memory: Option<u64>,
91    /// The requested GPU count for the task.
92    gpu: Option<u64>,
93    /// The evaluation cancellation context.
94    cancellation: CancellationContext,
95}
96
97impl<'a> DockerTask<'a> {
98    /// Runs the docker task.
99    ///
100    /// Returns `Ok(None)` if the task was canceled.
101    async fn run(self) -> Result<Option<TaskExecutionResult>> {
102        // Create the working directory
103        let work_dir = self.request.work_dir();
104        fs::create_dir_all(&work_dir).with_context(|| {
105            format!(
106                "failed to create directory `{path}`",
107                path = work_dir.display()
108            )
109        })?;
110
111        // On Unix, the work directory must be group writable in case the container uses
112        // a different user/group; the Crankshaft docker backend will automatically add
113        // the current user's egid to the container
114        #[cfg(unix)]
115        {
116            use std::fs::Permissions;
117            use std::fs::set_permissions;
118            use std::os::unix::fs::PermissionsExt;
119            set_permissions(&work_dir, Permissions::from_mode(0o770)).with_context(|| {
120                format!(
121                    "failed to set permissions for work directory `{path}`",
122                    path = work_dir.display()
123                )
124            })?;
125        }
126
127        // Write the evaluated command to disk
128        // This is done even for remote execution so that a copy exists locally
129        let command_path = self.request.command_path();
130        fs::write(&command_path, self.request.command).with_context(|| {
131            format!(
132                "failed to write command contents to `{path}`",
133                path = command_path.display()
134            )
135        })?;
136
137        // Allocate the inputs, which will always be, at most, the number of inputs plus
138        // the working directory and command
139        let mut inputs = Vec::with_capacity(self.request.backend_inputs.len() + 2);
140        for input in self.request.backend_inputs.iter() {
141            let guest_path = input.guest_path().expect("input should have guest path");
142            let local_path = input.local_path().expect("input should be localized");
143
144            // The local path must exist for Docker to mount
145            if !local_path.exists() {
146                bail!(
147                    "cannot mount input `{path}` as it does not exist",
148                    path = local_path.display()
149                );
150            }
151
152            inputs.push(
153                Input::builder()
154                    .path(guest_path.as_str())
155                    .contents(Contents::Path(local_path.into()))
156                    .ty(input.kind())
157                    .read_only(true)
158                    .build(),
159            );
160        }
161
162        // Add an input for the work directory
163        inputs.push(
164            Input::builder()
165                .path(GUEST_WORK_DIR)
166                .contents(Contents::Path(work_dir.to_path_buf()))
167                .ty(InputType::Directory)
168                .read_only(false)
169                .build(),
170        );
171
172        // Add an input for the command
173        inputs.push(
174            Input::builder()
175                .path(GUEST_COMMAND_PATH)
176                .contents(Contents::Path(command_path.to_path_buf()))
177                .ty(InputType::File)
178                .read_only(true)
179                .build(),
180        );
181
182        let stdout_path = self.request.stdout_path();
183        let stderr_path = self.request.stderr_path();
184
185        let outputs = vec![
186            Output::builder()
187                .path(GUEST_STDOUT_PATH)
188                .url(Url::from_file_path(&stdout_path).expect("path should be absolute"))
189                .ty(OutputType::File)
190                .build(),
191            Output::builder()
192                .path(GUEST_STDERR_PATH)
193                .url(Url::from_file_path(&stderr_path).expect("path should be absolute"))
194                .ty(OutputType::File)
195                .build(),
196        ];
197
198        let volumes = self
199            .request
200            .constraints
201            .disks
202            .keys()
203            .filter_map(|mp| {
204                // NOTE: the root mount point is already handled by the work
205                // directory mount, so we filter it here to avoid duplicate volume
206                // mapping.
207                if mp == DEFAULT_DISK_MOUNT_POINT {
208                    None
209                } else {
210                    Some(mp.clone())
211                }
212            })
213            .collect::<Vec<_>>();
214
215        if !volumes.is_empty() {
216            debug!(
217                "disk size constraints cannot be enforced by the Docker backend; mount points \
218                 will be created but sizes will not be limited"
219            );
220        }
221
222        let task = Task::builder()
223            .name(&self.name)
224            .executions(NonEmpty::new(
225                Execution::builder()
226                    .image(
227                        self.request
228                            .constraints
229                            .container
230                            .as_ref()
231                            .expect("must have container")
232                            .to_string(),
233                    )
234                    .program(
235                        self.config
236                            .task
237                            .shell
238                            .as_deref()
239                            .unwrap_or(DEFAULT_TASK_SHELL),
240                    )
241                    .args([GUEST_COMMAND_PATH.to_string()])
242                    .work_dir(GUEST_WORK_DIR)
243                    .env(self.request.env.clone())
244                    .stdout(GUEST_STDOUT_PATH)
245                    .stderr(GUEST_STDERR_PATH)
246                    .build(),
247            ))
248            .inputs(inputs)
249            .outputs(outputs)
250            .resources(
251                Resources::builder()
252                    .cpu(self.request.constraints.cpu)
253                    .maybe_cpu_limit(self.max_cpu)
254                    .ram(self.request.constraints.memory as f64 / ONE_GIBIBYTE)
255                    .maybe_ram_limit(self.max_memory.map(|m| m as f64 / ONE_GIBIBYTE))
256                    .maybe_gpu(self.gpu)
257                    .build(),
258            )
259            .volumes(volumes)
260            .build();
261
262        let statuses = match self.backend.run(task, self.cancellation.second())?.await {
263            Ok(statuses) => statuses,
264            Err(TaskRunError::Canceled) => return Ok(None),
265            Err(e) => return Err(e.into()),
266        };
267
268        assert_eq!(statuses.len(), 1, "there should only be one exit status");
269        let status = statuses.first();
270
271        Ok(Some(TaskExecutionResult {
272            exit_code: status.code().expect("should have exit code"),
273            work_dir: EvaluationPath::from_local_path(work_dir),
274            stdout: PrimitiveValue::new_file(
275                stdout_path
276                    .into_os_string()
277                    .into_string()
278                    .expect("path should be UTF-8"),
279            )
280            .into(),
281            stderr: PrimitiveValue::new_file(
282                stderr_path
283                    .into_os_string()
284                    .into_string()
285                    .expect("path should be UTF-8"),
286            )
287            .into(),
288        }))
289    }
290}
291
292/// Represents a cleanup task that is run upon successful completion of a Docker
293/// task.
294///
295/// On Unix systems, this is used to recursively run `chown` on the work
296/// directory so that files created by a container user (e.g. `root`) are
297/// changed to be owned by the user performing evaluation.
298#[cfg(unix)]
299struct CleanupTask {
300    /// The name of the task.
301    name: String,
302    /// The work directory to `chown`.
303    work_dir: EvaluationPath,
304    /// The underlying Crankshaft backend.
305    backend: Arc<docker::Backend>,
306    /// The evaluation cancellation context.
307    cancellation: CancellationContext,
308}
309
310#[cfg(unix)]
311impl CleanupTask {
312    /// Runs the cleanup task.
313    ///
314    /// Returns `Ok(None)` if the task was canceled.
315    async fn run(self) -> Result<Option<()>> {
316        use crankshaft::engine::service::runner::backend::TaskRunError;
317        use tracing::debug;
318
319        // SAFETY: the work directory is always local for the Docker backend
320        let work_dir = self.work_dir.as_local().expect("path should be local");
321        assert!(work_dir.is_absolute(), "work directory should be absolute");
322
323        let (uid, gid) = unsafe { (libc::geteuid(), libc::getegid()) };
324        let ownership = format!("{uid}:{gid}");
325
326        let task = Task::builder()
327            .name(&self.name)
328            .executions(NonEmpty::new(
329                Execution::builder()
330                    .image("alpine:latest")
331                    .program("chown")
332                    .args([
333                        "-R".to_string(),
334                        ownership.clone(),
335                        GUEST_WORK_DIR.to_string(),
336                    ])
337                    .build(),
338            ))
339            .inputs([Input::builder()
340                .path(GUEST_WORK_DIR)
341                .contents(Contents::Path(work_dir.to_path_buf()))
342                .ty(InputType::Directory)
343                // need write access to chown
344                .read_only(false)
345                .build()])
346            .resources(
347                Resources::builder()
348                    .cpu(CLEANUP_TASK_CPU)
349                    .ram(CLEANUP_TASK_MEMORY as f64 / ONE_GIBIBYTE)
350                    .build(),
351            )
352            .build();
353
354        debug!(
355            "running cleanup task `{name}` to change ownership of `{path}` to `{ownership}`",
356            name = self.name,
357            path = work_dir.display(),
358        );
359
360        match self
361            .backend
362            .run(task, self.cancellation.second())
363            .context("failed to submit cleanup task")?
364            .await
365        {
366            Ok(statuses) => {
367                let status = statuses.first();
368                if status.success() {
369                    Ok(Some(()))
370                } else {
371                    bail!(
372                        "failed to chown task work directory `{path}`",
373                        path = work_dir.display()
374                    );
375                }
376            }
377            Err(TaskRunError::Canceled) => Ok(None),
378            Err(e) => Err(e).context("failed to run cleanup task"),
379        }
380    }
381}
382
383/// Represents the Docker backend.
384pub struct DockerBackend {
385    /// The engine configuration.
386    config: Arc<Config>,
387    /// The underlying Crankshaft backend.
388    inner: Arc<docker::Backend>,
389    /// The evaluation cancellation context.
390    cancellation: CancellationContext,
391    /// The maximum CPUs for any of one node.
392    max_cpu: f64,
393    /// The maximum memory for any of one node.
394    max_memory: u64,
395    /// The task manager for the backend.
396    manager: TaskManager,
397    /// The name generator for tasks.
398    names: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
399}
400
401impl DockerBackend {
402    /// Constructs a new Docker task execution backend with the given
403    /// configuration.
404    ///
405    /// The provided configuration is expected to have already been validated.
406    pub async fn new(
407        config: Arc<Config>,
408        events: Events,
409        cancellation: CancellationContext,
410    ) -> Result<Self> {
411        info!("initializing Docker backend");
412
413        let names = Arc::new(Mutex::new(GeneratorIterator::new(
414            UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
415            INITIAL_EXPECTED_NAMES,
416        )));
417
418        let backend_config = config.backend()?;
419        let backend_config = backend_config
420            .as_docker()
421            .context("configured backend is not Docker")?;
422
423        let backend = docker::Backend::initialize_default_with(
424            backend::docker::Config::builder()
425                .cleanup(backend_config.cleanup)
426                .build(),
427            names.clone(),
428            events.crankshaft().clone(),
429        )
430        .await
431        .context("failed to initialize Docker backend")?;
432
433        let resources = *backend.resources();
434        let cpu = resources.cpu() as f64;
435        let max_cpu = resources.max_cpu() as f64;
436        let memory = resources.memory();
437        let max_memory = resources.max_memory();
438
439        // If a service is being used, then we're going to be spawning into a cluster
440        // For the purposes of resource tracking, treat it as unlimited resources and
441        // let Docker handle resource allocation
442        let manager = if resources.use_service() {
443            TaskManager::new_unlimited(max_cpu, max_memory)
444        } else {
445            TaskManager::new(
446                cpu,
447                max_cpu,
448                memory,
449                max_memory,
450                events,
451                cancellation.clone(),
452            )
453        };
454
455        Ok(Self {
456            config,
457            inner: Arc::new(backend),
458            cancellation,
459            max_cpu,
460            max_memory,
461            manager,
462            names,
463        })
464    }
465}
466
467impl TaskExecutionBackend for DockerBackend {
468    fn constraints(
469        &self,
470        inputs: &TaskInputs,
471        requirements: &HashMap<String, Value>,
472        hints: &HashMap<String, Value>,
473    ) -> Result<TaskExecutionConstraints> {
474        let container =
475            requirements::container(inputs, requirements, self.config.task.container.as_deref());
476        match &container {
477            ContainerSource::Docker(_) => {}
478            ContainerSource::Library(_) | ContainerSource::Oras(_) => {
479                bail!(
480                    "Docker backend does not support `{container:#}`; use a Docker registry image \
481                     instead"
482                )
483            }
484            ContainerSource::SifFile(_) => {
485                bail!(
486                    "Docker backend does not support local SIF file `{container:#}`; use a Docker \
487                     registry image instead"
488                )
489            }
490            ContainerSource::Unknown(_) => {
491                bail!("Docker backend does not support unknown container source `{container:#}`")
492            }
493        };
494
495        let mut cpu = requirements::cpu(inputs, requirements);
496        if self.max_cpu < cpu {
497            let env_specific = if self.config.suppress_env_specific_output {
498                String::new()
499            } else {
500                format!(
501                    ", but the execution backend has a maximum of {max_cpu}",
502                    max_cpu = self.max_cpu,
503                )
504            };
505            match self.config.task.cpu_limit_behavior {
506                TaskResourceLimitBehavior::TryWithMax => {
507                    warn!(
508                        "task requires at least {cpu} CPU{s}{env_specific}",
509                        s = if cpu == 1.0 { "" } else { "s" },
510                    );
511                    // clamp the reported constraint to what's available
512                    cpu = self.max_cpu;
513                }
514                TaskResourceLimitBehavior::Deny => {
515                    bail!(
516                        "task requires at least {cpu} CPU{s}{env_specific}",
517                        s = if cpu == 1.0 { "" } else { "s" },
518                    );
519                }
520            }
521        }
522
523        let mut memory = requirements::memory(inputs, requirements)? as u64;
524        if self.max_memory < memory as u64 {
525            let env_specific = if self.config.suppress_env_specific_output {
526                String::new()
527            } else {
528                format!(
529                    ", but the execution backend has a maximum of {max_memory} GiB",
530                    max_memory = self.max_memory as f64 / ONE_GIBIBYTE,
531                )
532            };
533            match self.config.task.memory_limit_behavior {
534                TaskResourceLimitBehavior::TryWithMax => {
535                    warn!(
536                        "task requires at least {memory} GiB of memory{env_specific}",
537                        // Display the error in GiB, as it is the most common unit for memory
538                        memory = memory as f64 / ONE_GIBIBYTE,
539                    );
540                    // clamp the reported constraint to what's available
541                    memory = self.max_memory;
542                }
543                TaskResourceLimitBehavior::Deny => {
544                    bail!(
545                        "task requires at least {memory} GiB of memory{env_specific}",
546                        // Display the error in GiB, as it is the most common unit for memory
547                        memory = memory as f64 / ONE_GIBIBYTE,
548                    );
549                }
550            }
551        }
552
553        // Generate GPU specification strings in the format "<type>-gpu-<index>".
554        // Each string represents one allocated GPU, indexed from 0. The type prefix
555        // (e.g., "nvidia", "amd", "intel") identifies the GPU vendor/driver.
556        // This is the first backend to populate the gpu field; other backends should
557        // follow this format for consistency.
558        let gpu = requirements::gpu(inputs, requirements, hints)
559            .map(|count| (0..count).map(|i| format!("nvidia-gpu-{i}")).collect())
560            .unwrap_or_default();
561
562        let disks = requirements::disks(inputs, requirements, hints)?
563            .into_iter()
564            .map(|(mount_point, disk)| (mount_point.to_string(), disk.size))
565            .collect();
566
567        Ok(TaskExecutionConstraints {
568            container: Some(container),
569            cpu,
570            memory,
571            gpu,
572            fpga: Default::default(),
573            disks,
574        })
575    }
576
577    fn execute<'a>(
578        &'a self,
579        _: &'a Arc<dyn Transferer>,
580        request: ExecuteTaskRequest<'a>,
581    ) -> BoxFuture<'a, Result<Option<TaskExecutionResult>>> {
582        async move {
583            let cpu = request.constraints.cpu;
584            let memory = request.constraints.memory;
585            // NOTE: in the Docker backend, we clamp `max_cpu` and `max_memory`
586            // to what is reported by the backend, as the Docker daemon does not
587            // respond gracefully to over-subscribing these.
588            let max_cpu =
589                hints::max_cpu(request.inputs, request.hints).map(|m| m.min(self.max_cpu));
590            let max_memory = hints::max_memory(request.inputs, request.hints)?
591                .map(|i| (i as u64).min(self.max_memory));
592            let gpu = requirements::gpu(request.inputs, request.requirements, request.hints);
593
594            let name = format!(
595                "{id}-{generated}",
596                id = request.id,
597                generated = self
598                    .names
599                    .lock()
600                    .expect("generator should always acquire")
601                    .next()
602                    .expect("generator should never be exhausted")
603            );
604
605            let task = DockerTask {
606                config: self.config.clone(),
607                request,
608                backend: self.inner.clone(),
609                name,
610                max_cpu,
611                max_memory,
612                gpu,
613                cancellation: self.cancellation.clone(),
614            };
615
616            match self.manager.run(cpu, memory, task.run()).await? {
617                Some(res) => {
618                    // The task completed, perform cleanup on unix platforms
619                    #[cfg(unix)]
620                    {
621                        let name = format!(
622                            "docker-chown-{id}",
623                            id = self
624                                .names
625                                .lock()
626                                .expect("generator should always acquire")
627                                .next()
628                                .expect("generator should never be exhausted")
629                        );
630
631                        let task = CleanupTask {
632                            name,
633                            work_dir: res.work_dir.clone(),
634                            backend: self.inner.clone(),
635                            cancellation: self.cancellation.clone(),
636                        };
637
638                        if let Err(e) = self
639                            .manager
640                            .run(CLEANUP_TASK_CPU, CLEANUP_TASK_MEMORY, task.run())
641                            .await
642                        {
643                            tracing::error!("Docker backend cleanup failed: {e:#}");
644                        }
645                    }
646
647                    Ok(Some(res))
648                }
649                None => Ok(None),
650            }
651        }
652        .boxed()
653    }
654}