Skip to main content

wdl_engine/
backend.rs

1//! Implementation of task execution backends.
2
3use std::collections::HashMap;
4use std::path::Path;
5use std::path::PathBuf;
6use std::sync::Arc;
7
8use anyhow::Result;
9use futures::future::BoxFuture;
10use indexmap::IndexMap;
11
12use crate::ContentKind;
13use crate::EvaluationPath;
14use crate::GuestPath;
15use crate::TaskInputs;
16use crate::Value;
17use crate::http::Location;
18use crate::http::Transferer;
19use crate::v1::requirements::ContainerSource;
20
21mod apptainer;
22mod docker;
23mod local;
24mod lsf_apptainer;
25pub(crate) mod manager;
26mod slurm_apptainer;
27mod tes;
28
29pub use apptainer::*;
30pub use docker::*;
31pub use local::*;
32pub use lsf_apptainer::*;
33pub use slurm_apptainer::*;
34pub use tes::*;
35
36/// The default root guest path for inputs.
37const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs/";
38
39/// The default work directory name.
40pub(crate) const WORK_DIR_NAME: &str = "work";
41
42/// The default command file name.
43pub(crate) const COMMAND_FILE_NAME: &str = "command";
44
45/// The default stdout file name.
46pub(crate) const STDOUT_FILE_NAME: &str = "stdout";
47
48/// The default stderr file name.
49pub(crate) const STDERR_FILE_NAME: &str = "stderr";
50
51/// The number of initial expected task names.
52///
53/// This controls the initial size of the bloom filter and how many names are
54/// prepopulated into a name generator.
55const INITIAL_EXPECTED_NAMES: usize = 1000;
56
57/// Represents a `File` or `Directory` input to a backend.
58#[derive(Debug, Clone)]
59pub(crate) struct Input {
60    /// The content kind of the input.
61    kind: ContentKind,
62    /// The path for the input.
63    path: EvaluationPath,
64    /// The guest path for the input.
65    ///
66    /// This is `None` when the backend isn't mapping input paths.
67    guest_path: Option<GuestPath>,
68    /// The download location for the input.
69    ///
70    /// This is `Some` if the input has been downloaded to a known location.
71    location: Option<Location>,
72}
73
74impl Input {
75    /// Creates a new input with the given path and guest path.
76    pub fn new(kind: ContentKind, path: EvaluationPath, guest_path: Option<GuestPath>) -> Self {
77        Self {
78            kind,
79            path,
80            guest_path,
81            location: None,
82        }
83    }
84
85    /// Gets the content kind of the input.
86    pub fn kind(&self) -> ContentKind {
87        self.kind
88    }
89
90    /// Gets the path to the input.
91    ///
92    /// The path of the input may be local or remote.
93    pub fn path(&self) -> &EvaluationPath {
94        &self.path
95    }
96
97    /// Gets the guest path for the input.
98    ///
99    /// This is `None` for inputs to backends that don't use containers.
100    pub fn guest_path(&self) -> Option<&GuestPath> {
101        self.guest_path.as_ref()
102    }
103
104    /// Gets the local path of the input.
105    ///
106    /// Returns `None` if the input is remote and has not been localized.
107    pub fn local_path(&self) -> Option<&Path> {
108        self.location.as_deref().or_else(|| self.path.as_local())
109    }
110
111    /// Sets the location of the input.
112    ///
113    /// This is used during localization to set a local path for remote inputs.
114    pub fn set_location(&mut self, location: Location) {
115        self.location = Some(location);
116    }
117}
118
119/// Represents constraints applied to a task's execution.
120#[derive(Debug)]
121pub struct TaskExecutionConstraints {
122    /// The container the task will run in.
123    ///
124    /// A value of `None` indicates the task will run on the host.
125    pub container: Option<ContainerSource>,
126    /// The allocated number of CPUs; must be greater than 0.
127    pub cpu: f64,
128    /// The allocated memory in bytes; must be greater than 0.
129    pub memory: u64,
130    /// A list with one specification per allocated GPU.
131    ///
132    /// The specification is execution engine-specific.
133    ///
134    /// If no GPUs were allocated, then the value must be an empty list.
135    pub gpu: Vec<String>,
136    /// A list with one specification per allocated FPGA.
137    ///
138    /// The specification is execution engine-specific.
139    ///
140    /// If no FPGAs were allocated, then the value must be an empty list.
141    pub fpga: Vec<String>,
142    /// A map with one entry for each disk mount point.
143    ///
144    /// The key is the mount point and the value is the initial amount of disk
145    /// space allocated, in bytes.
146    ///
147    /// The execution engine must, at a minimum, provide one entry for each disk
148    /// mount point requested, but may provide more.
149    ///
150    /// The amount of disk space available for a given mount point may increase
151    /// during the lifetime of the task (e.g., autoscaling volumes provided by
152    /// some cloud services).
153    pub disks: IndexMap<String, i64>,
154}
155
156/// Represents a request to execute a task.
157#[derive(Debug)]
158pub struct ExecuteTaskRequest<'a> {
159    /// The id of the task being executed.
160    pub id: &'a str,
161    /// The command of the task.
162    pub command: &'a str,
163    /// The original input values to the task.
164    pub inputs: &'a TaskInputs,
165    /// The backend inputs for task.
166    pub backend_inputs: &'a [Input],
167    /// The requirements of the task.
168    pub requirements: &'a HashMap<String, Value>,
169    /// The hints of the task.
170    pub hints: &'a HashMap<String, Value>,
171    /// The environment variables of the task.
172    pub env: &'a IndexMap<String, String>,
173    /// The constraints for the task's execution.
174    pub constraints: &'a TaskExecutionConstraints,
175    /// The attempt directory for the task's execution.
176    pub attempt_dir: &'a Path,
177    /// The temp directory for the evaluation.
178    pub temp_dir: &'a Path,
179}
180
181impl<'a> ExecuteTaskRequest<'a> {
182    /// The host path for the command to store the task's evaluated command.
183    pub fn command_path(&self) -> PathBuf {
184        self.attempt_dir.join(COMMAND_FILE_NAME)
185    }
186
187    /// The default work directory host path.
188    ///
189    /// This is used by backends that support local or shared file systems.
190    pub fn work_dir(&self) -> PathBuf {
191        self.attempt_dir.join(WORK_DIR_NAME)
192    }
193
194    /// The default stdout file host path.
195    ///
196    /// This is used by backends that support local or shared file systems.
197    pub fn stdout_path(&self) -> PathBuf {
198        self.attempt_dir.join(STDOUT_FILE_NAME)
199    }
200
201    /// The default stderr file host path.
202    ///
203    /// This is used by backends that support local or shared file systems.
204    pub fn stderr_path(&self) -> PathBuf {
205        self.attempt_dir.join(STDERR_FILE_NAME)
206    }
207}
208
209/// Represents the result of a task's execution.
210#[derive(Debug)]
211pub struct TaskExecutionResult {
212    /// Stores the task process exit code.
213    pub exit_code: i32,
214    /// The task's working directory.
215    pub work_dir: EvaluationPath,
216    /// The value of the task's stdout file.
217    pub stdout: Value,
218    /// The value of the task's stderr file.
219    pub stderr: Value,
220}
221
222/// Represents a task execution backend.
223pub(crate) trait TaskExecutionBackend: Send + Sync {
224    /// Gets the execution constraints given a task's inputs, requirements, and
225    /// hints.
226    ///
227    /// The returned constraints are used to populate the `task` variable in WDL
228    /// 1.2+.
229    ///
230    /// Returns an error if the task cannot be constrained for the execution
231    /// environment or if the task specifies invalid requirements.
232    fn constraints(
233        &self,
234        inputs: &TaskInputs,
235        requirements: &HashMap<String, Value>,
236        hints: &HashMap<String, Value>,
237    ) -> Result<TaskExecutionConstraints>;
238
239    /// Gets the guest (container) inputs directory of the backend.
240    ///
241    /// Returns `None` if the backend does not execute tasks in a container.
242    ///
243    /// The returned path is expected to be Unix style and end with a backslash.
244    fn guest_inputs_dir(&self) -> Option<&'static str> {
245        Some(GUEST_INPUTS_DIR)
246    }
247
248    /// Determines if the backend needs local inputs.
249    ///
250    /// Backends that run tasks remotely should return `false`.
251    fn needs_local_inputs(&self) -> bool {
252        true
253    }
254
255    /// Execute a task with the execution backend using the provided file
256    /// transferer.
257    ///
258    /// Returns the result of the task's execution or `None` if the task was
259    /// canceled.
260    fn execute<'a>(
261        &'a self,
262        transferer: &'a Arc<dyn Transferer>,
263        request: ExecuteTaskRequest<'a>,
264    ) -> BoxFuture<'a, Result<Option<TaskExecutionResult>>>;
265}