wdl_engine/backend/
tes.rs

1//! Implementation of the TES backend.
2
3use std::collections::HashMap;
4use std::fs;
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::Mutex;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::anyhow;
12use anyhow::bail;
13use base64::Engine as _;
14use base64::engine::general_purpose::STANDARD;
15use crankshaft::config::backend;
16use crankshaft::engine::Task;
17use crankshaft::engine::service::name::GeneratorIterator;
18use crankshaft::engine::service::name::UniqueAlphanumeric;
19use crankshaft::engine::service::runner::Backend;
20use crankshaft::engine::service::runner::backend::TaskRunError;
21use crankshaft::engine::service::runner::backend::tes;
22use crankshaft::engine::task::Execution;
23use crankshaft::engine::task::Input;
24use crankshaft::engine::task::Output;
25use crankshaft::engine::task::Resources;
26use crankshaft::engine::task::input::Contents;
27use crankshaft::engine::task::input::Type as InputType;
28use crankshaft::engine::task::output::Type as OutputType;
29use futures::FutureExt;
30use futures::future::BoxFuture;
31use nonempty::NonEmpty;
32use tokio::sync::oneshot;
33use tokio_util::sync::CancellationToken;
34use tracing::info;
35use wdl_ast::v1::TASK_REQUIREMENT_DISKS;
36
37use super::TaskExecutionBackend;
38use super::TaskExecutionConstraints;
39use super::TaskExecutionEvents;
40use super::TaskExecutionResult;
41use super::TaskManager;
42use super::TaskManagerRequest;
43use super::TaskSpawnRequest;
44use crate::COMMAND_FILE_NAME;
45use crate::InputKind;
46use crate::InputTrie;
47use crate::ONE_GIBIBYTE;
48use crate::PrimitiveValue;
49use crate::STDERR_FILE_NAME;
50use crate::STDOUT_FILE_NAME;
51use crate::Value;
52use crate::WORK_DIR_NAME;
53use crate::config::DEFAULT_TASK_SHELL;
54use crate::config::TaskConfig;
55use crate::config::TesBackendAuthConfig;
56use crate::config::TesBackendConfig;
57use crate::http::HttpDownloader;
58use crate::path::EvaluationPath;
59use crate::v1::DEFAULT_TASK_REQUIREMENT_DISKS;
60use crate::v1::container;
61use crate::v1::cpu;
62use crate::v1::disks;
63use crate::v1::max_cpu;
64use crate::v1::max_memory;
65use crate::v1::memory;
66use crate::v1::preemptible;
67
68/// The number of initial expected task names.
69///
70/// This controls the initial size of the bloom filter and how many names are
71/// prepopulated into the name generator.
72const INITIAL_EXPECTED_NAMES: usize = 1000;
73
74/// The root guest path for inputs.
75const GUEST_INPUTS_DIR: &str = "/mnt/inputs";
76
77/// The guest working directory.
78const GUEST_WORK_DIR: &str = "/mnt/work";
79
80/// The guest path for the command file.
81const GUEST_COMMAND_PATH: &str = "/mnt/command";
82
83/// The path to the container's stdout.
84const GUEST_STDOUT_PATH: &str = "/stdout";
85
86/// The path to the container's stderr.
87const GUEST_STDERR_PATH: &str = "/stderr";
88
89/// The default poll interval, in seconds, for the TES backend.
90const DEFAULT_TES_INTERVAL: u64 = 60;
91
92/// Represents a TES task request.
93///
94/// This request contains the requested cpu and memory reservations for the task
95/// as well as the result receiver channel.
96#[derive(Debug)]
97struct TesTaskRequest {
98    /// The inner task spawn request.
99    inner: TaskSpawnRequest,
100    /// The Crankshaft TES backend to use.
101    backend: Arc<tes::Backend>,
102    /// The name of the task.
103    name: String,
104    /// The optional shell to use.
105    shell: Arc<Option<String>>,
106    /// The requested container for the task.
107    container: String,
108    /// The associated engine configuration.
109    config: Arc<TesBackendConfig>,
110    /// The requested CPU reservation for the task.
111    cpu: f64,
112    /// The requested memory reservation for the task, in bytes.
113    memory: u64,
114    /// The requested maximum CPU limit for the task.
115    max_cpu: Option<f64>,
116    /// The requested maximum memory limit for the task, in bytes.
117    max_memory: Option<u64>,
118    /// The number of preemptible task retries to do before using a
119    /// non-preemptible task.
120    ///
121    /// If this value is 0, no preemptible tasks are requested from the TES
122    /// server.
123    preemptible: i64,
124    /// The cancellation token for the request.
125    token: CancellationToken,
126}
127
128impl TesTaskRequest {
129    /// Gets the TES disk resource for the request.
130    fn disk_resource(&self) -> Result<f64> {
131        let disks = disks(self.inner.requirements(), self.inner.hints())?;
132        if disks.len() > 1 {
133            bail!(
134                "TES backend does not support more than one disk specification for the \
135                 `{TASK_REQUIREMENT_DISKS}` task requirement"
136            );
137        }
138
139        if let Some(mount_point) = disks.keys().next() {
140            if *mount_point != "/" {
141                bail!(
142                    "TES backend does not support a disk mount point other than `/` for the \
143                     `{TASK_REQUIREMENT_DISKS}` task requirement"
144                );
145            }
146        }
147
148        Ok(disks
149            .values()
150            .next()
151            .map(|d| d.size as f64)
152            .unwrap_or(DEFAULT_TASK_REQUIREMENT_DISKS))
153    }
154}
155
156impl TaskManagerRequest for TesTaskRequest {
157    fn cpu(&self) -> f64 {
158        self.cpu
159    }
160
161    fn memory(&self) -> u64 {
162        self.memory
163    }
164
165    async fn run(self, spawned: oneshot::Sender<()>) -> Result<TaskExecutionResult> {
166        // Create the attempt directory
167        let attempt_dir = self.inner.attempt_dir();
168        fs::create_dir_all(attempt_dir).with_context(|| {
169            format!(
170                "failed to create directory `{path}`",
171                path = attempt_dir.display()
172            )
173        })?;
174
175        // Write the evaluated command to disk
176        // This is done even for remote execution so that a copy exists locally
177        let command_path = attempt_dir.join(COMMAND_FILE_NAME);
178        fs::write(&command_path, self.inner.command()).with_context(|| {
179            format!(
180                "failed to write command contents to `{path}`",
181                path = command_path.display()
182            )
183        })?;
184
185        let task_dir = format!(
186            "{name}-{timestamp}/",
187            name = self.name,
188            timestamp = chrono::Utc::now().format("%Y%m%d-%H%M%S")
189        );
190
191        // Start with the command file as an input
192        let mut inputs = vec![
193            Input::builder()
194                .path(GUEST_COMMAND_PATH)
195                .contents(Contents::Path(command_path.to_path_buf()))
196                .ty(InputType::File)
197                .read_only(true)
198                .build(),
199        ];
200
201        for input in self.inner.inputs() {
202            // Currently, if the input is a directory with contents, we'll error evaluation
203            // TODO: in the future, we should be uploading the entire contents to cloud
204            // storage
205            if input.kind() == InputKind::Directory {
206                if let EvaluationPath::Local(path) = input.path() {
207                    if let Ok(mut entries) = path.read_dir() {
208                        if entries.next().is_some() {
209                            bail!(
210                                "cannot upload contents of directory `{path}`: operation is not \
211                                 yet supported",
212                                path = path.display()
213                            );
214                        }
215                    }
216                }
217                continue;
218            }
219
220            // TODO: for local files, upload to cloud storage rather than specifying the
221            // input contents directly
222            inputs.push(
223                Input::builder()
224                    .path(input.guest_path().expect("should have guest path"))
225                    .contents(match input.path() {
226                        EvaluationPath::Local(path) => Contents::Path(path.clone()),
227                        EvaluationPath::Remote(url) => Contents::Url(url.clone()),
228                    })
229                    .ty(input.kind())
230                    .read_only(true)
231                    .build(),
232            );
233        }
234
235        // SAFETY: currently `outputs` is required by configuration validation, so it
236        // should always unwrap
237        let outputs_url = self
238            .config
239            .outputs
240            .as_ref()
241            .expect("should have outputs URL")
242            .join(&task_dir)
243            .expect("should join");
244
245        let mut work_dir_url = outputs_url.join(WORK_DIR_NAME).expect("should join");
246        let stdout_url = outputs_url.join(STDOUT_FILE_NAME).expect("should join");
247        let stderr_url = outputs_url.join(STDERR_FILE_NAME).expect("should join");
248
249        // The TES backend will output three things: the working directory contents,
250        // stdout, and stderr.
251        let outputs = vec![
252            Output::builder()
253                .path(GUEST_WORK_DIR)
254                .url(work_dir_url.clone())
255                .ty(OutputType::Directory)
256                .build(),
257            Output::builder()
258                .path(GUEST_STDOUT_PATH)
259                .url(stdout_url.clone())
260                .ty(OutputType::File)
261                .build(),
262            Output::builder()
263                .path(GUEST_STDERR_PATH)
264                .url(stderr_url.clone())
265                .ty(OutputType::File)
266                .build(),
267        ];
268
269        let mut preemptible = self.preemptible;
270        let mut spawned = Some(spawned);
271        loop {
272            let task = Task::builder()
273                .name(&self.name)
274                .executions(NonEmpty::new(
275                    Execution::builder()
276                        .image(&self.container)
277                        .program(self.shell.as_deref().unwrap_or(DEFAULT_TASK_SHELL))
278                        .args(["-C".to_string(), GUEST_COMMAND_PATH.to_string()])
279                        .work_dir(GUEST_WORK_DIR)
280                        .env(self.inner.env().clone())
281                        .stdout(GUEST_STDOUT_PATH)
282                        .stderr(GUEST_STDERR_PATH)
283                        .build(),
284                ))
285                .inputs(inputs.clone())
286                .outputs(outputs.clone())
287                .resources(
288                    Resources::builder()
289                        .cpu(self.cpu)
290                        .maybe_cpu_limit(self.max_cpu)
291                        .ram(self.memory as f64 / ONE_GIBIBYTE)
292                        .disk(self.disk_resource()?)
293                        .maybe_ram_limit(self.max_memory.map(|m| m as f64 / ONE_GIBIBYTE))
294                        .preemptible(preemptible > 0)
295                        .build(),
296                )
297                .build();
298
299            let statuses = match self
300                .backend
301                .run(task, spawned.take(), self.token.clone())
302                .map_err(|e| anyhow!("{e:#}"))?
303                .await
304            {
305                Ok(statuses) => statuses,
306                Err(TaskRunError::Preempted) if preemptible > 0 => {
307                    // Decrement the preemptible count and retry
308                    preemptible -= 1;
309                    continue;
310                }
311                Err(e) => {
312                    return Err(e.into());
313                }
314            };
315
316            assert_eq!(statuses.len(), 1, "there should only be one output");
317            let status = statuses.first();
318
319            // Push an empty path segment so that future joins of the work directory URL
320            // treat it as a directory
321            work_dir_url.path_segments_mut().unwrap().push("");
322
323            return Ok(TaskExecutionResult {
324                inputs: self.inner.info.inputs,
325                exit_code: status.code().expect("should have exit code"),
326                work_dir: EvaluationPath::Remote(work_dir_url),
327                stdout: PrimitiveValue::new_file(stdout_url).into(),
328                stderr: PrimitiveValue::new_file(stderr_url).into(),
329            });
330        }
331    }
332}
333
334/// Represents the Task Execution Service (TES) backend.
335pub struct TesBackend {
336    /// The underlying Crankshaft backend.
337    inner: Arc<tes::Backend>,
338    /// The shell to use.
339    shell: Arc<Option<String>>,
340    /// The default container to use.
341    container: Option<String>,
342    /// The associated backend configuration.
343    config: Arc<TesBackendConfig>,
344    /// The maximum amount of concurrency supported.
345    max_concurrency: u64,
346    /// The maximum CPUs for any of one node.
347    max_cpu: u64,
348    /// The maximum memory for any of one node.
349    max_memory: u64,
350    /// The task manager for the backend.
351    manager: TaskManager<TesTaskRequest>,
352    /// The name generator for tasks.
353    generator: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
354}
355
356impl TesBackend {
357    /// Constructs a new TES task execution backend with the given
358    /// configuration.
359    pub async fn new(task: &TaskConfig, config: &TesBackendConfig) -> Result<Self> {
360        task.validate()?;
361        config.validate()?;
362
363        info!("initializing TES backend");
364
365        // There's no way to ask the TES service for its limits, so use the maximums
366        // allowed
367        let max_cpu = u64::MAX;
368        let max_memory = u64::MAX;
369        let manager = TaskManager::new_unlimited(max_cpu, max_memory);
370
371        let mut http = backend::tes::http::Config::default();
372        if let Some(TesBackendAuthConfig::Basic(auth)) = &config.auth {
373            http.basic_auth_token = Some(STANDARD.encode(format!(
374                "{user}:{pass}",
375                user = auth.username.as_ref().expect("should have user name"),
376                pass = auth.password.as_ref().expect("should have password")
377            )));
378        }
379
380        let backend = tes::Backend::initialize(
381            backend::tes::Config::builder()
382                .url(config.url.clone().expect("should have URL"))
383                .http(http)
384                .interval(config.interval.unwrap_or(DEFAULT_TES_INTERVAL))
385                .build(),
386        );
387
388        Ok(Self {
389            inner: Arc::new(backend),
390            shell: Arc::new(task.shell.clone()),
391            container: task.container.clone(),
392            config: Arc::new(config.clone()),
393            max_concurrency: config.max_concurrency.unwrap_or(u64::MAX),
394            max_cpu,
395            max_memory,
396            manager,
397            generator: Arc::new(Mutex::new(GeneratorIterator::new(
398                UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
399                INITIAL_EXPECTED_NAMES,
400            ))),
401        })
402    }
403}
404
405impl TaskExecutionBackend for TesBackend {
406    fn max_concurrency(&self) -> u64 {
407        self.max_concurrency
408    }
409
410    fn constraints(
411        &self,
412        requirements: &HashMap<String, Value>,
413        hints: &HashMap<String, Value>,
414    ) -> Result<TaskExecutionConstraints> {
415        let container = container(requirements, self.container.as_deref());
416
417        let cpu = cpu(requirements);
418        if (self.max_cpu as f64) < cpu {
419            bail!(
420                "task requires at least {cpu} CPU{s}, but the execution backend has a maximum of \
421                 {max_cpu}",
422                s = if cpu == 1.0 { "" } else { "s" },
423                max_cpu = self.max_cpu,
424            );
425        }
426
427        let memory = memory(requirements)?;
428        if self.max_memory < memory as u64 {
429            // Display the error in GiB, as it is the most common unit for memory
430            let memory = memory as f64 / ONE_GIBIBYTE;
431            let max_memory = self.max_memory as f64 / ONE_GIBIBYTE;
432
433            bail!(
434                "task requires at least {memory} GiB of memory, but the execution backend has a \
435                 maximum of {max_memory} GiB",
436            );
437        }
438
439        // TODO: only parse the disks requirement once
440        let disks = disks(requirements, hints)?
441            .into_iter()
442            .map(|(mp, disk)| (mp.to_string(), disk.size))
443            .collect();
444
445        Ok(TaskExecutionConstraints {
446            container: Some(container.into_owned()),
447            cpu,
448            memory,
449            gpu: Default::default(),
450            fpga: Default::default(),
451            disks,
452        })
453    }
454
455    fn guest_work_dir(&self) -> Option<&Path> {
456        Some(Path::new(GUEST_WORK_DIR))
457    }
458
459    fn localize_inputs<'a, 'b, 'c, 'd>(
460        &'a self,
461        _: &'b HttpDownloader,
462        inputs: &'c mut [crate::eval::Input],
463    ) -> BoxFuture<'d, Result<()>>
464    where
465        'a: 'd,
466        'b: 'd,
467        'c: 'd,
468        Self: 'd,
469    {
470        async {
471            // Construct a trie for mapping input guest paths
472            let mut trie = InputTrie::default();
473            for input in inputs.iter() {
474                trie.insert(input)?;
475            }
476
477            for (index, guest_path) in trie.calculate_guest_paths(GUEST_INPUTS_DIR)? {
478                inputs[index].set_guest_path(guest_path);
479            }
480
481            Ok(())
482        }
483        .boxed()
484    }
485
486    fn spawn(
487        &self,
488        request: TaskSpawnRequest,
489        token: CancellationToken,
490    ) -> Result<TaskExecutionEvents> {
491        let (spawned_tx, spawned_rx) = oneshot::channel();
492        let (completed_tx, completed_rx) = oneshot::channel();
493
494        let requirements = request.requirements();
495        let hints = request.hints();
496
497        let container = container(requirements, self.container.as_deref()).into_owned();
498        let cpu = cpu(requirements);
499        let memory = memory(requirements)? as u64;
500        let max_cpu = max_cpu(hints);
501        let max_memory = max_memory(hints)?.map(|i| i as u64);
502        let preemptible = preemptible(hints);
503
504        let name = format!(
505            "{id}-{generated}",
506            id = request.id(),
507            generated = self
508                .generator
509                .lock()
510                .expect("generator should always acquire")
511                .next()
512                .expect("generator should never be exhausted")
513        );
514        self.manager.send(
515            TesTaskRequest {
516                inner: request,
517                backend: self.inner.clone(),
518                name,
519                shell: self.shell.clone(),
520                container,
521                config: self.config.clone(),
522                cpu,
523                memory,
524                max_cpu,
525                max_memory,
526                token,
527                preemptible,
528            },
529            spawned_tx,
530            completed_tx,
531        );
532
533        Ok(TaskExecutionEvents {
534            spawned: spawned_rx,
535            completed: completed_rx,
536        })
537    }
538}