1use std::borrow::Cow;
4use std::collections::HashMap;
5use std::fs;
6use std::path::Path;
7use std::sync::Arc;
8use std::sync::Mutex;
9
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use anyhow::bail;
14use crankshaft::config::backend;
15use crankshaft::config::backend::tes::http::HttpAuthConfig;
16use crankshaft::engine::Task;
17use crankshaft::engine::service::name::GeneratorIterator;
18use crankshaft::engine::service::name::UniqueAlphanumeric;
19use crankshaft::engine::service::runner::Backend;
20use crankshaft::engine::service::runner::backend::TaskRunError;
21use crankshaft::engine::service::runner::backend::tes;
22use crankshaft::engine::task::Execution;
23use crankshaft::engine::task::Input;
24use crankshaft::engine::task::Output;
25use crankshaft::engine::task::Resources;
26use crankshaft::engine::task::input::Contents;
27use crankshaft::engine::task::input::Type as InputType;
28use crankshaft::engine::task::output::Type as OutputType;
29use futures::FutureExt;
30use futures::future::BoxFuture;
31use nonempty::NonEmpty;
32use tokio::sync::oneshot;
33use tokio_util::sync::CancellationToken;
34use tracing::info;
35use wdl_ast::v1::TASK_REQUIREMENT_DISKS;
36
37use super::TaskExecutionBackend;
38use super::TaskExecutionConstraints;
39use super::TaskExecutionEvents;
40use super::TaskExecutionResult;
41use super::TaskManager;
42use super::TaskManagerRequest;
43use super::TaskSpawnRequest;
44use crate::COMMAND_FILE_NAME;
45use crate::InputKind;
46use crate::InputTrie;
47use crate::ONE_GIBIBYTE;
48use crate::PrimitiveValue;
49use crate::STDERR_FILE_NAME;
50use crate::STDOUT_FILE_NAME;
51use crate::Value;
52use crate::WORK_DIR_NAME;
53use crate::config::Config;
54use crate::config::DEFAULT_TASK_SHELL;
55use crate::config::TesBackendAuthConfig;
56use crate::config::TesBackendConfig;
57use crate::http::HttpDownloader;
58use crate::http::rewrite_url;
59use crate::path::EvaluationPath;
60use crate::v1::DEFAULT_TASK_REQUIREMENT_DISKS;
61use crate::v1::container;
62use crate::v1::cpu;
63use crate::v1::disks;
64use crate::v1::max_cpu;
65use crate::v1::max_memory;
66use crate::v1::memory;
67use crate::v1::preemptible;
68
69const INITIAL_EXPECTED_NAMES: usize = 1000;
74
75const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs";
77
78const GUEST_WORK_DIR: &str = "/mnt/task/work";
80
81const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
83
84const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
86
87const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
89
90const DEFAULT_TES_INTERVAL: u64 = 60;
92
93#[derive(Debug)]
98struct TesTaskRequest {
99 config: Arc<Config>,
101 backend_config: Arc<TesBackendConfig>,
103 inner: TaskSpawnRequest,
105 backend: Arc<tes::Backend>,
107 name: String,
109 container: String,
111 cpu: f64,
113 memory: u64,
115 max_cpu: Option<f64>,
117 max_memory: Option<u64>,
119 preemptible: i64,
125 token: CancellationToken,
127}
128
129impl TesTaskRequest {
130 fn disk_resource(&self) -> Result<f64> {
132 let disks = disks(self.inner.requirements(), self.inner.hints())?;
133 if disks.len() > 1 {
134 bail!(
135 "TES backend does not support more than one disk specification for the \
136 `{TASK_REQUIREMENT_DISKS}` task requirement"
137 );
138 }
139
140 if let Some(mount_point) = disks.keys().next()
141 && *mount_point != "/"
142 {
143 bail!(
144 "TES backend does not support a disk mount point other than `/` for the \
145 `{TASK_REQUIREMENT_DISKS}` task requirement"
146 );
147 }
148
149 Ok(disks
150 .values()
151 .next()
152 .map(|d| d.size as f64)
153 .unwrap_or(DEFAULT_TASK_REQUIREMENT_DISKS))
154 }
155}
156
157impl TaskManagerRequest for TesTaskRequest {
158 fn cpu(&self) -> f64 {
159 self.cpu
160 }
161
162 fn memory(&self) -> u64 {
163 self.memory
164 }
165
166 async fn run(self, spawned: oneshot::Sender<()>) -> Result<TaskExecutionResult> {
167 let attempt_dir = self.inner.attempt_dir();
169 fs::create_dir_all(attempt_dir).with_context(|| {
170 format!(
171 "failed to create directory `{path}`",
172 path = attempt_dir.display()
173 )
174 })?;
175
176 let command_path = attempt_dir.join(COMMAND_FILE_NAME);
179 fs::write(&command_path, self.inner.command()).with_context(|| {
180 format!(
181 "failed to write command contents to `{path}`",
182 path = command_path.display()
183 )
184 })?;
185
186 let task_dir = format!(
187 "{name}-{timestamp}/",
188 name = self.name,
189 timestamp = chrono::Utc::now().format("%Y%m%d-%H%M%S")
190 );
191
192 let mut inputs = vec![
194 Input::builder()
195 .path(GUEST_COMMAND_PATH)
196 .contents(Contents::Path(command_path.to_path_buf()))
197 .ty(InputType::File)
198 .read_only(true)
199 .build(),
200 ];
201
202 for input in self.inner.inputs() {
203 if input.kind() == InputKind::Directory {
207 if let EvaluationPath::Local(path) = input.path()
208 && let Ok(mut entries) = path.read_dir()
209 && entries.next().is_some()
210 {
211 bail!(
212 "cannot upload contents of directory `{path}`: operation is not yet \
213 supported",
214 path = path.display()
215 );
216 }
217 continue;
218 }
219
220 inputs.push(
223 Input::builder()
224 .path(input.guest_path().expect("should have guest path"))
225 .contents(match input.path() {
226 EvaluationPath::Local(path) => Contents::Path(path.clone()),
227 EvaluationPath::Remote(url) => Contents::Url(
228 rewrite_url(&self.config, Cow::Borrowed(url))?.into_owned(),
229 ),
230 })
231 .ty(input.kind())
232 .read_only(true)
233 .build(),
234 );
235 }
236
237 let outputs_url = self
240 .backend_config
241 .outputs
242 .as_ref()
243 .expect("should have outputs URL")
244 .join(&task_dir)
245 .expect("should join");
246
247 let mut work_dir_url = rewrite_url(
248 &self.config,
249 Cow::Owned(outputs_url.join(WORK_DIR_NAME).expect("should join")),
250 )?
251 .into_owned();
252 let stdout_url = rewrite_url(
253 &self.config,
254 Cow::Owned(outputs_url.join(STDOUT_FILE_NAME).expect("should join")),
255 )?
256 .into_owned();
257 let stderr_url = rewrite_url(
258 &self.config,
259 Cow::Owned(outputs_url.join(STDERR_FILE_NAME).expect("should join")),
260 )?
261 .into_owned();
262
263 let outputs = vec![
266 Output::builder()
267 .path(GUEST_WORK_DIR)
268 .url(work_dir_url.clone())
269 .ty(OutputType::Directory)
270 .build(),
271 Output::builder()
272 .path(GUEST_STDOUT_PATH)
273 .url(stdout_url.clone())
274 .ty(OutputType::File)
275 .build(),
276 Output::builder()
277 .path(GUEST_STDERR_PATH)
278 .url(stderr_url.clone())
279 .ty(OutputType::File)
280 .build(),
281 ];
282
283 let mut preemptible = self.preemptible;
284 let mut spawned = Some(spawned);
285 loop {
286 let task = Task::builder()
287 .name(&self.name)
288 .executions(NonEmpty::new(
289 Execution::builder()
290 .image(&self.container)
291 .program(
292 self.config
293 .task
294 .shell
295 .as_deref()
296 .unwrap_or(DEFAULT_TASK_SHELL),
297 )
298 .args([GUEST_COMMAND_PATH.to_string()])
299 .work_dir(GUEST_WORK_DIR)
300 .env(self.inner.env().clone())
301 .stdout(GUEST_STDOUT_PATH)
302 .stderr(GUEST_STDERR_PATH)
303 .build(),
304 ))
305 .inputs(inputs.clone())
306 .outputs(outputs.clone())
307 .resources(
308 Resources::builder()
309 .cpu(self.cpu)
310 .maybe_cpu_limit(self.max_cpu)
311 .ram(self.memory as f64 / ONE_GIBIBYTE)
312 .disk(self.disk_resource()?)
313 .maybe_ram_limit(self.max_memory.map(|m| m as f64 / ONE_GIBIBYTE))
314 .preemptible(preemptible > 0)
315 .build(),
316 )
317 .build();
318
319 let statuses = match self
320 .backend
321 .run(task, spawned.take(), self.token.clone())
322 .map_err(|e| anyhow!("{e:#}"))?
323 .await
324 {
325 Ok(statuses) => statuses,
326 Err(TaskRunError::Preempted) if preemptible > 0 => {
327 preemptible -= 1;
329 continue;
330 }
331 Err(e) => {
332 return Err(e.into());
333 }
334 };
335
336 assert_eq!(statuses.len(), 1, "there should only be one output");
337 let status = statuses.first();
338
339 work_dir_url.path_segments_mut().unwrap().push("");
342
343 return Ok(TaskExecutionResult {
344 inputs: self.inner.info.inputs,
345 exit_code: status.code().expect("should have exit code"),
346 work_dir: EvaluationPath::Remote(work_dir_url),
347 stdout: PrimitiveValue::new_file(stdout_url).into(),
348 stderr: PrimitiveValue::new_file(stderr_url).into(),
349 });
350 }
351 }
352}
353
354pub struct TesBackend {
356 config: Arc<Config>,
358 backend_config: Arc<TesBackendConfig>,
360 inner: Arc<tes::Backend>,
362 max_concurrency: u64,
364 max_cpu: u64,
366 max_memory: u64,
368 manager: TaskManager<TesTaskRequest>,
370 generator: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
372}
373
374impl TesBackend {
375 pub async fn new(config: Arc<Config>, backend_config: &TesBackendConfig) -> Result<Self> {
380 info!("initializing TES backend");
381
382 let max_cpu = u64::MAX;
385 let max_memory = u64::MAX;
386 let manager = TaskManager::new_unlimited(max_cpu, max_memory);
387
388 let mut http = backend::tes::http::Config::default();
389 match &backend_config.auth {
390 Some(TesBackendAuthConfig::Basic(config)) => {
391 http.auth = Some(HttpAuthConfig::Basic {
392 username: config
393 .username
394 .clone()
395 .ok_or_else(|| anyhow!("missing `username` in basic auth"))?,
396 password: config
397 .password
398 .clone()
399 .ok_or_else(|| anyhow!("missing `password` in basic auth"))?,
400 });
401 }
402 Some(TesBackendAuthConfig::Bearer(config)) => {
403 http.auth = Some(HttpAuthConfig::Bearer {
404 token: config
405 .token
406 .clone()
407 .ok_or_else(|| anyhow!("missing `token` in bearer auth"))?,
408 });
409 }
410 None => {}
411 }
412
413 let backend = tes::Backend::initialize(
414 backend::tes::Config::builder()
415 .url(backend_config.url.clone().expect("should have URL"))
416 .http(http)
417 .interval(backend_config.interval.unwrap_or(DEFAULT_TES_INTERVAL))
418 .build(),
419 );
420
421 let max_concurrency = backend_config.max_concurrency.unwrap_or(u64::MAX);
422
423 Ok(Self {
424 config,
425 backend_config: Arc::new(backend_config.clone()),
426 inner: Arc::new(backend),
427 max_concurrency,
428 max_cpu,
429 max_memory,
430 manager,
431 generator: Arc::new(Mutex::new(GeneratorIterator::new(
432 UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
433 INITIAL_EXPECTED_NAMES,
434 ))),
435 })
436 }
437}
438
439impl TaskExecutionBackend for TesBackend {
440 fn max_concurrency(&self) -> u64 {
441 self.max_concurrency
442 }
443
444 fn constraints(
445 &self,
446 requirements: &HashMap<String, Value>,
447 hints: &HashMap<String, Value>,
448 ) -> Result<TaskExecutionConstraints> {
449 let container = container(requirements, self.config.task.container.as_deref());
450
451 let cpu = cpu(requirements);
452 if (self.max_cpu as f64) < cpu {
453 bail!(
454 "task requires at least {cpu} CPU{s}, but the execution backend has a maximum of \
455 {max_cpu}",
456 s = if cpu == 1.0 { "" } else { "s" },
457 max_cpu = self.max_cpu,
458 );
459 }
460
461 let memory = memory(requirements)?;
462 if self.max_memory < memory as u64 {
463 let memory = memory as f64 / ONE_GIBIBYTE;
465 let max_memory = self.max_memory as f64 / ONE_GIBIBYTE;
466
467 bail!(
468 "task requires at least {memory} GiB of memory, but the execution backend has a \
469 maximum of {max_memory} GiB",
470 );
471 }
472
473 let disks = disks(requirements, hints)?
475 .into_iter()
476 .map(|(mp, disk)| (mp.to_string(), disk.size))
477 .collect();
478
479 Ok(TaskExecutionConstraints {
480 container: Some(container.into_owned()),
481 cpu,
482 memory,
483 gpu: Default::default(),
484 fpga: Default::default(),
485 disks,
486 })
487 }
488
489 fn guest_work_dir(&self) -> Option<&Path> {
490 Some(Path::new(GUEST_WORK_DIR))
491 }
492
493 fn localize_inputs<'a, 'b, 'c, 'd>(
494 &'a self,
495 _: &'b HttpDownloader,
496 inputs: &'c mut [crate::eval::Input],
497 ) -> BoxFuture<'d, Result<()>>
498 where
499 'a: 'd,
500 'b: 'd,
501 'c: 'd,
502 Self: 'd,
503 {
504 async {
505 let mut trie = InputTrie::default();
507 for input in inputs.iter() {
508 trie.insert(input)?;
509 }
510
511 for (index, guest_path) in trie.calculate_guest_paths(GUEST_INPUTS_DIR)? {
512 inputs[index].set_guest_path(guest_path);
513 }
514
515 Ok(())
516 }
517 .boxed()
518 }
519
520 fn spawn(
521 &self,
522 request: TaskSpawnRequest,
523 token: CancellationToken,
524 ) -> Result<TaskExecutionEvents> {
525 let (spawned_tx, spawned_rx) = oneshot::channel();
526 let (completed_tx, completed_rx) = oneshot::channel();
527
528 let requirements = request.requirements();
529 let hints = request.hints();
530
531 let container = container(requirements, self.config.task.container.as_deref()).into_owned();
532 let cpu = cpu(requirements);
533 let memory = memory(requirements)? as u64;
534 let max_cpu = max_cpu(hints);
535 let max_memory = max_memory(hints)?.map(|i| i as u64);
536 let preemptible = preemptible(hints);
537
538 let name = format!(
539 "{id}-{generated}",
540 id = request.id(),
541 generated = self
542 .generator
543 .lock()
544 .expect("generator should always acquire")
545 .next()
546 .expect("generator should never be exhausted")
547 );
548 self.manager.send(
549 TesTaskRequest {
550 config: self.config.clone(),
551 backend_config: self.backend_config.clone(),
552 inner: request,
553 backend: self.inner.clone(),
554 name,
555 container,
556 cpu,
557 memory,
558 max_cpu,
559 max_memory,
560 token,
561 preemptible,
562 },
563 spawned_tx,
564 completed_tx,
565 );
566
567 Ok(TaskExecutionEvents {
568 spawned: spawned_rx,
569 completed: completed_rx,
570 })
571 }
572}