wdl_engine/backend/
docker.rs

1//! Implementation of the Docker backend.
2
3use std::collections::HashMap;
4use std::fs;
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::Mutex;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::anyhow;
12use anyhow::bail;
13use crankshaft::config::backend;
14use crankshaft::engine::Task;
15use crankshaft::engine::service::name::GeneratorIterator;
16use crankshaft::engine::service::name::UniqueAlphanumeric;
17use crankshaft::engine::service::runner::Backend;
18use crankshaft::engine::service::runner::backend::docker;
19use crankshaft::engine::task::Execution;
20use crankshaft::engine::task::Input;
21use crankshaft::engine::task::Output;
22use crankshaft::engine::task::Resources;
23use crankshaft::engine::task::input::Contents;
24use crankshaft::engine::task::input::Type as InputType;
25use crankshaft::engine::task::output::Type as OutputType;
26use futures::FutureExt;
27use futures::future::BoxFuture;
28use nonempty::NonEmpty;
29use tokio::sync::oneshot;
30use tokio::task::JoinSet;
31use tokio_util::sync::CancellationToken;
32use tracing::info;
33use tracing::warn;
34use url::Url;
35
36use super::TaskExecutionBackend;
37use super::TaskExecutionConstraints;
38use super::TaskExecutionEvents;
39use super::TaskExecutionResult;
40use super::TaskManager;
41use super::TaskManagerRequest;
42use super::TaskSpawnRequest;
43use crate::COMMAND_FILE_NAME;
44use crate::InputTrie;
45use crate::ONE_GIBIBYTE;
46use crate::PrimitiveValue;
47use crate::STDERR_FILE_NAME;
48use crate::STDOUT_FILE_NAME;
49use crate::Value;
50use crate::WORK_DIR_NAME;
51use crate::config::Config;
52use crate::config::DEFAULT_TASK_SHELL;
53use crate::config::DockerBackendConfig;
54use crate::config::TaskResourceLimitBehavior;
55use crate::http::Downloader;
56use crate::http::HttpDownloader;
57use crate::http::Location;
58use crate::path::EvaluationPath;
59use crate::v1::container;
60use crate::v1::cpu;
61use crate::v1::max_cpu;
62use crate::v1::max_memory;
63use crate::v1::memory;
64
65/// The number of initial expected task names.
66///
67/// This controls the initial size of the bloom filter and how many names are
68/// prepopulated into the name generator.
69const INITIAL_EXPECTED_NAMES: usize = 1000;
70
71/// The root guest path for inputs.
72const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs";
73
74/// The guest working directory.
75const GUEST_WORK_DIR: &str = "/mnt/task/work";
76
77/// The guest path for the command file.
78const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
79
80/// The path to the container's stdout.
81const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
82
83/// The path to the container's stderr.
84const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
85
86/// This request contains the requested cpu and memory reservations for the task
87/// as well as the result receiver channel.
88#[derive(Debug)]
89struct DockerTaskRequest {
90    /// The engine configuration.
91    config: Arc<Config>,
92    /// The inner task spawn request.
93    inner: TaskSpawnRequest,
94    /// The underlying Crankshaft backend.
95    backend: Arc<docker::Backend>,
96    /// The name of the task.
97    name: String,
98    /// The requested container for the task.
99    container: String,
100    /// The requested CPU reservation for the task.
101    cpu: f64,
102    /// The requested memory reservation for the task, in bytes.
103    memory: u64,
104    /// The requested maximum CPU limit for the task.
105    max_cpu: Option<f64>,
106    /// The requested maximum memory limit for the task, in bytes.
107    max_memory: Option<u64>,
108    /// The cancellation token for the request.
109    token: CancellationToken,
110}
111
112impl TaskManagerRequest for DockerTaskRequest {
113    fn cpu(&self) -> f64 {
114        self.cpu
115    }
116
117    fn memory(&self) -> u64 {
118        self.memory
119    }
120
121    async fn run(self, spawned: oneshot::Sender<()>) -> Result<TaskExecutionResult> {
122        // Create the working directory
123        let work_dir = self.inner.attempt_dir().join(WORK_DIR_NAME);
124        fs::create_dir_all(&work_dir).with_context(|| {
125            format!(
126                "failed to create directory `{path}`",
127                path = work_dir.display()
128            )
129        })?;
130
131        // Write the evaluated command to disk
132        // This is done even for remote execution so that a copy exists locally
133        let command_path = self.inner.attempt_dir().join(COMMAND_FILE_NAME);
134        fs::write(&command_path, self.inner.command()).with_context(|| {
135            format!(
136                "failed to write command contents to `{path}`",
137                path = command_path.display()
138            )
139        })?;
140
141        // Allocate the inputs, which will always be, at most, the number of inputs plus
142        // the working directory and command
143        let mut inputs = Vec::with_capacity(self.inner.inputs().len() + 2);
144        for input in self.inner.inputs().iter() {
145            if let Some(guest_path) = input.guest_path() {
146                let location = input.location().expect("all inputs should have localized");
147
148                if location.exists() {
149                    inputs.push(
150                        Input::builder()
151                            .path(guest_path)
152                            .contents(Contents::Path(location.into()))
153                            .ty(input.kind())
154                            .read_only(true)
155                            .build(),
156                    );
157                }
158            }
159        }
160
161        // Add an input for the work directory
162        inputs.push(
163            Input::builder()
164                .path(GUEST_WORK_DIR)
165                .contents(Contents::Path(work_dir.to_path_buf()))
166                .ty(InputType::Directory)
167                .read_only(false)
168                .build(),
169        );
170
171        // Add an input for the command
172        inputs.push(
173            Input::builder()
174                .path(GUEST_COMMAND_PATH)
175                .contents(Contents::Path(command_path.to_path_buf()))
176                .ty(InputType::File)
177                .read_only(true)
178                .build(),
179        );
180
181        let stdout_path = self.inner.attempt_dir().join(STDOUT_FILE_NAME);
182        let stderr_path = self.inner.attempt_dir().join(STDERR_FILE_NAME);
183
184        let outputs = vec![
185            Output::builder()
186                .path(GUEST_STDOUT_PATH)
187                .url(Url::from_file_path(&stdout_path).expect("path should be absolute"))
188                .ty(OutputType::File)
189                .build(),
190            Output::builder()
191                .path(GUEST_STDERR_PATH)
192                .url(Url::from_file_path(&stderr_path).expect("path should be absolute"))
193                .ty(OutputType::File)
194                .build(),
195        ];
196
197        let task = Task::builder()
198            .name(self.name)
199            .executions(NonEmpty::new(
200                Execution::builder()
201                    .image(&self.container)
202                    .program(
203                        self.config
204                            .task
205                            .shell
206                            .as_deref()
207                            .unwrap_or(DEFAULT_TASK_SHELL),
208                    )
209                    .args(["-C".to_string(), GUEST_COMMAND_PATH.to_string()])
210                    .work_dir(GUEST_WORK_DIR)
211                    .env({
212                        let mut final_env = indexmap::IndexMap::new();
213                        for (k, v) in self.inner.env() {
214                            let guest_path = self
215                                .inner
216                                .inputs()
217                                .iter()
218                                .find(|input| input.path().to_str() == Some(v))
219                                .and_then(|input| input.guest_path());
220
221                            final_env.insert(k.clone(), guest_path.unwrap_or(v).to_string());
222                        }
223                        final_env
224                    })
225                    .stdout(GUEST_STDOUT_PATH)
226                    .stderr(GUEST_STDERR_PATH)
227                    .build(),
228            ))
229            .inputs(inputs)
230            .outputs(outputs)
231            .resources(
232                Resources::builder()
233                    .cpu(self.cpu)
234                    .maybe_cpu_limit(self.max_cpu)
235                    .ram(self.memory as f64 / ONE_GIBIBYTE)
236                    .maybe_ram_limit(self.max_memory.map(|m| m as f64 / ONE_GIBIBYTE))
237                    .build(),
238            )
239            .build();
240
241        let statuses = self
242            .backend
243            .run(task, Some(spawned), self.token.clone())
244            .map_err(|e| anyhow!("{e:#}"))?
245            .await
246            .map_err(|e| anyhow!("{e:#}"))?;
247
248        assert_eq!(statuses.len(), 1, "there should only be one exit status");
249        let status = statuses.first();
250
251        Ok(TaskExecutionResult {
252            inputs: self.inner.info.inputs,
253            exit_code: status.code().expect("should have exit code"),
254            work_dir: EvaluationPath::Local(work_dir),
255            stdout: PrimitiveValue::new_file(
256                stdout_path
257                    .into_os_string()
258                    .into_string()
259                    .expect("path should be UTF-8"),
260            )
261            .into(),
262            stderr: PrimitiveValue::new_file(
263                stderr_path
264                    .into_os_string()
265                    .into_string()
266                    .expect("path should be UTF-8"),
267            )
268            .into(),
269        })
270    }
271}
272
273/// Represents the Docker backend.
274pub struct DockerBackend {
275    /// The engine configuration.
276    config: Arc<Config>,
277    /// The underlying Crankshaft backend.
278    inner: Arc<docker::Backend>,
279    /// The maximum amount of concurrency supported.
280    max_concurrency: u64,
281    /// The maximum CPUs for any of one node.
282    max_cpu: u64,
283    /// The maximum memory for any of one node.
284    max_memory: u64,
285    /// The task manager for the backend.
286    manager: TaskManager<DockerTaskRequest>,
287    /// The name generator for tasks.
288    generator: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
289}
290
291impl DockerBackend {
292    /// Constructs a new Docker task execution backend with the given
293    /// configuration.
294    ///
295    /// The provided configuration is expected to have already been validated.
296    pub async fn new(config: Arc<Config>, backend_config: &DockerBackendConfig) -> Result<Self> {
297        info!("initializing Docker backend");
298
299        let backend = docker::Backend::initialize_default_with(
300            backend::docker::Config::builder()
301                .cleanup(backend_config.cleanup)
302                .build(),
303        )
304        .await
305        .map_err(|e| anyhow!("{e:#}"))
306        .context("failed to initialize Docker backend")?;
307
308        let resources = *backend.resources();
309        let cpu = resources.cpu();
310        let max_cpu = resources.max_cpu();
311        let memory = resources.memory();
312        let max_memory = resources.max_memory();
313
314        // If a service is being used, then we're going to be spawning into a cluster
315        // For the purposes of resource tracking, treat it as unlimited resources and
316        // let Docker handle resource allocation
317        let manager = if resources.use_service() {
318            TaskManager::new_unlimited(max_cpu, max_memory)
319        } else {
320            TaskManager::new(cpu, max_cpu, memory, max_memory)
321        };
322
323        Ok(Self {
324            config,
325            inner: Arc::new(backend),
326            max_concurrency: cpu,
327            max_cpu,
328            max_memory,
329            manager,
330            generator: Arc::new(Mutex::new(GeneratorIterator::new(
331                UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
332                INITIAL_EXPECTED_NAMES,
333            ))),
334        })
335    }
336}
337
338impl TaskExecutionBackend for DockerBackend {
339    fn max_concurrency(&self) -> u64 {
340        self.max_concurrency
341    }
342
343    fn constraints(
344        &self,
345        requirements: &HashMap<String, Value>,
346        _: &HashMap<String, Value>,
347    ) -> Result<TaskExecutionConstraints> {
348        let container = container(requirements, self.config.task.container.as_deref());
349
350        let mut cpu = cpu(requirements);
351        if (self.max_cpu as f64) < cpu {
352            match self.config.task.cpu_limit_behavior {
353                TaskResourceLimitBehavior::TryWithMax => {
354                    warn!(
355                        "task requires at least {cpu} CPU{s}, but the execution backend has a \
356                         maximum of {max_cpu}",
357                        s = if cpu == 1.0 { "" } else { "s" },
358                        max_cpu = self.max_cpu,
359                    );
360                    // clamp the reported constraint to what's available
361                    cpu = self.max_cpu as f64;
362                }
363                TaskResourceLimitBehavior::Deny => {
364                    bail!(
365                        "task requires at least {cpu} CPU{s}, but the execution backend has a \
366                         maximum of {max_cpu}",
367                        s = if cpu == 1.0 { "" } else { "s" },
368                        max_cpu = self.max_cpu,
369                    );
370                }
371            }
372        }
373
374        let mut memory = memory(requirements)?;
375        if self.max_memory < memory as u64 {
376            match self.config.task.memory_limit_behavior {
377                TaskResourceLimitBehavior::TryWithMax => {
378                    warn!(
379                        "task requires at least {memory} GiB of memory, but the execution backend \
380                         has a maximum of {max_memory} GiB",
381                        // Display the error in GiB, as it is the most common unit for memory
382                        memory = memory as f64 / ONE_GIBIBYTE,
383                        max_memory = self.max_memory as f64 / ONE_GIBIBYTE,
384                    );
385                    // clamp the reported constraint to what's available
386                    memory = self.max_memory.try_into().unwrap_or(i64::MAX);
387                }
388                TaskResourceLimitBehavior::Deny => {
389                    bail!(
390                        "task requires at least {memory} GiB of memory, but the execution backend \
391                         has a maximum of {max_memory} GiB",
392                        // Display the error in GiB, as it is the most common unit for memory
393                        memory = memory as f64 / ONE_GIBIBYTE,
394                        max_memory = self.max_memory as f64 / ONE_GIBIBYTE,
395                    );
396                }
397            }
398        }
399
400        Ok(TaskExecutionConstraints {
401            container: Some(container.into_owned()),
402            cpu,
403            memory,
404            gpu: Default::default(),
405            fpga: Default::default(),
406            disks: Default::default(),
407        })
408    }
409
410    fn guest_work_dir(&self) -> Option<&Path> {
411        Some(Path::new(GUEST_WORK_DIR))
412    }
413
414    fn localize_inputs<'a, 'b, 'c, 'd>(
415        &'a self,
416        downloader: &'b HttpDownloader,
417        inputs: &'c mut [crate::eval::Input],
418    ) -> BoxFuture<'d, Result<()>>
419    where
420        'a: 'd,
421        'b: 'd,
422        'c: 'd,
423        Self: 'd,
424    {
425        async move {
426            // Construct a trie for mapping input guest paths
427            let mut trie = InputTrie::default();
428            for input in inputs.iter() {
429                trie.insert(input)?;
430            }
431
432            for (index, guest_path) in trie.calculate_guest_paths(GUEST_INPUTS_DIR)? {
433                if let Some(input) = inputs.get_mut(index) {
434                    input.set_guest_path(guest_path);
435                } else {
436                    bail!("invalid index {} returned from trie", index);
437                }
438            }
439
440            // Localize all inputs
441            let mut downloads = JoinSet::new();
442            for (idx, input) in inputs.iter_mut().enumerate() {
443                match input.path() {
444                    EvaluationPath::Local(path) => {
445                        input.set_location(Location::Path(path.clone().into()));
446                    }
447                    EvaluationPath::Remote(url) => {
448                        let downloader = downloader.clone();
449                        let url = url.clone();
450                        downloads.spawn(async move {
451                            let location_result = downloader.download(&url).await;
452
453                            match location_result {
454                                Ok(location) => Ok((idx, location.into_owned())),
455                                Err(e) => bail!("failed to localize `{url}`: {e:?}"),
456                            }
457                        });
458                    }
459                }
460            }
461
462            while let Some(result) = downloads.join_next().await {
463                match result {
464                    Ok(Ok((idx, location))) => {
465                        inputs
466                            .get_mut(idx)
467                            .expect("index from should be valid")
468                            .set_location(location);
469                    }
470                    Ok(Err(e)) => {
471                        // Futures are aborted when the `JoinSet` is dropped.
472                        bail!(e)
473                    }
474                    Err(e) => {
475                        // Futures are aborted when the `JoinSet` is dropped.
476                        bail!("download task failed: {e:?}")
477                    }
478                }
479            }
480
481            Ok(())
482        }
483        .boxed()
484    }
485
486    fn spawn(
487        &self,
488        request: TaskSpawnRequest,
489        token: CancellationToken,
490    ) -> Result<TaskExecutionEvents> {
491        let (spawned_tx, spawned_rx) = oneshot::channel();
492        let (completed_tx, completed_rx) = oneshot::channel();
493
494        let requirements = request.requirements();
495        let hints = request.hints();
496
497        let container = container(requirements, self.config.task.container.as_deref()).into_owned();
498        let mut cpu = cpu(requirements);
499        if let TaskResourceLimitBehavior::TryWithMax = self.config.task.cpu_limit_behavior {
500            cpu = std::cmp::min(cpu.ceil() as u64, self.max_cpu) as f64;
501        }
502        let mut memory = memory(requirements)? as u64;
503        if let TaskResourceLimitBehavior::TryWithMax = self.config.task.memory_limit_behavior {
504            memory = std::cmp::min(memory, self.max_memory);
505        }
506        let max_cpu = max_cpu(hints);
507        let max_memory = max_memory(hints)?.map(|i| i as u64);
508
509        let name = format!(
510            "{id}-{generated}",
511            id = request.id(),
512            generated = self
513                .generator
514                .lock()
515                .expect("generator should always acquire")
516                .next()
517                .expect("generator should never be exhausted")
518        );
519        self.manager.send(
520            DockerTaskRequest {
521                config: self.config.clone(),
522                inner: request,
523                backend: self.inner.clone(),
524                name,
525                container,
526                cpu,
527                memory,
528                max_cpu,
529                max_memory,
530                token,
531            },
532            spawned_tx,
533            completed_tx,
534        );
535
536        Ok(TaskExecutionEvents {
537            spawned: spawned_rx,
538            completed: completed_rx,
539        })
540    }
541
542    #[cfg(unix)]
543    fn cleanup<'a, 'b, 'c>(
544        &'a self,
545        output_dir: &'b Path,
546        token: CancellationToken,
547    ) -> Option<BoxFuture<'c, ()>>
548    where
549        'a: 'c,
550        'b: 'c,
551        Self: 'c,
552    {
553        /// The guest path for the output directory.
554        const GUEST_OUT_DIR: &str = "/workflow_output";
555
556        /// Amount of CPU to reserve for the cleanup task.
557        const CLEANUP_CPU: f64 = 0.1;
558
559        /// Amount of memory to reserve for the cleanup task.
560        const CLEANUP_MEMORY: f64 = 0.05;
561
562        let backend = self.inner.clone();
563        let generator = self.generator.clone();
564        let output_path = std::path::absolute(output_dir).expect("failed to get absolute path");
565        if !output_path.is_dir() {
566            info!("output directory does not exist: skipping cleanup");
567            return None;
568        }
569
570        Some(
571            async move {
572                let result = async {
573                    let (uid, gid) = unsafe { (libc::getuid(), libc::getgid()) };
574                    let ownership = format!("{uid}:{gid}");
575                    let output_mount = Input::builder()
576                        .path(GUEST_OUT_DIR)
577                        .contents(Contents::Path(output_path.clone()))
578                        .ty(InputType::Directory)
579                        // need write access
580                        .read_only(false)
581                        .build();
582
583                    let name = format!(
584                        "docker-backend-cleanup-{id}",
585                        id = generator
586                            .lock()
587                            .expect("generator should always acquire")
588                            .next()
589                            .expect("generator should never be exhausted")
590                    );
591
592                    let task = Task::builder()
593                        .name(&name)
594                        .executions(NonEmpty::new(
595                            Execution::builder()
596                                .image("alpine:latest")
597                                .program("chown")
598                                .args([
599                                    "-R".to_string(),
600                                    ownership.clone(),
601                                    GUEST_OUT_DIR.to_string(),
602                                ])
603                                .work_dir("/")
604                                .build(),
605                        ))
606                        .inputs([output_mount])
607                        .resources(
608                            Resources::builder()
609                                .cpu(CLEANUP_CPU)
610                                .ram(CLEANUP_MEMORY)
611                                .build(),
612                        )
613                        .build();
614
615                    info!(
616                        "running cleanup task `{name}` to change ownership of `{path}` to \
617                         `{ownership}`",
618                        path = output_path.display(),
619                    );
620
621                    let (spawned_tx, _) = oneshot::channel();
622                    let output_rx = backend
623                        .run(task, Some(spawned_tx), token)
624                        .map_err(|e| anyhow!("failed to submit cleanup task: {e}"))?;
625
626                    let statuses = output_rx
627                        .await
628                        .map_err(|e| anyhow!("failed to run cleanup task: {e}"))?;
629                    let status = statuses.first();
630                    if status.success() {
631                        Ok(())
632                    } else {
633                        bail!(
634                            "failed to chown output directory `{path}`",
635                            path = output_path.display()
636                        );
637                    }
638                }
639                .await;
640
641                if let Err(e) = result {
642                    tracing::error!("cleanup task failed: {e:#}");
643                }
644            }
645            .boxed(),
646        )
647    }
648
649    #[cfg(not(unix))]
650    fn cleanup<'a, 'b, 'c>(&'a self, _: &'b Path, _: CancellationToken) -> Option<BoxFuture<'c, ()>>
651    where
652        'a: 'c,
653        'b: 'c,
654        Self: 'c,
655    {
656        tracing::debug!("cleanup task is not supported on this platform");
657        None
658    }
659}