wdl_engine/backend/
docker.rs

1//! Implementation of the Docker backend.
2
3use std::collections::HashMap;
4use std::fs;
5use std::sync::Arc;
6use std::sync::Mutex;
7
8use anyhow::Context;
9use anyhow::Result;
10use anyhow::bail;
11use crankshaft::config::backend;
12use crankshaft::engine::Task;
13use crankshaft::engine::service::name::GeneratorIterator;
14use crankshaft::engine::service::name::UniqueAlphanumeric;
15use crankshaft::engine::service::runner::Backend;
16use crankshaft::engine::service::runner::backend::docker;
17use crankshaft::engine::task::Execution;
18use crankshaft::engine::task::Input;
19use crankshaft::engine::task::Output;
20use crankshaft::engine::task::Resources;
21use crankshaft::engine::task::input::Contents;
22use crankshaft::engine::task::input::Type as InputType;
23use crankshaft::engine::task::output::Type as OutputType;
24use crankshaft::events::Event;
25use nonempty::NonEmpty;
26use tokio::sync::broadcast;
27use tokio::sync::oneshot;
28use tokio::sync::oneshot::Receiver;
29use tokio_util::sync::CancellationToken;
30use tracing::info;
31use tracing::warn;
32use url::Url;
33
34use super::TaskExecutionBackend;
35use super::TaskExecutionConstraints;
36use super::TaskExecutionResult;
37use super::TaskManager;
38use super::TaskManagerRequest;
39use super::TaskSpawnRequest;
40use crate::COMMAND_FILE_NAME;
41use crate::ONE_GIBIBYTE;
42use crate::PrimitiveValue;
43use crate::STDERR_FILE_NAME;
44use crate::STDOUT_FILE_NAME;
45use crate::Value;
46use crate::WORK_DIR_NAME;
47use crate::backend::INITIAL_EXPECTED_NAMES;
48use crate::config::Config;
49use crate::config::DEFAULT_TASK_SHELL;
50use crate::config::DockerBackendConfig;
51use crate::config::TaskResourceLimitBehavior;
52use crate::path::EvaluationPath;
53use crate::v1::container;
54use crate::v1::cpu;
55use crate::v1::max_cpu;
56use crate::v1::max_memory;
57use crate::v1::memory;
58
59/// The root guest path for inputs.
60const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs/";
61
62/// The guest working directory.
63const GUEST_WORK_DIR: &str = "/mnt/task/work";
64
65/// The guest path for the command file.
66const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
67
68/// The path to the container's stdout.
69const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
70
71/// The path to the container's stderr.
72const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
73
74/// This request contains the requested cpu and memory reservations for the task
75/// as well as the result receiver channel.
76#[derive(Debug)]
77struct DockerTaskRequest {
78    /// The engine configuration.
79    config: Arc<Config>,
80    /// The inner task spawn request.
81    inner: TaskSpawnRequest,
82    /// The underlying Crankshaft backend.
83    backend: Arc<docker::Backend>,
84    /// The name of the task.
85    name: String,
86    /// The requested container for the task.
87    container: String,
88    /// The requested CPU reservation for the task.
89    cpu: f64,
90    /// The requested memory reservation for the task, in bytes.
91    memory: u64,
92    /// The requested maximum CPU limit for the task.
93    max_cpu: Option<f64>,
94    /// The requested maximum memory limit for the task, in bytes.
95    max_memory: Option<u64>,
96    /// The cancellation token for the request.
97    token: CancellationToken,
98}
99
100impl TaskManagerRequest for DockerTaskRequest {
101    fn cpu(&self) -> f64 {
102        self.cpu
103    }
104
105    fn memory(&self) -> u64 {
106        self.memory
107    }
108
109    async fn run(self) -> Result<TaskExecutionResult> {
110        // Create the working directory
111        let work_dir = self.inner.attempt_dir().join(WORK_DIR_NAME);
112        fs::create_dir_all(&work_dir).with_context(|| {
113            format!(
114                "failed to create directory `{path}`",
115                path = work_dir.display()
116            )
117        })?;
118
119        // On Unix, the work directory must be group writable in case the container uses
120        // a different user/group; the Crankshaft docker backend will automatically add
121        // the current user's egid to the container
122        #[cfg(unix)]
123        {
124            use std::fs::Permissions;
125            use std::fs::set_permissions;
126            use std::os::unix::fs::PermissionsExt;
127            set_permissions(&work_dir, Permissions::from_mode(0o770)).with_context(|| {
128                format!(
129                    "failed to set permissions for work directory `{path}`",
130                    path = work_dir.display()
131                )
132            })?;
133        }
134
135        // Write the evaluated command to disk
136        // This is done even for remote execution so that a copy exists locally
137        let command_path = self.inner.attempt_dir().join(COMMAND_FILE_NAME);
138        fs::write(&command_path, self.inner.command()).with_context(|| {
139            format!(
140                "failed to write command contents to `{path}`",
141                path = command_path.display()
142            )
143        })?;
144
145        // Allocate the inputs, which will always be, at most, the number of inputs plus
146        // the working directory and command
147        let mut inputs = Vec::with_capacity(self.inner.inputs().len() + 2);
148        for input in self.inner.inputs().iter() {
149            let guest_path = input.guest_path().expect("input should have guest path");
150            let local_path = input.local_path().expect("input should be localized");
151
152            // The local path must exist for Docker to mount
153            if !local_path.exists() {
154                bail!(
155                    "cannot mount input `{path}` as it does not exist",
156                    path = local_path.display()
157                );
158            }
159
160            inputs.push(
161                Input::builder()
162                    .path(guest_path.as_str())
163                    .contents(Contents::Path(local_path.into()))
164                    .ty(input.kind())
165                    .read_only(true)
166                    .build(),
167            );
168        }
169
170        // Add an input for the work directory
171        inputs.push(
172            Input::builder()
173                .path(GUEST_WORK_DIR)
174                .contents(Contents::Path(work_dir.to_path_buf()))
175                .ty(InputType::Directory)
176                .read_only(false)
177                .build(),
178        );
179
180        // Add an input for the command
181        inputs.push(
182            Input::builder()
183                .path(GUEST_COMMAND_PATH)
184                .contents(Contents::Path(command_path.to_path_buf()))
185                .ty(InputType::File)
186                .read_only(true)
187                .build(),
188        );
189
190        let stdout_path = self.inner.attempt_dir().join(STDOUT_FILE_NAME);
191        let stderr_path = self.inner.attempt_dir().join(STDERR_FILE_NAME);
192
193        let outputs = vec![
194            Output::builder()
195                .path(GUEST_STDOUT_PATH)
196                .url(Url::from_file_path(&stdout_path).expect("path should be absolute"))
197                .ty(OutputType::File)
198                .build(),
199            Output::builder()
200                .path(GUEST_STDERR_PATH)
201                .url(Url::from_file_path(&stderr_path).expect("path should be absolute"))
202                .ty(OutputType::File)
203                .build(),
204        ];
205
206        let task = Task::builder()
207            .name(self.name)
208            .executions(NonEmpty::new(
209                Execution::builder()
210                    .image(self.container)
211                    .program(
212                        self.config
213                            .task
214                            .shell
215                            .as_deref()
216                            .unwrap_or(DEFAULT_TASK_SHELL),
217                    )
218                    .args([GUEST_COMMAND_PATH.to_string()])
219                    .work_dir(GUEST_WORK_DIR)
220                    .env(self.inner.env().clone())
221                    .stdout(GUEST_STDOUT_PATH)
222                    .stderr(GUEST_STDERR_PATH)
223                    .build(),
224            ))
225            .inputs(inputs)
226            .outputs(outputs)
227            .resources(
228                Resources::builder()
229                    .cpu(self.cpu)
230                    .maybe_cpu_limit(self.max_cpu)
231                    .ram(self.memory as f64 / ONE_GIBIBYTE)
232                    .maybe_ram_limit(self.max_memory.map(|m| m as f64 / ONE_GIBIBYTE))
233                    .build(),
234            )
235            .build();
236
237        let statuses = self.backend.run(task, self.token.clone())?.await?;
238
239        assert_eq!(statuses.len(), 1, "there should only be one exit status");
240        let status = statuses.first();
241
242        Ok(TaskExecutionResult {
243            exit_code: status.code().expect("should have exit code"),
244            work_dir: EvaluationPath::Local(work_dir),
245            stdout: PrimitiveValue::new_file(
246                stdout_path
247                    .into_os_string()
248                    .into_string()
249                    .expect("path should be UTF-8"),
250            )
251            .into(),
252            stderr: PrimitiveValue::new_file(
253                stderr_path
254                    .into_os_string()
255                    .into_string()
256                    .expect("path should be UTF-8"),
257            )
258            .into(),
259        })
260    }
261}
262
263/// Represents the Docker backend.
264pub struct DockerBackend {
265    /// The engine configuration.
266    config: Arc<Config>,
267    /// The underlying Crankshaft backend.
268    inner: Arc<docker::Backend>,
269    /// The maximum amount of concurrency supported.
270    max_concurrency: u64,
271    /// The maximum CPUs for any of one node.
272    max_cpu: u64,
273    /// The maximum memory for any of one node.
274    max_memory: u64,
275    /// The task manager for the backend.
276    manager: TaskManager<DockerTaskRequest>,
277    /// The name generator for tasks.
278    names: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
279}
280
281impl DockerBackend {
282    /// Constructs a new Docker task execution backend with the given
283    /// configuration.
284    ///
285    /// The provided configuration is expected to have already been validated.
286    pub async fn new(
287        config: Arc<Config>,
288        backend_config: &DockerBackendConfig,
289        events: Option<broadcast::Sender<Event>>,
290    ) -> Result<Self> {
291        info!("initializing Docker backend");
292
293        let names = Arc::new(Mutex::new(GeneratorIterator::new(
294            UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
295            INITIAL_EXPECTED_NAMES,
296        )));
297
298        let backend = docker::Backend::initialize_default_with(
299            backend::docker::Config::builder()
300                .cleanup(backend_config.cleanup)
301                .build(),
302            names.clone(),
303            events,
304        )
305        .await
306        .context("failed to initialize Docker backend")?;
307
308        let resources = *backend.resources();
309        let cpu = resources.cpu();
310        let max_cpu = resources.max_cpu();
311        let memory = resources.memory();
312        let max_memory = resources.max_memory();
313
314        // If a service is being used, then we're going to be spawning into a cluster
315        // For the purposes of resource tracking, treat it as unlimited resources and
316        // let Docker handle resource allocation
317        let manager = if resources.use_service() {
318            TaskManager::new_unlimited(max_cpu, max_memory)
319        } else {
320            TaskManager::new(cpu, max_cpu, memory, max_memory)
321        };
322
323        Ok(Self {
324            config,
325            inner: Arc::new(backend),
326            max_concurrency: cpu,
327            max_cpu,
328            max_memory,
329            manager,
330            names,
331        })
332    }
333}
334
335impl TaskExecutionBackend for DockerBackend {
336    fn max_concurrency(&self) -> u64 {
337        self.max_concurrency
338    }
339
340    fn constraints(
341        &self,
342        requirements: &HashMap<String, Value>,
343        _: &HashMap<String, Value>,
344    ) -> Result<TaskExecutionConstraints> {
345        let container = container(requirements, self.config.task.container.as_deref());
346
347        let mut cpu = cpu(requirements);
348        if (self.max_cpu as f64) < cpu {
349            let env_specific = if self.config.suppress_env_specific_output {
350                String::new()
351            } else {
352                format!(
353                    ", but the execution backend has a maximum of {max_cpu}",
354                    max_cpu = self.max_cpu,
355                )
356            };
357            match self.config.task.cpu_limit_behavior {
358                TaskResourceLimitBehavior::TryWithMax => {
359                    warn!(
360                        "task requires at least {cpu} CPU{s}{env_specific}",
361                        s = if cpu == 1.0 { "" } else { "s" },
362                    );
363                    // clamp the reported constraint to what's available
364                    cpu = self.max_cpu as f64;
365                }
366                TaskResourceLimitBehavior::Deny => {
367                    bail!(
368                        "task requires at least {cpu} CPU{s}{env_specific}",
369                        s = if cpu == 1.0 { "" } else { "s" },
370                    );
371                }
372            }
373        }
374
375        let mut memory = memory(requirements)?;
376        if self.max_memory < memory as u64 {
377            let env_specific = if self.config.suppress_env_specific_output {
378                String::new()
379            } else {
380                format!(
381                    ", but the execution backend has a maximum of {max_memory} GiB",
382                    max_memory = self.max_memory as f64 / ONE_GIBIBYTE,
383                )
384            };
385            match self.config.task.memory_limit_behavior {
386                TaskResourceLimitBehavior::TryWithMax => {
387                    warn!(
388                        "task requires at least {memory} GiB of memory{env_specific}",
389                        // Display the error in GiB, as it is the most common unit for memory
390                        memory = memory as f64 / ONE_GIBIBYTE,
391                    );
392                    // clamp the reported constraint to what's available
393                    memory = self.max_memory.try_into().unwrap_or(i64::MAX);
394                }
395                TaskResourceLimitBehavior::Deny => {
396                    bail!(
397                        "task requires at least {memory} GiB of memory{env_specific}",
398                        // Display the error in GiB, as it is the most common unit for memory
399                        memory = memory as f64 / ONE_GIBIBYTE,
400                    );
401                }
402            }
403        }
404
405        Ok(TaskExecutionConstraints {
406            container: Some(container.into_owned()),
407            cpu,
408            memory,
409            gpu: Default::default(),
410            fpga: Default::default(),
411            disks: Default::default(),
412        })
413    }
414
415    fn guest_inputs_dir(&self) -> Option<&'static str> {
416        Some(GUEST_INPUTS_DIR)
417    }
418
419    fn needs_local_inputs(&self) -> bool {
420        true
421    }
422
423    fn spawn(
424        &self,
425        request: TaskSpawnRequest,
426        token: CancellationToken,
427    ) -> Result<Receiver<Result<TaskExecutionResult>>> {
428        let (completed_tx, completed_rx) = oneshot::channel();
429
430        let requirements = request.requirements();
431        let hints = request.hints();
432
433        let container = container(requirements, self.config.task.container.as_deref()).into_owned();
434        let mut cpu = cpu(requirements);
435        if let TaskResourceLimitBehavior::TryWithMax = self.config.task.cpu_limit_behavior {
436            cpu = std::cmp::min(cpu.ceil() as u64, self.max_cpu) as f64;
437        }
438        let mut memory = memory(requirements)? as u64;
439        if let TaskResourceLimitBehavior::TryWithMax = self.config.task.memory_limit_behavior {
440            memory = std::cmp::min(memory, self.max_memory);
441        }
442        let max_cpu = max_cpu(hints);
443        let max_memory = max_memory(hints)?.map(|i| i as u64);
444
445        let name = format!(
446            "{id}-{generated}",
447            id = request.id(),
448            generated = self
449                .names
450                .lock()
451                .expect("generator should always acquire")
452                .next()
453                .expect("generator should never be exhausted")
454        );
455        self.manager.send(
456            DockerTaskRequest {
457                config: self.config.clone(),
458                inner: request,
459                backend: self.inner.clone(),
460                name,
461                container,
462                cpu,
463                memory,
464                max_cpu,
465                max_memory,
466                token,
467            },
468            completed_tx,
469        );
470
471        Ok(completed_rx)
472    }
473
474    #[cfg(unix)]
475    fn cleanup<'a>(
476        &'a self,
477        work_dir: &'a EvaluationPath,
478        token: CancellationToken,
479    ) -> Option<futures::future::BoxFuture<'a, ()>> {
480        use futures::FutureExt;
481        use tracing::debug;
482
483        /// The guest path for the work directory.
484        const GUEST_WORK_DIR: &str = "/mnt/work";
485        /// Amount of CPU to reserve for the cleanup task.
486        const CLEANUP_CPU: f64 = 0.1;
487        /// Amount of memory to reserve for the cleanup task.
488        const CLEANUP_MEMORY: f64 = 0.05;
489
490        // SAFETY: the work directory is always local for the Docker backend
491        let work_dir = work_dir.as_local().expect("path should be local");
492        assert!(work_dir.is_absolute(), "work directory should be absolute");
493
494        let backend = self.inner.clone();
495        let names = self.names.clone();
496
497        Some(
498            async move {
499                let result = async {
500                    let (uid, gid) = unsafe { (libc::geteuid(), libc::getegid()) };
501                    let ownership = format!("{uid}:{gid}");
502
503                    let name = format!(
504                        "docker-backend-cleanup-{id}",
505                        id = names
506                            .lock()
507                            .expect("generator should always acquire")
508                            .next()
509                            .expect("generator should never be exhausted")
510                    );
511
512                    let task = Task::builder()
513                        .name(&name)
514                        .executions(NonEmpty::new(
515                            Execution::builder()
516                                .image("alpine:latest")
517                                .program("chown")
518                                .args([
519                                    "-R".to_string(),
520                                    ownership.clone(),
521                                    GUEST_WORK_DIR.to_string(),
522                                ])
523                                .build(),
524                        ))
525                        .inputs([Input::builder()
526                            .path(GUEST_WORK_DIR)
527                            .contents(Contents::Path(work_dir.to_path_buf()))
528                            .ty(InputType::Directory)
529                            // need write access to chown
530                            .read_only(false)
531                            .build()])
532                        .resources(
533                            Resources::builder()
534                                .cpu(CLEANUP_CPU)
535                                .ram(CLEANUP_MEMORY)
536                                .build(),
537                        )
538                        .build();
539
540                    debug!(
541                        "running cleanup task `{name}` to change ownership of `{path}` to \
542                         `{ownership}`",
543                        path = work_dir.display(),
544                    );
545
546                    let statuses = backend
547                        .run(task, token)
548                        .context("failed to submit cleanup task")?
549                        .await
550                        .context("failed to run cleanup task")?;
551                    let status = statuses.first();
552                    if status.success() {
553                        Ok(())
554                    } else {
555                        bail!(
556                            "failed to chown task work directory `{path}`",
557                            path = work_dir.display()
558                        );
559                    }
560                }
561                .await;
562
563                if let Err(e) = result {
564                    tracing::error!("Docker backend cleanup failed: {e:#}");
565                }
566            }
567            .boxed(),
568        )
569    }
570}