wdl_engine/backend/
docker.rs

1//! Implementation of the Docker backend.
2
3use std::collections::HashMap;
4use std::fs;
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::Mutex;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::anyhow;
12use anyhow::bail;
13use crankshaft::config::backend;
14use crankshaft::engine::Task;
15use crankshaft::engine::service::name::GeneratorIterator;
16use crankshaft::engine::service::name::UniqueAlphanumeric;
17use crankshaft::engine::service::runner::Backend;
18use crankshaft::engine::service::runner::backend::docker;
19use crankshaft::engine::task::Execution;
20use crankshaft::engine::task::Input;
21use crankshaft::engine::task::Output;
22use crankshaft::engine::task::Resources;
23use crankshaft::engine::task::input::Contents;
24use crankshaft::engine::task::input::Type as InputType;
25use crankshaft::engine::task::output::Type as OutputType;
26use futures::FutureExt;
27use futures::future::BoxFuture;
28use nonempty::NonEmpty;
29use tokio::sync::oneshot;
30use tokio::task::JoinSet;
31use tokio_util::sync::CancellationToken;
32use tracing::info;
33use url::Url;
34
35use super::TaskExecutionBackend;
36use super::TaskExecutionConstraints;
37use super::TaskExecutionEvents;
38use super::TaskExecutionResult;
39use super::TaskManager;
40use super::TaskManagerRequest;
41use super::TaskSpawnRequest;
42use crate::COMMAND_FILE_NAME;
43use crate::InputTrie;
44use crate::ONE_GIBIBYTE;
45use crate::PrimitiveValue;
46use crate::STDERR_FILE_NAME;
47use crate::STDOUT_FILE_NAME;
48use crate::Value;
49use crate::WORK_DIR_NAME;
50use crate::config::DEFAULT_TASK_SHELL;
51use crate::config::DockerBackendConfig;
52use crate::config::TaskConfig;
53use crate::http::Downloader;
54use crate::http::HttpDownloader;
55use crate::http::Location;
56use crate::path::EvaluationPath;
57use crate::v1::container;
58use crate::v1::cpu;
59use crate::v1::max_cpu;
60use crate::v1::max_memory;
61use crate::v1::memory;
62
63/// The number of initial expected task names.
64///
65/// This controls the initial size of the bloom filter and how many names are
66/// prepopulated into the name generator.
67const INITIAL_EXPECTED_NAMES: usize = 1000;
68
69/// The root guest path for inputs.
70const GUEST_INPUTS_DIR: &str = "/mnt/inputs";
71
72/// The guest working directory.
73const GUEST_WORK_DIR: &str = "/mnt/work";
74
75/// The guest path for the command file.
76const GUEST_COMMAND_PATH: &str = "/mnt/command";
77
78/// The path to the container's stdout.
79const GUEST_STDOUT_PATH: &str = "/stdout";
80
81/// The path to the container's stderr.
82const GUEST_STDERR_PATH: &str = "/stderr";
83
84/// This request contains the requested cpu and memory reservations for the task
85/// as well as the result receiver channel.
86#[derive(Debug)]
87struct DockerTaskRequest {
88    /// The inner task spawn request.
89    inner: TaskSpawnRequest,
90    /// The underlying Crankshaft backend.
91    backend: Arc<docker::Backend>,
92    /// The name of the task.
93    name: String,
94    /// The optional shell to use.
95    shell: Arc<Option<String>>,
96    /// The requested container for the task.
97    container: String,
98    /// The requested CPU reservation for the task.
99    cpu: f64,
100    /// The requested memory reservation for the task, in bytes.
101    memory: u64,
102    /// The requested maximum CPU limit for the task.
103    max_cpu: Option<f64>,
104    /// The requested maximum memory limit for the task, in bytes.
105    max_memory: Option<u64>,
106    /// The cancellation token for the request.
107    token: CancellationToken,
108}
109
110impl TaskManagerRequest for DockerTaskRequest {
111    fn cpu(&self) -> f64 {
112        self.cpu
113    }
114
115    fn memory(&self) -> u64 {
116        self.memory
117    }
118
119    async fn run(self, spawned: oneshot::Sender<()>) -> Result<TaskExecutionResult> {
120        // Create the working directory
121        let work_dir = self.inner.attempt_dir().join(WORK_DIR_NAME);
122        fs::create_dir_all(&work_dir).with_context(|| {
123            format!(
124                "failed to create directory `{path}`",
125                path = work_dir.display()
126            )
127        })?;
128
129        // Write the evaluated command to disk
130        // This is done even for remote execution so that a copy exists locally
131        let command_path = self.inner.attempt_dir().join(COMMAND_FILE_NAME);
132        fs::write(&command_path, self.inner.command()).with_context(|| {
133            format!(
134                "failed to write command contents to `{path}`",
135                path = command_path.display()
136            )
137        })?;
138
139        // Allocate the inputs, which will always be, at most, the number of inputs plus
140        // the working directory and command
141        let mut inputs = Vec::with_capacity(self.inner.inputs().len() + 2);
142        for input in self.inner.inputs().iter() {
143            if let Some(guest_path) = input.guest_path() {
144                let location = input.location().expect("all inputs should have localized");
145
146                if location.exists() {
147                    inputs.push(
148                        Input::builder()
149                            .path(guest_path)
150                            .contents(Contents::Path(location.into()))
151                            .ty(input.kind())
152                            .read_only(true)
153                            .build(),
154                    );
155                }
156            }
157        }
158
159        // Add an input for the work directory
160        inputs.push(
161            Input::builder()
162                .path(GUEST_WORK_DIR)
163                .contents(Contents::Path(work_dir.to_path_buf()))
164                .ty(InputType::Directory)
165                .read_only(false)
166                .build(),
167        );
168
169        // Add an input for the command
170        inputs.push(
171            Input::builder()
172                .path(GUEST_COMMAND_PATH)
173                .contents(Contents::Path(command_path.to_path_buf()))
174                .ty(InputType::File)
175                .read_only(true)
176                .build(),
177        );
178
179        let stdout_path = self.inner.attempt_dir().join(STDOUT_FILE_NAME);
180        let stderr_path = self.inner.attempt_dir().join(STDERR_FILE_NAME);
181
182        let outputs = vec![
183            Output::builder()
184                .path(GUEST_STDOUT_PATH)
185                .url(Url::from_file_path(&stdout_path).expect("path should be absolute"))
186                .ty(OutputType::File)
187                .build(),
188            Output::builder()
189                .path(GUEST_STDERR_PATH)
190                .url(Url::from_file_path(&stderr_path).expect("path should be absolute"))
191                .ty(OutputType::File)
192                .build(),
193        ];
194
195        let task = Task::builder()
196            .name(self.name)
197            .executions(NonEmpty::new(
198                Execution::builder()
199                    .image(&self.container)
200                    .program(self.shell.as_deref().unwrap_or(DEFAULT_TASK_SHELL))
201                    .args(["-C".to_string(), GUEST_COMMAND_PATH.to_string()])
202                    .work_dir(GUEST_WORK_DIR)
203                    .env({
204                        let mut final_env = indexmap::IndexMap::new();
205                        for (k, v) in self.inner.env() {
206                            let guest_path = self
207                                .inner
208                                .inputs()
209                                .iter()
210                                .find(|input| input.path().to_str() == Some(v))
211                                .and_then(|input| input.guest_path());
212
213                            final_env.insert(k.clone(), guest_path.unwrap_or(v).to_string());
214                        }
215                        final_env
216                    })
217                    .stdout(GUEST_STDOUT_PATH)
218                    .stderr(GUEST_STDERR_PATH)
219                    .build(),
220            ))
221            .inputs(inputs)
222            .outputs(outputs)
223            .resources(
224                Resources::builder()
225                    .cpu(self.cpu)
226                    .maybe_cpu_limit(self.max_cpu)
227                    .ram(self.memory as f64 / ONE_GIBIBYTE)
228                    .maybe_ram_limit(self.max_memory.map(|m| m as f64 / ONE_GIBIBYTE))
229                    .build(),
230            )
231            .build();
232
233        let statuses = self
234            .backend
235            .run(task, Some(spawned), self.token.clone())
236            .map_err(|e| anyhow!("{e:#}"))?
237            .await
238            .map_err(|e| anyhow!("{e:#}"))?;
239
240        assert_eq!(statuses.len(), 1, "there should only be one exit status");
241        let status = statuses.first();
242
243        Ok(TaskExecutionResult {
244            inputs: self.inner.info.inputs,
245            exit_code: status.code().expect("should have exit code"),
246            work_dir: EvaluationPath::Local(work_dir),
247            stdout: PrimitiveValue::new_file(
248                stdout_path
249                    .into_os_string()
250                    .into_string()
251                    .expect("path should be UTF-8"),
252            )
253            .into(),
254            stderr: PrimitiveValue::new_file(
255                stderr_path
256                    .into_os_string()
257                    .into_string()
258                    .expect("path should be UTF-8"),
259            )
260            .into(),
261        })
262    }
263}
264
265/// Represents the Docker backend.
266pub struct DockerBackend {
267    /// The underlying Crankshaft backend.
268    inner: Arc<docker::Backend>,
269    /// The shell to use.
270    shell: Arc<Option<String>>,
271    /// The default container to use.
272    container: Option<String>,
273    /// The maximum amount of concurrency supported.
274    max_concurrency: u64,
275    /// The maximum CPUs for any of one node.
276    max_cpu: u64,
277    /// The maximum memory for any of one node.
278    max_memory: u64,
279    /// The task manager for the backend.
280    manager: TaskManager<DockerTaskRequest>,
281    /// The name generator for tasks.
282    generator: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
283}
284
285impl DockerBackend {
286    /// Constructs a new Docker task execution backend with the given
287    /// configuration.
288    pub async fn new(task: &TaskConfig, config: &DockerBackendConfig) -> Result<Self> {
289        task.validate()?;
290        config.validate()?;
291
292        info!("initializing Docker backend");
293
294        let backend = docker::Backend::initialize_default_with(
295            backend::docker::Config::builder()
296                .cleanup(config.cleanup)
297                .build(),
298        )
299        .await
300        .map_err(|e| anyhow!("{e:#}"))
301        .context("failed to initialize Docker backend")?;
302
303        let resources = *backend.resources();
304        let cpu = resources.cpu();
305        let max_cpu = resources.max_cpu();
306        let memory = resources.memory();
307        let max_memory = resources.max_memory();
308
309        // If a service is being used, then we're going to be spawning into a cluster
310        // For the purposes of resource tracking, treat it as unlimited resources and
311        // let Docker handle resource allocation
312        let manager = if resources.use_service() {
313            TaskManager::new_unlimited(max_cpu, max_memory)
314        } else {
315            TaskManager::new(cpu, max_cpu, memory, max_memory)
316        };
317
318        Ok(Self {
319            inner: Arc::new(backend),
320            shell: Arc::new(task.shell.clone()),
321            container: task.shell.clone(),
322            max_concurrency: cpu,
323            max_cpu,
324            max_memory,
325            manager,
326            generator: Arc::new(Mutex::new(GeneratorIterator::new(
327                UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
328                INITIAL_EXPECTED_NAMES,
329            ))),
330        })
331    }
332}
333
334impl TaskExecutionBackend for DockerBackend {
335    fn max_concurrency(&self) -> u64 {
336        self.max_concurrency
337    }
338
339    fn constraints(
340        &self,
341        requirements: &HashMap<String, Value>,
342        _: &HashMap<String, Value>,
343    ) -> Result<TaskExecutionConstraints> {
344        let container = container(requirements, self.container.as_deref());
345
346        let cpu = cpu(requirements);
347        if (self.max_cpu as f64) < cpu {
348            bail!(
349                "task requires at least {cpu} CPU{s}, but the execution backend has a maximum of \
350                 {max_cpu}",
351                s = if cpu == 1.0 { "" } else { "s" },
352                max_cpu = self.max_cpu,
353            );
354        }
355
356        let memory = memory(requirements)?;
357        if self.max_memory < memory as u64 {
358            // Display the error in GiB, as it is the most common unit for memory
359            let memory = memory as f64 / ONE_GIBIBYTE;
360            let max_memory = self.max_memory as f64 / ONE_GIBIBYTE;
361
362            bail!(
363                "task requires at least {memory} GiB of memory, but the execution backend has a \
364                 maximum of {max_memory} GiB",
365            );
366        }
367
368        Ok(TaskExecutionConstraints {
369            container: Some(container.into_owned()),
370            cpu,
371            memory,
372            gpu: Default::default(),
373            fpga: Default::default(),
374            disks: Default::default(),
375        })
376    }
377
378    fn guest_work_dir(&self) -> Option<&Path> {
379        Some(Path::new(GUEST_WORK_DIR))
380    }
381
382    fn localize_inputs<'a, 'b, 'c, 'd>(
383        &'a self,
384        downloader: &'b HttpDownloader,
385        inputs: &'c mut [crate::eval::Input],
386    ) -> BoxFuture<'d, Result<()>>
387    where
388        'a: 'd,
389        'b: 'd,
390        'c: 'd,
391        Self: 'd,
392    {
393        async move {
394            // Construct a trie for mapping input guest paths
395            let mut trie = InputTrie::default();
396            for input in inputs.iter() {
397                trie.insert(input)?;
398            }
399
400            for (index, guest_path) in trie.calculate_guest_paths(GUEST_INPUTS_DIR)? {
401                if let Some(input) = inputs.get_mut(index) {
402                    input.set_guest_path(guest_path);
403                } else {
404                    bail!("invalid index {} returned from trie", index);
405                }
406            }
407
408            // Localize all inputs
409            let mut downloads = JoinSet::new();
410            for (idx, input) in inputs.iter_mut().enumerate() {
411                match input.path() {
412                    EvaluationPath::Local(path) => {
413                        input.set_location(Location::Path(path.clone().into()));
414                    }
415                    EvaluationPath::Remote(url) => {
416                        let downloader = downloader.clone();
417                        let url = url.clone();
418                        downloads.spawn(async move {
419                            let location_result = downloader.download(&url).await;
420
421                            match location_result {
422                                Ok(location) => Ok((idx, location.into_owned())),
423                                Err(e) => bail!("failed to localize `{url}`: {e:?}"),
424                            }
425                        });
426                    }
427                }
428            }
429
430            while let Some(result) = downloads.join_next().await {
431                match result {
432                    Ok(Ok((idx, location))) => {
433                        inputs
434                            .get_mut(idx)
435                            .expect("index from should be valid")
436                            .set_location(location);
437                    }
438                    Ok(Err(e)) => {
439                        // Futures are aborted when the `JoinSet` is dropped.
440                        bail!(e)
441                    }
442                    Err(e) => {
443                        // Futures are aborted when the `JoinSet` is dropped.
444                        bail!("download task failed: {e:?}")
445                    }
446                }
447            }
448
449            Ok(())
450        }
451        .boxed()
452    }
453
454    fn spawn(
455        &self,
456        request: TaskSpawnRequest,
457        token: CancellationToken,
458    ) -> Result<TaskExecutionEvents> {
459        let (spawned_tx, spawned_rx) = oneshot::channel();
460        let (completed_tx, completed_rx) = oneshot::channel();
461
462        let requirements = request.requirements();
463        let hints = request.hints();
464
465        let container = container(requirements, self.container.as_deref()).into_owned();
466        let cpu = cpu(requirements);
467        let memory = memory(requirements)? as u64;
468        let max_cpu = max_cpu(hints);
469        let max_memory = max_memory(hints)?.map(|i| i as u64);
470
471        let name = format!(
472            "{id}-{generated}",
473            id = request.id(),
474            generated = self
475                .generator
476                .lock()
477                .expect("generator should always acquire")
478                .next()
479                .expect("generator should never be exhausted")
480        );
481        self.manager.send(
482            DockerTaskRequest {
483                inner: request,
484                shell: self.shell.clone(),
485                backend: self.inner.clone(),
486                name,
487                container,
488                cpu,
489                memory,
490                max_cpu,
491                max_memory,
492                token,
493            },
494            spawned_tx,
495            completed_tx,
496        );
497
498        Ok(TaskExecutionEvents {
499            spawned: spawned_rx,
500            completed: completed_rx,
501        })
502    }
503
504    #[cfg(unix)]
505    fn cleanup<'a, 'b, 'c>(
506        &'a self,
507        output_dir: &'b Path,
508        token: CancellationToken,
509    ) -> Option<BoxFuture<'c, ()>>
510    where
511        'a: 'c,
512        'b: 'c,
513        Self: 'c,
514    {
515        /// The guest path for the output directory.
516        const GUEST_OUT_DIR: &str = "/workflow_output";
517
518        /// Amount of CPU to reserve for the cleanup task.
519        const CLEANUP_CPU: f64 = 0.1;
520
521        /// Amount of memory to reserve for the cleanup task.
522        const CLEANUP_MEMORY: f64 = 0.05;
523
524        let backend = self.inner.clone();
525        let generator = self.generator.clone();
526        let output_path = std::path::absolute(output_dir).expect("failed to get absolute path");
527        if !output_path.is_dir() {
528            info!("output directory does not exist: skipping cleanup");
529            return None;
530        }
531
532        Some(
533            async move {
534                let result = async {
535                    let (uid, gid) = unsafe { (libc::getuid(), libc::getgid()) };
536                    let ownership = format!("{uid}:{gid}");
537                    let output_mount = Input::builder()
538                        .path(GUEST_OUT_DIR)
539                        .contents(Contents::Path(output_path.clone()))
540                        .ty(InputType::Directory)
541                        // need write access
542                        .read_only(false)
543                        .build();
544
545                    let name = format!(
546                        "docker-backend-cleanup-{id}",
547                        id = generator
548                            .lock()
549                            .expect("generator should always acquire")
550                            .next()
551                            .expect("generator should never be exhausted")
552                    );
553
554                    let task = Task::builder()
555                        .name(&name)
556                        .executions(NonEmpty::new(
557                            Execution::builder()
558                                .image("alpine:latest")
559                                .program("chown")
560                                .args([
561                                    "-R".to_string(),
562                                    ownership.clone(),
563                                    GUEST_OUT_DIR.to_string(),
564                                ])
565                                .work_dir("/")
566                                .build(),
567                        ))
568                        .inputs([output_mount])
569                        .resources(
570                            Resources::builder()
571                                .cpu(CLEANUP_CPU)
572                                .ram(CLEANUP_MEMORY)
573                                .build(),
574                        )
575                        .build();
576
577                    info!(
578                        "running cleanup task `{name}` to change ownership of `{path}` to \
579                         `{ownership}`",
580                        path = output_path.display(),
581                    );
582
583                    let (spawned_tx, _) = oneshot::channel();
584                    let output_rx = backend
585                        .run(task, Some(spawned_tx), token)
586                        .map_err(|e| anyhow!("failed to submit cleanup task: {e}"))?;
587
588                    let statuses = output_rx
589                        .await
590                        .map_err(|e| anyhow!("failed to run cleanup task: {e}"))?;
591                    let status = statuses.first();
592                    if status.success() {
593                        Ok(())
594                    } else {
595                        bail!(
596                            "failed to chown output directory `{path}`",
597                            path = output_path.display()
598                        );
599                    }
600                }
601                .await;
602
603                if let Err(e) = result {
604                    tracing::error!("cleanup task failed: {e:#}");
605                }
606            }
607            .boxed(),
608        )
609    }
610
611    #[cfg(not(unix))]
612    fn cleanup<'a, 'b, 'c>(&'a self, _: &'b Path, _: CancellationToken) -> Option<BoxFuture<'c, ()>>
613    where
614        'a: 'c,
615        'b: 'c,
616        Self: 'c,
617    {
618        tracing::debug!("cleanup task is not supported on this platform");
619        None
620    }
621}