wdl_engine/backend/
docker.rs

1//! Implementation of the Docker backend.
2
3use std::collections::HashMap;
4use std::fs;
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::Mutex;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::bail;
12use crankshaft::config::backend;
13use crankshaft::engine::Task;
14use crankshaft::engine::service::name::GeneratorIterator;
15use crankshaft::engine::service::name::UniqueAlphanumeric;
16use crankshaft::engine::service::runner::Backend;
17use crankshaft::engine::service::runner::backend::docker;
18use crankshaft::engine::task::Execution;
19use crankshaft::engine::task::Input;
20use crankshaft::engine::task::Output;
21use crankshaft::engine::task::Resources;
22use crankshaft::engine::task::input::Contents;
23use crankshaft::engine::task::input::Type as InputType;
24use crankshaft::engine::task::output::Type as OutputType;
25use crankshaft::events::Event;
26use futures::future::BoxFuture;
27use nonempty::NonEmpty;
28use tokio::sync::broadcast;
29use tokio::sync::oneshot;
30use tokio::sync::oneshot::Receiver;
31use tokio_util::sync::CancellationToken;
32use tracing::info;
33use tracing::warn;
34use url::Url;
35
36use super::TaskExecutionBackend;
37use super::TaskExecutionConstraints;
38use super::TaskExecutionResult;
39use super::TaskManager;
40use super::TaskManagerRequest;
41use super::TaskSpawnRequest;
42use crate::COMMAND_FILE_NAME;
43use crate::ONE_GIBIBYTE;
44use crate::PrimitiveValue;
45use crate::STDERR_FILE_NAME;
46use crate::STDOUT_FILE_NAME;
47use crate::Value;
48use crate::WORK_DIR_NAME;
49use crate::backend::INITIAL_EXPECTED_NAMES;
50use crate::config::Config;
51use crate::config::DEFAULT_TASK_SHELL;
52use crate::config::DockerBackendConfig;
53use crate::config::TaskResourceLimitBehavior;
54use crate::path::EvaluationPath;
55use crate::v1::container;
56use crate::v1::cpu;
57use crate::v1::max_cpu;
58use crate::v1::max_memory;
59use crate::v1::memory;
60
61/// The root guest path for inputs.
62const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs/";
63
64/// The guest working directory.
65const GUEST_WORK_DIR: &str = "/mnt/task/work";
66
67/// The guest path for the command file.
68const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
69
70/// The path to the container's stdout.
71const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
72
73/// The path to the container's stderr.
74const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
75
76/// This request contains the requested cpu and memory reservations for the task
77/// as well as the result receiver channel.
78#[derive(Debug)]
79struct DockerTaskRequest {
80    /// The engine configuration.
81    config: Arc<Config>,
82    /// The inner task spawn request.
83    inner: TaskSpawnRequest,
84    /// The underlying Crankshaft backend.
85    backend: Arc<docker::Backend>,
86    /// The name of the task.
87    name: String,
88    /// The requested container for the task.
89    container: String,
90    /// The requested CPU reservation for the task.
91    cpu: f64,
92    /// The requested memory reservation for the task, in bytes.
93    memory: u64,
94    /// The requested maximum CPU limit for the task.
95    max_cpu: Option<f64>,
96    /// The requested maximum memory limit for the task, in bytes.
97    max_memory: Option<u64>,
98    /// The cancellation token for the request.
99    token: CancellationToken,
100}
101
102impl TaskManagerRequest for DockerTaskRequest {
103    fn cpu(&self) -> f64 {
104        self.cpu
105    }
106
107    fn memory(&self) -> u64 {
108        self.memory
109    }
110
111    async fn run(self) -> Result<TaskExecutionResult> {
112        // Create the working directory
113        let work_dir = self.inner.attempt_dir().join(WORK_DIR_NAME);
114        fs::create_dir_all(&work_dir).with_context(|| {
115            format!(
116                "failed to create directory `{path}`",
117                path = work_dir.display()
118            )
119        })?;
120
121        // Write the evaluated command to disk
122        // This is done even for remote execution so that a copy exists locally
123        let command_path = self.inner.attempt_dir().join(COMMAND_FILE_NAME);
124        fs::write(&command_path, self.inner.command()).with_context(|| {
125            format!(
126                "failed to write command contents to `{path}`",
127                path = command_path.display()
128            )
129        })?;
130
131        // Allocate the inputs, which will always be, at most, the number of inputs plus
132        // the working directory and command
133        let mut inputs = Vec::with_capacity(self.inner.inputs().len() + 2);
134        for input in self.inner.inputs().iter() {
135            let guest_path = input.guest_path().expect("input should have guest path");
136            let local_path = input.local_path().expect("input should be localized");
137
138            // The local path must exist for Docker to mount
139            if !local_path.exists() {
140                bail!(
141                    "cannot mount input `{path}` as it does not exist",
142                    path = local_path.display()
143                );
144            }
145
146            inputs.push(
147                Input::builder()
148                    .path(guest_path.as_str())
149                    .contents(Contents::Path(local_path.into()))
150                    .ty(input.kind())
151                    .read_only(true)
152                    .build(),
153            );
154        }
155
156        // Add an input for the work directory
157        inputs.push(
158            Input::builder()
159                .path(GUEST_WORK_DIR)
160                .contents(Contents::Path(work_dir.to_path_buf()))
161                .ty(InputType::Directory)
162                .read_only(false)
163                .build(),
164        );
165
166        // Add an input for the command
167        inputs.push(
168            Input::builder()
169                .path(GUEST_COMMAND_PATH)
170                .contents(Contents::Path(command_path.to_path_buf()))
171                .ty(InputType::File)
172                .read_only(true)
173                .build(),
174        );
175
176        let stdout_path = self.inner.attempt_dir().join(STDOUT_FILE_NAME);
177        let stderr_path = self.inner.attempt_dir().join(STDERR_FILE_NAME);
178
179        let outputs = vec![
180            Output::builder()
181                .path(GUEST_STDOUT_PATH)
182                .url(Url::from_file_path(&stdout_path).expect("path should be absolute"))
183                .ty(OutputType::File)
184                .build(),
185            Output::builder()
186                .path(GUEST_STDERR_PATH)
187                .url(Url::from_file_path(&stderr_path).expect("path should be absolute"))
188                .ty(OutputType::File)
189                .build(),
190        ];
191
192        let task = Task::builder()
193            .name(self.name)
194            .executions(NonEmpty::new(
195                Execution::builder()
196                    .image(self.container)
197                    .program(
198                        self.config
199                            .task
200                            .shell
201                            .as_deref()
202                            .unwrap_or(DEFAULT_TASK_SHELL),
203                    )
204                    .args([GUEST_COMMAND_PATH.to_string()])
205                    .work_dir(GUEST_WORK_DIR)
206                    .env(self.inner.env().clone())
207                    .stdout(GUEST_STDOUT_PATH)
208                    .stderr(GUEST_STDERR_PATH)
209                    .build(),
210            ))
211            .inputs(inputs)
212            .outputs(outputs)
213            .resources(
214                Resources::builder()
215                    .cpu(self.cpu)
216                    .maybe_cpu_limit(self.max_cpu)
217                    .ram(self.memory as f64 / ONE_GIBIBYTE)
218                    .maybe_ram_limit(self.max_memory.map(|m| m as f64 / ONE_GIBIBYTE))
219                    .build(),
220            )
221            .build();
222
223        let statuses = self.backend.run(task, self.token.clone())?.await?;
224
225        assert_eq!(statuses.len(), 1, "there should only be one exit status");
226        let status = statuses.first();
227
228        Ok(TaskExecutionResult {
229            exit_code: status.code().expect("should have exit code"),
230            work_dir: EvaluationPath::Local(work_dir),
231            stdout: PrimitiveValue::new_file(
232                stdout_path
233                    .into_os_string()
234                    .into_string()
235                    .expect("path should be UTF-8"),
236            )
237            .into(),
238            stderr: PrimitiveValue::new_file(
239                stderr_path
240                    .into_os_string()
241                    .into_string()
242                    .expect("path should be UTF-8"),
243            )
244            .into(),
245        })
246    }
247}
248
249/// Represents the Docker backend.
250pub struct DockerBackend {
251    /// The engine configuration.
252    config: Arc<Config>,
253    /// The underlying Crankshaft backend.
254    inner: Arc<docker::Backend>,
255    /// The maximum amount of concurrency supported.
256    max_concurrency: u64,
257    /// The maximum CPUs for any of one node.
258    max_cpu: u64,
259    /// The maximum memory for any of one node.
260    max_memory: u64,
261    /// The task manager for the backend.
262    manager: TaskManager<DockerTaskRequest>,
263    /// The name generator for tasks.
264    names: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
265}
266
267impl DockerBackend {
268    /// Constructs a new Docker task execution backend with the given
269    /// configuration.
270    ///
271    /// The provided configuration is expected to have already been validated.
272    pub async fn new(
273        config: Arc<Config>,
274        backend_config: &DockerBackendConfig,
275        events: Option<broadcast::Sender<Event>>,
276    ) -> Result<Self> {
277        info!("initializing Docker backend");
278
279        let names = Arc::new(Mutex::new(GeneratorIterator::new(
280            UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
281            INITIAL_EXPECTED_NAMES,
282        )));
283
284        let backend = docker::Backend::initialize_default_with(
285            backend::docker::Config::builder()
286                .cleanup(backend_config.cleanup)
287                .build(),
288            names.clone(),
289            events,
290        )
291        .await
292        .context("failed to initialize Docker backend")?;
293
294        let resources = *backend.resources();
295        let cpu = resources.cpu();
296        let max_cpu = resources.max_cpu();
297        let memory = resources.memory();
298        let max_memory = resources.max_memory();
299
300        // If a service is being used, then we're going to be spawning into a cluster
301        // For the purposes of resource tracking, treat it as unlimited resources and
302        // let Docker handle resource allocation
303        let manager = if resources.use_service() {
304            TaskManager::new_unlimited(max_cpu, max_memory)
305        } else {
306            TaskManager::new(cpu, max_cpu, memory, max_memory)
307        };
308
309        Ok(Self {
310            config,
311            inner: Arc::new(backend),
312            max_concurrency: cpu,
313            max_cpu,
314            max_memory,
315            manager,
316            names,
317        })
318    }
319}
320
321impl TaskExecutionBackend for DockerBackend {
322    fn max_concurrency(&self) -> u64 {
323        self.max_concurrency
324    }
325
326    fn constraints(
327        &self,
328        requirements: &HashMap<String, Value>,
329        _: &HashMap<String, Value>,
330    ) -> Result<TaskExecutionConstraints> {
331        let container = container(requirements, self.config.task.container.as_deref());
332
333        let mut cpu = cpu(requirements);
334        if (self.max_cpu as f64) < cpu {
335            let env_specific = if self.config.suppress_env_specific_output {
336                String::new()
337            } else {
338                format!(
339                    ", but the execution backend has a maximum of {max_cpu}",
340                    max_cpu = self.max_cpu,
341                )
342            };
343            match self.config.task.cpu_limit_behavior {
344                TaskResourceLimitBehavior::TryWithMax => {
345                    warn!(
346                        "task requires at least {cpu} CPU{s}{env_specific}",
347                        s = if cpu == 1.0 { "" } else { "s" },
348                    );
349                    // clamp the reported constraint to what's available
350                    cpu = self.max_cpu as f64;
351                }
352                TaskResourceLimitBehavior::Deny => {
353                    bail!(
354                        "task requires at least {cpu} CPU{s}{env_specific}",
355                        s = if cpu == 1.0 { "" } else { "s" },
356                    );
357                }
358            }
359        }
360
361        let mut memory = memory(requirements)?;
362        if self.max_memory < memory as u64 {
363            let env_specific = if self.config.suppress_env_specific_output {
364                String::new()
365            } else {
366                format!(
367                    ", but the execution backend has a maximum of {max_memory} GiB",
368                    max_memory = self.max_memory as f64 / ONE_GIBIBYTE,
369                )
370            };
371            match self.config.task.memory_limit_behavior {
372                TaskResourceLimitBehavior::TryWithMax => {
373                    warn!(
374                        "task requires at least {memory} GiB of memory{env_specific}",
375                        // Display the error in GiB, as it is the most common unit for memory
376                        memory = memory as f64 / ONE_GIBIBYTE,
377                    );
378                    // clamp the reported constraint to what's available
379                    memory = self.max_memory.try_into().unwrap_or(i64::MAX);
380                }
381                TaskResourceLimitBehavior::Deny => {
382                    bail!(
383                        "task requires at least {memory} GiB of memory{env_specific}",
384                        // Display the error in GiB, as it is the most common unit for memory
385                        memory = memory as f64 / ONE_GIBIBYTE,
386                    );
387                }
388            }
389        }
390
391        Ok(TaskExecutionConstraints {
392            container: Some(container.into_owned()),
393            cpu,
394            memory,
395            gpu: Default::default(),
396            fpga: Default::default(),
397            disks: Default::default(),
398        })
399    }
400
401    fn guest_inputs_dir(&self) -> Option<&'static str> {
402        Some(GUEST_INPUTS_DIR)
403    }
404
405    fn needs_local_inputs(&self) -> bool {
406        true
407    }
408
409    fn spawn(
410        &self,
411        request: TaskSpawnRequest,
412        token: CancellationToken,
413    ) -> Result<Receiver<Result<TaskExecutionResult>>> {
414        let (completed_tx, completed_rx) = oneshot::channel();
415
416        let requirements = request.requirements();
417        let hints = request.hints();
418
419        let container = container(requirements, self.config.task.container.as_deref()).into_owned();
420        let mut cpu = cpu(requirements);
421        if let TaskResourceLimitBehavior::TryWithMax = self.config.task.cpu_limit_behavior {
422            cpu = std::cmp::min(cpu.ceil() as u64, self.max_cpu) as f64;
423        }
424        let mut memory = memory(requirements)? as u64;
425        if let TaskResourceLimitBehavior::TryWithMax = self.config.task.memory_limit_behavior {
426            memory = std::cmp::min(memory, self.max_memory);
427        }
428        let max_cpu = max_cpu(hints);
429        let max_memory = max_memory(hints)?.map(|i| i as u64);
430
431        let name = format!(
432            "{id}-{generated}",
433            id = request.id(),
434            generated = self
435                .names
436                .lock()
437                .expect("generator should always acquire")
438                .next()
439                .expect("generator should never be exhausted")
440        );
441        self.manager.send(
442            DockerTaskRequest {
443                config: self.config.clone(),
444                inner: request,
445                backend: self.inner.clone(),
446                name,
447                container,
448                cpu,
449                memory,
450                max_cpu,
451                max_memory,
452                token,
453            },
454            completed_tx,
455        );
456
457        Ok(completed_rx)
458    }
459
460    #[cfg(unix)]
461    fn cleanup<'a, 'b, 'c>(
462        &'a self,
463        output_dir: &'b Path,
464        token: CancellationToken,
465    ) -> Option<BoxFuture<'c, ()>>
466    where
467        'a: 'c,
468        'b: 'c,
469        Self: 'c,
470    {
471        use anyhow::anyhow;
472        use futures::FutureExt;
473
474        /// The guest path for the output directory.
475        const GUEST_OUT_DIR: &str = "/workflow_output";
476
477        /// Amount of CPU to reserve for the cleanup task.
478        const CLEANUP_CPU: f64 = 0.1;
479
480        /// Amount of memory to reserve for the cleanup task.
481        const CLEANUP_MEMORY: f64 = 0.05;
482
483        let backend = self.inner.clone();
484        let names = self.names.clone();
485        let output_path = std::path::absolute(output_dir).expect("failed to get absolute path");
486        if !output_path.is_dir() {
487            info!("output directory does not exist: skipping cleanup");
488            return None;
489        }
490
491        Some(
492            async move {
493                let result = async {
494                    let (uid, gid) = unsafe { (libc::getuid(), libc::getgid()) };
495                    let ownership = format!("{uid}:{gid}");
496                    let output_mount = Input::builder()
497                        .path(GUEST_OUT_DIR)
498                        .contents(Contents::Path(output_path.clone()))
499                        .ty(InputType::Directory)
500                        // need write access
501                        .read_only(false)
502                        .build();
503
504                    let name = format!(
505                        "docker-backend-cleanup-{id}",
506                        id = names
507                            .lock()
508                            .expect("generator should always acquire")
509                            .next()
510                            .expect("generator should never be exhausted")
511                    );
512
513                    let task = Task::builder()
514                        .name(&name)
515                        .executions(NonEmpty::new(
516                            Execution::builder()
517                                .image("alpine:latest")
518                                .program("chown")
519                                .args([
520                                    "-R".to_string(),
521                                    ownership.clone(),
522                                    GUEST_OUT_DIR.to_string(),
523                                ])
524                                .work_dir("/")
525                                .build(),
526                        ))
527                        .inputs([output_mount])
528                        .resources(
529                            Resources::builder()
530                                .cpu(CLEANUP_CPU)
531                                .ram(CLEANUP_MEMORY)
532                                .build(),
533                        )
534                        .build();
535
536                    info!(
537                        "running cleanup task `{name}` to change ownership of `{path}` to \
538                         `{ownership}`",
539                        path = output_path.display(),
540                    );
541
542                    let output_rx = backend
543                        .run(task, token)
544                        .map_err(|e| anyhow!("failed to submit cleanup task: {e}"))?;
545
546                    let statuses = output_rx
547                        .await
548                        .map_err(|e| anyhow!("failed to run cleanup task: {e}"))?;
549                    let status = statuses.first();
550                    if status.success() {
551                        Ok(())
552                    } else {
553                        bail!(
554                            "failed to chown output directory `{path}`",
555                            path = output_path.display()
556                        );
557                    }
558                }
559                .await;
560
561                if let Err(e) = result {
562                    tracing::error!("cleanup task failed: {e:#}");
563                }
564            }
565            .boxed(),
566        )
567    }
568
569    #[cfg(not(unix))]
570    fn cleanup<'a, 'b, 'c>(&'a self, _: &'b Path, _: CancellationToken) -> Option<BoxFuture<'c, ()>>
571    where
572        'a: 'c,
573        'b: 'c,
574        Self: 'c,
575    {
576        tracing::debug!("cleanup task is not supported on this platform");
577        None
578    }
579}