wdl_engine/backend/
tes.rs

1//! Implementation of the TES backend.
2
3use std::borrow::Cow;
4use std::collections::HashMap;
5use std::fs;
6use std::path::Path;
7use std::sync::Arc;
8use std::sync::Mutex;
9
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use anyhow::bail;
14use crankshaft::config::backend;
15use crankshaft::config::backend::tes::http::HttpAuthConfig;
16use crankshaft::engine::Task;
17use crankshaft::engine::service::name::GeneratorIterator;
18use crankshaft::engine::service::name::UniqueAlphanumeric;
19use crankshaft::engine::service::runner::Backend;
20use crankshaft::engine::service::runner::backend::TaskRunError;
21use crankshaft::engine::service::runner::backend::tes;
22use crankshaft::engine::task::Execution;
23use crankshaft::engine::task::Input;
24use crankshaft::engine::task::Output;
25use crankshaft::engine::task::Resources;
26use crankshaft::engine::task::input::Contents;
27use crankshaft::engine::task::input::Type as InputType;
28use crankshaft::engine::task::output::Type as OutputType;
29use futures::FutureExt;
30use futures::future::BoxFuture;
31use nonempty::NonEmpty;
32use tokio::sync::oneshot;
33use tokio_util::sync::CancellationToken;
34use tracing::info;
35use wdl_ast::v1::TASK_REQUIREMENT_DISKS;
36
37use super::TaskExecutionBackend;
38use super::TaskExecutionConstraints;
39use super::TaskExecutionEvents;
40use super::TaskExecutionResult;
41use super::TaskManager;
42use super::TaskManagerRequest;
43use super::TaskSpawnRequest;
44use crate::COMMAND_FILE_NAME;
45use crate::InputKind;
46use crate::InputTrie;
47use crate::ONE_GIBIBYTE;
48use crate::PrimitiveValue;
49use crate::STDERR_FILE_NAME;
50use crate::STDOUT_FILE_NAME;
51use crate::Value;
52use crate::WORK_DIR_NAME;
53use crate::config::Config;
54use crate::config::DEFAULT_TASK_SHELL;
55use crate::config::TesBackendAuthConfig;
56use crate::config::TesBackendConfig;
57use crate::http::HttpDownloader;
58use crate::http::rewrite_url;
59use crate::path::EvaluationPath;
60use crate::v1::DEFAULT_TASK_REQUIREMENT_DISKS;
61use crate::v1::container;
62use crate::v1::cpu;
63use crate::v1::disks;
64use crate::v1::max_cpu;
65use crate::v1::max_memory;
66use crate::v1::memory;
67use crate::v1::preemptible;
68
69/// The number of initial expected task names.
70///
71/// This controls the initial size of the bloom filter and how many names are
72/// prepopulated into the name generator.
73const INITIAL_EXPECTED_NAMES: usize = 1000;
74
75/// The root guest path for inputs.
76const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs";
77
78/// The guest working directory.
79const GUEST_WORK_DIR: &str = "/mnt/task/work";
80
81/// The guest path for the command file.
82const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
83
84/// The path to the container's stdout.
85const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
86
87/// The path to the container's stderr.
88const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
89
90/// The default poll interval, in seconds, for the TES backend.
91const DEFAULT_TES_INTERVAL: u64 = 60;
92
93/// Represents a TES task request.
94///
95/// This request contains the requested cpu and memory reservations for the task
96/// as well as the result receiver channel.
97#[derive(Debug)]
98struct TesTaskRequest {
99    /// The engine configuration.
100    config: Arc<Config>,
101    /// The backend configuration.
102    backend_config: Arc<TesBackendConfig>,
103    /// The inner task spawn request.
104    inner: TaskSpawnRequest,
105    /// The Crankshaft TES backend to use.
106    backend: Arc<tes::Backend>,
107    /// The name of the task.
108    name: String,
109    /// The requested container for the task.
110    container: String,
111    /// The requested CPU reservation for the task.
112    cpu: f64,
113    /// The requested memory reservation for the task, in bytes.
114    memory: u64,
115    /// The requested maximum CPU limit for the task.
116    max_cpu: Option<f64>,
117    /// The requested maximum memory limit for the task, in bytes.
118    max_memory: Option<u64>,
119    /// The number of preemptible task retries to do before using a
120    /// non-preemptible task.
121    ///
122    /// If this value is 0, no preemptible tasks are requested from the TES
123    /// server.
124    preemptible: i64,
125    /// The cancellation token for the request.
126    token: CancellationToken,
127}
128
129impl TesTaskRequest {
130    /// Gets the TES disk resource for the request.
131    fn disk_resource(&self) -> Result<f64> {
132        let disks = disks(self.inner.requirements(), self.inner.hints())?;
133        if disks.len() > 1 {
134            bail!(
135                "TES backend does not support more than one disk specification for the \
136                 `{TASK_REQUIREMENT_DISKS}` task requirement"
137            );
138        }
139
140        if let Some(mount_point) = disks.keys().next() {
141            if *mount_point != "/" {
142                bail!(
143                    "TES backend does not support a disk mount point other than `/` for the \
144                     `{TASK_REQUIREMENT_DISKS}` task requirement"
145                );
146            }
147        }
148
149        Ok(disks
150            .values()
151            .next()
152            .map(|d| d.size as f64)
153            .unwrap_or(DEFAULT_TASK_REQUIREMENT_DISKS))
154    }
155}
156
157impl TaskManagerRequest for TesTaskRequest {
158    fn cpu(&self) -> f64 {
159        self.cpu
160    }
161
162    fn memory(&self) -> u64 {
163        self.memory
164    }
165
166    async fn run(self, spawned: oneshot::Sender<()>) -> Result<TaskExecutionResult> {
167        // Create the attempt directory
168        let attempt_dir = self.inner.attempt_dir();
169        fs::create_dir_all(attempt_dir).with_context(|| {
170            format!(
171                "failed to create directory `{path}`",
172                path = attempt_dir.display()
173            )
174        })?;
175
176        // Write the evaluated command to disk
177        // This is done even for remote execution so that a copy exists locally
178        let command_path = attempt_dir.join(COMMAND_FILE_NAME);
179        fs::write(&command_path, self.inner.command()).with_context(|| {
180            format!(
181                "failed to write command contents to `{path}`",
182                path = command_path.display()
183            )
184        })?;
185
186        let task_dir = format!(
187            "{name}-{timestamp}/",
188            name = self.name,
189            timestamp = chrono::Utc::now().format("%Y%m%d-%H%M%S")
190        );
191
192        // Start with the command file as an input
193        let mut inputs = vec![
194            Input::builder()
195                .path(GUEST_COMMAND_PATH)
196                .contents(Contents::Path(command_path.to_path_buf()))
197                .ty(InputType::File)
198                .read_only(true)
199                .build(),
200        ];
201
202        for input in self.inner.inputs() {
203            // Currently, if the input is a directory with contents, we'll error evaluation
204            // TODO: in the future, we should be uploading the entire contents to cloud
205            // storage
206            if input.kind() == InputKind::Directory {
207                if let EvaluationPath::Local(path) = input.path() {
208                    if let Ok(mut entries) = path.read_dir() {
209                        if entries.next().is_some() {
210                            bail!(
211                                "cannot upload contents of directory `{path}`: operation is not \
212                                 yet supported",
213                                path = path.display()
214                            );
215                        }
216                    }
217                }
218                continue;
219            }
220
221            // TODO: for local files, upload to cloud storage rather than specifying the
222            // input contents directly
223            inputs.push(
224                Input::builder()
225                    .path(input.guest_path().expect("should have guest path"))
226                    .contents(match input.path() {
227                        EvaluationPath::Local(path) => Contents::Path(path.clone()),
228                        EvaluationPath::Remote(url) => Contents::Url(
229                            rewrite_url(&self.config, Cow::Borrowed(url))?.into_owned(),
230                        ),
231                    })
232                    .ty(input.kind())
233                    .read_only(true)
234                    .build(),
235            );
236        }
237
238        // SAFETY: currently `outputs` is required by configuration validation, so it
239        // should always unwrap
240        let outputs_url = self
241            .backend_config
242            .outputs
243            .as_ref()
244            .expect("should have outputs URL")
245            .join(&task_dir)
246            .expect("should join");
247
248        let mut work_dir_url = rewrite_url(
249            &self.config,
250            Cow::Owned(outputs_url.join(WORK_DIR_NAME).expect("should join")),
251        )?
252        .into_owned();
253        let stdout_url = rewrite_url(
254            &self.config,
255            Cow::Owned(outputs_url.join(STDOUT_FILE_NAME).expect("should join")),
256        )?
257        .into_owned();
258        let stderr_url = rewrite_url(
259            &self.config,
260            Cow::Owned(outputs_url.join(STDERR_FILE_NAME).expect("should join")),
261        )?
262        .into_owned();
263
264        // The TES backend will output three things: the working directory contents,
265        // stdout, and stderr.
266        let outputs = vec![
267            Output::builder()
268                .path(GUEST_WORK_DIR)
269                .url(work_dir_url.clone())
270                .ty(OutputType::Directory)
271                .build(),
272            Output::builder()
273                .path(GUEST_STDOUT_PATH)
274                .url(stdout_url.clone())
275                .ty(OutputType::File)
276                .build(),
277            Output::builder()
278                .path(GUEST_STDERR_PATH)
279                .url(stderr_url.clone())
280                .ty(OutputType::File)
281                .build(),
282        ];
283
284        let mut preemptible = self.preemptible;
285        let mut spawned = Some(spawned);
286        loop {
287            let task = Task::builder()
288                .name(&self.name)
289                .executions(NonEmpty::new(
290                    Execution::builder()
291                        .image(&self.container)
292                        .program(
293                            self.config
294                                .task
295                                .shell
296                                .as_deref()
297                                .unwrap_or(DEFAULT_TASK_SHELL),
298                        )
299                        .args(["-C".to_string(), GUEST_COMMAND_PATH.to_string()])
300                        .work_dir(GUEST_WORK_DIR)
301                        .env(self.inner.env().clone())
302                        .stdout(GUEST_STDOUT_PATH)
303                        .stderr(GUEST_STDERR_PATH)
304                        .build(),
305                ))
306                .inputs(inputs.clone())
307                .outputs(outputs.clone())
308                .resources(
309                    Resources::builder()
310                        .cpu(self.cpu)
311                        .maybe_cpu_limit(self.max_cpu)
312                        .ram(self.memory as f64 / ONE_GIBIBYTE)
313                        .disk(self.disk_resource()?)
314                        .maybe_ram_limit(self.max_memory.map(|m| m as f64 / ONE_GIBIBYTE))
315                        .preemptible(preemptible > 0)
316                        .build(),
317                )
318                .build();
319
320            let statuses = match self
321                .backend
322                .run(task, spawned.take(), self.token.clone())
323                .map_err(|e| anyhow!("{e:#}"))?
324                .await
325            {
326                Ok(statuses) => statuses,
327                Err(TaskRunError::Preempted) if preemptible > 0 => {
328                    // Decrement the preemptible count and retry
329                    preemptible -= 1;
330                    continue;
331                }
332                Err(e) => {
333                    return Err(e.into());
334                }
335            };
336
337            assert_eq!(statuses.len(), 1, "there should only be one output");
338            let status = statuses.first();
339
340            // Push an empty path segment so that future joins of the work directory URL
341            // treat it as a directory
342            work_dir_url.path_segments_mut().unwrap().push("");
343
344            return Ok(TaskExecutionResult {
345                inputs: self.inner.info.inputs,
346                exit_code: status.code().expect("should have exit code"),
347                work_dir: EvaluationPath::Remote(work_dir_url),
348                stdout: PrimitiveValue::new_file(stdout_url).into(),
349                stderr: PrimitiveValue::new_file(stderr_url).into(),
350            });
351        }
352    }
353}
354
355/// Represents the Task Execution Service (TES) backend.
356pub struct TesBackend {
357    /// The engine configuration.
358    config: Arc<Config>,
359    /// The backend configuration.
360    backend_config: Arc<TesBackendConfig>,
361    /// The underlying Crankshaft backend.
362    inner: Arc<tes::Backend>,
363    /// The maximum amount of concurrency supported.
364    max_concurrency: u64,
365    /// The maximum CPUs for any of one node.
366    max_cpu: u64,
367    /// The maximum memory for any of one node.
368    max_memory: u64,
369    /// The task manager for the backend.
370    manager: TaskManager<TesTaskRequest>,
371    /// The name generator for tasks.
372    generator: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
373}
374
375impl TesBackend {
376    /// Constructs a new TES task execution backend with the given
377    /// configuration.
378    ///
379    /// The provided configuration is expected to have already been validated.
380    pub async fn new(config: Arc<Config>, backend_config: &TesBackendConfig) -> Result<Self> {
381        info!("initializing TES backend");
382
383        // There's no way to ask the TES service for its limits, so use the maximums
384        // allowed
385        let max_cpu = u64::MAX;
386        let max_memory = u64::MAX;
387        let manager = TaskManager::new_unlimited(max_cpu, max_memory);
388
389        let mut http = backend::tes::http::Config::default();
390        match &backend_config.auth {
391            Some(TesBackendAuthConfig::Basic(config)) => {
392                http.auth = Some(HttpAuthConfig::Basic {
393                    username: config
394                        .username
395                        .clone()
396                        .ok_or_else(|| anyhow!("missing `username` in basic auth"))?,
397                    password: config
398                        .password
399                        .clone()
400                        .ok_or_else(|| anyhow!("missing `password` in basic auth"))?,
401                });
402            }
403            Some(TesBackendAuthConfig::Bearer(config)) => {
404                http.auth = Some(HttpAuthConfig::Bearer {
405                    token: config
406                        .token
407                        .clone()
408                        .ok_or_else(|| anyhow!("missing `token` in bearer auth"))?,
409                });
410            }
411            None => {}
412        }
413
414        let backend = tes::Backend::initialize(
415            backend::tes::Config::builder()
416                .url(backend_config.url.clone().expect("should have URL"))
417                .http(http)
418                .interval(backend_config.interval.unwrap_or(DEFAULT_TES_INTERVAL))
419                .build(),
420        );
421
422        let max_concurrency = backend_config.max_concurrency.unwrap_or(u64::MAX);
423
424        Ok(Self {
425            config,
426            backend_config: Arc::new(backend_config.clone()),
427            inner: Arc::new(backend),
428            max_concurrency,
429            max_cpu,
430            max_memory,
431            manager,
432            generator: Arc::new(Mutex::new(GeneratorIterator::new(
433                UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
434                INITIAL_EXPECTED_NAMES,
435            ))),
436        })
437    }
438}
439
440impl TaskExecutionBackend for TesBackend {
441    fn max_concurrency(&self) -> u64 {
442        self.max_concurrency
443    }
444
445    fn constraints(
446        &self,
447        requirements: &HashMap<String, Value>,
448        hints: &HashMap<String, Value>,
449    ) -> Result<TaskExecutionConstraints> {
450        let container = container(requirements, self.config.task.container.as_deref());
451
452        let cpu = cpu(requirements);
453        if (self.max_cpu as f64) < cpu {
454            bail!(
455                "task requires at least {cpu} CPU{s}, but the execution backend has a maximum of \
456                 {max_cpu}",
457                s = if cpu == 1.0 { "" } else { "s" },
458                max_cpu = self.max_cpu,
459            );
460        }
461
462        let memory = memory(requirements)?;
463        if self.max_memory < memory as u64 {
464            // Display the error in GiB, as it is the most common unit for memory
465            let memory = memory as f64 / ONE_GIBIBYTE;
466            let max_memory = self.max_memory as f64 / ONE_GIBIBYTE;
467
468            bail!(
469                "task requires at least {memory} GiB of memory, but the execution backend has a \
470                 maximum of {max_memory} GiB",
471            );
472        }
473
474        // TODO: only parse the disks requirement once
475        let disks = disks(requirements, hints)?
476            .into_iter()
477            .map(|(mp, disk)| (mp.to_string(), disk.size))
478            .collect();
479
480        Ok(TaskExecutionConstraints {
481            container: Some(container.into_owned()),
482            cpu,
483            memory,
484            gpu: Default::default(),
485            fpga: Default::default(),
486            disks,
487        })
488    }
489
490    fn guest_work_dir(&self) -> Option<&Path> {
491        Some(Path::new(GUEST_WORK_DIR))
492    }
493
494    fn localize_inputs<'a, 'b, 'c, 'd>(
495        &'a self,
496        _: &'b HttpDownloader,
497        inputs: &'c mut [crate::eval::Input],
498    ) -> BoxFuture<'d, Result<()>>
499    where
500        'a: 'd,
501        'b: 'd,
502        'c: 'd,
503        Self: 'd,
504    {
505        async {
506            // Construct a trie for mapping input guest paths
507            let mut trie = InputTrie::default();
508            for input in inputs.iter() {
509                trie.insert(input)?;
510            }
511
512            for (index, guest_path) in trie.calculate_guest_paths(GUEST_INPUTS_DIR)? {
513                inputs[index].set_guest_path(guest_path);
514            }
515
516            Ok(())
517        }
518        .boxed()
519    }
520
521    fn spawn(
522        &self,
523        request: TaskSpawnRequest,
524        token: CancellationToken,
525    ) -> Result<TaskExecutionEvents> {
526        let (spawned_tx, spawned_rx) = oneshot::channel();
527        let (completed_tx, completed_rx) = oneshot::channel();
528
529        let requirements = request.requirements();
530        let hints = request.hints();
531
532        let container = container(requirements, self.config.task.container.as_deref()).into_owned();
533        let cpu = cpu(requirements);
534        let memory = memory(requirements)? as u64;
535        let max_cpu = max_cpu(hints);
536        let max_memory = max_memory(hints)?.map(|i| i as u64);
537        let preemptible = preemptible(hints);
538
539        let name = format!(
540            "{id}-{generated}",
541            id = request.id(),
542            generated = self
543                .generator
544                .lock()
545                .expect("generator should always acquire")
546                .next()
547                .expect("generator should never be exhausted")
548        );
549        self.manager.send(
550            TesTaskRequest {
551                config: self.config.clone(),
552                backend_config: self.backend_config.clone(),
553                inner: request,
554                backend: self.inner.clone(),
555                name,
556                container,
557                cpu,
558                memory,
559                max_cpu,
560                max_memory,
561                token,
562                preemptible,
563            },
564            spawned_tx,
565            completed_tx,
566        );
567
568        Ok(TaskExecutionEvents {
569            spawned: spawned_rx,
570            completed: completed_rx,
571        })
572    }
573}