Skip to main content

wdl_engine/backend/
tes.rs

1//! Implementation of the TES backend.
2
3use std::collections::HashMap;
4use std::fs;
5use std::sync::Arc;
6use std::sync::Mutex;
7
8use anyhow::Context;
9use anyhow::Result;
10use anyhow::bail;
11use cloud_copy::UrlExt;
12use crankshaft::config::backend;
13use crankshaft::config::backend::tes::http::HttpAuthConfig;
14use crankshaft::engine::Task;
15use crankshaft::engine::service::name::GeneratorIterator;
16use crankshaft::engine::service::name::UniqueAlphanumeric;
17use crankshaft::engine::service::runner::Backend;
18use crankshaft::engine::service::runner::backend::TaskRunError;
19use crankshaft::engine::service::runner::backend::tes;
20use crankshaft::engine::task::Execution;
21use crankshaft::engine::task::Input;
22use crankshaft::engine::task::Output;
23use crankshaft::engine::task::Resources;
24use crankshaft::engine::task::input::Contents;
25use crankshaft::engine::task::input::Type as InputType;
26use crankshaft::engine::task::output::Type as OutputType;
27use futures::FutureExt;
28use futures::future::BoxFuture;
29use nonempty::NonEmpty;
30use secrecy::ExposeSecret;
31use tokio::task::JoinSet;
32use tracing::debug;
33use tracing::info;
34
35use super::ExecuteTaskRequest;
36use super::TaskExecutionBackend;
37use super::TaskExecutionConstraints;
38use super::TaskExecutionResult;
39use crate::CancellationContext;
40use crate::EvaluationPath;
41use crate::EvaluationPathKind;
42use crate::Events;
43use crate::ONE_GIBIBYTE;
44use crate::PrimitiveValue;
45use crate::TaskInputs;
46use crate::Value;
47use crate::backend::INITIAL_EXPECTED_NAMES;
48use crate::backend::STDERR_FILE_NAME;
49use crate::backend::STDOUT_FILE_NAME;
50use crate::backend::WORK_DIR_NAME;
51use crate::config::Config;
52use crate::config::ContentDigestMode;
53use crate::config::DEFAULT_TASK_SHELL;
54use crate::config::TesBackendAuthConfig;
55use crate::digest::UrlDigestExt;
56use crate::digest::calculate_local_digest;
57use crate::http::Transferer;
58use crate::v1::DEFAULT_DISK_MOUNT_POINT;
59use crate::v1::DEFAULT_TASK_REQUIREMENT_DISKS;
60use crate::v1::hints;
61use crate::v1::requirements;
62use crate::v1::requirements::ContainerSource;
63
64/// The guest working directory.
65const GUEST_WORK_DIR: &str = "/mnt/task/work";
66
67/// The guest path for the command file.
68const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
69
70/// The path to the container's stdout.
71const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
72
73/// The path to the container's stderr.
74const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
75
76/// The default poll interval, in seconds, for the TES backend.
77const DEFAULT_TES_INTERVAL: u64 = 1;
78
79/// Represents the Task Execution Service (TES) backend.
80pub struct TesBackend {
81    /// The engine configuration.
82    config: Arc<Config>,
83    /// The underlying Crankshaft backend.
84    inner: tes::Backend,
85    /// The name generator for tasks.
86    names: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
87    /// The evaluation cancellation context.
88    cancellation: CancellationContext,
89}
90
91impl TesBackend {
92    /// Constructs a new TES task execution backend with the given
93    /// configuration.
94    ///
95    /// The provided configuration is expected to have already been validated.
96    pub async fn new(
97        config: Arc<Config>,
98        events: Events,
99        cancellation: CancellationContext,
100    ) -> Result<Self> {
101        info!("initializing TES backend");
102
103        let backend_config = config.backend()?;
104        let backend_config = backend_config
105            .as_tes()
106            .context("configured backend is not TES")?;
107
108        let mut http = backend::tes::http::Config::default();
109        match &backend_config.auth {
110            Some(TesBackendAuthConfig::Basic(config)) => {
111                http.auth = Some(HttpAuthConfig::Basic {
112                    username: config.username.clone(),
113                    password: config.password.inner().expose_secret().to_string(),
114                });
115            }
116            Some(TesBackendAuthConfig::Bearer(config)) => {
117                http.auth = Some(HttpAuthConfig::Bearer {
118                    token: config.token.inner().expose_secret().to_string(),
119                });
120            }
121            None => {}
122        }
123
124        http.retries = backend_config.retries;
125        http.max_concurrency = backend_config.max_concurrency.map(|c| c as usize);
126
127        let names = Arc::new(Mutex::new(GeneratorIterator::new(
128            UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
129            INITIAL_EXPECTED_NAMES,
130        )));
131
132        let inner = tes::Backend::initialize(
133            backend::tes::Config::builder()
134                .url(backend_config.url.clone().expect("should have URL"))
135                .http(http)
136                .interval(backend_config.interval.unwrap_or(DEFAULT_TES_INTERVAL))
137                .build(),
138            names.clone(),
139            events.crankshaft().clone(),
140        )
141        .await;
142
143        Ok(Self {
144            config,
145            inner,
146            names,
147            cancellation,
148        })
149    }
150}
151
152impl TaskExecutionBackend for TesBackend {
153    fn constraints(
154        &self,
155        inputs: &TaskInputs,
156        requirements: &HashMap<String, Value>,
157        hints: &HashMap<String, Value>,
158    ) -> Result<TaskExecutionConstraints> {
159        let container =
160            requirements::container(inputs, requirements, self.config.task.container.as_deref());
161        match &container {
162            ContainerSource::Docker(_) | ContainerSource::Library(_) | ContainerSource::Oras(_) => {
163            }
164            ContainerSource::SifFile(_) => {
165                bail!(
166                    "TES backend does not support local SIF file `{container:#}`; use a \
167                     registry-based container image instead"
168                )
169            }
170            ContainerSource::Unknown(_) => {
171                bail!("TES backend does not support unknown container source `{container:#}`")
172            }
173        };
174
175        let disks = requirements::disks(inputs, requirements, hints)?;
176        if disks.values().any(|d| d.ty.is_some()) {
177            debug!("disk type hints are not supported by the TES backend and will be ignored");
178        }
179
180        Ok(TaskExecutionConstraints {
181            container: Some(container),
182            cpu: requirements::cpu(inputs, requirements),
183            memory: requirements::memory(inputs, requirements)? as u64,
184            gpu: Default::default(),
185            fpga: Default::default(),
186            disks: disks
187                .into_iter()
188                .map(|(mp, disk)| (mp.to_string(), disk.size))
189                .collect(),
190        })
191    }
192
193    fn needs_local_inputs(&self) -> bool {
194        false
195    }
196
197    fn execute<'a>(
198        &'a self,
199        transferer: &'a Arc<dyn Transferer>,
200        request: ExecuteTaskRequest<'a>,
201    ) -> BoxFuture<'a, Result<Option<TaskExecutionResult>>> {
202        async move {
203            let backend_config = self.config.backend()?;
204            let backend_config = backend_config
205                .as_tes()
206                .expect("configured backend should be TES");
207
208            let preemptible = hints::preemptible(request.inputs, request.hints)?;
209            let max_memory =
210                hints::max_memory(request.inputs, request.hints)?.map(|m| m as f64 / ONE_GIBIBYTE);
211            let name = format!(
212                "{id}-{generated}",
213                id = request.id,
214                generated = self
215                    .names
216                    .lock()
217                    .expect("generator should always acquire")
218                    .next()
219                    .expect("generator should never be exhausted")
220            );
221
222            // Write the evaluated command to disk
223            // This is done even for remote execution so that a copy exists locally
224            let command_path = request.command_path();
225            if let Some(parent) = command_path.parent() {
226                fs::create_dir_all(parent).with_context(|| {
227                    format!(
228                        "failed to create directory `{path}`",
229                        path = parent.display()
230                    )
231                })?;
232            }
233
234            fs::write(&command_path, request.command).with_context(|| {
235                format!(
236                    "failed to write command contents to `{path}`",
237                    path = command_path.display()
238                )
239            })?;
240
241            // SAFETY: currently `inputs` is required by configuration validation, so it
242            // should always unwrap
243            let inputs_url = Arc::new(
244                backend_config
245                    .inputs
246                    .clone()
247                    .expect("should have inputs URL"),
248            );
249
250            // Start with the command file as an input
251            let mut backend_inputs = vec![
252                Input::builder()
253                    .path(GUEST_COMMAND_PATH)
254                    .contents(Contents::Path(command_path.to_path_buf()))
255                    .ty(InputType::File)
256                    .read_only(true)
257                    .build(),
258            ];
259
260            // Spawn upload tasks for inputs available locally, and apply authentication to
261            // the URLs for remote inputs.
262            let mut uploads = JoinSet::new();
263            for (i, input) in request.backend_inputs.iter().enumerate() {
264                match input.path().kind() {
265                    EvaluationPathKind::Local(path) => {
266                        // Input is local, spawn an upload of it
267                        let kind = input.kind();
268                        let path = path.to_path_buf();
269                        let transferer = transferer.clone();
270                        let inputs_url = inputs_url.clone();
271                        uploads.spawn(async move {
272                            let url = inputs_url.join_digest(
273                                calculate_local_digest(&path, kind, ContentDigestMode::Strong)
274                                    .await
275                                    .with_context(|| {
276                                        format!(
277                                            "failed to calculate digest of `{path}`",
278                                            path = path.display()
279                                        )
280                                    })?,
281                            );
282                            transferer
283                                .upload(&path, &url)
284                                .await
285                                .with_context(|| {
286                                    format!(
287                                        "failed to upload `{path}` to `{url}`",
288                                        path = path.display(),
289                                        url = url.display()
290                                    )
291                                })
292                                .map(|_| (i, url))
293                        });
294                    }
295                    EvaluationPathKind::Remote(url) => {
296                        // Input is already remote, add it to the Crankshaft inputs list
297                        backend_inputs.push(
298                            Input::builder()
299                                .path(
300                                    input
301                                        .guest_path()
302                                        .expect("input should have guest path")
303                                        .as_str(),
304                                )
305                                .contents(Contents::Url(url.clone()))
306                                .ty(input.kind())
307                                .read_only(true)
308                                .build(),
309                        );
310                    }
311                }
312            }
313
314            // Wait for any uploads to complete
315            while let Some(result) = uploads.join_next().await {
316                let (i, url) = result.context("upload task")??;
317                let input = &request.backend_inputs[i];
318                backend_inputs.push(
319                    Input::builder()
320                        .path(
321                            input
322                                .guest_path()
323                                .expect("input should have guest path")
324                                .as_str(),
325                        )
326                        .contents(Contents::Url(url))
327                        .ty(input.kind())
328                        .read_only(true)
329                        .build(),
330                );
331            }
332
333            let output_dir = format!(
334                "{name}-{timestamp}/",
335                timestamp = chrono::Utc::now().format("%Y%m%d-%H%M%S")
336            );
337
338            // SAFETY: currently `outputs` is required by configuration validation, so it
339            // should always unwrap
340            let outputs_url = backend_config
341                .outputs
342                .as_ref()
343                .expect("should have outputs URL")
344                .join(&output_dir)
345                .expect("should join");
346
347            let mut work_dir_url = outputs_url.join(WORK_DIR_NAME).expect("should join");
348            let stdout_url = outputs_url.join(STDOUT_FILE_NAME).expect("should join");
349            let stderr_url = outputs_url.join(STDERR_FILE_NAME).expect("should join");
350
351            // The TES backend will output three things: the working directory contents,
352            // stdout, and stderr.
353            let outputs = vec![
354                Output::builder()
355                    .path(GUEST_WORK_DIR)
356                    .url(work_dir_url.clone())
357                    .ty(OutputType::Directory)
358                    .build(),
359                Output::builder()
360                    .path(GUEST_STDOUT_PATH)
361                    .url(stdout_url.clone())
362                    .ty(OutputType::File)
363                    .build(),
364                Output::builder()
365                    .path(GUEST_STDERR_PATH)
366                    .url(stderr_url.clone())
367                    .ty(OutputType::File)
368                    .build(),
369            ];
370
371            // Calculate the total size required for all disks as TES does not have a way of
372            // specifying volume sizes; a single disk will be created from which all volumes
373            // will be mounted
374            let disks = &request.constraints.disks;
375            let disk: f64 = if disks.is_empty() {
376                DEFAULT_TASK_REQUIREMENT_DISKS
377            } else {
378                let sum: f64 = disks.values().map(|size| *size as f64).sum();
379                if disks.contains_key(DEFAULT_DISK_MOUNT_POINT) {
380                    sum
381                } else {
382                    sum + DEFAULT_TASK_REQUIREMENT_DISKS
383                }
384            };
385
386            let volumes = request
387                .constraints
388                .disks
389                .keys()
390                .filter_map(|mp| {
391                    // NOTE: the root mount point is already handled by the work
392                    // directory mount, so we filter it here to avoid duplicate volume
393                    // mapping.
394                    if mp == DEFAULT_DISK_MOUNT_POINT {
395                        None
396                    } else {
397                        Some(mp.clone())
398                    }
399                })
400                .collect::<Vec<_>>();
401
402            if !volumes.is_empty() {
403                debug!(
404                    "disk size constraints cannot be enforced by the Docker backend; mount points \
405                     will be created but sizes will not be limited"
406                );
407            }
408
409            let mut preemptible = preemptible;
410            loop {
411                let task = Task::builder()
412                    .name(&name)
413                    .executions(NonEmpty::new(
414                        Execution::builder()
415                            .image(
416                                match request
417                                    .constraints
418                                    .container
419                                    .as_ref()
420                                    .expect("constraints should have a container")
421                                {
422                                    // For Docker container image sources, omit the protocol
423                                    ContainerSource::Docker(s) => s.clone(),
424                                    c => format!("{c:#}"),
425                                },
426                            )
427                            .program(
428                                self.config
429                                    .task
430                                    .shell
431                                    .as_deref()
432                                    .unwrap_or(DEFAULT_TASK_SHELL),
433                            )
434                            .args([GUEST_COMMAND_PATH.to_string()])
435                            .work_dir(GUEST_WORK_DIR)
436                            .env(request.env.clone())
437                            .stdout(GUEST_STDOUT_PATH)
438                            .stderr(GUEST_STDERR_PATH)
439                            .build(),
440                    ))
441                    .inputs(backend_inputs.clone())
442                    .outputs(outputs.clone())
443                    .resources(
444                        Resources::builder()
445                            .cpu(request.constraints.cpu)
446                            .maybe_cpu_limit(hints::max_cpu(request.inputs, request.hints))
447                            .ram(request.constraints.memory as f64 / ONE_GIBIBYTE)
448                            .disk(disk)
449                            .maybe_ram_limit(max_memory)
450                            .preemptible(preemptible > 0)
451                            .build(),
452                    )
453                    .volumes(volumes.clone())
454                    .build();
455
456                let statuses = match self.inner.run(task, self.cancellation.second())?.await {
457                    Ok(statuses) => statuses,
458                    Err(TaskRunError::Preempted) if preemptible > 0 => {
459                        // Decrement the preemptible count and retry
460                        preemptible -= 1;
461                        continue;
462                    }
463                    Err(TaskRunError::Canceled) => return Ok(None),
464                    Err(e) => return Err(e.into()),
465                };
466
467                assert_eq!(statuses.len(), 1, "there should only be one output");
468                let status = statuses.first();
469
470                // Push an empty path segment so that future joins of the work directory URL
471                // treat it as a directory
472                work_dir_url.path_segments_mut().unwrap().push("");
473
474                return Ok(Some(TaskExecutionResult {
475                    exit_code: status.code().expect("should have exit code"),
476                    work_dir: EvaluationPath::try_from(work_dir_url)?,
477                    stdout: PrimitiveValue::new_file(stdout_url).into(),
478                    stderr: PrimitiveValue::new_file(stderr_url).into(),
479                }));
480            }
481        }
482        .boxed()
483    }
484}