wdl_engine/backend/
local.rs

1//! Implementation of the local backend.
2
3use std::collections::HashMap;
4use std::ffi::OsStr;
5use std::fs;
6use std::fs::File;
7use std::path::Path;
8use std::process::Stdio;
9
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::bail;
13use futures::FutureExt;
14use futures::future::BoxFuture;
15use tokio::process::Command;
16use tokio::select;
17use tokio::sync::oneshot;
18use tokio::task::JoinSet;
19use tokio_util::sync::CancellationToken;
20use tracing::info;
21
22use super::TaskExecutionBackend;
23use super::TaskExecutionConstraints;
24use super::TaskExecutionEvents;
25use super::TaskManager;
26use super::TaskManagerRequest;
27use super::TaskSpawnRequest;
28use crate::COMMAND_FILE_NAME;
29use crate::Input;
30use crate::ONE_GIBIBYTE;
31use crate::PrimitiveValue;
32use crate::STDERR_FILE_NAME;
33use crate::STDOUT_FILE_NAME;
34use crate::SYSTEM;
35use crate::TaskExecutionResult;
36use crate::Value;
37use crate::WORK_DIR_NAME;
38use crate::config::DEFAULT_TASK_SHELL;
39use crate::config::LocalBackendConfig;
40use crate::config::TaskConfig;
41use crate::convert_unit_string;
42use crate::http::Downloader;
43use crate::http::HttpDownloader;
44use crate::http::Location;
45use crate::path::EvaluationPath;
46use crate::v1::cpu;
47use crate::v1::memory;
48
49/// Represents a local task request.
50///
51/// This request contains the requested cpu and memory reservations for the task
52/// as well as the result receiver channel.
53#[derive(Debug)]
54struct LocalTaskRequest {
55    /// The inner task spawn request.
56    inner: TaskSpawnRequest,
57    /// The requested CPU reservation for the task.
58    ///
59    /// Note that CPU isn't actually reserved for the task process.
60    cpu: f64,
61    /// The requested memory reservation for the task.
62    ///
63    /// Note that memory isn't actually reserved for the task process.
64    memory: u64,
65    /// The optional shell to use.
66    shell: Option<String>,
67    /// The cancellation token for the request.
68    token: CancellationToken,
69}
70
71impl TaskManagerRequest for LocalTaskRequest {
72    fn cpu(&self) -> f64 {
73        self.cpu
74    }
75
76    fn memory(&self) -> u64 {
77        self.memory
78    }
79
80    async fn run(self, spawned: oneshot::Sender<()>) -> Result<TaskExecutionResult> {
81        // Create the working directory
82        let work_dir = self.inner.attempt_dir().join(WORK_DIR_NAME);
83        fs::create_dir_all(&work_dir).with_context(|| {
84            format!(
85                "failed to create directory `{path}`",
86                path = work_dir.display()
87            )
88        })?;
89
90        // Write the evaluated command to disk
91        let command_path = self.inner.attempt_dir().join(COMMAND_FILE_NAME);
92        fs::write(&command_path, self.inner.command()).with_context(|| {
93            format!(
94                "failed to write command contents to `{path}`",
95                path = command_path.display()
96            )
97        })?;
98
99        // Create a file for the stdout
100        let stdout_path = self.inner.attempt_dir().join(STDOUT_FILE_NAME);
101        let stdout = File::create(&stdout_path).with_context(|| {
102            format!(
103                "failed to create stdout file `{path}`",
104                path = stdout_path.display()
105            )
106        })?;
107
108        // Create a file for the stderr
109        let stderr_path = self.inner.attempt_dir().join(STDERR_FILE_NAME);
110        let stderr = File::create(&stderr_path).with_context(|| {
111            format!(
112                "failed to create stderr file `{path}`",
113                path = stderr_path.display()
114            )
115        })?;
116
117        let mut command = Command::new(self.shell.as_deref().unwrap_or(DEFAULT_TASK_SHELL));
118        command
119            .current_dir(&work_dir)
120            .arg("-C")
121            .arg(command_path)
122            .stdin(Stdio::null())
123            .stdout(stdout)
124            .stderr(stderr)
125            .envs(
126                self.inner
127                    .env()
128                    .iter()
129                    .map(|(k, v)| (OsStr::new(k), OsStr::new(v))),
130            )
131            .kill_on_drop(true);
132
133        // Set the PATH variable for the child on Windows to get consistent PATH
134        // searching. See: https://github.com/rust-lang/rust/issues/122660
135        #[cfg(windows)]
136        if let Ok(path) = std::env::var("PATH") {
137            command.env("PATH", path);
138        }
139
140        let mut child = command.spawn().context("failed to spawn `bash`")?;
141
142        // Notify that the process has spawned
143        spawned.send(()).ok();
144
145        let id = child.id().expect("should have id");
146        info!("spawned local `bash` process {id} for task execution");
147
148        select! {
149            // Poll the cancellation token before the child future
150            biased;
151
152            _ = self.token.cancelled() => {
153                bail!("task was cancelled");
154            }
155            status = child.wait() => {
156                let status = status.with_context(|| {
157                    format!("failed to wait for termination of task child process {id}")
158                })?;
159
160                #[cfg(unix)]
161                {
162                    use std::os::unix::process::ExitStatusExt;
163                    if let Some(signal) = status.signal() {
164                        tracing::warn!("task process {id} has terminated with signal {signal}");
165
166                        bail!(
167                            "task child process {id} has terminated with signal {signal}; see stderr file \
168                            `{path}` for more details",
169                            path = stderr_path.display()
170                        );
171                    }
172                }
173
174                let exit_code = status.code().expect("process should have exited");
175                info!("task process {id} has terminated with status code {exit_code}");
176                Ok(TaskExecutionResult {
177                    inputs: self.inner.info.inputs,
178                    exit_code,
179                    work_dir: EvaluationPath::Local(work_dir),
180                    stdout: PrimitiveValue::new_file(stdout_path.into_os_string().into_string().expect("path should be UTF-8")).into(),
181                    stderr: PrimitiveValue::new_file(stderr_path.into_os_string().into_string().expect("path should be UTF-8")).into(),
182                })
183            }
184        }
185    }
186}
187
188/// Represents a task execution backend that locally executes tasks.
189///
190/// <div class="warning">
191/// Warning: the local task execution backend spawns processes on the host
192/// directly without the use of a container; only use this backend on trusted
193/// WDL. </div>
194pub struct LocalBackend {
195    /// The total CPU of the host.
196    cpu: u64,
197    /// The total memory of the host.
198    memory: u64,
199    /// The optional shell to use.
200    shell: Option<String>,
201    /// The underlying task manager.
202    manager: TaskManager<LocalTaskRequest>,
203}
204
205impl LocalBackend {
206    /// Constructs a new local task execution backend with the given
207    /// configuration.
208    pub fn new(task: &TaskConfig, config: &LocalBackendConfig) -> Result<Self> {
209        task.validate()?;
210        config.validate()?;
211
212        let cpu = config.cpu.unwrap_or_else(|| SYSTEM.cpus().len() as u64);
213        let memory = config
214            .memory
215            .as_ref()
216            .map(|s| convert_unit_string(s).expect("value should be valid"))
217            .unwrap_or_else(|| SYSTEM.total_memory());
218        let manager = TaskManager::new(cpu, cpu, memory, memory);
219
220        Ok(Self {
221            cpu,
222            memory,
223            shell: task.shell.clone(),
224            manager,
225        })
226    }
227}
228
229impl TaskExecutionBackend for LocalBackend {
230    fn max_concurrency(&self) -> u64 {
231        self.cpu
232    }
233
234    fn constraints(
235        &self,
236        requirements: &HashMap<String, Value>,
237        _: &HashMap<String, Value>,
238    ) -> Result<TaskExecutionConstraints> {
239        let cpu = cpu(requirements);
240        if (self.cpu as f64) < cpu {
241            bail!(
242                "task requires at least {cpu} CPU{s}, but the host only has {total_cpu} available",
243                s = if cpu == 1.0 { "" } else { "s" },
244                total_cpu = self.cpu,
245            );
246        }
247
248        let memory = memory(requirements)?;
249        if self.memory < memory as u64 {
250            // Display the error in GiB, as it is the most common unit for memory
251            let memory = memory as f64 / ONE_GIBIBYTE;
252            let total_memory = self.memory as f64 / ONE_GIBIBYTE;
253
254            bail!(
255                "task requires at least {memory} GiB of memory, but the host only has \
256                 {total_memory} GiB available",
257            );
258        }
259
260        Ok(TaskExecutionConstraints {
261            container: None,
262            cpu,
263            memory,
264            gpu: Default::default(),
265            fpga: Default::default(),
266            disks: Default::default(),
267        })
268    }
269
270    fn guest_work_dir(&self) -> Option<&Path> {
271        // Local execution does not use a container
272        None
273    }
274
275    fn localize_inputs<'a, 'b, 'c, 'd>(
276        &'a self,
277        downloader: &'b HttpDownloader,
278        inputs: &'c mut [Input],
279    ) -> BoxFuture<'d, Result<()>>
280    where
281        'a: 'd,
282        'b: 'd,
283        'c: 'd,
284        Self: 'd,
285    {
286        async move {
287            let mut download_futs = JoinSet::new();
288
289            for (idx, input) in inputs.iter_mut().enumerate() {
290                match input.path() {
291                    EvaluationPath::Local(path) => {
292                        let location = Location::Path(path.clone().into());
293                        let guest_path = location
294                            .to_str()
295                            .with_context(|| {
296                                format!("path `{path}` is not UTF-8", path = path.display())
297                            })?
298                            .to_string();
299                        input.set_location(location.into_owned());
300                        input.set_guest_path(guest_path);
301                    }
302                    EvaluationPath::Remote(url) => {
303                        let downloader = downloader.clone();
304                        let url = url.clone();
305                        download_futs.spawn(async move {
306                            let location_result = downloader.download(&url).await;
307
308                            match location_result {
309                                Ok(location) => Ok((idx, location.into_owned())),
310                                Err(e) => bail!("failed to localize `{url}`: {e:?}"),
311                            }
312                        });
313                    }
314                }
315            }
316
317            while let Some(result) = download_futs.join_next().await {
318                match result {
319                    Ok(Ok((idx, location))) => {
320                        let guest_path = location
321                            .to_str()
322                            .with_context(|| {
323                                format!(
324                                    "downloaded path `{path}` is not UTF-8",
325                                    path = location.display()
326                                )
327                            })?
328                            .to_string();
329
330                        let input = inputs.get_mut(idx).expect("index should be valid");
331                        input.set_location(location);
332                        input.set_guest_path(guest_path);
333                    }
334                    Ok(Err(e)) => {
335                        // Futures are aborted when the `JoinSet` is dropped.
336                        bail!(e);
337                    }
338                    Err(e) => {
339                        // Futures are aborted when the `JoinSet` is dropped.
340                        bail!("download task failed: {e}");
341                    }
342                }
343            }
344
345            Ok(())
346        }
347        .boxed()
348    }
349
350    fn spawn(
351        &self,
352        request: TaskSpawnRequest,
353        token: CancellationToken,
354    ) -> Result<TaskExecutionEvents> {
355        let (spawned_tx, spawned_rx) = oneshot::channel();
356        let (completed_tx, completed_rx) = oneshot::channel();
357
358        let requirements = request.requirements();
359        let cpu = cpu(requirements);
360        let memory = memory(requirements)? as u64;
361
362        self.manager.send(
363            LocalTaskRequest {
364                inner: request,
365                cpu,
366                memory,
367                shell: self.shell.clone(),
368                token,
369            },
370            spawned_tx,
371            completed_tx,
372        );
373
374        Ok(TaskExecutionEvents {
375            spawned: spawned_rx,
376            completed: completed_rx,
377        })
378    }
379}