wdl_engine/backend.rs
1//! Implementation of task execution backends.
2
3use std::collections::HashMap;
4use std::path::Path;
5use std::path::PathBuf;
6use std::sync::Arc;
7
8use anyhow::Result;
9use futures::future::BoxFuture;
10use indexmap::IndexMap;
11
12use crate::ContentKind;
13use crate::EvaluationPath;
14use crate::GuestPath;
15use crate::TaskInputs;
16use crate::Value;
17use crate::http::Location;
18use crate::http::Transferer;
19use crate::v1::requirements::ContainerSource;
20
21mod apptainer;
22mod docker;
23mod local;
24mod lsf_apptainer;
25pub(crate) mod manager;
26mod slurm_apptainer;
27mod tes;
28
29pub use apptainer::*;
30pub use docker::*;
31pub use local::*;
32pub use lsf_apptainer::*;
33pub use slurm_apptainer::*;
34pub use tes::*;
35
36/// The default root guest path for inputs.
37const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs/";
38
39/// The default work directory name.
40pub(crate) const WORK_DIR_NAME: &str = "work";
41
42/// The default command file name.
43pub(crate) const COMMAND_FILE_NAME: &str = "command";
44
45/// The default stdout file name.
46pub(crate) const STDOUT_FILE_NAME: &str = "stdout";
47
48/// The default stderr file name.
49pub(crate) const STDERR_FILE_NAME: &str = "stderr";
50
51/// The number of initial expected task names.
52///
53/// This controls the initial size of the bloom filter and how many names are
54/// prepopulated into a name generator.
55const INITIAL_EXPECTED_NAMES: usize = 1000;
56
57/// Represents a `File` or `Directory` input to a backend.
58#[derive(Debug, Clone)]
59pub(crate) struct Input {
60 /// The content kind of the input.
61 kind: ContentKind,
62 /// The path for the input.
63 path: EvaluationPath,
64 /// The guest path for the input.
65 ///
66 /// This is `None` when the backend isn't mapping input paths.
67 guest_path: Option<GuestPath>,
68 /// The download location for the input.
69 ///
70 /// This is `Some` if the input has been downloaded to a known location.
71 location: Option<Location>,
72}
73
74impl Input {
75 /// Creates a new input with the given path and guest path.
76 pub fn new(kind: ContentKind, path: EvaluationPath, guest_path: Option<GuestPath>) -> Self {
77 Self {
78 kind,
79 path,
80 guest_path,
81 location: None,
82 }
83 }
84
85 /// Gets the content kind of the input.
86 pub fn kind(&self) -> ContentKind {
87 self.kind
88 }
89
90 /// Gets the path to the input.
91 ///
92 /// The path of the input may be local or remote.
93 pub fn path(&self) -> &EvaluationPath {
94 &self.path
95 }
96
97 /// Gets the guest path for the input.
98 ///
99 /// This is `None` for inputs to backends that don't use containers.
100 pub fn guest_path(&self) -> Option<&GuestPath> {
101 self.guest_path.as_ref()
102 }
103
104 /// Gets the local path of the input.
105 ///
106 /// Returns `None` if the input is remote and has not been localized.
107 pub fn local_path(&self) -> Option<&Path> {
108 self.location.as_deref().or_else(|| self.path.as_local())
109 }
110
111 /// Sets the location of the input.
112 ///
113 /// This is used during localization to set a local path for remote inputs.
114 pub fn set_location(&mut self, location: Location) {
115 self.location = Some(location);
116 }
117}
118
119/// Represents constraints applied to a task's execution.
120#[derive(Debug)]
121pub struct TaskExecutionConstraints {
122 /// The container the task will run in.
123 ///
124 /// A value of `None` indicates the task will run on the host.
125 pub container: Option<ContainerSource>,
126 /// The allocated number of CPUs; must be greater than 0.
127 pub cpu: f64,
128 /// The allocated memory in bytes; must be greater than 0.
129 pub memory: u64,
130 /// A list with one specification per allocated GPU.
131 ///
132 /// The specification is execution engine-specific.
133 ///
134 /// If no GPUs were allocated, then the value must be an empty list.
135 pub gpu: Vec<String>,
136 /// A list with one specification per allocated FPGA.
137 ///
138 /// The specification is execution engine-specific.
139 ///
140 /// If no FPGAs were allocated, then the value must be an empty list.
141 pub fpga: Vec<String>,
142 /// A map with one entry for each disk mount point.
143 ///
144 /// The key is the mount point and the value is the initial amount of disk
145 /// space allocated, in bytes.
146 ///
147 /// The execution engine must, at a minimum, provide one entry for each disk
148 /// mount point requested, but may provide more.
149 ///
150 /// The amount of disk space available for a given mount point may increase
151 /// during the lifetime of the task (e.g., autoscaling volumes provided by
152 /// some cloud services).
153 pub disks: IndexMap<String, i64>,
154}
155
156/// Represents a request to execute a task.
157#[derive(Debug)]
158pub struct ExecuteTaskRequest<'a> {
159 /// The id of the task being executed.
160 pub id: &'a str,
161 /// The command of the task.
162 pub command: &'a str,
163 /// The original input values to the task.
164 pub inputs: &'a TaskInputs,
165 /// The backend inputs for task.
166 pub backend_inputs: &'a [Input],
167 /// The requirements of the task.
168 pub requirements: &'a HashMap<String, Value>,
169 /// The hints of the task.
170 pub hints: &'a HashMap<String, Value>,
171 /// The environment variables of the task.
172 pub env: &'a IndexMap<String, String>,
173 /// The constraints for the task's execution.
174 pub constraints: &'a TaskExecutionConstraints,
175 /// The attempt directory for the task's execution.
176 pub attempt_dir: &'a Path,
177 /// The temp directory for the evaluation.
178 pub temp_dir: &'a Path,
179}
180
181impl<'a> ExecuteTaskRequest<'a> {
182 /// The host path for the command to store the task's evaluated command.
183 pub fn command_path(&self) -> PathBuf {
184 self.attempt_dir.join(COMMAND_FILE_NAME)
185 }
186
187 /// The default work directory host path.
188 ///
189 /// This is used by backends that support local or shared file systems.
190 pub fn work_dir(&self) -> PathBuf {
191 self.attempt_dir.join(WORK_DIR_NAME)
192 }
193
194 /// The default stdout file host path.
195 ///
196 /// This is used by backends that support local or shared file systems.
197 pub fn stdout_path(&self) -> PathBuf {
198 self.attempt_dir.join(STDOUT_FILE_NAME)
199 }
200
201 /// The default stderr file host path.
202 ///
203 /// This is used by backends that support local or shared file systems.
204 pub fn stderr_path(&self) -> PathBuf {
205 self.attempt_dir.join(STDERR_FILE_NAME)
206 }
207}
208
209/// Represents the result of a task's execution.
210#[derive(Debug)]
211pub struct TaskExecutionResult {
212 /// Stores the task process exit code.
213 pub exit_code: i32,
214 /// The task's working directory.
215 pub work_dir: EvaluationPath,
216 /// The value of the task's stdout file.
217 pub stdout: Value,
218 /// The value of the task's stderr file.
219 pub stderr: Value,
220}
221
222/// Represents a task execution backend.
223pub(crate) trait TaskExecutionBackend: Send + Sync {
224 /// Gets the execution constraints given a task's inputs, requirements, and
225 /// hints.
226 ///
227 /// The returned constraints are used to populate the `task` variable in WDL
228 /// 1.2+.
229 ///
230 /// Returns an error if the task cannot be constrained for the execution
231 /// environment or if the task specifies invalid requirements.
232 fn constraints(
233 &self,
234 inputs: &TaskInputs,
235 requirements: &HashMap<String, Value>,
236 hints: &HashMap<String, Value>,
237 ) -> Result<TaskExecutionConstraints>;
238
239 /// Gets the guest (container) inputs directory of the backend.
240 ///
241 /// Returns `None` if the backend does not execute tasks in a container.
242 ///
243 /// The returned path is expected to be Unix style and end with a backslash.
244 fn guest_inputs_dir(&self) -> Option<&'static str> {
245 Some(GUEST_INPUTS_DIR)
246 }
247
248 /// Determines if the backend needs local inputs.
249 ///
250 /// Backends that run tasks remotely should return `false`.
251 fn needs_local_inputs(&self) -> bool {
252 true
253 }
254
255 /// Execute a task with the execution backend using the provided file
256 /// transferer.
257 ///
258 /// Returns the result of the task's execution or `None` if the task was
259 /// canceled.
260 fn execute<'a>(
261 &'a self,
262 transferer: &'a Arc<dyn Transferer>,
263 request: ExecuteTaskRequest<'a>,
264 ) -> BoxFuture<'a, Result<Option<TaskExecutionResult>>>;
265}