wdl_engine/backend/
docker.rs

1//! Implementation of the Docker backend.
2
3use std::collections::HashMap;
4use std::fs;
5use std::sync::Arc;
6use std::sync::Mutex;
7
8use anyhow::Context;
9use anyhow::Result;
10use anyhow::bail;
11use crankshaft::config::backend;
12use crankshaft::engine::Task;
13use crankshaft::engine::service::name::GeneratorIterator;
14use crankshaft::engine::service::name::UniqueAlphanumeric;
15use crankshaft::engine::service::runner::Backend;
16use crankshaft::engine::service::runner::backend::docker;
17use crankshaft::engine::task::Execution;
18use crankshaft::engine::task::Input;
19use crankshaft::engine::task::Output;
20use crankshaft::engine::task::Resources;
21use crankshaft::engine::task::input::Contents;
22use crankshaft::engine::task::input::Type as InputType;
23use crankshaft::engine::task::output::Type as OutputType;
24use crankshaft::events::Event;
25use nonempty::NonEmpty;
26use tokio::sync::broadcast;
27use tokio::sync::oneshot;
28use tokio::sync::oneshot::Receiver;
29use tokio_util::sync::CancellationToken;
30use tracing::info;
31use tracing::warn;
32use url::Url;
33
34use super::TaskExecutionBackend;
35use super::TaskExecutionConstraints;
36use super::TaskExecutionResult;
37use super::TaskManager;
38use super::TaskManagerRequest;
39use super::TaskSpawnRequest;
40use crate::COMMAND_FILE_NAME;
41use crate::ONE_GIBIBYTE;
42use crate::PrimitiveValue;
43use crate::STDERR_FILE_NAME;
44use crate::STDOUT_FILE_NAME;
45use crate::Value;
46use crate::WORK_DIR_NAME;
47use crate::backend::INITIAL_EXPECTED_NAMES;
48use crate::config::Config;
49use crate::config::DEFAULT_TASK_SHELL;
50use crate::config::DockerBackendConfig;
51use crate::config::TaskResourceLimitBehavior;
52use crate::path::EvaluationPath;
53use crate::v1::container;
54use crate::v1::cpu;
55use crate::v1::gpu;
56use crate::v1::max_cpu;
57use crate::v1::max_memory;
58use crate::v1::memory;
59
60/// The root guest path for inputs.
61const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs/";
62
63/// The guest working directory.
64const GUEST_WORK_DIR: &str = "/mnt/task/work";
65
66/// The guest path for the command file.
67const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
68
69/// The path to the container's stdout.
70const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
71
72/// The path to the container's stderr.
73const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
74
75/// This request contains the requested cpu and memory reservations for the task
76/// as well as the result receiver channel.
77#[derive(Debug)]
78struct DockerTaskRequest {
79    /// The engine configuration.
80    config: Arc<Config>,
81    /// The inner task spawn request.
82    inner: TaskSpawnRequest,
83    /// The underlying Crankshaft backend.
84    backend: Arc<docker::Backend>,
85    /// The name of the task.
86    name: String,
87    /// The requested container for the task.
88    container: String,
89    /// The requested CPU reservation for the task.
90    cpu: f64,
91    /// The requested memory reservation for the task, in bytes.
92    memory: u64,
93    /// The requested maximum CPU limit for the task.
94    max_cpu: Option<f64>,
95    /// The requested maximum memory limit for the task, in bytes.
96    max_memory: Option<u64>,
97    /// The requested GPU count for the task.
98    gpu: Option<u64>,
99    /// The cancellation token for the request.
100    token: CancellationToken,
101}
102
103impl TaskManagerRequest for DockerTaskRequest {
104    fn cpu(&self) -> f64 {
105        self.cpu
106    }
107
108    fn memory(&self) -> u64 {
109        self.memory
110    }
111
112    async fn run(self) -> Result<TaskExecutionResult> {
113        // Create the working directory
114        let work_dir = self.inner.attempt_dir().join(WORK_DIR_NAME);
115        fs::create_dir_all(&work_dir).with_context(|| {
116            format!(
117                "failed to create directory `{path}`",
118                path = work_dir.display()
119            )
120        })?;
121
122        // On Unix, the work directory must be group writable in case the container uses
123        // a different user/group; the Crankshaft docker backend will automatically add
124        // the current user's egid to the container
125        #[cfg(unix)]
126        {
127            use std::fs::Permissions;
128            use std::fs::set_permissions;
129            use std::os::unix::fs::PermissionsExt;
130            set_permissions(&work_dir, Permissions::from_mode(0o770)).with_context(|| {
131                format!(
132                    "failed to set permissions for work directory `{path}`",
133                    path = work_dir.display()
134                )
135            })?;
136        }
137
138        // Write the evaluated command to disk
139        // This is done even for remote execution so that a copy exists locally
140        let command_path = self.inner.attempt_dir().join(COMMAND_FILE_NAME);
141        fs::write(&command_path, self.inner.command()).with_context(|| {
142            format!(
143                "failed to write command contents to `{path}`",
144                path = command_path.display()
145            )
146        })?;
147
148        // Allocate the inputs, which will always be, at most, the number of inputs plus
149        // the working directory and command
150        let mut inputs = Vec::with_capacity(self.inner.inputs().len() + 2);
151        for input in self.inner.inputs().iter() {
152            let guest_path = input.guest_path().expect("input should have guest path");
153            let local_path = input.local_path().expect("input should be localized");
154
155            // The local path must exist for Docker to mount
156            if !local_path.exists() {
157                bail!(
158                    "cannot mount input `{path}` as it does not exist",
159                    path = local_path.display()
160                );
161            }
162
163            inputs.push(
164                Input::builder()
165                    .path(guest_path.as_str())
166                    .contents(Contents::Path(local_path.into()))
167                    .ty(input.kind())
168                    .read_only(true)
169                    .build(),
170            );
171        }
172
173        // Add an input for the work directory
174        inputs.push(
175            Input::builder()
176                .path(GUEST_WORK_DIR)
177                .contents(Contents::Path(work_dir.to_path_buf()))
178                .ty(InputType::Directory)
179                .read_only(false)
180                .build(),
181        );
182
183        // Add an input for the command
184        inputs.push(
185            Input::builder()
186                .path(GUEST_COMMAND_PATH)
187                .contents(Contents::Path(command_path.to_path_buf()))
188                .ty(InputType::File)
189                .read_only(true)
190                .build(),
191        );
192
193        let stdout_path = self.inner.attempt_dir().join(STDOUT_FILE_NAME);
194        let stderr_path = self.inner.attempt_dir().join(STDERR_FILE_NAME);
195
196        let outputs = vec![
197            Output::builder()
198                .path(GUEST_STDOUT_PATH)
199                .url(Url::from_file_path(&stdout_path).expect("path should be absolute"))
200                .ty(OutputType::File)
201                .build(),
202            Output::builder()
203                .path(GUEST_STDERR_PATH)
204                .url(Url::from_file_path(&stderr_path).expect("path should be absolute"))
205                .ty(OutputType::File)
206                .build(),
207        ];
208
209        let task = Task::builder()
210            .name(self.name)
211            .executions(NonEmpty::new(
212                Execution::builder()
213                    .image(self.container)
214                    .program(
215                        self.config
216                            .task
217                            .shell
218                            .as_deref()
219                            .unwrap_or(DEFAULT_TASK_SHELL),
220                    )
221                    .args([GUEST_COMMAND_PATH.to_string()])
222                    .work_dir(GUEST_WORK_DIR)
223                    .env(self.inner.env().clone())
224                    .stdout(GUEST_STDOUT_PATH)
225                    .stderr(GUEST_STDERR_PATH)
226                    .build(),
227            ))
228            .inputs(inputs)
229            .outputs(outputs)
230            .resources(
231                Resources::builder()
232                    .cpu(self.cpu)
233                    .maybe_cpu_limit(self.max_cpu)
234                    .ram(self.memory as f64 / ONE_GIBIBYTE)
235                    .maybe_ram_limit(self.max_memory.map(|m| m as f64 / ONE_GIBIBYTE))
236                    .maybe_gpu(self.gpu)
237                    .build(),
238            )
239            .build();
240
241        let statuses = self.backend.run(task, self.token.clone())?.await?;
242
243        assert_eq!(statuses.len(), 1, "there should only be one exit status");
244        let status = statuses.first();
245
246        Ok(TaskExecutionResult {
247            exit_code: status.code().expect("should have exit code"),
248            work_dir: EvaluationPath::Local(work_dir),
249            stdout: PrimitiveValue::new_file(
250                stdout_path
251                    .into_os_string()
252                    .into_string()
253                    .expect("path should be UTF-8"),
254            )
255            .into(),
256            stderr: PrimitiveValue::new_file(
257                stderr_path
258                    .into_os_string()
259                    .into_string()
260                    .expect("path should be UTF-8"),
261            )
262            .into(),
263        })
264    }
265}
266
267/// Represents the Docker backend.
268pub struct DockerBackend {
269    /// The engine configuration.
270    config: Arc<Config>,
271    /// The underlying Crankshaft backend.
272    inner: Arc<docker::Backend>,
273    /// The maximum amount of concurrency supported.
274    max_concurrency: u64,
275    /// The maximum CPUs for any of one node.
276    max_cpu: u64,
277    /// The maximum memory for any of one node.
278    max_memory: u64,
279    /// The task manager for the backend.
280    manager: TaskManager<DockerTaskRequest>,
281    /// The name generator for tasks.
282    names: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
283}
284
285impl DockerBackend {
286    /// Constructs a new Docker task execution backend with the given
287    /// configuration.
288    ///
289    /// The provided configuration is expected to have already been validated.
290    pub async fn new(
291        config: Arc<Config>,
292        backend_config: &DockerBackendConfig,
293        events: Option<broadcast::Sender<Event>>,
294    ) -> Result<Self> {
295        info!("initializing Docker backend");
296
297        let names = Arc::new(Mutex::new(GeneratorIterator::new(
298            UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
299            INITIAL_EXPECTED_NAMES,
300        )));
301
302        let backend = docker::Backend::initialize_default_with(
303            backend::docker::Config::builder()
304                .cleanup(backend_config.cleanup)
305                .build(),
306            names.clone(),
307            events,
308        )
309        .await
310        .context("failed to initialize Docker backend")?;
311
312        let resources = *backend.resources();
313        let cpu = resources.cpu();
314        let max_cpu = resources.max_cpu();
315        let memory = resources.memory();
316        let max_memory = resources.max_memory();
317
318        // If a service is being used, then we're going to be spawning into a cluster
319        // For the purposes of resource tracking, treat it as unlimited resources and
320        // let Docker handle resource allocation
321        let manager = if resources.use_service() {
322            TaskManager::new_unlimited(max_cpu, max_memory)
323        } else {
324            TaskManager::new(cpu, max_cpu, memory, max_memory)
325        };
326
327        Ok(Self {
328            config,
329            inner: Arc::new(backend),
330            max_concurrency: cpu,
331            max_cpu,
332            max_memory,
333            manager,
334            names,
335        })
336    }
337}
338
339impl TaskExecutionBackend for DockerBackend {
340    fn max_concurrency(&self) -> u64 {
341        self.max_concurrency
342    }
343
344    fn constraints(
345        &self,
346        requirements: &HashMap<String, Value>,
347        hints: &HashMap<String, Value>,
348    ) -> Result<TaskExecutionConstraints> {
349        let container = container(requirements, self.config.task.container.as_deref());
350
351        let mut cpu = cpu(requirements);
352        if (self.max_cpu as f64) < cpu {
353            let env_specific = if self.config.suppress_env_specific_output {
354                String::new()
355            } else {
356                format!(
357                    ", but the execution backend has a maximum of {max_cpu}",
358                    max_cpu = self.max_cpu,
359                )
360            };
361            match self.config.task.cpu_limit_behavior {
362                TaskResourceLimitBehavior::TryWithMax => {
363                    warn!(
364                        "task requires at least {cpu} CPU{s}{env_specific}",
365                        s = if cpu == 1.0 { "" } else { "s" },
366                    );
367                    // clamp the reported constraint to what's available
368                    cpu = self.max_cpu as f64;
369                }
370                TaskResourceLimitBehavior::Deny => {
371                    bail!(
372                        "task requires at least {cpu} CPU{s}{env_specific}",
373                        s = if cpu == 1.0 { "" } else { "s" },
374                    );
375                }
376            }
377        }
378
379        let mut memory = memory(requirements)?;
380        if self.max_memory < memory as u64 {
381            let env_specific = if self.config.suppress_env_specific_output {
382                String::new()
383            } else {
384                format!(
385                    ", but the execution backend has a maximum of {max_memory} GiB",
386                    max_memory = self.max_memory as f64 / ONE_GIBIBYTE,
387                )
388            };
389            match self.config.task.memory_limit_behavior {
390                TaskResourceLimitBehavior::TryWithMax => {
391                    warn!(
392                        "task requires at least {memory} GiB of memory{env_specific}",
393                        // Display the error in GiB, as it is the most common unit for memory
394                        memory = memory as f64 / ONE_GIBIBYTE,
395                    );
396                    // clamp the reported constraint to what's available
397                    memory = self.max_memory.try_into().unwrap_or(i64::MAX);
398                }
399                TaskResourceLimitBehavior::Deny => {
400                    bail!(
401                        "task requires at least {memory} GiB of memory{env_specific}",
402                        // Display the error in GiB, as it is the most common unit for memory
403                        memory = memory as f64 / ONE_GIBIBYTE,
404                    );
405                }
406            }
407        }
408
409        // Generate GPU specification strings in the format "<type>-gpu-<index>".
410        // Each string represents one allocated GPU, indexed from 0. The type prefix
411        // (e.g., "nvidia", "amd", "intel") identifies the GPU vendor/driver.
412        // This is the first backend to populate the gpu field; other backends should
413        // follow this format for consistency.
414        let gpu = gpu(requirements, hints)
415            .map(|count| (0..count).map(|i| format!("nvidia-gpu-{i}")).collect())
416            .unwrap_or_default();
417
418        Ok(TaskExecutionConstraints {
419            container: Some(container.into_owned()),
420            cpu,
421            memory,
422            gpu,
423            fpga: Default::default(),
424            disks: Default::default(),
425        })
426    }
427
428    fn guest_inputs_dir(&self) -> Option<&'static str> {
429        Some(GUEST_INPUTS_DIR)
430    }
431
432    fn needs_local_inputs(&self) -> bool {
433        true
434    }
435
436    fn spawn(
437        &self,
438        request: TaskSpawnRequest,
439        token: CancellationToken,
440    ) -> Result<Receiver<Result<TaskExecutionResult>>> {
441        let (completed_tx, completed_rx) = oneshot::channel();
442
443        let requirements = request.requirements();
444        let hints = request.hints();
445
446        let container = container(requirements, self.config.task.container.as_deref()).into_owned();
447        let mut cpu = cpu(requirements);
448        if let TaskResourceLimitBehavior::TryWithMax = self.config.task.cpu_limit_behavior {
449            cpu = std::cmp::min(cpu.ceil() as u64, self.max_cpu) as f64;
450        }
451        let mut memory = memory(requirements)? as u64;
452        if let TaskResourceLimitBehavior::TryWithMax = self.config.task.memory_limit_behavior {
453            memory = std::cmp::min(memory, self.max_memory);
454        }
455        let max_cpu = max_cpu(hints);
456        let max_memory = max_memory(hints)?.map(|i| i as u64);
457        let gpu = gpu(requirements, hints);
458
459        let name = format!(
460            "{id}-{generated}",
461            id = request.id(),
462            generated = self
463                .names
464                .lock()
465                .expect("generator should always acquire")
466                .next()
467                .expect("generator should never be exhausted")
468        );
469        self.manager.send(
470            DockerTaskRequest {
471                config: self.config.clone(),
472                inner: request,
473                backend: self.inner.clone(),
474                name,
475                container,
476                cpu,
477                memory,
478                max_cpu,
479                max_memory,
480                gpu,
481                token,
482            },
483            completed_tx,
484        );
485
486        Ok(completed_rx)
487    }
488
489    #[cfg(unix)]
490    fn cleanup<'a>(
491        &'a self,
492        work_dir: &'a EvaluationPath,
493        token: CancellationToken,
494    ) -> Option<futures::future::BoxFuture<'a, ()>> {
495        use futures::FutureExt;
496        use tracing::debug;
497
498        /// The guest path for the work directory.
499        const GUEST_WORK_DIR: &str = "/mnt/work";
500        /// Amount of CPU to reserve for the cleanup task.
501        const CLEANUP_CPU: f64 = 0.1;
502        /// Amount of memory to reserve for the cleanup task.
503        const CLEANUP_MEMORY: f64 = 0.05;
504
505        // SAFETY: the work directory is always local for the Docker backend
506        let work_dir = work_dir.as_local().expect("path should be local");
507        assert!(work_dir.is_absolute(), "work directory should be absolute");
508
509        let backend = self.inner.clone();
510        let names = self.names.clone();
511
512        Some(
513            async move {
514                let result = async {
515                    let (uid, gid) = unsafe { (libc::geteuid(), libc::getegid()) };
516                    let ownership = format!("{uid}:{gid}");
517
518                    let name = format!(
519                        "docker-backend-cleanup-{id}",
520                        id = names
521                            .lock()
522                            .expect("generator should always acquire")
523                            .next()
524                            .expect("generator should never be exhausted")
525                    );
526
527                    let task = Task::builder()
528                        .name(&name)
529                        .executions(NonEmpty::new(
530                            Execution::builder()
531                                .image("alpine:latest")
532                                .program("chown")
533                                .args([
534                                    "-R".to_string(),
535                                    ownership.clone(),
536                                    GUEST_WORK_DIR.to_string(),
537                                ])
538                                .build(),
539                        ))
540                        .inputs([Input::builder()
541                            .path(GUEST_WORK_DIR)
542                            .contents(Contents::Path(work_dir.to_path_buf()))
543                            .ty(InputType::Directory)
544                            // need write access to chown
545                            .read_only(false)
546                            .build()])
547                        .resources(
548                            Resources::builder()
549                                .cpu(CLEANUP_CPU)
550                                .ram(CLEANUP_MEMORY)
551                                .build(),
552                        )
553                        .build();
554
555                    debug!(
556                        "running cleanup task `{name}` to change ownership of `{path}` to \
557                         `{ownership}`",
558                        path = work_dir.display(),
559                    );
560
561                    let statuses = backend
562                        .run(task, token)
563                        .context("failed to submit cleanup task")?
564                        .await
565                        .context("failed to run cleanup task")?;
566                    let status = statuses.first();
567                    if status.success() {
568                        Ok(())
569                    } else {
570                        bail!(
571                            "failed to chown task work directory `{path}`",
572                            path = work_dir.display()
573                        );
574                    }
575                }
576                .await;
577
578                if let Err(e) = result {
579                    tracing::error!("Docker backend cleanup failed: {e:#}");
580                }
581            }
582            .boxed(),
583        )
584    }
585}