wdl_engine/backend/
local.rs

1//! Implementation of the local backend.
2
3use std::collections::HashMap;
4use std::ffi::OsStr;
5use std::fs;
6use std::fs::File;
7use std::process::Stdio;
8use std::sync::Arc;
9use std::sync::Mutex;
10
11use anyhow::Context;
12use anyhow::Result;
13use anyhow::bail;
14use crankshaft::engine::service::name::GeneratorIterator;
15use crankshaft::engine::service::name::UniqueAlphanumeric;
16use crankshaft::events::Event;
17use crankshaft::events::next_task_id;
18use crankshaft::events::send_event;
19use nonempty::NonEmpty;
20use tokio::process::Command;
21use tokio::select;
22use tokio::sync::broadcast;
23use tokio::sync::oneshot;
24use tokio::sync::oneshot::Receiver;
25use tokio_util::sync::CancellationToken;
26use tracing::info;
27use tracing::warn;
28
29use super::TaskExecutionBackend;
30use super::TaskExecutionConstraints;
31use super::TaskManager;
32use super::TaskManagerRequest;
33use super::TaskSpawnRequest;
34use crate::COMMAND_FILE_NAME;
35use crate::ONE_GIBIBYTE;
36use crate::PrimitiveValue;
37use crate::STDERR_FILE_NAME;
38use crate::STDOUT_FILE_NAME;
39use crate::SYSTEM;
40use crate::TaskExecutionResult;
41use crate::Value;
42use crate::WORK_DIR_NAME;
43use crate::backend::INITIAL_EXPECTED_NAMES;
44use crate::config::Config;
45use crate::config::DEFAULT_TASK_SHELL;
46use crate::config::LocalBackendConfig;
47use crate::config::TaskResourceLimitBehavior;
48use crate::convert_unit_string;
49use crate::path::EvaluationPath;
50use crate::v1::cpu;
51use crate::v1::memory;
52
53/// Represents a local task request.
54///
55/// This request contains the requested cpu and memory reservations for the task
56/// as well as the result receiver channel.
57#[derive(Debug)]
58struct LocalTaskRequest {
59    /// The engine configuration.
60    config: Arc<Config>,
61    /// The inner task spawn request.
62    inner: TaskSpawnRequest,
63    /// The name of the task.
64    name: String,
65    /// The requested CPU reservation for the task.
66    ///
67    /// Note that CPU isn't actually reserved for the task process.
68    cpu: f64,
69    /// The requested memory reservation for the task.
70    ///
71    /// Note that memory isn't actually reserved for the task process.
72    memory: u64,
73    /// The cancellation token for the request.
74    token: CancellationToken,
75    /// The sender for events.
76    events: Option<broadcast::Sender<Event>>,
77}
78
79impl TaskManagerRequest for LocalTaskRequest {
80    fn cpu(&self) -> f64 {
81        self.cpu
82    }
83
84    fn memory(&self) -> u64 {
85        self.memory
86    }
87
88    async fn run(self) -> Result<TaskExecutionResult> {
89        let id = next_task_id();
90        let work_dir = self.inner.attempt_dir().join(WORK_DIR_NAME);
91        let stdout_path = self.inner.attempt_dir().join(STDOUT_FILE_NAME);
92        let stderr_path = self.inner.attempt_dir().join(STDERR_FILE_NAME);
93
94        let run = async {
95            // Create the working directory
96            fs::create_dir_all(&work_dir).with_context(|| {
97                format!(
98                    "failed to create directory `{path}`",
99                    path = work_dir.display()
100                )
101            })?;
102
103            // Write the evaluated command to disk
104            let command_path = self.inner.attempt_dir().join(COMMAND_FILE_NAME);
105            fs::write(&command_path, self.inner.command()).with_context(|| {
106                format!(
107                    "failed to write command contents to `{path}`",
108                    path = command_path.display()
109                )
110            })?;
111
112            // Create a file for the stdout
113            let stdout = File::create(&stdout_path).with_context(|| {
114                format!(
115                    "failed to create stdout file `{path}`",
116                    path = stdout_path.display()
117                )
118            })?;
119
120            // Create a file for the stderr
121            let stderr = File::create(&stderr_path).with_context(|| {
122                format!(
123                    "failed to create stderr file `{path}`",
124                    path = stderr_path.display()
125                )
126            })?;
127
128            let mut command = Command::new(
129                self.config
130                    .task
131                    .shell
132                    .as_deref()
133                    .unwrap_or(DEFAULT_TASK_SHELL),
134            );
135            command
136                .current_dir(&work_dir)
137                .arg(command_path)
138                .stdin(Stdio::null())
139                .stdout(stdout)
140                .stderr(stderr)
141                .envs(
142                    self.inner
143                        .env()
144                        .iter()
145                        .map(|(k, v)| (OsStr::new(k), OsStr::new(v))),
146                )
147                .kill_on_drop(true);
148
149            // Set the PATH variable for the child on Windows to get consistent PATH
150            // searching. See: https://github.com/rust-lang/rust/issues/122660
151            #[cfg(windows)]
152            if let Ok(path) = std::env::var("PATH") {
153                command.env("PATH", path);
154            }
155
156            let mut child = command.spawn().context("failed to spawn shell")?;
157
158            // Notify that the process has spawned
159            send_event!(self.events, Event::TaskStarted { id });
160
161            let id = child.id().expect("should have id");
162            info!(
163                "spawned local shell process {id} for execution of task `{name}`",
164                name = self.name
165            );
166
167            let status = child.wait().await.with_context(|| {
168                format!("failed to wait for termination of task child process {id}")
169            })?;
170
171            #[cfg(unix)]
172            {
173                use std::os::unix::process::ExitStatusExt;
174                if let Some(signal) = status.signal() {
175                    tracing::warn!("task process {id} has terminated with signal {signal}");
176
177                    bail!(
178                        "task child process {id} has terminated with signal {signal}; see stderr \
179                         file `{path}` for more details",
180                        path = stderr_path.display()
181                    );
182                }
183            }
184
185            Ok(status)
186        };
187
188        // Send the created event
189        send_event!(
190            self.events,
191            Event::TaskCreated {
192                id,
193                name: self.name.clone(),
194                tes_id: None,
195                token: self.token.clone(),
196            }
197        );
198
199        select! {
200            // Poll the cancellation token before the child future
201            biased;
202
203            _ = self.token.cancelled() => {
204                send_event!(self.events, Event::TaskCanceled { id });
205                bail!("task was cancelled");
206            }
207            result = run => {
208                match result {
209                    Ok(status) => {
210                        send_event!(self.events, Event::TaskCompleted { id, exit_statuses: NonEmpty::new(status) });
211
212                        let exit_code = status.code().expect("process should have exited");
213                        info!("process {id} for task `{name}` has terminated with status code {exit_code}", name = self.name);
214                        Ok(TaskExecutionResult {
215                            exit_code,
216                            work_dir: EvaluationPath::Local(work_dir),
217                            stdout: PrimitiveValue::new_file(stdout_path.into_os_string().into_string().expect("path should be UTF-8")).into(),
218                            stderr: PrimitiveValue::new_file(stderr_path.into_os_string().into_string().expect("path should be UTF-8")).into(),
219                        })
220                    }
221                    Err(e) => {
222                        send_event!(self.events, Event::TaskFailed { id, message: format!("{e:#}") });
223                        Err(e)
224                    }
225                }
226            }
227        }
228    }
229}
230
231/// Represents a task execution backend that locally executes tasks.
232///
233/// <div class="warning">
234/// Warning: the local task execution backend spawns processes on the host
235/// directly without the use of a container; only use this backend on trusted
236/// WDL. </div>
237pub struct LocalBackend {
238    /// The engine configuration.
239    config: Arc<Config>,
240    /// The total CPU of the host.
241    cpu: u64,
242    /// The total memory of the host.
243    memory: u64,
244    /// The underlying task manager.
245    manager: TaskManager<LocalTaskRequest>,
246    /// The name generator for tasks.
247    names: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
248    /// The sender for events.
249    events: Option<broadcast::Sender<Event>>,
250}
251
252impl LocalBackend {
253    /// Constructs a new local task execution backend with the given
254    /// configuration.
255    ///
256    /// The provided configuration is expected to have already been validated.
257    pub fn new(
258        config: Arc<Config>,
259        backend_config: &LocalBackendConfig,
260        events: Option<broadcast::Sender<Event>>,
261    ) -> Result<Self> {
262        info!("initializing local backend");
263
264        let names = Arc::new(Mutex::new(GeneratorIterator::new(
265            UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
266            INITIAL_EXPECTED_NAMES,
267        )));
268
269        let cpu = backend_config
270            .cpu
271            .unwrap_or_else(|| SYSTEM.cpus().len() as u64);
272        let memory = backend_config
273            .memory
274            .as_ref()
275            .map(|s| convert_unit_string(s).expect("value should be valid"))
276            .unwrap_or_else(|| SYSTEM.total_memory());
277        let manager = TaskManager::new(cpu, cpu, memory, memory);
278
279        Ok(Self {
280            config,
281            cpu,
282            memory,
283            manager,
284            names,
285            events,
286        })
287    }
288}
289
290impl TaskExecutionBackend for LocalBackend {
291    fn max_concurrency(&self) -> u64 {
292        self.cpu
293    }
294
295    fn constraints(
296        &self,
297        requirements: &HashMap<String, Value>,
298        _: &HashMap<String, Value>,
299    ) -> Result<TaskExecutionConstraints> {
300        let mut cpu = cpu(requirements);
301        if (self.cpu as f64) < cpu {
302            let env_specific = if self.config.suppress_env_specific_output {
303                String::new()
304            } else {
305                format!(
306                    ", but the host only has {total_cpu} available",
307                    total_cpu = self.cpu
308                )
309            };
310            match self.config.task.cpu_limit_behavior {
311                TaskResourceLimitBehavior::TryWithMax => {
312                    warn!(
313                        "task requires at least {cpu} CPU{s}{env_specific}",
314                        s = if cpu == 1.0 { "" } else { "s" },
315                    );
316                    // clamp the reported constraint to what's available
317                    cpu = self.cpu as f64;
318                }
319                TaskResourceLimitBehavior::Deny => {
320                    bail!(
321                        "task requires at least {cpu} CPU{s}{env_specific}",
322                        s = if cpu == 1.0 { "" } else { "s" },
323                    );
324                }
325            }
326        }
327
328        let mut memory = memory(requirements)?;
329        if self.memory < memory as u64 {
330            let env_specific = if self.config.suppress_env_specific_output {
331                String::new()
332            } else {
333                format!(
334                    ", but the host only has {total_memory} GiB available",
335                    total_memory = self.memory as f64 / ONE_GIBIBYTE,
336                )
337            };
338            match self.config.task.memory_limit_behavior {
339                TaskResourceLimitBehavior::TryWithMax => {
340                    warn!(
341                        "task requires at least {memory} GiB of memory{env_specific}",
342                        // Display the error in GiB, as it is the most common unit for memory
343                        memory = memory as f64 / ONE_GIBIBYTE,
344                    );
345                    // clamp the reported constraint to what's available
346                    memory = self.memory.try_into().unwrap_or(i64::MAX);
347                }
348                TaskResourceLimitBehavior::Deny => {
349                    bail!(
350                        "task requires at least {memory} GiB of memory{env_specific}",
351                        // Display the error in GiB, as it is the most common unit for memory
352                        memory = memory as f64 / ONE_GIBIBYTE,
353                    );
354                }
355            }
356        }
357
358        Ok(TaskExecutionConstraints {
359            container: None,
360            cpu,
361            memory,
362            gpu: Default::default(),
363            fpga: Default::default(),
364            disks: Default::default(),
365        })
366    }
367
368    fn guest_inputs_dir(&self) -> Option<&'static str> {
369        // Local execution does not use a container
370        None
371    }
372
373    fn needs_local_inputs(&self) -> bool {
374        true
375    }
376
377    fn spawn(
378        &self,
379        request: TaskSpawnRequest,
380        token: CancellationToken,
381    ) -> Result<Receiver<Result<TaskExecutionResult>>> {
382        let (completed_tx, completed_rx) = oneshot::channel();
383
384        let requirements = request.requirements();
385        let mut cpu = cpu(requirements);
386        if let TaskResourceLimitBehavior::TryWithMax = self.config.task.cpu_limit_behavior {
387            cpu = std::cmp::min(cpu.ceil() as u64, self.cpu) as f64;
388        }
389        let mut memory = memory(requirements)? as u64;
390        if let TaskResourceLimitBehavior::TryWithMax = self.config.task.memory_limit_behavior {
391            memory = std::cmp::min(memory, self.memory);
392        }
393
394        let name = format!(
395            "{id}-{generated}",
396            id = request.id(),
397            generated = self
398                .names
399                .lock()
400                .expect("generator should always acquire")
401                .next()
402                .expect("generator should never be exhausted")
403        );
404
405        self.manager.send(
406            LocalTaskRequest {
407                config: self.config.clone(),
408                inner: request,
409                name,
410                cpu,
411                memory,
412                token,
413                events: self.events.clone(),
414            },
415            completed_tx,
416        );
417
418        Ok(completed_rx)
419    }
420}