wdl_engine/backend/
local.rs1use std::collections::HashMap;
4use std::ffi::OsStr;
5use std::fs;
6use std::fs::File;
7use std::path::Path;
8use std::process::Stdio;
9
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::bail;
13use futures::FutureExt;
14use futures::future::BoxFuture;
15use tokio::process::Command;
16use tokio::select;
17use tokio::sync::oneshot;
18use tokio::task::JoinSet;
19use tokio_util::sync::CancellationToken;
20use tracing::info;
21
22use super::TaskExecutionBackend;
23use super::TaskExecutionConstraints;
24use super::TaskExecutionEvents;
25use super::TaskManager;
26use super::TaskManagerRequest;
27use super::TaskSpawnRequest;
28use crate::COMMAND_FILE_NAME;
29use crate::Input;
30use crate::ONE_GIBIBYTE;
31use crate::PrimitiveValue;
32use crate::STDERR_FILE_NAME;
33use crate::STDOUT_FILE_NAME;
34use crate::SYSTEM;
35use crate::TaskExecutionResult;
36use crate::Value;
37use crate::WORK_DIR_NAME;
38use crate::config::DEFAULT_TASK_SHELL;
39use crate::config::LocalBackendConfig;
40use crate::config::TaskConfig;
41use crate::convert_unit_string;
42use crate::http::Downloader;
43use crate::http::HttpDownloader;
44use crate::http::Location;
45use crate::path::EvaluationPath;
46use crate::v1::cpu;
47use crate::v1::memory;
48
49#[derive(Debug)]
54struct LocalTaskRequest {
55 inner: TaskSpawnRequest,
57 cpu: f64,
61 memory: u64,
65 shell: Option<String>,
67 token: CancellationToken,
69}
70
71impl TaskManagerRequest for LocalTaskRequest {
72 fn cpu(&self) -> f64 {
73 self.cpu
74 }
75
76 fn memory(&self) -> u64 {
77 self.memory
78 }
79
80 async fn run(self, spawned: oneshot::Sender<()>) -> Result<TaskExecutionResult> {
81 let work_dir = self.inner.attempt_dir().join(WORK_DIR_NAME);
83 fs::create_dir_all(&work_dir).with_context(|| {
84 format!(
85 "failed to create directory `{path}`",
86 path = work_dir.display()
87 )
88 })?;
89
90 let command_path = self.inner.attempt_dir().join(COMMAND_FILE_NAME);
92 fs::write(&command_path, self.inner.command()).with_context(|| {
93 format!(
94 "failed to write command contents to `{path}`",
95 path = command_path.display()
96 )
97 })?;
98
99 let stdout_path = self.inner.attempt_dir().join(STDOUT_FILE_NAME);
101 let stdout = File::create(&stdout_path).with_context(|| {
102 format!(
103 "failed to create stdout file `{path}`",
104 path = stdout_path.display()
105 )
106 })?;
107
108 let stderr_path = self.inner.attempt_dir().join(STDERR_FILE_NAME);
110 let stderr = File::create(&stderr_path).with_context(|| {
111 format!(
112 "failed to create stderr file `{path}`",
113 path = stderr_path.display()
114 )
115 })?;
116
117 let mut command = Command::new(self.shell.as_deref().unwrap_or(DEFAULT_TASK_SHELL));
118 command
119 .current_dir(&work_dir)
120 .arg("-C")
121 .arg(command_path)
122 .stdin(Stdio::null())
123 .stdout(stdout)
124 .stderr(stderr)
125 .envs(
126 self.inner
127 .env()
128 .iter()
129 .map(|(k, v)| (OsStr::new(k), OsStr::new(v))),
130 )
131 .kill_on_drop(true);
132
133 #[cfg(windows)]
136 command.env("WDL_TASK_EVALUATION", "1");
137
138 let mut child = command.spawn().context("failed to spawn `bash`")?;
139
140 spawned.send(()).ok();
142
143 let id = child.id().expect("should have id");
144 info!("spawned local `bash` process {id} for task execution");
145
146 select! {
147 biased;
149
150 _ = self.token.cancelled() => {
151 bail!("task was cancelled");
152 }
153 status = child.wait() => {
154 let status = status.with_context(|| {
155 format!("failed to wait for termination of task child process {id}")
156 })?;
157
158 #[cfg(unix)]
159 {
160 use std::os::unix::process::ExitStatusExt;
161 if let Some(signal) = status.signal() {
162 tracing::warn!("task process {id} has terminated with signal {signal}");
163
164 bail!(
165 "task child process {id} has terminated with signal {signal}; see stderr file \
166 `{path}` for more details",
167 path = stderr_path.display()
168 );
169 }
170 }
171
172 let exit_code = status.code().expect("process should have exited");
173 info!("task process {id} has terminated with status code {exit_code}");
174 Ok(TaskExecutionResult {
175 inputs: self.inner.info.inputs,
176 exit_code,
177 work_dir: EvaluationPath::Local(work_dir),
178 stdout: PrimitiveValue::new_file(stdout_path.into_os_string().into_string().expect("path should be UTF-8")).into(),
179 stderr: PrimitiveValue::new_file(stderr_path.into_os_string().into_string().expect("path should be UTF-8")).into(),
180 })
181 }
182 }
183 }
184}
185
186pub struct LocalBackend {
193 cpu: u64,
195 memory: u64,
197 shell: Option<String>,
199 manager: TaskManager<LocalTaskRequest>,
201}
202
203impl LocalBackend {
204 pub fn new(task: &TaskConfig, config: &LocalBackendConfig) -> Result<Self> {
207 task.validate()?;
208 config.validate()?;
209
210 let cpu = config.cpu.unwrap_or_else(|| SYSTEM.cpus().len() as u64);
211 let memory = config
212 .memory
213 .as_ref()
214 .map(|s| convert_unit_string(s).expect("value should be valid"))
215 .unwrap_or_else(|| SYSTEM.total_memory());
216 let manager = TaskManager::new(cpu, cpu, memory, memory);
217
218 Ok(Self {
219 cpu,
220 memory,
221 shell: task.shell.clone(),
222 manager,
223 })
224 }
225}
226
227impl TaskExecutionBackend for LocalBackend {
228 fn max_concurrency(&self) -> u64 {
229 self.cpu
230 }
231
232 fn constraints(
233 &self,
234 requirements: &HashMap<String, Value>,
235 _: &HashMap<String, Value>,
236 ) -> Result<TaskExecutionConstraints> {
237 let cpu = cpu(requirements);
238 if (self.cpu as f64) < cpu {
239 bail!(
240 "task requires at least {cpu} CPU{s}, but the host only has {total_cpu} available",
241 s = if cpu == 1.0 { "" } else { "s" },
242 total_cpu = self.cpu,
243 );
244 }
245
246 let memory = memory(requirements)?;
247 if self.memory < memory as u64 {
248 let memory = memory as f64 / ONE_GIBIBYTE;
250 let total_memory = self.memory as f64 / ONE_GIBIBYTE;
251
252 bail!(
253 "task requires at least {memory} GiB of memory, but the host only has \
254 {total_memory} GiB available",
255 );
256 }
257
258 Ok(TaskExecutionConstraints {
259 container: None,
260 cpu,
261 memory,
262 gpu: Default::default(),
263 fpga: Default::default(),
264 disks: Default::default(),
265 })
266 }
267
268 fn guest_work_dir(&self) -> Option<&Path> {
269 None
271 }
272
273 fn localize_inputs<'a, 'b, 'c, 'd>(
274 &'a self,
275 downloader: &'b HttpDownloader,
276 inputs: &'c mut [Input],
277 ) -> BoxFuture<'d, Result<()>>
278 where
279 'a: 'd,
280 'b: 'd,
281 'c: 'd,
282 Self: 'd,
283 {
284 async move {
285 let mut download_futs = JoinSet::new();
286
287 for (idx, input) in inputs.iter_mut().enumerate() {
288 match input.path() {
289 EvaluationPath::Local(path) => {
290 let location = Location::Path(path.clone().into());
291 let guest_path = location
292 .to_str()
293 .with_context(|| {
294 format!("path `{path}` is not UTF-8", path = path.display())
295 })?
296 .to_string();
297 input.set_location(location.into_owned());
298 input.set_guest_path(guest_path);
299 }
300 EvaluationPath::Remote(url) => {
301 let downloader = downloader.clone();
302 let url = url.clone();
303 download_futs.spawn(async move {
304 let location_result = downloader.download(&url).await;
305
306 match location_result {
307 Ok(location) => Ok((idx, location.into_owned())),
308 Err(e) => bail!("failed to localize `{url}`: {e:?}"),
309 }
310 });
311 }
312 }
313 }
314
315 while let Some(result) = download_futs.join_next().await {
316 match result {
317 Ok(Ok((idx, location))) => {
318 let guest_path = location
319 .to_str()
320 .with_context(|| {
321 format!(
322 "downloaded path `{path}` is not UTF-8",
323 path = location.display()
324 )
325 })?
326 .to_string();
327
328 let input = inputs.get_mut(idx).expect("index should be valid");
329 input.set_location(location);
330 input.set_guest_path(guest_path);
331 }
332 Ok(Err(e)) => {
333 bail!(e);
335 }
336 Err(e) => {
337 bail!("download task failed: {e}");
339 }
340 }
341 }
342
343 Ok(())
344 }
345 .boxed()
346 }
347
348 fn spawn(
349 &self,
350 request: TaskSpawnRequest,
351 token: CancellationToken,
352 ) -> Result<TaskExecutionEvents> {
353 let (spawned_tx, spawned_rx) = oneshot::channel();
354 let (completed_tx, completed_rx) = oneshot::channel();
355
356 let requirements = request.requirements();
357 let cpu = cpu(requirements);
358 let memory = memory(requirements)? as u64;
359
360 self.manager.send(
361 LocalTaskRequest {
362 inner: request,
363 cpu,
364 memory,
365 shell: self.shell.clone(),
366 token,
367 },
368 spawned_tx,
369 completed_tx,
370 );
371
372 Ok(TaskExecutionEvents {
373 spawned: spawned_rx,
374 completed: completed_rx,
375 })
376 }
377}