wdl_engine/backend/
docker.rs

1//! Implementation of the Docker backend.
2
3use std::collections::HashMap;
4use std::fs;
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::Mutex;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::anyhow;
12use anyhow::bail;
13use crankshaft::config::backend;
14use crankshaft::engine::Task;
15use crankshaft::engine::service::name::GeneratorIterator;
16use crankshaft::engine::service::name::UniqueAlphanumeric;
17use crankshaft::engine::service::runner::Backend;
18use crankshaft::engine::service::runner::backend::docker;
19use crankshaft::engine::task::Execution;
20use crankshaft::engine::task::Input;
21use crankshaft::engine::task::Output;
22use crankshaft::engine::task::Resources;
23use crankshaft::engine::task::input::Contents;
24use crankshaft::engine::task::input::Type as InputType;
25use crankshaft::engine::task::output::Type as OutputType;
26use futures::FutureExt;
27use futures::future::BoxFuture;
28use nonempty::NonEmpty;
29use tokio::sync::oneshot;
30use tokio::task::JoinSet;
31use tokio_util::sync::CancellationToken;
32use tracing::info;
33use url::Url;
34
35use super::TaskExecutionBackend;
36use super::TaskExecutionConstraints;
37use super::TaskExecutionEvents;
38use super::TaskExecutionResult;
39use super::TaskManager;
40use super::TaskManagerRequest;
41use super::TaskSpawnRequest;
42use crate::COMMAND_FILE_NAME;
43use crate::InputTrie;
44use crate::ONE_GIBIBYTE;
45use crate::PrimitiveValue;
46use crate::STDERR_FILE_NAME;
47use crate::STDOUT_FILE_NAME;
48use crate::Value;
49use crate::WORK_DIR_NAME;
50use crate::config::Config;
51use crate::config::DEFAULT_TASK_SHELL;
52use crate::config::DockerBackendConfig;
53use crate::http::Downloader;
54use crate::http::HttpDownloader;
55use crate::http::Location;
56use crate::path::EvaluationPath;
57use crate::v1::container;
58use crate::v1::cpu;
59use crate::v1::max_cpu;
60use crate::v1::max_memory;
61use crate::v1::memory;
62
63/// The number of initial expected task names.
64///
65/// This controls the initial size of the bloom filter and how many names are
66/// prepopulated into the name generator.
67const INITIAL_EXPECTED_NAMES: usize = 1000;
68
69/// The root guest path for inputs.
70const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs";
71
72/// The guest working directory.
73const GUEST_WORK_DIR: &str = "/mnt/task/work";
74
75/// The guest path for the command file.
76const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
77
78/// The path to the container's stdout.
79const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
80
81/// The path to the container's stderr.
82const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
83
84/// This request contains the requested cpu and memory reservations for the task
85/// as well as the result receiver channel.
86#[derive(Debug)]
87struct DockerTaskRequest {
88    /// The engine configuration.
89    config: Arc<Config>,
90    /// The inner task spawn request.
91    inner: TaskSpawnRequest,
92    /// The underlying Crankshaft backend.
93    backend: Arc<docker::Backend>,
94    /// The name of the task.
95    name: String,
96    /// The requested container for the task.
97    container: String,
98    /// The requested CPU reservation for the task.
99    cpu: f64,
100    /// The requested memory reservation for the task, in bytes.
101    memory: u64,
102    /// The requested maximum CPU limit for the task.
103    max_cpu: Option<f64>,
104    /// The requested maximum memory limit for the task, in bytes.
105    max_memory: Option<u64>,
106    /// The cancellation token for the request.
107    token: CancellationToken,
108}
109
110impl TaskManagerRequest for DockerTaskRequest {
111    fn cpu(&self) -> f64 {
112        self.cpu
113    }
114
115    fn memory(&self) -> u64 {
116        self.memory
117    }
118
119    async fn run(self, spawned: oneshot::Sender<()>) -> Result<TaskExecutionResult> {
120        // Create the working directory
121        let work_dir = self.inner.attempt_dir().join(WORK_DIR_NAME);
122        fs::create_dir_all(&work_dir).with_context(|| {
123            format!(
124                "failed to create directory `{path}`",
125                path = work_dir.display()
126            )
127        })?;
128
129        // Write the evaluated command to disk
130        // This is done even for remote execution so that a copy exists locally
131        let command_path = self.inner.attempt_dir().join(COMMAND_FILE_NAME);
132        fs::write(&command_path, self.inner.command()).with_context(|| {
133            format!(
134                "failed to write command contents to `{path}`",
135                path = command_path.display()
136            )
137        })?;
138
139        // Allocate the inputs, which will always be, at most, the number of inputs plus
140        // the working directory and command
141        let mut inputs = Vec::with_capacity(self.inner.inputs().len() + 2);
142        for input in self.inner.inputs().iter() {
143            if let Some(guest_path) = input.guest_path() {
144                let location = input.location().expect("all inputs should have localized");
145
146                if location.exists() {
147                    inputs.push(
148                        Input::builder()
149                            .path(guest_path)
150                            .contents(Contents::Path(location.into()))
151                            .ty(input.kind())
152                            .read_only(true)
153                            .build(),
154                    );
155                }
156            }
157        }
158
159        // Add an input for the work directory
160        inputs.push(
161            Input::builder()
162                .path(GUEST_WORK_DIR)
163                .contents(Contents::Path(work_dir.to_path_buf()))
164                .ty(InputType::Directory)
165                .read_only(false)
166                .build(),
167        );
168
169        // Add an input for the command
170        inputs.push(
171            Input::builder()
172                .path(GUEST_COMMAND_PATH)
173                .contents(Contents::Path(command_path.to_path_buf()))
174                .ty(InputType::File)
175                .read_only(true)
176                .build(),
177        );
178
179        let stdout_path = self.inner.attempt_dir().join(STDOUT_FILE_NAME);
180        let stderr_path = self.inner.attempt_dir().join(STDERR_FILE_NAME);
181
182        let outputs = vec![
183            Output::builder()
184                .path(GUEST_STDOUT_PATH)
185                .url(Url::from_file_path(&stdout_path).expect("path should be absolute"))
186                .ty(OutputType::File)
187                .build(),
188            Output::builder()
189                .path(GUEST_STDERR_PATH)
190                .url(Url::from_file_path(&stderr_path).expect("path should be absolute"))
191                .ty(OutputType::File)
192                .build(),
193        ];
194
195        let task = Task::builder()
196            .name(self.name)
197            .executions(NonEmpty::new(
198                Execution::builder()
199                    .image(&self.container)
200                    .program(
201                        self.config
202                            .task
203                            .shell
204                            .as_deref()
205                            .unwrap_or(DEFAULT_TASK_SHELL),
206                    )
207                    .args(["-C".to_string(), GUEST_COMMAND_PATH.to_string()])
208                    .work_dir(GUEST_WORK_DIR)
209                    .env({
210                        let mut final_env = indexmap::IndexMap::new();
211                        for (k, v) in self.inner.env() {
212                            let guest_path = self
213                                .inner
214                                .inputs()
215                                .iter()
216                                .find(|input| input.path().to_str() == Some(v))
217                                .and_then(|input| input.guest_path());
218
219                            final_env.insert(k.clone(), guest_path.unwrap_or(v).to_string());
220                        }
221                        final_env
222                    })
223                    .stdout(GUEST_STDOUT_PATH)
224                    .stderr(GUEST_STDERR_PATH)
225                    .build(),
226            ))
227            .inputs(inputs)
228            .outputs(outputs)
229            .resources(
230                Resources::builder()
231                    .cpu(self.cpu)
232                    .maybe_cpu_limit(self.max_cpu)
233                    .ram(self.memory as f64 / ONE_GIBIBYTE)
234                    .maybe_ram_limit(self.max_memory.map(|m| m as f64 / ONE_GIBIBYTE))
235                    .build(),
236            )
237            .build();
238
239        let statuses = self
240            .backend
241            .run(task, Some(spawned), self.token.clone())
242            .map_err(|e| anyhow!("{e:#}"))?
243            .await
244            .map_err(|e| anyhow!("{e:#}"))?;
245
246        assert_eq!(statuses.len(), 1, "there should only be one exit status");
247        let status = statuses.first();
248
249        Ok(TaskExecutionResult {
250            inputs: self.inner.info.inputs,
251            exit_code: status.code().expect("should have exit code"),
252            work_dir: EvaluationPath::Local(work_dir),
253            stdout: PrimitiveValue::new_file(
254                stdout_path
255                    .into_os_string()
256                    .into_string()
257                    .expect("path should be UTF-8"),
258            )
259            .into(),
260            stderr: PrimitiveValue::new_file(
261                stderr_path
262                    .into_os_string()
263                    .into_string()
264                    .expect("path should be UTF-8"),
265            )
266            .into(),
267        })
268    }
269}
270
271/// Represents the Docker backend.
272pub struct DockerBackend {
273    /// The engine configuration.
274    config: Arc<Config>,
275    /// The underlying Crankshaft backend.
276    inner: Arc<docker::Backend>,
277    /// The maximum amount of concurrency supported.
278    max_concurrency: u64,
279    /// The maximum CPUs for any of one node.
280    max_cpu: u64,
281    /// The maximum memory for any of one node.
282    max_memory: u64,
283    /// The task manager for the backend.
284    manager: TaskManager<DockerTaskRequest>,
285    /// The name generator for tasks.
286    generator: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
287}
288
289impl DockerBackend {
290    /// Constructs a new Docker task execution backend with the given
291    /// configuration.
292    ///
293    /// The provided configuration is expected to have already been validated.
294    pub async fn new(config: Arc<Config>, backend_config: &DockerBackendConfig) -> Result<Self> {
295        info!("initializing Docker backend");
296
297        let backend = docker::Backend::initialize_default_with(
298            backend::docker::Config::builder()
299                .cleanup(backend_config.cleanup)
300                .build(),
301        )
302        .await
303        .map_err(|e| anyhow!("{e:#}"))
304        .context("failed to initialize Docker backend")?;
305
306        let resources = *backend.resources();
307        let cpu = resources.cpu();
308        let max_cpu = resources.max_cpu();
309        let memory = resources.memory();
310        let max_memory = resources.max_memory();
311
312        // If a service is being used, then we're going to be spawning into a cluster
313        // For the purposes of resource tracking, treat it as unlimited resources and
314        // let Docker handle resource allocation
315        let manager = if resources.use_service() {
316            TaskManager::new_unlimited(max_cpu, max_memory)
317        } else {
318            TaskManager::new(cpu, max_cpu, memory, max_memory)
319        };
320
321        Ok(Self {
322            config,
323            inner: Arc::new(backend),
324            max_concurrency: cpu,
325            max_cpu,
326            max_memory,
327            manager,
328            generator: Arc::new(Mutex::new(GeneratorIterator::new(
329                UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
330                INITIAL_EXPECTED_NAMES,
331            ))),
332        })
333    }
334}
335
336impl TaskExecutionBackend for DockerBackend {
337    fn max_concurrency(&self) -> u64 {
338        self.max_concurrency
339    }
340
341    fn constraints(
342        &self,
343        requirements: &HashMap<String, Value>,
344        _: &HashMap<String, Value>,
345    ) -> Result<TaskExecutionConstraints> {
346        let container = container(requirements, self.config.task.container.as_deref());
347
348        let cpu = cpu(requirements);
349        if (self.max_cpu as f64) < cpu {
350            bail!(
351                "task requires at least {cpu} CPU{s}, but the execution backend has a maximum of \
352                 {max_cpu}",
353                s = if cpu == 1.0 { "" } else { "s" },
354                max_cpu = self.max_cpu,
355            );
356        }
357
358        let memory = memory(requirements)?;
359        if self.max_memory < memory as u64 {
360            // Display the error in GiB, as it is the most common unit for memory
361            let memory = memory as f64 / ONE_GIBIBYTE;
362            let max_memory = self.max_memory as f64 / ONE_GIBIBYTE;
363
364            bail!(
365                "task requires at least {memory} GiB of memory, but the execution backend has a \
366                 maximum of {max_memory} GiB",
367            );
368        }
369
370        Ok(TaskExecutionConstraints {
371            container: Some(container.into_owned()),
372            cpu,
373            memory,
374            gpu: Default::default(),
375            fpga: Default::default(),
376            disks: Default::default(),
377        })
378    }
379
380    fn guest_work_dir(&self) -> Option<&Path> {
381        Some(Path::new(GUEST_WORK_DIR))
382    }
383
384    fn localize_inputs<'a, 'b, 'c, 'd>(
385        &'a self,
386        downloader: &'b HttpDownloader,
387        inputs: &'c mut [crate::eval::Input],
388    ) -> BoxFuture<'d, Result<()>>
389    where
390        'a: 'd,
391        'b: 'd,
392        'c: 'd,
393        Self: 'd,
394    {
395        async move {
396            // Construct a trie for mapping input guest paths
397            let mut trie = InputTrie::default();
398            for input in inputs.iter() {
399                trie.insert(input)?;
400            }
401
402            for (index, guest_path) in trie.calculate_guest_paths(GUEST_INPUTS_DIR)? {
403                if let Some(input) = inputs.get_mut(index) {
404                    input.set_guest_path(guest_path);
405                } else {
406                    bail!("invalid index {} returned from trie", index);
407                }
408            }
409
410            // Localize all inputs
411            let mut downloads = JoinSet::new();
412            for (idx, input) in inputs.iter_mut().enumerate() {
413                match input.path() {
414                    EvaluationPath::Local(path) => {
415                        input.set_location(Location::Path(path.clone().into()));
416                    }
417                    EvaluationPath::Remote(url) => {
418                        let downloader = downloader.clone();
419                        let url = url.clone();
420                        downloads.spawn(async move {
421                            let location_result = downloader.download(&url).await;
422
423                            match location_result {
424                                Ok(location) => Ok((idx, location.into_owned())),
425                                Err(e) => bail!("failed to localize `{url}`: {e:?}"),
426                            }
427                        });
428                    }
429                }
430            }
431
432            while let Some(result) = downloads.join_next().await {
433                match result {
434                    Ok(Ok((idx, location))) => {
435                        inputs
436                            .get_mut(idx)
437                            .expect("index from should be valid")
438                            .set_location(location);
439                    }
440                    Ok(Err(e)) => {
441                        // Futures are aborted when the `JoinSet` is dropped.
442                        bail!(e)
443                    }
444                    Err(e) => {
445                        // Futures are aborted when the `JoinSet` is dropped.
446                        bail!("download task failed: {e:?}")
447                    }
448                }
449            }
450
451            Ok(())
452        }
453        .boxed()
454    }
455
456    fn spawn(
457        &self,
458        request: TaskSpawnRequest,
459        token: CancellationToken,
460    ) -> Result<TaskExecutionEvents> {
461        let (spawned_tx, spawned_rx) = oneshot::channel();
462        let (completed_tx, completed_rx) = oneshot::channel();
463
464        let requirements = request.requirements();
465        let hints = request.hints();
466
467        let container = container(requirements, self.config.task.container.as_deref()).into_owned();
468        let cpu = cpu(requirements);
469        let memory = memory(requirements)? as u64;
470        let max_cpu = max_cpu(hints);
471        let max_memory = max_memory(hints)?.map(|i| i as u64);
472
473        let name = format!(
474            "{id}-{generated}",
475            id = request.id(),
476            generated = self
477                .generator
478                .lock()
479                .expect("generator should always acquire")
480                .next()
481                .expect("generator should never be exhausted")
482        );
483        self.manager.send(
484            DockerTaskRequest {
485                config: self.config.clone(),
486                inner: request,
487                backend: self.inner.clone(),
488                name,
489                container,
490                cpu,
491                memory,
492                max_cpu,
493                max_memory,
494                token,
495            },
496            spawned_tx,
497            completed_tx,
498        );
499
500        Ok(TaskExecutionEvents {
501            spawned: spawned_rx,
502            completed: completed_rx,
503        })
504    }
505
506    #[cfg(unix)]
507    fn cleanup<'a, 'b, 'c>(
508        &'a self,
509        output_dir: &'b Path,
510        token: CancellationToken,
511    ) -> Option<BoxFuture<'c, ()>>
512    where
513        'a: 'c,
514        'b: 'c,
515        Self: 'c,
516    {
517        /// The guest path for the output directory.
518        const GUEST_OUT_DIR: &str = "/workflow_output";
519
520        /// Amount of CPU to reserve for the cleanup task.
521        const CLEANUP_CPU: f64 = 0.1;
522
523        /// Amount of memory to reserve for the cleanup task.
524        const CLEANUP_MEMORY: f64 = 0.05;
525
526        let backend = self.inner.clone();
527        let generator = self.generator.clone();
528        let output_path = std::path::absolute(output_dir).expect("failed to get absolute path");
529        if !output_path.is_dir() {
530            info!("output directory does not exist: skipping cleanup");
531            return None;
532        }
533
534        Some(
535            async move {
536                let result = async {
537                    let (uid, gid) = unsafe { (libc::getuid(), libc::getgid()) };
538                    let ownership = format!("{uid}:{gid}");
539                    let output_mount = Input::builder()
540                        .path(GUEST_OUT_DIR)
541                        .contents(Contents::Path(output_path.clone()))
542                        .ty(InputType::Directory)
543                        // need write access
544                        .read_only(false)
545                        .build();
546
547                    let name = format!(
548                        "docker-backend-cleanup-{id}",
549                        id = generator
550                            .lock()
551                            .expect("generator should always acquire")
552                            .next()
553                            .expect("generator should never be exhausted")
554                    );
555
556                    let task = Task::builder()
557                        .name(&name)
558                        .executions(NonEmpty::new(
559                            Execution::builder()
560                                .image("alpine:latest")
561                                .program("chown")
562                                .args([
563                                    "-R".to_string(),
564                                    ownership.clone(),
565                                    GUEST_OUT_DIR.to_string(),
566                                ])
567                                .work_dir("/")
568                                .build(),
569                        ))
570                        .inputs([output_mount])
571                        .resources(
572                            Resources::builder()
573                                .cpu(CLEANUP_CPU)
574                                .ram(CLEANUP_MEMORY)
575                                .build(),
576                        )
577                        .build();
578
579                    info!(
580                        "running cleanup task `{name}` to change ownership of `{path}` to \
581                         `{ownership}`",
582                        path = output_path.display(),
583                    );
584
585                    let (spawned_tx, _) = oneshot::channel();
586                    let output_rx = backend
587                        .run(task, Some(spawned_tx), token)
588                        .map_err(|e| anyhow!("failed to submit cleanup task: {e}"))?;
589
590                    let statuses = output_rx
591                        .await
592                        .map_err(|e| anyhow!("failed to run cleanup task: {e}"))?;
593                    let status = statuses.first();
594                    if status.success() {
595                        Ok(())
596                    } else {
597                        bail!(
598                            "failed to chown output directory `{path}`",
599                            path = output_path.display()
600                        );
601                    }
602                }
603                .await;
604
605                if let Err(e) = result {
606                    tracing::error!("cleanup task failed: {e:#}");
607                }
608            }
609            .boxed(),
610        )
611    }
612
613    #[cfg(not(unix))]
614    fn cleanup<'a, 'b, 'c>(&'a self, _: &'b Path, _: CancellationToken) -> Option<BoxFuture<'c, ()>>
615    where
616        'a: 'c,
617        'b: 'c,
618        Self: 'c,
619    {
620        tracing::debug!("cleanup task is not supported on this platform");
621        None
622    }
623}