Skip to main content

wdl_engine/backend/
local.rs

1//! Implementation of the local backend.
2
3use std::collections::HashMap;
4use std::ffi::OsStr;
5use std::fs;
6use std::fs::File;
7use std::process::Stdio;
8use std::sync::Arc;
9use std::sync::Mutex;
10
11use anyhow::Context;
12use anyhow::Result;
13use anyhow::bail;
14use crankshaft::engine::service::name::GeneratorIterator;
15use crankshaft::engine::service::name::UniqueAlphanumeric;
16use crankshaft::events::Event;
17use crankshaft::events::next_task_id;
18use crankshaft::events::send_event;
19use futures::FutureExt;
20use futures::future::BoxFuture;
21use nonempty::NonEmpty;
22use tokio::process::Command;
23use tokio::select;
24use tokio::sync::broadcast;
25use tokio_util::sync::CancellationToken;
26use tracing::info;
27use tracing::warn;
28
29use super::TaskExecutionBackend;
30use super::TaskExecutionConstraints;
31use crate::CancellationContext;
32use crate::EvaluationPath;
33use crate::Events;
34use crate::ONE_GIBIBYTE;
35use crate::PrimitiveValue;
36use crate::SYSTEM;
37use crate::TaskInputs;
38use crate::Value;
39use crate::backend::ExecuteTaskRequest;
40use crate::backend::INITIAL_EXPECTED_NAMES;
41use crate::backend::TaskExecutionResult;
42use crate::backend::manager::TaskManager;
43use crate::config::Config;
44use crate::config::DEFAULT_TASK_SHELL;
45use crate::config::TaskResourceLimitBehavior;
46use crate::convert_unit_string;
47use crate::http::Transferer;
48use crate::v1::requirements;
49
50/// Represents a local task request.
51///
52/// This request contains the requested cpu and memory reservations for the task
53/// as well as the result receiver channel.
54struct LocalTask<'a> {
55    /// The engine configuration.
56    config: Arc<Config>,
57    /// The task execution request.
58    request: ExecuteTaskRequest<'a>,
59    /// The name of the task.
60    name: String,
61    /// The sender for events.
62    events: Option<broadcast::Sender<Event>>,
63    /// The evaluation cancellation context.
64    cancellation: CancellationContext,
65}
66
67impl<'a> LocalTask<'a> {
68    /// Runs the local task.
69    ///
70    /// Returns `Ok(None)` if the task was canceled.
71    async fn run(self) -> Result<Option<TaskExecutionResult>> {
72        let id = next_task_id();
73        let work_dir = self.request.work_dir();
74        let stdout_path = self.request.stdout_path();
75        let stderr_path = self.request.stderr_path();
76
77        let run = async {
78            // Create the working directory
79            fs::create_dir_all(&work_dir).with_context(|| {
80                format!(
81                    "failed to create directory `{path}`",
82                    path = work_dir.display()
83                )
84            })?;
85
86            // Write the evaluated command to disk
87            let command_path = self.request.command_path();
88            fs::write(&command_path, self.request.command).with_context(|| {
89                format!(
90                    "failed to write command contents to `{path}`",
91                    path = command_path.display()
92                )
93            })?;
94
95            // Create a file for the stdout
96            let stdout = File::create(&stdout_path).with_context(|| {
97                format!(
98                    "failed to create stdout file `{path}`",
99                    path = stdout_path.display()
100                )
101            })?;
102
103            // Create a file for the stderr
104            let stderr = File::create(&stderr_path).with_context(|| {
105                format!(
106                    "failed to create stderr file `{path}`",
107                    path = stderr_path.display()
108                )
109            })?;
110
111            let mut command = Command::new(
112                self.config
113                    .task
114                    .shell
115                    .as_deref()
116                    .unwrap_or(DEFAULT_TASK_SHELL),
117            );
118            command
119                .current_dir(&work_dir)
120                .arg(command_path)
121                .stdin(Stdio::null())
122                .stdout(stdout)
123                .stderr(stderr)
124                .envs(
125                    self.request
126                        .env
127                        .iter()
128                        .map(|(k, v)| (OsStr::new(k), OsStr::new(v))),
129                )
130                .kill_on_drop(true);
131
132            // Set the PATH variable for the child on Windows to get consistent PATH
133            // searching. See: https://github.com/rust-lang/rust/issues/122660
134            #[cfg(windows)]
135            if let Ok(path) = std::env::var("PATH") {
136                command.env("PATH", path);
137            }
138
139            let mut child = command.spawn().context("failed to spawn shell")?;
140
141            // Notify that the process has spawned
142            send_event!(self.events, Event::TaskStarted { id });
143
144            let id = child.id().expect("should have id");
145            info!(
146                "spawned local shell process {id} for execution of task `{name}`",
147                name = self.name
148            );
149
150            let status = child.wait().await.with_context(|| {
151                format!("failed to wait for termination of task child process {id}")
152            })?;
153
154            #[cfg(unix)]
155            {
156                use std::os::unix::process::ExitStatusExt;
157                if let Some(signal) = status.signal() {
158                    tracing::warn!("task process {id} has terminated with signal {signal}");
159
160                    bail!(
161                        "task child process {id} has terminated with signal {signal}; see stderr \
162                         file `{path}` for more details",
163                        path = stderr_path.display()
164                    );
165                }
166            }
167
168            Ok(status)
169        };
170
171        // Send the created event
172        let task_token = CancellationToken::new();
173        send_event!(
174            self.events,
175            Event::TaskCreated {
176                id,
177                name: self.name.clone(),
178                tes_id: None,
179                token: task_token.clone(),
180            }
181        );
182
183        let token = self.cancellation.second();
184
185        select! {
186            // Poll the cancellation tokens before the child future
187            biased;
188            _ = task_token.cancelled() => {
189                send_event!(self.events, Event::TaskCanceled { id });
190                Ok(None)
191            }
192            _ = token.cancelled() => {
193                send_event!(self.events, Event::TaskCanceled { id });
194                Ok(None)
195            }
196            result = run => {
197                match result {
198                    Ok(status) => {
199                        send_event!(self.events, Event::TaskCompleted { id, exit_statuses: NonEmpty::new(status) });
200
201                        let exit_code = status.code().expect("process should have exited");
202                        info!("process {id} for task `{name}` has terminated with status code {exit_code}", name = self.name);
203                        Ok(Some(TaskExecutionResult {
204                            exit_code,
205                            work_dir: EvaluationPath::from_local_path(work_dir),
206                            stdout: PrimitiveValue::new_file(stdout_path.into_os_string().into_string().expect("path should be UTF-8")).into(),
207                            stderr: PrimitiveValue::new_file(stderr_path.into_os_string().into_string().expect("path should be UTF-8")).into(),
208                        }))
209                    }
210                    Err(e) => {
211                        send_event!(self.events, Event::TaskFailed { id, message: format!("{e:#}") });
212                        Err(e)
213                    }
214                }
215            }
216        }
217    }
218}
219
220/// Represents a task execution backend that locally executes tasks.
221///
222/// <div class="warning">
223/// Warning: the local task execution backend spawns processes on the host
224/// directly without the use of a container; only use this backend on trusted
225/// WDL. </div>
226pub struct LocalBackend {
227    /// The engine configuration.
228    config: Arc<Config>,
229    /// The evaluation cancellation context.
230    cancellation: CancellationContext,
231    /// The total CPU of the host.
232    cpu: f64,
233    /// The total memory of the host.
234    memory: u64,
235    /// The underlying task manager.
236    manager: TaskManager,
237    /// The name generator for tasks.
238    names: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
239    /// The sender for events.
240    events: Events,
241}
242
243impl LocalBackend {
244    /// Constructs a new local task execution backend with the given
245    /// configuration.
246    ///
247    /// The provided configuration is expected to have already been validated.
248    pub fn new(
249        config: Arc<Config>,
250        events: Events,
251        cancellation: CancellationContext,
252    ) -> Result<Self> {
253        info!("initializing local backend");
254
255        let names = Arc::new(Mutex::new(GeneratorIterator::new(
256            UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
257            INITIAL_EXPECTED_NAMES,
258        )));
259
260        let backend_config = config.backend()?;
261        let backend_config = backend_config
262            .as_local()
263            .context("configured backend is not local")?;
264        let cpu = backend_config
265            .cpu
266            .map(|v| v as f64)
267            .unwrap_or_else(|| SYSTEM.cpus().len() as f64);
268        let memory = backend_config
269            .memory
270            .as_ref()
271            .map(|s| convert_unit_string(s).expect("value should be valid"))
272            .unwrap_or_else(|| SYSTEM.total_memory());
273        let manager = TaskManager::new(
274            cpu,
275            cpu,
276            memory,
277            memory,
278            events.clone(),
279            cancellation.clone(),
280        );
281
282        Ok(Self {
283            config,
284            cancellation,
285            cpu,
286            memory,
287            manager,
288            names,
289            events,
290        })
291    }
292}
293
294impl TaskExecutionBackend for LocalBackend {
295    fn constraints(
296        &self,
297        inputs: &TaskInputs,
298        requirements: &HashMap<String, Value>,
299        _: &HashMap<String, Value>,
300    ) -> Result<TaskExecutionConstraints> {
301        let mut cpu = requirements::cpu(inputs, requirements);
302        if self.cpu < cpu {
303            let env_specific = if self.config.suppress_env_specific_output {
304                String::new()
305            } else {
306                format!(
307                    ", but the host only has {total_cpu} available",
308                    total_cpu = self.cpu
309                )
310            };
311            match self.config.task.cpu_limit_behavior {
312                TaskResourceLimitBehavior::TryWithMax => {
313                    warn!(
314                        "task requires at least {cpu} CPU{s}{env_specific}",
315                        s = if cpu == 1.0 { "" } else { "s" },
316                    );
317                    // clamp the reported constraint to what's available
318                    cpu = self.cpu;
319                }
320                TaskResourceLimitBehavior::Deny => {
321                    bail!(
322                        "task requires at least {cpu} CPU{s}{env_specific}",
323                        s = if cpu == 1.0 { "" } else { "s" },
324                    );
325                }
326            }
327        }
328
329        let mut memory = requirements::memory(inputs, requirements)? as u64;
330        if self.memory < memory as u64 {
331            let env_specific = if self.config.suppress_env_specific_output {
332                String::new()
333            } else {
334                format!(
335                    ", but the host only has {total_memory} GiB available",
336                    total_memory = self.memory as f64 / ONE_GIBIBYTE,
337                )
338            };
339            match self.config.task.memory_limit_behavior {
340                TaskResourceLimitBehavior::TryWithMax => {
341                    warn!(
342                        "task requires at least {memory} GiB of memory{env_specific}",
343                        // Display the error in GiB, as it is the most common unit for memory
344                        memory = memory as f64 / ONE_GIBIBYTE,
345                    );
346                    // clamp the reported constraint to what's available
347                    memory = self.memory;
348                }
349                TaskResourceLimitBehavior::Deny => {
350                    bail!(
351                        "task requires at least {memory} GiB of memory{env_specific}",
352                        // Display the error in GiB, as it is the most common unit for memory
353                        memory = memory as f64 / ONE_GIBIBYTE,
354                    );
355                }
356            }
357        }
358
359        Ok(TaskExecutionConstraints {
360            container: None,
361            cpu,
362            memory,
363            gpu: Default::default(),
364            fpga: Default::default(),
365            disks: Default::default(),
366        })
367    }
368
369    fn guest_inputs_dir(&self) -> Option<&'static str> {
370        // Local execution does not use a container
371        None
372    }
373
374    fn execute<'a>(
375        &'a self,
376        _: &'a Arc<dyn Transferer>,
377        request: ExecuteTaskRequest<'a>,
378    ) -> BoxFuture<'a, Result<Option<TaskExecutionResult>>> {
379        async move {
380            let name = format!(
381                "{id}-{generated}",
382                id = request.id,
383                generated = self
384                    .names
385                    .lock()
386                    .expect("generator should always acquire")
387                    .next()
388                    .expect("generator should never be exhausted")
389            );
390
391            let cpu = request.constraints.cpu;
392            let memory = request.constraints.memory;
393
394            let task = LocalTask {
395                config: self.config.clone(),
396                request,
397                name,
398                events: self.events.crankshaft().clone(),
399                cancellation: self.cancellation.clone(),
400            };
401
402            self.manager.run(cpu, memory, task.run()).await
403        }
404        .boxed()
405    }
406}