1use std::borrow::Cow;
4use std::collections::HashMap;
5use std::fs;
6use std::path::Path;
7use std::sync::Arc;
8use std::sync::Mutex;
9
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use anyhow::bail;
14use crankshaft::config::backend;
15use crankshaft::config::backend::tes::http::HttpAuthConfig;
16use crankshaft::engine::Task;
17use crankshaft::engine::service::name::GeneratorIterator;
18use crankshaft::engine::service::name::UniqueAlphanumeric;
19use crankshaft::engine::service::runner::Backend;
20use crankshaft::engine::service::runner::backend::TaskRunError;
21use crankshaft::engine::service::runner::backend::tes;
22use crankshaft::engine::task::Execution;
23use crankshaft::engine::task::Input;
24use crankshaft::engine::task::Output;
25use crankshaft::engine::task::Resources;
26use crankshaft::engine::task::input::Contents;
27use crankshaft::engine::task::input::Type as InputType;
28use crankshaft::engine::task::output::Type as OutputType;
29use futures::FutureExt;
30use futures::future::BoxFuture;
31use nonempty::NonEmpty;
32use tokio::sync::oneshot;
33use tokio_util::sync::CancellationToken;
34use tracing::info;
35use wdl_ast::v1::TASK_REQUIREMENT_DISKS;
36
37use super::TaskExecutionBackend;
38use super::TaskExecutionConstraints;
39use super::TaskExecutionEvents;
40use super::TaskExecutionResult;
41use super::TaskManager;
42use super::TaskManagerRequest;
43use super::TaskSpawnRequest;
44use crate::COMMAND_FILE_NAME;
45use crate::InputKind;
46use crate::InputTrie;
47use crate::ONE_GIBIBYTE;
48use crate::PrimitiveValue;
49use crate::STDERR_FILE_NAME;
50use crate::STDOUT_FILE_NAME;
51use crate::Value;
52use crate::WORK_DIR_NAME;
53use crate::config::Config;
54use crate::config::DEFAULT_TASK_SHELL;
55use crate::config::TesBackendAuthConfig;
56use crate::config::TesBackendConfig;
57use crate::http::HttpDownloader;
58use crate::http::rewrite_url;
59use crate::path::EvaluationPath;
60use crate::v1::DEFAULT_TASK_REQUIREMENT_DISKS;
61use crate::v1::container;
62use crate::v1::cpu;
63use crate::v1::disks;
64use crate::v1::max_cpu;
65use crate::v1::max_memory;
66use crate::v1::memory;
67use crate::v1::preemptible;
68
69const INITIAL_EXPECTED_NAMES: usize = 1000;
74
75const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs";
77
78const GUEST_WORK_DIR: &str = "/mnt/task/work";
80
81const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
83
84const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
86
87const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
89
90const DEFAULT_TES_INTERVAL: u64 = 60;
92
93#[derive(Debug)]
98struct TesTaskRequest {
99 config: Arc<Config>,
101 backend_config: Arc<TesBackendConfig>,
103 inner: TaskSpawnRequest,
105 backend: Arc<tes::Backend>,
107 name: String,
109 container: String,
111 cpu: f64,
113 memory: u64,
115 max_cpu: Option<f64>,
117 max_memory: Option<u64>,
119 preemptible: i64,
125 token: CancellationToken,
127}
128
129impl TesTaskRequest {
130 fn disk_resource(&self) -> Result<f64> {
132 let disks = disks(self.inner.requirements(), self.inner.hints())?;
133 if disks.len() > 1 {
134 bail!(
135 "TES backend does not support more than one disk specification for the \
136 `{TASK_REQUIREMENT_DISKS}` task requirement"
137 );
138 }
139
140 if let Some(mount_point) = disks.keys().next() {
141 if *mount_point != "/" {
142 bail!(
143 "TES backend does not support a disk mount point other than `/` for the \
144 `{TASK_REQUIREMENT_DISKS}` task requirement"
145 );
146 }
147 }
148
149 Ok(disks
150 .values()
151 .next()
152 .map(|d| d.size as f64)
153 .unwrap_or(DEFAULT_TASK_REQUIREMENT_DISKS))
154 }
155}
156
157impl TaskManagerRequest for TesTaskRequest {
158 fn cpu(&self) -> f64 {
159 self.cpu
160 }
161
162 fn memory(&self) -> u64 {
163 self.memory
164 }
165
166 async fn run(self, spawned: oneshot::Sender<()>) -> Result<TaskExecutionResult> {
167 let attempt_dir = self.inner.attempt_dir();
169 fs::create_dir_all(attempt_dir).with_context(|| {
170 format!(
171 "failed to create directory `{path}`",
172 path = attempt_dir.display()
173 )
174 })?;
175
176 let command_path = attempt_dir.join(COMMAND_FILE_NAME);
179 fs::write(&command_path, self.inner.command()).with_context(|| {
180 format!(
181 "failed to write command contents to `{path}`",
182 path = command_path.display()
183 )
184 })?;
185
186 let task_dir = format!(
187 "{name}-{timestamp}/",
188 name = self.name,
189 timestamp = chrono::Utc::now().format("%Y%m%d-%H%M%S")
190 );
191
192 let mut inputs = vec![
194 Input::builder()
195 .path(GUEST_COMMAND_PATH)
196 .contents(Contents::Path(command_path.to_path_buf()))
197 .ty(InputType::File)
198 .read_only(true)
199 .build(),
200 ];
201
202 for input in self.inner.inputs() {
203 if input.kind() == InputKind::Directory {
207 if let EvaluationPath::Local(path) = input.path() {
208 if let Ok(mut entries) = path.read_dir() {
209 if entries.next().is_some() {
210 bail!(
211 "cannot upload contents of directory `{path}`: operation is not \
212 yet supported",
213 path = path.display()
214 );
215 }
216 }
217 }
218 continue;
219 }
220
221 inputs.push(
224 Input::builder()
225 .path(input.guest_path().expect("should have guest path"))
226 .contents(match input.path() {
227 EvaluationPath::Local(path) => Contents::Path(path.clone()),
228 EvaluationPath::Remote(url) => Contents::Url(
229 rewrite_url(&self.config, Cow::Borrowed(url))?.into_owned(),
230 ),
231 })
232 .ty(input.kind())
233 .read_only(true)
234 .build(),
235 );
236 }
237
238 let outputs_url = self
241 .backend_config
242 .outputs
243 .as_ref()
244 .expect("should have outputs URL")
245 .join(&task_dir)
246 .expect("should join");
247
248 let mut work_dir_url = rewrite_url(
249 &self.config,
250 Cow::Owned(outputs_url.join(WORK_DIR_NAME).expect("should join")),
251 )?
252 .into_owned();
253 let stdout_url = rewrite_url(
254 &self.config,
255 Cow::Owned(outputs_url.join(STDOUT_FILE_NAME).expect("should join")),
256 )?
257 .into_owned();
258 let stderr_url = rewrite_url(
259 &self.config,
260 Cow::Owned(outputs_url.join(STDERR_FILE_NAME).expect("should join")),
261 )?
262 .into_owned();
263
264 let outputs = vec![
267 Output::builder()
268 .path(GUEST_WORK_DIR)
269 .url(work_dir_url.clone())
270 .ty(OutputType::Directory)
271 .build(),
272 Output::builder()
273 .path(GUEST_STDOUT_PATH)
274 .url(stdout_url.clone())
275 .ty(OutputType::File)
276 .build(),
277 Output::builder()
278 .path(GUEST_STDERR_PATH)
279 .url(stderr_url.clone())
280 .ty(OutputType::File)
281 .build(),
282 ];
283
284 let mut preemptible = self.preemptible;
285 let mut spawned = Some(spawned);
286 loop {
287 let task = Task::builder()
288 .name(&self.name)
289 .executions(NonEmpty::new(
290 Execution::builder()
291 .image(&self.container)
292 .program(
293 self.config
294 .task
295 .shell
296 .as_deref()
297 .unwrap_or(DEFAULT_TASK_SHELL),
298 )
299 .args(["-C".to_string(), GUEST_COMMAND_PATH.to_string()])
300 .work_dir(GUEST_WORK_DIR)
301 .env(self.inner.env().clone())
302 .stdout(GUEST_STDOUT_PATH)
303 .stderr(GUEST_STDERR_PATH)
304 .build(),
305 ))
306 .inputs(inputs.clone())
307 .outputs(outputs.clone())
308 .resources(
309 Resources::builder()
310 .cpu(self.cpu)
311 .maybe_cpu_limit(self.max_cpu)
312 .ram(self.memory as f64 / ONE_GIBIBYTE)
313 .disk(self.disk_resource()?)
314 .maybe_ram_limit(self.max_memory.map(|m| m as f64 / ONE_GIBIBYTE))
315 .preemptible(preemptible > 0)
316 .build(),
317 )
318 .build();
319
320 let statuses = match self
321 .backend
322 .run(task, spawned.take(), self.token.clone())
323 .map_err(|e| anyhow!("{e:#}"))?
324 .await
325 {
326 Ok(statuses) => statuses,
327 Err(TaskRunError::Preempted) if preemptible > 0 => {
328 preemptible -= 1;
330 continue;
331 }
332 Err(e) => {
333 return Err(e.into());
334 }
335 };
336
337 assert_eq!(statuses.len(), 1, "there should only be one output");
338 let status = statuses.first();
339
340 work_dir_url.path_segments_mut().unwrap().push("");
343
344 return Ok(TaskExecutionResult {
345 inputs: self.inner.info.inputs,
346 exit_code: status.code().expect("should have exit code"),
347 work_dir: EvaluationPath::Remote(work_dir_url),
348 stdout: PrimitiveValue::new_file(stdout_url).into(),
349 stderr: PrimitiveValue::new_file(stderr_url).into(),
350 });
351 }
352 }
353}
354
355pub struct TesBackend {
357 config: Arc<Config>,
359 backend_config: Arc<TesBackendConfig>,
361 inner: Arc<tes::Backend>,
363 max_concurrency: u64,
365 max_cpu: u64,
367 max_memory: u64,
369 manager: TaskManager<TesTaskRequest>,
371 generator: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
373}
374
375impl TesBackend {
376 pub async fn new(config: Arc<Config>, backend_config: &TesBackendConfig) -> Result<Self> {
381 info!("initializing TES backend");
382
383 let max_cpu = u64::MAX;
386 let max_memory = u64::MAX;
387 let manager = TaskManager::new_unlimited(max_cpu, max_memory);
388
389 let mut http = backend::tes::http::Config::default();
390 match &backend_config.auth {
391 Some(TesBackendAuthConfig::Basic(config)) => {
392 http.auth = Some(HttpAuthConfig::Basic {
393 username: config
394 .username
395 .clone()
396 .ok_or_else(|| anyhow!("missing `username` in basic auth"))?,
397 password: config
398 .password
399 .clone()
400 .ok_or_else(|| anyhow!("missing `password` in basic auth"))?,
401 });
402 }
403 Some(TesBackendAuthConfig::Bearer(config)) => {
404 http.auth = Some(HttpAuthConfig::Bearer {
405 token: config
406 .token
407 .clone()
408 .ok_or_else(|| anyhow!("missing `token` in bearer auth"))?,
409 });
410 }
411 None => {}
412 }
413
414 let backend = tes::Backend::initialize(
415 backend::tes::Config::builder()
416 .url(backend_config.url.clone().expect("should have URL"))
417 .http(http)
418 .interval(backend_config.interval.unwrap_or(DEFAULT_TES_INTERVAL))
419 .build(),
420 );
421
422 let max_concurrency = backend_config.max_concurrency.unwrap_or(u64::MAX);
423
424 Ok(Self {
425 config,
426 backend_config: Arc::new(backend_config.clone()),
427 inner: Arc::new(backend),
428 max_concurrency,
429 max_cpu,
430 max_memory,
431 manager,
432 generator: Arc::new(Mutex::new(GeneratorIterator::new(
433 UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
434 INITIAL_EXPECTED_NAMES,
435 ))),
436 })
437 }
438}
439
440impl TaskExecutionBackend for TesBackend {
441 fn max_concurrency(&self) -> u64 {
442 self.max_concurrency
443 }
444
445 fn constraints(
446 &self,
447 requirements: &HashMap<String, Value>,
448 hints: &HashMap<String, Value>,
449 ) -> Result<TaskExecutionConstraints> {
450 let container = container(requirements, self.config.task.container.as_deref());
451
452 let cpu = cpu(requirements);
453 if (self.max_cpu as f64) < cpu {
454 bail!(
455 "task requires at least {cpu} CPU{s}, but the execution backend has a maximum of \
456 {max_cpu}",
457 s = if cpu == 1.0 { "" } else { "s" },
458 max_cpu = self.max_cpu,
459 );
460 }
461
462 let memory = memory(requirements)?;
463 if self.max_memory < memory as u64 {
464 let memory = memory as f64 / ONE_GIBIBYTE;
466 let max_memory = self.max_memory as f64 / ONE_GIBIBYTE;
467
468 bail!(
469 "task requires at least {memory} GiB of memory, but the execution backend has a \
470 maximum of {max_memory} GiB",
471 );
472 }
473
474 let disks = disks(requirements, hints)?
476 .into_iter()
477 .map(|(mp, disk)| (mp.to_string(), disk.size))
478 .collect();
479
480 Ok(TaskExecutionConstraints {
481 container: Some(container.into_owned()),
482 cpu,
483 memory,
484 gpu: Default::default(),
485 fpga: Default::default(),
486 disks,
487 })
488 }
489
490 fn guest_work_dir(&self) -> Option<&Path> {
491 Some(Path::new(GUEST_WORK_DIR))
492 }
493
494 fn localize_inputs<'a, 'b, 'c, 'd>(
495 &'a self,
496 _: &'b HttpDownloader,
497 inputs: &'c mut [crate::eval::Input],
498 ) -> BoxFuture<'d, Result<()>>
499 where
500 'a: 'd,
501 'b: 'd,
502 'c: 'd,
503 Self: 'd,
504 {
505 async {
506 let mut trie = InputTrie::default();
508 for input in inputs.iter() {
509 trie.insert(input)?;
510 }
511
512 for (index, guest_path) in trie.calculate_guest_paths(GUEST_INPUTS_DIR)? {
513 inputs[index].set_guest_path(guest_path);
514 }
515
516 Ok(())
517 }
518 .boxed()
519 }
520
521 fn spawn(
522 &self,
523 request: TaskSpawnRequest,
524 token: CancellationToken,
525 ) -> Result<TaskExecutionEvents> {
526 let (spawned_tx, spawned_rx) = oneshot::channel();
527 let (completed_tx, completed_rx) = oneshot::channel();
528
529 let requirements = request.requirements();
530 let hints = request.hints();
531
532 let container = container(requirements, self.config.task.container.as_deref()).into_owned();
533 let cpu = cpu(requirements);
534 let memory = memory(requirements)? as u64;
535 let max_cpu = max_cpu(hints);
536 let max_memory = max_memory(hints)?.map(|i| i as u64);
537 let preemptible = preemptible(hints);
538
539 let name = format!(
540 "{id}-{generated}",
541 id = request.id(),
542 generated = self
543 .generator
544 .lock()
545 .expect("generator should always acquire")
546 .next()
547 .expect("generator should never be exhausted")
548 );
549 self.manager.send(
550 TesTaskRequest {
551 config: self.config.clone(),
552 backend_config: self.backend_config.clone(),
553 inner: request,
554 backend: self.inner.clone(),
555 name,
556 container,
557 cpu,
558 memory,
559 max_cpu,
560 max_memory,
561 token,
562 preemptible,
563 },
564 spawned_tx,
565 completed_tx,
566 );
567
568 Ok(TaskExecutionEvents {
569 spawned: spawned_rx,
570 completed: completed_rx,
571 })
572 }
573}