1use std::collections::HashMap;
4use std::fs;
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::Mutex;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::anyhow;
12use anyhow::bail;
13use base64::Engine as _;
14use base64::engine::general_purpose::STANDARD;
15use crankshaft::config::backend;
16use crankshaft::engine::Task;
17use crankshaft::engine::service::name::GeneratorIterator;
18use crankshaft::engine::service::name::UniqueAlphanumeric;
19use crankshaft::engine::service::runner::Backend;
20use crankshaft::engine::service::runner::backend::TaskRunError;
21use crankshaft::engine::service::runner::backend::tes;
22use crankshaft::engine::task::Execution;
23use crankshaft::engine::task::Input;
24use crankshaft::engine::task::Output;
25use crankshaft::engine::task::Resources;
26use crankshaft::engine::task::input::Contents;
27use crankshaft::engine::task::input::Type as InputType;
28use crankshaft::engine::task::output::Type as OutputType;
29use futures::FutureExt;
30use futures::future::BoxFuture;
31use nonempty::NonEmpty;
32use tokio::sync::oneshot;
33use tokio_util::sync::CancellationToken;
34use tracing::info;
35use wdl_ast::v1::TASK_REQUIREMENT_DISKS;
36
37use super::TaskExecutionBackend;
38use super::TaskExecutionConstraints;
39use super::TaskExecutionEvents;
40use super::TaskExecutionResult;
41use super::TaskManager;
42use super::TaskManagerRequest;
43use super::TaskSpawnRequest;
44use crate::COMMAND_FILE_NAME;
45use crate::InputKind;
46use crate::InputTrie;
47use crate::ONE_GIBIBYTE;
48use crate::PrimitiveValue;
49use crate::STDERR_FILE_NAME;
50use crate::STDOUT_FILE_NAME;
51use crate::Value;
52use crate::WORK_DIR_NAME;
53use crate::config::DEFAULT_TASK_SHELL;
54use crate::config::TaskConfig;
55use crate::config::TesBackendAuthConfig;
56use crate::config::TesBackendConfig;
57use crate::http::HttpDownloader;
58use crate::path::EvaluationPath;
59use crate::v1::DEFAULT_TASK_REQUIREMENT_DISKS;
60use crate::v1::container;
61use crate::v1::cpu;
62use crate::v1::disks;
63use crate::v1::max_cpu;
64use crate::v1::max_memory;
65use crate::v1::memory;
66use crate::v1::preemptible;
67
68const INITIAL_EXPECTED_NAMES: usize = 1000;
73
74const GUEST_INPUTS_DIR: &str = "/mnt/inputs";
76
77const GUEST_WORK_DIR: &str = "/mnt/work";
79
80const GUEST_COMMAND_PATH: &str = "/mnt/command";
82
83const GUEST_STDOUT_PATH: &str = "/stdout";
85
86const GUEST_STDERR_PATH: &str = "/stderr";
88
89const DEFAULT_TES_INTERVAL: u64 = 60;
91
92#[derive(Debug)]
97struct TesTaskRequest {
98 inner: TaskSpawnRequest,
100 backend: Arc<tes::Backend>,
102 name: String,
104 shell: Arc<Option<String>>,
106 container: String,
108 config: Arc<TesBackendConfig>,
110 cpu: f64,
112 memory: u64,
114 max_cpu: Option<f64>,
116 max_memory: Option<u64>,
118 preemptible: i64,
124 token: CancellationToken,
126}
127
128impl TesTaskRequest {
129 fn disk_resource(&self) -> Result<f64> {
131 let disks = disks(self.inner.requirements(), self.inner.hints())?;
132 if disks.len() > 1 {
133 bail!(
134 "TES backend does not support more than one disk specification for the \
135 `{TASK_REQUIREMENT_DISKS}` task requirement"
136 );
137 }
138
139 if let Some(mount_point) = disks.keys().next() {
140 if *mount_point != "/" {
141 bail!(
142 "TES backend does not support a disk mount point other than `/` for the \
143 `{TASK_REQUIREMENT_DISKS}` task requirement"
144 );
145 }
146 }
147
148 Ok(disks
149 .values()
150 .next()
151 .map(|d| d.size as f64)
152 .unwrap_or(DEFAULT_TASK_REQUIREMENT_DISKS))
153 }
154}
155
156impl TaskManagerRequest for TesTaskRequest {
157 fn cpu(&self) -> f64 {
158 self.cpu
159 }
160
161 fn memory(&self) -> u64 {
162 self.memory
163 }
164
165 async fn run(self, spawned: oneshot::Sender<()>) -> Result<TaskExecutionResult> {
166 let attempt_dir = self.inner.attempt_dir();
168 fs::create_dir_all(attempt_dir).with_context(|| {
169 format!(
170 "failed to create directory `{path}`",
171 path = attempt_dir.display()
172 )
173 })?;
174
175 let command_path = attempt_dir.join(COMMAND_FILE_NAME);
178 fs::write(&command_path, self.inner.command()).with_context(|| {
179 format!(
180 "failed to write command contents to `{path}`",
181 path = command_path.display()
182 )
183 })?;
184
185 let task_dir = format!(
186 "{name}-{timestamp}/",
187 name = self.name,
188 timestamp = chrono::Utc::now().format("%Y%m%d-%H%M%S")
189 );
190
191 let mut inputs = vec![
193 Input::builder()
194 .path(GUEST_COMMAND_PATH)
195 .contents(Contents::Path(command_path.to_path_buf()))
196 .ty(InputType::File)
197 .read_only(true)
198 .build(),
199 ];
200
201 for input in self.inner.inputs() {
202 if input.kind() == InputKind::Directory {
206 if let EvaluationPath::Local(path) = input.path() {
207 if let Ok(mut entries) = path.read_dir() {
208 if entries.next().is_some() {
209 bail!(
210 "cannot upload contents of directory `{path}`: operation is not \
211 yet supported",
212 path = path.display()
213 );
214 }
215 }
216 }
217 continue;
218 }
219
220 inputs.push(
223 Input::builder()
224 .path(input.guest_path().expect("should have guest path"))
225 .contents(match input.path() {
226 EvaluationPath::Local(path) => Contents::Path(path.clone()),
227 EvaluationPath::Remote(url) => Contents::Url(url.clone()),
228 })
229 .ty(input.kind())
230 .read_only(true)
231 .build(),
232 );
233 }
234
235 let outputs_url = self
238 .config
239 .outputs
240 .as_ref()
241 .expect("should have outputs URL")
242 .join(&task_dir)
243 .expect("should join");
244
245 let mut work_dir_url = outputs_url.join(WORK_DIR_NAME).expect("should join");
246 let stdout_url = outputs_url.join(STDOUT_FILE_NAME).expect("should join");
247 let stderr_url = outputs_url.join(STDERR_FILE_NAME).expect("should join");
248
249 let outputs = vec![
252 Output::builder()
253 .path(GUEST_WORK_DIR)
254 .url(work_dir_url.clone())
255 .ty(OutputType::Directory)
256 .build(),
257 Output::builder()
258 .path(GUEST_STDOUT_PATH)
259 .url(stdout_url.clone())
260 .ty(OutputType::File)
261 .build(),
262 Output::builder()
263 .path(GUEST_STDERR_PATH)
264 .url(stderr_url.clone())
265 .ty(OutputType::File)
266 .build(),
267 ];
268
269 let mut preemptible = self.preemptible;
270 let mut spawned = Some(spawned);
271 loop {
272 let task = Task::builder()
273 .name(&self.name)
274 .executions(NonEmpty::new(
275 Execution::builder()
276 .image(&self.container)
277 .program(self.shell.as_deref().unwrap_or(DEFAULT_TASK_SHELL))
278 .args(["-C".to_string(), GUEST_COMMAND_PATH.to_string()])
279 .work_dir(GUEST_WORK_DIR)
280 .env(self.inner.env().clone())
281 .stdout(GUEST_STDOUT_PATH)
282 .stderr(GUEST_STDERR_PATH)
283 .build(),
284 ))
285 .inputs(inputs.clone())
286 .outputs(outputs.clone())
287 .resources(
288 Resources::builder()
289 .cpu(self.cpu)
290 .maybe_cpu_limit(self.max_cpu)
291 .ram(self.memory as f64 / ONE_GIBIBYTE)
292 .disk(self.disk_resource()?)
293 .maybe_ram_limit(self.max_memory.map(|m| m as f64 / ONE_GIBIBYTE))
294 .preemptible(preemptible > 0)
295 .build(),
296 )
297 .build();
298
299 let statuses = match self
300 .backend
301 .run(task, spawned.take(), self.token.clone())
302 .map_err(|e| anyhow!("{e:#}"))?
303 .await
304 {
305 Ok(statuses) => statuses,
306 Err(TaskRunError::Preempted) if preemptible > 0 => {
307 preemptible -= 1;
309 continue;
310 }
311 Err(e) => {
312 return Err(e.into());
313 }
314 };
315
316 assert_eq!(statuses.len(), 1, "there should only be one output");
317 let status = statuses.first();
318
319 work_dir_url.path_segments_mut().unwrap().push("");
322
323 return Ok(TaskExecutionResult {
324 inputs: self.inner.info.inputs,
325 exit_code: status.code().expect("should have exit code"),
326 work_dir: EvaluationPath::Remote(work_dir_url),
327 stdout: PrimitiveValue::new_file(stdout_url).into(),
328 stderr: PrimitiveValue::new_file(stderr_url).into(),
329 });
330 }
331 }
332}
333
334pub struct TesBackend {
336 inner: Arc<tes::Backend>,
338 shell: Arc<Option<String>>,
340 container: Option<String>,
342 config: Arc<TesBackendConfig>,
344 max_concurrency: u64,
346 max_cpu: u64,
348 max_memory: u64,
350 manager: TaskManager<TesTaskRequest>,
352 generator: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
354}
355
356impl TesBackend {
357 pub async fn new(task: &TaskConfig, config: &TesBackendConfig) -> Result<Self> {
360 task.validate()?;
361 config.validate()?;
362
363 info!("initializing TES backend");
364
365 let max_cpu = u64::MAX;
368 let max_memory = u64::MAX;
369 let manager = TaskManager::new_unlimited(max_cpu, max_memory);
370
371 let mut http = backend::tes::http::Config::default();
372 if let Some(TesBackendAuthConfig::Basic(auth)) = &config.auth {
373 http.basic_auth_token = Some(STANDARD.encode(format!(
374 "{user}:{pass}",
375 user = auth.username.as_ref().expect("should have user name"),
376 pass = auth.password.as_ref().expect("should have password")
377 )));
378 }
379
380 let backend = tes::Backend::initialize(
381 backend::tes::Config::builder()
382 .url(config.url.clone().expect("should have URL"))
383 .http(http)
384 .interval(config.interval.unwrap_or(DEFAULT_TES_INTERVAL))
385 .build(),
386 );
387
388 Ok(Self {
389 inner: Arc::new(backend),
390 shell: Arc::new(task.shell.clone()),
391 container: task.container.clone(),
392 config: Arc::new(config.clone()),
393 max_concurrency: config.max_concurrency.unwrap_or(u64::MAX),
394 max_cpu,
395 max_memory,
396 manager,
397 generator: Arc::new(Mutex::new(GeneratorIterator::new(
398 UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
399 INITIAL_EXPECTED_NAMES,
400 ))),
401 })
402 }
403}
404
405impl TaskExecutionBackend for TesBackend {
406 fn max_concurrency(&self) -> u64 {
407 self.max_concurrency
408 }
409
410 fn constraints(
411 &self,
412 requirements: &HashMap<String, Value>,
413 hints: &HashMap<String, Value>,
414 ) -> Result<TaskExecutionConstraints> {
415 let container = container(requirements, self.container.as_deref());
416
417 let cpu = cpu(requirements);
418 if (self.max_cpu as f64) < cpu {
419 bail!(
420 "task requires at least {cpu} CPU{s}, but the execution backend has a maximum of \
421 {max_cpu}",
422 s = if cpu == 1.0 { "" } else { "s" },
423 max_cpu = self.max_cpu,
424 );
425 }
426
427 let memory = memory(requirements)?;
428 if self.max_memory < memory as u64 {
429 let memory = memory as f64 / ONE_GIBIBYTE;
431 let max_memory = self.max_memory as f64 / ONE_GIBIBYTE;
432
433 bail!(
434 "task requires at least {memory} GiB of memory, but the execution backend has a \
435 maximum of {max_memory} GiB",
436 );
437 }
438
439 let disks = disks(requirements, hints)?
441 .into_iter()
442 .map(|(mp, disk)| (mp.to_string(), disk.size))
443 .collect();
444
445 Ok(TaskExecutionConstraints {
446 container: Some(container.into_owned()),
447 cpu,
448 memory,
449 gpu: Default::default(),
450 fpga: Default::default(),
451 disks,
452 })
453 }
454
455 fn guest_work_dir(&self) -> Option<&Path> {
456 Some(Path::new(GUEST_WORK_DIR))
457 }
458
459 fn localize_inputs<'a, 'b, 'c, 'd>(
460 &'a self,
461 _: &'b HttpDownloader,
462 inputs: &'c mut [crate::eval::Input],
463 ) -> BoxFuture<'d, Result<()>>
464 where
465 'a: 'd,
466 'b: 'd,
467 'c: 'd,
468 Self: 'd,
469 {
470 async {
471 let mut trie = InputTrie::default();
473 for input in inputs.iter() {
474 trie.insert(input)?;
475 }
476
477 for (index, guest_path) in trie.calculate_guest_paths(GUEST_INPUTS_DIR)? {
478 inputs[index].set_guest_path(guest_path);
479 }
480
481 Ok(())
482 }
483 .boxed()
484 }
485
486 fn spawn(
487 &self,
488 request: TaskSpawnRequest,
489 token: CancellationToken,
490 ) -> Result<TaskExecutionEvents> {
491 let (spawned_tx, spawned_rx) = oneshot::channel();
492 let (completed_tx, completed_rx) = oneshot::channel();
493
494 let requirements = request.requirements();
495 let hints = request.hints();
496
497 let container = container(requirements, self.container.as_deref()).into_owned();
498 let cpu = cpu(requirements);
499 let memory = memory(requirements)? as u64;
500 let max_cpu = max_cpu(hints);
501 let max_memory = max_memory(hints)?.map(|i| i as u64);
502 let preemptible = preemptible(hints);
503
504 let name = format!(
505 "{id}-{generated}",
506 id = request.id(),
507 generated = self
508 .generator
509 .lock()
510 .expect("generator should always acquire")
511 .next()
512 .expect("generator should never be exhausted")
513 );
514 self.manager.send(
515 TesTaskRequest {
516 inner: request,
517 backend: self.inner.clone(),
518 name,
519 shell: self.shell.clone(),
520 container,
521 config: self.config.clone(),
522 cpu,
523 memory,
524 max_cpu,
525 max_memory,
526 token,
527 preemptible,
528 },
529 spawned_tx,
530 completed_tx,
531 );
532
533 Ok(TaskExecutionEvents {
534 spawned: spawned_rx,
535 completed: completed_rx,
536 })
537 }
538}