wdl_engine/backend/
local.rs1use std::collections::HashMap;
4use std::ffi::OsStr;
5use std::fs;
6use std::fs::File;
7use std::path::Path;
8use std::process::Stdio;
9
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::bail;
13use futures::FutureExt;
14use futures::future::BoxFuture;
15use tokio::process::Command;
16use tokio::select;
17use tokio::sync::oneshot;
18use tokio::task::JoinSet;
19use tokio_util::sync::CancellationToken;
20use tracing::info;
21
22use super::TaskExecutionBackend;
23use super::TaskExecutionConstraints;
24use super::TaskExecutionEvents;
25use super::TaskManager;
26use super::TaskManagerRequest;
27use super::TaskSpawnRequest;
28use crate::COMMAND_FILE_NAME;
29use crate::Input;
30use crate::ONE_GIBIBYTE;
31use crate::PrimitiveValue;
32use crate::STDERR_FILE_NAME;
33use crate::STDOUT_FILE_NAME;
34use crate::SYSTEM;
35use crate::TaskExecutionResult;
36use crate::Value;
37use crate::WORK_DIR_NAME;
38use crate::config::DEFAULT_TASK_SHELL;
39use crate::config::LocalBackendConfig;
40use crate::config::TaskConfig;
41use crate::convert_unit_string;
42use crate::http::Downloader;
43use crate::http::HttpDownloader;
44use crate::http::Location;
45use crate::path::EvaluationPath;
46use crate::v1::cpu;
47use crate::v1::memory;
48
49#[derive(Debug)]
54struct LocalTaskRequest {
55 inner: TaskSpawnRequest,
57 cpu: f64,
61 memory: u64,
65 shell: Option<String>,
67 token: CancellationToken,
69}
70
71impl TaskManagerRequest for LocalTaskRequest {
72 fn cpu(&self) -> f64 {
73 self.cpu
74 }
75
76 fn memory(&self) -> u64 {
77 self.memory
78 }
79
80 async fn run(self, spawned: oneshot::Sender<()>) -> Result<TaskExecutionResult> {
81 let work_dir = self.inner.attempt_dir().join(WORK_DIR_NAME);
83 fs::create_dir_all(&work_dir).with_context(|| {
84 format!(
85 "failed to create directory `{path}`",
86 path = work_dir.display()
87 )
88 })?;
89
90 let command_path = self.inner.attempt_dir().join(COMMAND_FILE_NAME);
92 fs::write(&command_path, self.inner.command()).with_context(|| {
93 format!(
94 "failed to write command contents to `{path}`",
95 path = command_path.display()
96 )
97 })?;
98
99 let stdout_path = self.inner.attempt_dir().join(STDOUT_FILE_NAME);
101 let stdout = File::create(&stdout_path).with_context(|| {
102 format!(
103 "failed to create stdout file `{path}`",
104 path = stdout_path.display()
105 )
106 })?;
107
108 let stderr_path = self.inner.attempt_dir().join(STDERR_FILE_NAME);
110 let stderr = File::create(&stderr_path).with_context(|| {
111 format!(
112 "failed to create stderr file `{path}`",
113 path = stderr_path.display()
114 )
115 })?;
116
117 let mut command = Command::new(self.shell.as_deref().unwrap_or(DEFAULT_TASK_SHELL));
118 command
119 .current_dir(&work_dir)
120 .arg("-C")
121 .arg(command_path)
122 .stdin(Stdio::null())
123 .stdout(stdout)
124 .stderr(stderr)
125 .envs(
126 self.inner
127 .env()
128 .iter()
129 .map(|(k, v)| (OsStr::new(k), OsStr::new(v))),
130 )
131 .kill_on_drop(true);
132
133 #[cfg(windows)]
136 if let Ok(path) = std::env::var("PATH") {
137 command.env("PATH", path);
138 }
139
140 let mut child = command.spawn().context("failed to spawn `bash`")?;
141
142 spawned.send(()).ok();
144
145 let id = child.id().expect("should have id");
146 info!("spawned local `bash` process {id} for task execution");
147
148 select! {
149 biased;
151
152 _ = self.token.cancelled() => {
153 bail!("task was cancelled");
154 }
155 status = child.wait() => {
156 let status = status.with_context(|| {
157 format!("failed to wait for termination of task child process {id}")
158 })?;
159
160 #[cfg(unix)]
161 {
162 use std::os::unix::process::ExitStatusExt;
163 if let Some(signal) = status.signal() {
164 tracing::warn!("task process {id} has terminated with signal {signal}");
165
166 bail!(
167 "task child process {id} has terminated with signal {signal}; see stderr file \
168 `{path}` for more details",
169 path = stderr_path.display()
170 );
171 }
172 }
173
174 let exit_code = status.code().expect("process should have exited");
175 info!("task process {id} has terminated with status code {exit_code}");
176 Ok(TaskExecutionResult {
177 inputs: self.inner.info.inputs,
178 exit_code,
179 work_dir: EvaluationPath::Local(work_dir),
180 stdout: PrimitiveValue::new_file(stdout_path.into_os_string().into_string().expect("path should be UTF-8")).into(),
181 stderr: PrimitiveValue::new_file(stderr_path.into_os_string().into_string().expect("path should be UTF-8")).into(),
182 })
183 }
184 }
185 }
186}
187
188pub struct LocalBackend {
195 cpu: u64,
197 memory: u64,
199 shell: Option<String>,
201 manager: TaskManager<LocalTaskRequest>,
203}
204
205impl LocalBackend {
206 pub fn new(task: &TaskConfig, config: &LocalBackendConfig) -> Result<Self> {
209 task.validate()?;
210 config.validate()?;
211
212 let cpu = config.cpu.unwrap_or_else(|| SYSTEM.cpus().len() as u64);
213 let memory = config
214 .memory
215 .as_ref()
216 .map(|s| convert_unit_string(s).expect("value should be valid"))
217 .unwrap_or_else(|| SYSTEM.total_memory());
218 let manager = TaskManager::new(cpu, cpu, memory, memory);
219
220 Ok(Self {
221 cpu,
222 memory,
223 shell: task.shell.clone(),
224 manager,
225 })
226 }
227}
228
229impl TaskExecutionBackend for LocalBackend {
230 fn max_concurrency(&self) -> u64 {
231 self.cpu
232 }
233
234 fn constraints(
235 &self,
236 requirements: &HashMap<String, Value>,
237 _: &HashMap<String, Value>,
238 ) -> Result<TaskExecutionConstraints> {
239 let cpu = cpu(requirements);
240 if (self.cpu as f64) < cpu {
241 bail!(
242 "task requires at least {cpu} CPU{s}, but the host only has {total_cpu} available",
243 s = if cpu == 1.0 { "" } else { "s" },
244 total_cpu = self.cpu,
245 );
246 }
247
248 let memory = memory(requirements)?;
249 if self.memory < memory as u64 {
250 let memory = memory as f64 / ONE_GIBIBYTE;
252 let total_memory = self.memory as f64 / ONE_GIBIBYTE;
253
254 bail!(
255 "task requires at least {memory} GiB of memory, but the host only has \
256 {total_memory} GiB available",
257 );
258 }
259
260 Ok(TaskExecutionConstraints {
261 container: None,
262 cpu,
263 memory,
264 gpu: Default::default(),
265 fpga: Default::default(),
266 disks: Default::default(),
267 })
268 }
269
270 fn guest_work_dir(&self) -> Option<&Path> {
271 None
273 }
274
275 fn localize_inputs<'a, 'b, 'c, 'd>(
276 &'a self,
277 downloader: &'b HttpDownloader,
278 inputs: &'c mut [Input],
279 ) -> BoxFuture<'d, Result<()>>
280 where
281 'a: 'd,
282 'b: 'd,
283 'c: 'd,
284 Self: 'd,
285 {
286 async move {
287 let mut download_futs = JoinSet::new();
288
289 for (idx, input) in inputs.iter_mut().enumerate() {
290 match input.path() {
291 EvaluationPath::Local(path) => {
292 let location = Location::Path(path.clone().into());
293 let guest_path = location
294 .to_str()
295 .with_context(|| {
296 format!("path `{path}` is not UTF-8", path = path.display())
297 })?
298 .to_string();
299 input.set_location(location.into_owned());
300 input.set_guest_path(guest_path);
301 }
302 EvaluationPath::Remote(url) => {
303 let downloader = downloader.clone();
304 let url = url.clone();
305 download_futs.spawn(async move {
306 let location_result = downloader.download(&url).await;
307
308 match location_result {
309 Ok(location) => Ok((idx, location.into_owned())),
310 Err(e) => bail!("failed to localize `{url}`: {e:?}"),
311 }
312 });
313 }
314 }
315 }
316
317 while let Some(result) = download_futs.join_next().await {
318 match result {
319 Ok(Ok((idx, location))) => {
320 let guest_path = location
321 .to_str()
322 .with_context(|| {
323 format!(
324 "downloaded path `{path}` is not UTF-8",
325 path = location.display()
326 )
327 })?
328 .to_string();
329
330 let input = inputs.get_mut(idx).expect("index should be valid");
331 input.set_location(location);
332 input.set_guest_path(guest_path);
333 }
334 Ok(Err(e)) => {
335 bail!(e);
337 }
338 Err(e) => {
339 bail!("download task failed: {e}");
341 }
342 }
343 }
344
345 Ok(())
346 }
347 .boxed()
348 }
349
350 fn spawn(
351 &self,
352 request: TaskSpawnRequest,
353 token: CancellationToken,
354 ) -> Result<TaskExecutionEvents> {
355 let (spawned_tx, spawned_rx) = oneshot::channel();
356 let (completed_tx, completed_rx) = oneshot::channel();
357
358 let requirements = request.requirements();
359 let cpu = cpu(requirements);
360 let memory = memory(requirements)? as u64;
361
362 self.manager.send(
363 LocalTaskRequest {
364 inner: request,
365 cpu,
366 memory,
367 shell: self.shell.clone(),
368 token,
369 },
370 spawned_tx,
371 completed_tx,
372 );
373
374 Ok(TaskExecutionEvents {
375 spawned: spawned_rx,
376 completed: completed_rx,
377 })
378 }
379}