wdl_engine/backend/
docker.rs

1//! Implementation of the Docker backend.
2
3use std::collections::HashMap;
4use std::fs;
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::Mutex;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::anyhow;
12use anyhow::bail;
13use crankshaft::config::backend;
14use crankshaft::engine::Task;
15use crankshaft::engine::service::name::GeneratorIterator;
16use crankshaft::engine::service::name::UniqueAlphanumeric;
17use crankshaft::engine::service::runner::Backend;
18use crankshaft::engine::service::runner::backend::docker;
19use crankshaft::engine::task::Execution;
20use crankshaft::engine::task::Input;
21use crankshaft::engine::task::Output;
22use crankshaft::engine::task::Resources;
23use crankshaft::engine::task::input::Contents;
24use crankshaft::engine::task::input::Type as InputType;
25use crankshaft::engine::task::output::Type as OutputType;
26use futures::FutureExt;
27use futures::future::BoxFuture;
28use nonempty::NonEmpty;
29use tokio::sync::oneshot;
30use tokio::task::JoinSet;
31use tokio_util::sync::CancellationToken;
32use tracing::info;
33use tracing::warn;
34use url::Url;
35
36use super::TaskExecutionBackend;
37use super::TaskExecutionConstraints;
38use super::TaskExecutionEvents;
39use super::TaskExecutionResult;
40use super::TaskManager;
41use super::TaskManagerRequest;
42use super::TaskSpawnRequest;
43use crate::COMMAND_FILE_NAME;
44use crate::InputTrie;
45use crate::ONE_GIBIBYTE;
46use crate::PrimitiveValue;
47use crate::STDERR_FILE_NAME;
48use crate::STDOUT_FILE_NAME;
49use crate::Value;
50use crate::WORK_DIR_NAME;
51use crate::config::Config;
52use crate::config::DEFAULT_TASK_SHELL;
53use crate::config::DockerBackendConfig;
54use crate::config::TaskResourceLimitBehavior;
55use crate::http::Downloader;
56use crate::http::HttpDownloader;
57use crate::http::Location;
58use crate::path::EvaluationPath;
59use crate::v1::container;
60use crate::v1::cpu;
61use crate::v1::max_cpu;
62use crate::v1::max_memory;
63use crate::v1::memory;
64
65/// The number of initial expected task names.
66///
67/// This controls the initial size of the bloom filter and how many names are
68/// prepopulated into the name generator.
69const INITIAL_EXPECTED_NAMES: usize = 1000;
70
71/// The root guest path for inputs.
72const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs";
73
74/// The guest working directory.
75const GUEST_WORK_DIR: &str = "/mnt/task/work";
76
77/// The guest path for the command file.
78const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
79
80/// The path to the container's stdout.
81const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
82
83/// The path to the container's stderr.
84const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
85
86/// This request contains the requested cpu and memory reservations for the task
87/// as well as the result receiver channel.
88#[derive(Debug)]
89struct DockerTaskRequest {
90    /// The engine configuration.
91    config: Arc<Config>,
92    /// The inner task spawn request.
93    inner: TaskSpawnRequest,
94    /// The underlying Crankshaft backend.
95    backend: Arc<docker::Backend>,
96    /// The name of the task.
97    name: String,
98    /// The requested container for the task.
99    container: String,
100    /// The requested CPU reservation for the task.
101    cpu: f64,
102    /// The requested memory reservation for the task, in bytes.
103    memory: u64,
104    /// The requested maximum CPU limit for the task.
105    max_cpu: Option<f64>,
106    /// The requested maximum memory limit for the task, in bytes.
107    max_memory: Option<u64>,
108    /// The cancellation token for the request.
109    token: CancellationToken,
110}
111
112impl TaskManagerRequest for DockerTaskRequest {
113    fn cpu(&self) -> f64 {
114        self.cpu
115    }
116
117    fn memory(&self) -> u64 {
118        self.memory
119    }
120
121    async fn run(self, spawned: oneshot::Sender<()>) -> Result<TaskExecutionResult> {
122        // Create the working directory
123        let work_dir = self.inner.attempt_dir().join(WORK_DIR_NAME);
124        fs::create_dir_all(&work_dir).with_context(|| {
125            format!(
126                "failed to create directory `{path}`",
127                path = work_dir.display()
128            )
129        })?;
130
131        // Write the evaluated command to disk
132        // This is done even for remote execution so that a copy exists locally
133        let command_path = self.inner.attempt_dir().join(COMMAND_FILE_NAME);
134        fs::write(&command_path, self.inner.command()).with_context(|| {
135            format!(
136                "failed to write command contents to `{path}`",
137                path = command_path.display()
138            )
139        })?;
140
141        // Allocate the inputs, which will always be, at most, the number of inputs plus
142        // the working directory and command
143        let mut inputs = Vec::with_capacity(self.inner.inputs().len() + 2);
144        for input in self.inner.inputs().iter() {
145            if let Some(guest_path) = input.guest_path() {
146                let location = input.location().expect("all inputs should have localized");
147
148                if location.exists() {
149                    inputs.push(
150                        Input::builder()
151                            .path(guest_path)
152                            .contents(Contents::Path(location.into()))
153                            .ty(input.kind())
154                            .read_only(true)
155                            .build(),
156                    );
157                }
158            }
159        }
160
161        // Add an input for the work directory
162        inputs.push(
163            Input::builder()
164                .path(GUEST_WORK_DIR)
165                .contents(Contents::Path(work_dir.to_path_buf()))
166                .ty(InputType::Directory)
167                .read_only(false)
168                .build(),
169        );
170
171        // Add an input for the command
172        inputs.push(
173            Input::builder()
174                .path(GUEST_COMMAND_PATH)
175                .contents(Contents::Path(command_path.to_path_buf()))
176                .ty(InputType::File)
177                .read_only(true)
178                .build(),
179        );
180
181        let stdout_path = self.inner.attempt_dir().join(STDOUT_FILE_NAME);
182        let stderr_path = self.inner.attempt_dir().join(STDERR_FILE_NAME);
183
184        let outputs = vec![
185            Output::builder()
186                .path(GUEST_STDOUT_PATH)
187                .url(Url::from_file_path(&stdout_path).expect("path should be absolute"))
188                .ty(OutputType::File)
189                .build(),
190            Output::builder()
191                .path(GUEST_STDERR_PATH)
192                .url(Url::from_file_path(&stderr_path).expect("path should be absolute"))
193                .ty(OutputType::File)
194                .build(),
195        ];
196
197        let task = Task::builder()
198            .name(self.name)
199            .executions(NonEmpty::new(
200                Execution::builder()
201                    .image(&self.container)
202                    .program(
203                        self.config
204                            .task
205                            .shell
206                            .as_deref()
207                            .unwrap_or(DEFAULT_TASK_SHELL),
208                    )
209                    .args([GUEST_COMMAND_PATH.to_string()])
210                    .work_dir(GUEST_WORK_DIR)
211                    .env({
212                        let mut final_env = indexmap::IndexMap::new();
213                        for (k, v) in self.inner.env() {
214                            let guest_path = self
215                                .inner
216                                .inputs()
217                                .iter()
218                                .find(|input| input.path().to_str() == Some(v))
219                                .and_then(|input| input.guest_path());
220
221                            final_env.insert(k.clone(), guest_path.unwrap_or(v).to_string());
222                        }
223                        final_env
224                    })
225                    .stdout(GUEST_STDOUT_PATH)
226                    .stderr(GUEST_STDERR_PATH)
227                    .build(),
228            ))
229            .inputs(inputs)
230            .outputs(outputs)
231            .resources(
232                Resources::builder()
233                    .cpu(self.cpu)
234                    .maybe_cpu_limit(self.max_cpu)
235                    .ram(self.memory as f64 / ONE_GIBIBYTE)
236                    .maybe_ram_limit(self.max_memory.map(|m| m as f64 / ONE_GIBIBYTE))
237                    .build(),
238            )
239            .build();
240
241        let statuses = self
242            .backend
243            .run(task, Some(spawned), self.token.clone())
244            .map_err(|e| anyhow!("{e:#}"))?
245            .await
246            .map_err(|e| anyhow!("{e:#}"))?;
247
248        assert_eq!(statuses.len(), 1, "there should only be one exit status");
249        let status = statuses.first();
250
251        Ok(TaskExecutionResult {
252            inputs: self.inner.info.inputs,
253            exit_code: status.code().expect("should have exit code"),
254            work_dir: EvaluationPath::Local(work_dir),
255            stdout: PrimitiveValue::new_file(
256                stdout_path
257                    .into_os_string()
258                    .into_string()
259                    .expect("path should be UTF-8"),
260            )
261            .into(),
262            stderr: PrimitiveValue::new_file(
263                stderr_path
264                    .into_os_string()
265                    .into_string()
266                    .expect("path should be UTF-8"),
267            )
268            .into(),
269        })
270    }
271}
272
273/// Represents the Docker backend.
274pub struct DockerBackend {
275    /// The engine configuration.
276    config: Arc<Config>,
277    /// The underlying Crankshaft backend.
278    inner: Arc<docker::Backend>,
279    /// The maximum amount of concurrency supported.
280    max_concurrency: u64,
281    /// The maximum CPUs for any of one node.
282    max_cpu: u64,
283    /// The maximum memory for any of one node.
284    max_memory: u64,
285    /// The task manager for the backend.
286    manager: TaskManager<DockerTaskRequest>,
287    /// The name generator for tasks.
288    generator: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
289}
290
291impl DockerBackend {
292    /// Constructs a new Docker task execution backend with the given
293    /// configuration.
294    ///
295    /// The provided configuration is expected to have already been validated.
296    pub async fn new(config: Arc<Config>, backend_config: &DockerBackendConfig) -> Result<Self> {
297        info!("initializing Docker backend");
298
299        let backend = docker::Backend::initialize_default_with(
300            backend::docker::Config::builder()
301                .cleanup(backend_config.cleanup)
302                .build(),
303        )
304        .await
305        .map_err(|e| anyhow!("{e:#}"))
306        .context("failed to initialize Docker backend")?;
307
308        let resources = *backend.resources();
309        let cpu = resources.cpu();
310        let max_cpu = resources.max_cpu();
311        let memory = resources.memory();
312        let max_memory = resources.max_memory();
313
314        // If a service is being used, then we're going to be spawning into a cluster
315        // For the purposes of resource tracking, treat it as unlimited resources and
316        // let Docker handle resource allocation
317        let manager = if resources.use_service() {
318            TaskManager::new_unlimited(max_cpu, max_memory)
319        } else {
320            TaskManager::new(cpu, max_cpu, memory, max_memory)
321        };
322
323        Ok(Self {
324            config,
325            inner: Arc::new(backend),
326            max_concurrency: cpu,
327            max_cpu,
328            max_memory,
329            manager,
330            generator: Arc::new(Mutex::new(GeneratorIterator::new(
331                UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
332                INITIAL_EXPECTED_NAMES,
333            ))),
334        })
335    }
336}
337
338impl TaskExecutionBackend for DockerBackend {
339    fn max_concurrency(&self) -> u64 {
340        self.max_concurrency
341    }
342
343    fn constraints(
344        &self,
345        requirements: &HashMap<String, Value>,
346        _: &HashMap<String, Value>,
347    ) -> Result<TaskExecutionConstraints> {
348        let container = container(requirements, self.config.task.container.as_deref());
349
350        let mut cpu = cpu(requirements);
351        if (self.max_cpu as f64) < cpu {
352            let env_specific = if self.config.suppress_env_specific_output {
353                String::new()
354            } else {
355                format!(
356                    ", but the execution backend has a maximum of {max_cpu}",
357                    max_cpu = self.max_cpu,
358                )
359            };
360            match self.config.task.cpu_limit_behavior {
361                TaskResourceLimitBehavior::TryWithMax => {
362                    warn!(
363                        "task requires at least {cpu} CPU{s}{env_specific}",
364                        s = if cpu == 1.0 { "" } else { "s" },
365                    );
366                    // clamp the reported constraint to what's available
367                    cpu = self.max_cpu as f64;
368                }
369                TaskResourceLimitBehavior::Deny => {
370                    bail!(
371                        "task requires at least {cpu} CPU{s}{env_specific}",
372                        s = if cpu == 1.0 { "" } else { "s" },
373                    );
374                }
375            }
376        }
377
378        let mut memory = memory(requirements)?;
379        if self.max_memory < memory as u64 {
380            let env_specific = if self.config.suppress_env_specific_output {
381                String::new()
382            } else {
383                format!(
384                    ", but the execution backend has a maximum of {max_memory} GiB",
385                    max_memory = self.max_memory as f64 / ONE_GIBIBYTE,
386                )
387            };
388            match self.config.task.memory_limit_behavior {
389                TaskResourceLimitBehavior::TryWithMax => {
390                    warn!(
391                        "task requires at least {memory} GiB of memory{env_specific}",
392                        // Display the error in GiB, as it is the most common unit for memory
393                        memory = memory as f64 / ONE_GIBIBYTE,
394                    );
395                    // clamp the reported constraint to what's available
396                    memory = self.max_memory.try_into().unwrap_or(i64::MAX);
397                }
398                TaskResourceLimitBehavior::Deny => {
399                    bail!(
400                        "task requires at least {memory} GiB of memory{env_specific}",
401                        // Display the error in GiB, as it is the most common unit for memory
402                        memory = memory as f64 / ONE_GIBIBYTE,
403                    );
404                }
405            }
406        }
407
408        Ok(TaskExecutionConstraints {
409            container: Some(container.into_owned()),
410            cpu,
411            memory,
412            gpu: Default::default(),
413            fpga: Default::default(),
414            disks: Default::default(),
415        })
416    }
417
418    fn guest_work_dir(&self) -> Option<&Path> {
419        Some(Path::new(GUEST_WORK_DIR))
420    }
421
422    fn localize_inputs<'a, 'b, 'c, 'd>(
423        &'a self,
424        downloader: &'b HttpDownloader,
425        inputs: &'c mut [crate::eval::Input],
426    ) -> BoxFuture<'d, Result<()>>
427    where
428        'a: 'd,
429        'b: 'd,
430        'c: 'd,
431        Self: 'd,
432    {
433        async move {
434            // Construct a trie for mapping input guest paths
435            let mut trie = InputTrie::default();
436            for input in inputs.iter() {
437                trie.insert(input)?;
438            }
439
440            for (index, guest_path) in trie.calculate_guest_paths(GUEST_INPUTS_DIR)? {
441                if let Some(input) = inputs.get_mut(index) {
442                    input.set_guest_path(guest_path);
443                } else {
444                    bail!("invalid index {} returned from trie", index);
445                }
446            }
447
448            // Localize all inputs
449            let mut downloads = JoinSet::new();
450            for (idx, input) in inputs.iter_mut().enumerate() {
451                match input.path() {
452                    EvaluationPath::Local(path) => {
453                        input.set_location(Location::Path(path.clone().into()));
454                    }
455                    EvaluationPath::Remote(url) => {
456                        let downloader = downloader.clone();
457                        let url = url.clone();
458                        downloads.spawn(async move {
459                            let location_result = downloader.download(&url).await;
460
461                            match location_result {
462                                Ok(location) => Ok((idx, location.into_owned())),
463                                Err(e) => bail!("failed to localize `{url}`: {e:?}"),
464                            }
465                        });
466                    }
467                }
468            }
469
470            while let Some(result) = downloads.join_next().await {
471                match result {
472                    Ok(Ok((idx, location))) => {
473                        inputs
474                            .get_mut(idx)
475                            .expect("index from should be valid")
476                            .set_location(location);
477                    }
478                    Ok(Err(e)) => {
479                        // Futures are aborted when the `JoinSet` is dropped.
480                        bail!(e)
481                    }
482                    Err(e) => {
483                        // Futures are aborted when the `JoinSet` is dropped.
484                        bail!("download task failed: {e:?}")
485                    }
486                }
487            }
488
489            Ok(())
490        }
491        .boxed()
492    }
493
494    fn spawn(
495        &self,
496        request: TaskSpawnRequest,
497        token: CancellationToken,
498    ) -> Result<TaskExecutionEvents> {
499        let (spawned_tx, spawned_rx) = oneshot::channel();
500        let (completed_tx, completed_rx) = oneshot::channel();
501
502        let requirements = request.requirements();
503        let hints = request.hints();
504
505        let container = container(requirements, self.config.task.container.as_deref()).into_owned();
506        let mut cpu = cpu(requirements);
507        if let TaskResourceLimitBehavior::TryWithMax = self.config.task.cpu_limit_behavior {
508            cpu = std::cmp::min(cpu.ceil() as u64, self.max_cpu) as f64;
509        }
510        let mut memory = memory(requirements)? as u64;
511        if let TaskResourceLimitBehavior::TryWithMax = self.config.task.memory_limit_behavior {
512            memory = std::cmp::min(memory, self.max_memory);
513        }
514        let max_cpu = max_cpu(hints);
515        let max_memory = max_memory(hints)?.map(|i| i as u64);
516
517        let name = format!(
518            "{id}-{generated}",
519            id = request.id(),
520            generated = self
521                .generator
522                .lock()
523                .expect("generator should always acquire")
524                .next()
525                .expect("generator should never be exhausted")
526        );
527        self.manager.send(
528            DockerTaskRequest {
529                config: self.config.clone(),
530                inner: request,
531                backend: self.inner.clone(),
532                name,
533                container,
534                cpu,
535                memory,
536                max_cpu,
537                max_memory,
538                token,
539            },
540            spawned_tx,
541            completed_tx,
542        );
543
544        Ok(TaskExecutionEvents {
545            spawned: spawned_rx,
546            completed: completed_rx,
547        })
548    }
549
550    #[cfg(unix)]
551    fn cleanup<'a, 'b, 'c>(
552        &'a self,
553        output_dir: &'b Path,
554        token: CancellationToken,
555    ) -> Option<BoxFuture<'c, ()>>
556    where
557        'a: 'c,
558        'b: 'c,
559        Self: 'c,
560    {
561        /// The guest path for the output directory.
562        const GUEST_OUT_DIR: &str = "/workflow_output";
563
564        /// Amount of CPU to reserve for the cleanup task.
565        const CLEANUP_CPU: f64 = 0.1;
566
567        /// Amount of memory to reserve for the cleanup task.
568        const CLEANUP_MEMORY: f64 = 0.05;
569
570        let backend = self.inner.clone();
571        let generator = self.generator.clone();
572        let output_path = std::path::absolute(output_dir).expect("failed to get absolute path");
573        if !output_path.is_dir() {
574            info!("output directory does not exist: skipping cleanup");
575            return None;
576        }
577
578        Some(
579            async move {
580                let result = async {
581                    let (uid, gid) = unsafe { (libc::getuid(), libc::getgid()) };
582                    let ownership = format!("{uid}:{gid}");
583                    let output_mount = Input::builder()
584                        .path(GUEST_OUT_DIR)
585                        .contents(Contents::Path(output_path.clone()))
586                        .ty(InputType::Directory)
587                        // need write access
588                        .read_only(false)
589                        .build();
590
591                    let name = format!(
592                        "docker-backend-cleanup-{id}",
593                        id = generator
594                            .lock()
595                            .expect("generator should always acquire")
596                            .next()
597                            .expect("generator should never be exhausted")
598                    );
599
600                    let task = Task::builder()
601                        .name(&name)
602                        .executions(NonEmpty::new(
603                            Execution::builder()
604                                .image("alpine:latest")
605                                .program("chown")
606                                .args([
607                                    "-R".to_string(),
608                                    ownership.clone(),
609                                    GUEST_OUT_DIR.to_string(),
610                                ])
611                                .work_dir("/")
612                                .build(),
613                        ))
614                        .inputs([output_mount])
615                        .resources(
616                            Resources::builder()
617                                .cpu(CLEANUP_CPU)
618                                .ram(CLEANUP_MEMORY)
619                                .build(),
620                        )
621                        .build();
622
623                    info!(
624                        "running cleanup task `{name}` to change ownership of `{path}` to \
625                         `{ownership}`",
626                        path = output_path.display(),
627                    );
628
629                    let (spawned_tx, _) = oneshot::channel();
630                    let output_rx = backend
631                        .run(task, Some(spawned_tx), token)
632                        .map_err(|e| anyhow!("failed to submit cleanup task: {e}"))?;
633
634                    let statuses = output_rx
635                        .await
636                        .map_err(|e| anyhow!("failed to run cleanup task: {e}"))?;
637                    let status = statuses.first();
638                    if status.success() {
639                        Ok(())
640                    } else {
641                        bail!(
642                            "failed to chown output directory `{path}`",
643                            path = output_path.display()
644                        );
645                    }
646                }
647                .await;
648
649                if let Err(e) = result {
650                    tracing::error!("cleanup task failed: {e:#}");
651                }
652            }
653            .boxed(),
654        )
655    }
656
657    #[cfg(not(unix))]
658    fn cleanup<'a, 'b, 'c>(&'a self, _: &'b Path, _: CancellationToken) -> Option<BoxFuture<'c, ()>>
659    where
660        'a: 'c,
661        'b: 'c,
662        Self: 'c,
663    {
664        tracing::debug!("cleanup task is not supported on this platform");
665        None
666    }
667}