wdl_engine/backend.rs
1//! Implementation of task execution backends.
2
3use std::collections::HashMap;
4use std::collections::VecDeque;
5use std::fmt;
6use std::future::Future;
7use std::ops::Add;
8use std::ops::Range;
9use std::ops::Sub;
10use std::path::Path;
11use std::path::PathBuf;
12use std::sync::Arc;
13
14use anyhow::Result;
15use anyhow::anyhow;
16use futures::future::BoxFuture;
17use indexmap::IndexMap;
18use ordered_float::OrderedFloat;
19use tokio::sync::mpsc;
20use tokio::sync::oneshot;
21use tokio::sync::oneshot::Receiver;
22use tokio::task::JoinSet;
23use tokio_util::sync::CancellationToken;
24use tracing::debug;
25
26use crate::Input;
27use crate::Value;
28use crate::http::Transferer;
29use crate::path::EvaluationPath;
30
31mod docker;
32mod local;
33mod tes;
34
35pub use docker::*;
36pub use local::*;
37pub use tes::*;
38
39/// The default work directory name.
40pub(crate) const WORK_DIR_NAME: &str = "work";
41
42/// The default command file name.
43pub(crate) const COMMAND_FILE_NAME: &str = "command";
44
45/// The default stdout file name.
46pub(crate) const STDOUT_FILE_NAME: &str = "stdout";
47
48/// The default stderr file name.
49pub(crate) const STDERR_FILE_NAME: &str = "stderr";
50
51/// The number of initial expected task names.
52///
53/// This controls the initial size of the bloom filter and how many names are
54/// prepopulated into a name generator.
55const INITIAL_EXPECTED_NAMES: usize = 1000;
56
57/// Represents constraints applied to a task's execution.
58pub struct TaskExecutionConstraints {
59 /// The container the task will run in.
60 ///
61 /// A value of `None` indicates the task will run on the host.
62 pub container: Option<String>,
63 /// The allocated number of CPUs; must be greater than 0.
64 pub cpu: f64,
65 /// The allocated memory in bytes; must be greater than 0.
66 pub memory: i64,
67 /// A list with one specification per allocated GPU.
68 ///
69 /// The specification is execution engine-specific.
70 ///
71 /// If no GPUs were allocated, then the value must be an empty list.
72 pub gpu: Vec<String>,
73 /// A list with one specification per allocated FPGA.
74 ///
75 /// The specification is execution engine-specific.
76 ///
77 /// If no FPGAs were allocated, then the value must be an empty list.
78 pub fpga: Vec<String>,
79 /// A map with one entry for each disk mount point.
80 ///
81 /// The key is the mount point and the value is the initial amount of disk
82 /// space allocated, in bytes.
83 ///
84 /// The execution engine must, at a minimum, provide one entry for each disk
85 /// mount point requested, but may provide more.
86 ///
87 /// The amount of disk space available for a given mount point may increase
88 /// during the lifetime of the task (e.g., autoscaling volumes provided by
89 /// some cloud services).
90 pub disks: IndexMap<String, i64>,
91}
92
93/// Represents information for spawning a task.
94pub struct TaskSpawnInfo {
95 /// The command of the task.
96 command: String,
97 /// The inputs for task.
98 inputs: Vec<Input>,
99 /// The requirements of the task.
100 requirements: Arc<HashMap<String, Value>>,
101 /// The hints of the task.
102 hints: Arc<HashMap<String, Value>>,
103 /// The environment variables of the task.
104 env: Arc<IndexMap<String, String>>,
105 /// The transferer to use for uploading inputs.
106 transferer: Arc<dyn Transferer>,
107}
108
109impl TaskSpawnInfo {
110 /// Constructs a new task spawn information.
111 pub fn new(
112 command: String,
113 inputs: Vec<Input>,
114 requirements: Arc<HashMap<String, Value>>,
115 hints: Arc<HashMap<String, Value>>,
116 env: Arc<IndexMap<String, String>>,
117 transferer: Arc<dyn Transferer>,
118 ) -> Self {
119 Self {
120 command,
121 inputs,
122 requirements,
123 hints,
124 env,
125 transferer,
126 }
127 }
128}
129
130impl fmt::Debug for TaskSpawnInfo {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 f.debug_struct("TaskSpawnInfo")
133 .field("command", &self.command)
134 .field("inputs", &self.inputs)
135 .field("requirements", &self.requirements)
136 .field("hints", &self.hints)
137 .field("env", &self.env)
138 .field("transferer", &"<transferer>")
139 .finish()
140 }
141}
142
143/// Represents a request to spawn a task.
144#[derive(Debug)]
145pub struct TaskSpawnRequest {
146 /// The id of the task being spawned.
147 id: String,
148 /// The information for the task to spawn.
149 info: TaskSpawnInfo,
150 /// The attempt number for the spawn request.
151 attempt: u64,
152 /// The attempt directory for the task's execution.
153 attempt_dir: PathBuf,
154}
155
156impl TaskSpawnRequest {
157 /// Creates a new task spawn request.
158 pub fn new(id: String, info: TaskSpawnInfo, attempt: u64, attempt_dir: PathBuf) -> Self {
159 Self {
160 id,
161 info,
162 attempt,
163 attempt_dir,
164 }
165 }
166
167 /// The identifier of the task being spawned.
168 pub fn id(&self) -> &str {
169 &self.id
170 }
171
172 /// Gets the command for the task.
173 pub fn command(&self) -> &str {
174 &self.info.command
175 }
176
177 /// Gets the inputs for the task.
178 pub fn inputs(&self) -> &[Input] {
179 &self.info.inputs
180 }
181
182 /// Gets the requirements of the task.
183 pub fn requirements(&self) -> &HashMap<String, Value> {
184 &self.info.requirements
185 }
186
187 /// Gets the hints of the task.
188 pub fn hints(&self) -> &HashMap<String, Value> {
189 &self.info.hints
190 }
191
192 /// Gets the environment variables of the task.
193 pub fn env(&self) -> &IndexMap<String, String> {
194 &self.info.env
195 }
196
197 /// Gets the transferer to use for uploading inputs.
198 pub fn transferer(&self) -> &Arc<dyn Transferer> {
199 &self.info.transferer
200 }
201
202 /// Gets the attempt number for the task's execution.
203 ///
204 /// The attempt number starts at 0.
205 pub fn attempt(&self) -> u64 {
206 self.attempt
207 }
208
209 /// Gets the attempt directory for the task's execution.
210 pub fn attempt_dir(&self) -> &Path {
211 &self.attempt_dir
212 }
213}
214
215/// Represents the result of a task's execution.
216#[derive(Debug)]
217pub struct TaskExecutionResult {
218 /// Stores the task process exit code.
219 pub exit_code: i32,
220 /// The task's working directory.
221 pub work_dir: EvaluationPath,
222 /// The value of the task's stdout file.
223 pub stdout: Value,
224 /// The value of the task's stderr file.
225 pub stderr: Value,
226}
227
228/// Represents a task execution backend.
229pub trait TaskExecutionBackend: Send + Sync {
230 /// Gets the maximum concurrent tasks supported by the backend.
231 fn max_concurrency(&self) -> u64;
232
233 /// Gets the execution constraints given a task's requirements and hints.
234 ///
235 /// Returns an error if the task cannot be constrained for the execution
236 /// environment or if the task specifies invalid requirements.
237 fn constraints(
238 &self,
239 requirements: &HashMap<String, Value>,
240 hints: &HashMap<String, Value>,
241 ) -> Result<TaskExecutionConstraints>;
242
243 /// Gets the guest (container) inputs directory of the backend.
244 ///
245 /// Returns `None` if the backend does not execute tasks in a container.
246 ///
247 /// The returned path is expected to be Unix style and end with a backslash.
248 fn guest_inputs_dir(&self) -> Option<&'static str>;
249
250 /// Determines if the backend needs local inputs.
251 ///
252 /// Backends that run tasks locally or from a shared file system will return
253 /// `true`.
254 fn needs_local_inputs(&self) -> bool;
255
256 /// Spawns a task with the execution backend.
257 ///
258 /// Returns a oneshot receiver for awaiting the completion of the task.
259 fn spawn(
260 &self,
261 request: TaskSpawnRequest,
262 token: CancellationToken,
263 ) -> Result<Receiver<Result<TaskExecutionResult>>>;
264
265 /// Performs cleanup operations after task execution completes.
266 ///
267 /// Returns `None` if no cleanup is required.
268 fn cleanup<'a>(
269 &'a self,
270 work_dir: &'a EvaluationPath,
271 token: CancellationToken,
272 ) -> Option<BoxFuture<'a, ()>> {
273 let _ = work_dir;
274 let _ = token;
275 None
276 }
277}
278
279/// A trait implemented by backend requests.
280trait TaskManagerRequest: Send + Sync + 'static {
281 /// Gets the requested CPU allocation from the request.
282 fn cpu(&self) -> f64;
283
284 /// Gets the requested memory allocation from the request, in bytes.
285 fn memory(&self) -> u64;
286
287 /// Runs the request.
288 fn run(self) -> impl Future<Output = Result<TaskExecutionResult>> + Send;
289}
290
291/// Represents a response internal to the task manager.
292struct TaskManagerResponse {
293 /// The previous CPU allocation from the request.
294 cpu: f64,
295 /// The previous memory allocation from the request.
296 memory: u64,
297 /// The result of the task's execution.
298 result: Result<TaskExecutionResult>,
299 /// The channel to send the task's execution result back on.
300 tx: oneshot::Sender<Result<TaskExecutionResult>>,
301}
302
303/// Represents state used by the task manager.
304struct TaskManagerState<Req> {
305 /// The amount of available CPU remaining.
306 cpu: OrderedFloat<f64>,
307 /// The amount of available memory remaining, in bytes.
308 memory: u64,
309 /// The set of spawned tasks.
310 spawned: JoinSet<TaskManagerResponse>,
311 /// The queue of parked spawn requests.
312 parked: VecDeque<(Req, oneshot::Sender<Result<TaskExecutionResult>>)>,
313}
314
315impl<Req> TaskManagerState<Req> {
316 /// Constructs a new task manager state with the given total CPU and memory.
317 fn new(cpu: u64, memory: u64) -> Self {
318 Self {
319 cpu: OrderedFloat(cpu as f64),
320 memory,
321 spawned: Default::default(),
322 parked: Default::default(),
323 }
324 }
325
326 /// Determines if the resources are unlimited.
327 fn unlimited(&self) -> bool {
328 self.cpu == u64::MAX as f64 && self.memory == u64::MAX
329 }
330}
331
332/// Responsible for managing tasks based on available host resources.
333struct TaskManager<Req> {
334 /// The sender for new spawn requests.
335 tx: mpsc::UnboundedSender<(Req, oneshot::Sender<Result<TaskExecutionResult>>)>,
336}
337
338impl<Req> TaskManager<Req>
339where
340 Req: TaskManagerRequest,
341{
342 /// Constructs a new task manager with the given total CPU, maximum CPU per
343 /// request, total memory, and maximum memory per request.
344 fn new(cpu: u64, max_cpu: u64, memory: u64, max_memory: u64) -> Self {
345 let (tx, rx) = mpsc::unbounded_channel();
346
347 tokio::spawn(async move {
348 Self::run_request_queue(rx, cpu, max_cpu, memory, max_memory).await;
349 });
350
351 Self { tx }
352 }
353
354 /// Constructs a new task manager that does not limit requests based on
355 /// available resources.
356 fn new_unlimited(max_cpu: u64, max_memory: u64) -> Self {
357 Self::new(u64::MAX, max_cpu, u64::MAX, max_memory)
358 }
359
360 /// Sends a request to the task manager's queue.
361 fn send(&self, request: Req, completed: oneshot::Sender<Result<TaskExecutionResult>>) {
362 self.tx.send((request, completed)).ok();
363 }
364
365 /// Runs the request queue.
366 async fn run_request_queue(
367 mut rx: mpsc::UnboundedReceiver<(Req, oneshot::Sender<Result<TaskExecutionResult>>)>,
368 cpu: u64,
369 max_cpu: u64,
370 memory: u64,
371 max_memory: u64,
372 ) {
373 let mut state = TaskManagerState::new(cpu, memory);
374
375 loop {
376 // If there aren't any spawned tasks, wait for a spawn request only
377 if state.spawned.is_empty() {
378 assert!(
379 state.parked.is_empty(),
380 "there can't be any parked requests if there are no spawned tasks"
381 );
382 match rx.recv().await {
383 Some((req, completed)) => {
384 Self::handle_spawn_request(&mut state, max_cpu, max_memory, req, completed);
385 continue;
386 }
387 None => break,
388 }
389 }
390
391 // Otherwise, wait for a spawn request or a completed task
392 tokio::select! {
393 request = rx.recv() => {
394 match request {
395 Some((req, completed)) => {
396 Self::handle_spawn_request(&mut state, max_cpu, max_memory, req, completed);
397 }
398 None => break,
399 }
400 }
401 Some(Ok(response)) = state.spawned.join_next() => {
402 if !state.unlimited() {
403 state.cpu += response.cpu;
404 state.memory += response.memory;
405 }
406
407 response.tx.send(response.result).ok();
408 Self::spawn_parked_tasks(&mut state, max_cpu, max_memory);
409 }
410 }
411 }
412 }
413
414 /// Handles a spawn request by either parking it (not enough resources
415 /// currently available) or by spawning it.
416 fn handle_spawn_request(
417 state: &mut TaskManagerState<Req>,
418 max_cpu: u64,
419 max_memory: u64,
420 request: Req,
421 completed: oneshot::Sender<Result<TaskExecutionResult>>,
422 ) {
423 // Ensure the request does not exceed the maximum CPU
424 let cpu = request.cpu();
425 if cpu > max_cpu as f64 {
426 completed
427 .send(Err(anyhow!(
428 "requested task CPU count of {cpu} exceeds the maximum CPU count of {max_cpu}",
429 )))
430 .ok();
431 return;
432 }
433
434 // Ensure the request does not exceed the maximum memory
435 let memory = request.memory();
436 if memory > max_memory {
437 completed
438 .send(Err(anyhow!(
439 "requested task memory of {memory} byte{s} exceeds the maximum memory of \
440 {max_memory}",
441 s = if memory == 1 { "" } else { "s" }
442 )))
443 .ok();
444 return;
445 }
446
447 if !state.unlimited() {
448 // If the request can't be processed due to resource constraints, park the
449 // request for now. When a task completes and resources become available,
450 // we'll unpark the request
451 if cpu > state.cpu.into() || memory > state.memory {
452 debug!(
453 "parking task due to insufficient resources: task reserves {cpu} CPU(s) and \
454 {memory} bytes of memory but there are only {cpu_remaining} CPU(s) and \
455 {memory_remaining} bytes of memory available",
456 cpu_remaining = state.cpu,
457 memory_remaining = state.memory
458 );
459 state.parked.push_back((request, completed));
460 return;
461 }
462
463 // Decrement the resource counts and spawn the task
464 state.cpu -= cpu;
465 state.memory -= memory;
466 debug!(
467 "spawning task with {cpu} CPUs and {memory} bytes of memory remaining",
468 cpu = state.cpu,
469 memory = state.memory
470 );
471 }
472
473 state.spawned.spawn(async move {
474 TaskManagerResponse {
475 cpu: request.cpu(),
476 memory: request.memory(),
477 result: request.run().await,
478 tx: completed,
479 }
480 });
481 }
482
483 /// Responsible for spawning parked tasks.
484 fn spawn_parked_tasks(state: &mut TaskManagerState<Req>, max_cpu: u64, max_memory: u64) {
485 if state.parked.is_empty() {
486 return;
487 }
488
489 debug!(
490 "attempting to unpark tasks with {cpu} CPUs and {memory} bytes of memory available",
491 cpu = state.cpu,
492 memory = state.memory,
493 );
494
495 // This algorithm is intended to unpark the greatest number of tasks.
496 //
497 // It first finds the greatest subset of tasks that are constrained by CPU and
498 // then by memory.
499 //
500 // Next it finds the greatest subset of tasks that are constrained by memory and
501 // then by CPU.
502 //
503 // It then unparks whichever subset is greater.
504 //
505 // The process is repeated until both subsets reach zero length.
506 loop {
507 let cpu_by_memory_len = {
508 // Start by finding the longest range in the parked set that could run based on
509 // CPU reservation
510 let range =
511 fit_longest_range(state.parked.make_contiguous(), state.cpu, |(r, ..)| {
512 OrderedFloat(r.cpu())
513 });
514
515 // Next, find the longest subset of that subset that could run based on memory
516 // reservation
517 fit_longest_range(
518 &mut state.parked.make_contiguous()[range],
519 state.memory,
520 |(r, ..)| r.memory(),
521 )
522 .len()
523 };
524
525 // Next, find the longest range in the parked set that could run based on memory
526 // reservation
527 let memory_by_cpu =
528 fit_longest_range(state.parked.make_contiguous(), state.memory, |(r, ..)| {
529 r.memory()
530 });
531
532 // Next, find the longest subset of that subset that could run based on CPU
533 // reservation
534 let memory_by_cpu = fit_longest_range(
535 &mut state.parked.make_contiguous()[memory_by_cpu],
536 state.cpu,
537 |(r, ..)| OrderedFloat(r.cpu()),
538 );
539
540 // If both subsets are empty, break out
541 if cpu_by_memory_len == 0 && memory_by_cpu.is_empty() {
542 break;
543 }
544
545 // Check to see which subset is greater (for equivalence, use the one we don't
546 // need to refit for)
547 let range = if memory_by_cpu.len() >= cpu_by_memory_len {
548 memory_by_cpu
549 } else {
550 // We need to refit because the above calculation of `memory_by_cpu` mutated the
551 // parked list
552 let range =
553 fit_longest_range(state.parked.make_contiguous(), state.cpu, |(r, ..)| {
554 OrderedFloat(r.cpu())
555 });
556
557 fit_longest_range(
558 &mut state.parked.make_contiguous()[range],
559 state.memory,
560 |(r, ..)| r.memory(),
561 )
562 };
563
564 debug!("unparking {len} task(s)", len = range.len());
565
566 assert_eq!(
567 range.start, 0,
568 "expected the fit tasks to be at the front of the queue"
569 );
570 for _ in range {
571 let (request, completed) = state.parked.pop_front().unwrap();
572
573 debug!(
574 "unparking task with reservation of {cpu} CPU(s) and {memory} bytes of memory",
575 cpu = request.cpu(),
576 memory = request.memory(),
577 );
578
579 Self::handle_spawn_request(state, max_cpu, max_memory, request, completed);
580 }
581 }
582 }
583}
584
585/// Determines the longest range in a slice where the sum of the weights of the
586/// elements in the returned range is less than or equal to the supplied total
587/// weight.
588///
589/// The returned range always starts at zero as this algorithm will partially
590/// sort the slice.
591///
592/// Due to the partial sorting, the provided slice will have its elements
593/// rearranged. As the function modifies the slice in-place, this function does
594/// not make any allocations.
595///
596/// # Implementation
597///
598/// This function is implemented using a modified quick sort algorithm as a
599/// solution to the more general "0/1 knapsack" problem where each item has an
600/// equal profit value; this maximizes for the number of items to put
601/// into the knapsack (i.e. longest range that fits).
602///
603/// Using a uniform random pivot point, it partitions the input into two sides:
604/// the left side where all weights are less than the pivot and the right side
605/// where all weights are equal to or greater than the pivot.
606///
607/// It then checks to see if the total weight of the left side is less than or
608/// equal to the total remaining weight; if it is, every element in
609/// the left side is considered as part of the output and it recurses on the
610/// right side.
611///
612/// If the total weight of the left side is greater than the remaining weight
613/// budget, it can completely ignore the right side and instead recurse on the
614/// left side.
615///
616/// The algorithm stops when the partition size reaches zero.
617///
618/// # Panics
619///
620/// Panics if the supplied weight is a negative value.
621fn fit_longest_range<T, F, W>(slice: &mut [T], total_weight: W, mut weight_fn: F) -> Range<usize>
622where
623 F: FnMut(&T) -> W,
624 W: Ord + Add<Output = W> + Sub<Output = W> + Default,
625{
626 /// Partitions the slice so that the weight of every element to the left
627 /// of the pivot is less than the pivot's weight and every element to the
628 /// right of the pivot is greater than or equal to the pivot's weight.
629 ///
630 /// Returns the pivot index, pivot weight, and the sum of the left side
631 /// element's weights.
632 fn partition<T, F, W>(
633 slice: &mut [T],
634 weight_fn: &mut F,
635 mut low: usize,
636 high: usize,
637 ) -> (usize, W, W)
638 where
639 F: FnMut(&T) -> W,
640 W: Ord + Add<Output = W> + Sub<Output = W> + Default,
641 {
642 assert!(low < high);
643
644 // Swap a random element (the pivot) in the remaining range with the high
645 slice.swap(high, rand::random_range(low..high));
646
647 let pivot_weight = weight_fn(&slice[high]);
648 let mut sum_weight = W::default();
649 let range = low..=high;
650 for i in range {
651 let weight = weight_fn(&slice[i]);
652 // If the weight belongs on the left side of the pivot, swap
653 if weight < pivot_weight {
654 slice.swap(i, low);
655 low += 1;
656 sum_weight = sum_weight.add(weight);
657 }
658 }
659
660 slice.swap(low, high);
661 (low, pivot_weight, sum_weight)
662 }
663
664 fn recurse_fit_maximal_range<T, F, W>(
665 slice: &mut [T],
666 mut remaining_weight: W,
667 weight_fn: &mut F,
668 low: usize,
669 high: usize,
670 end: &mut usize,
671 ) where
672 F: FnMut(&T) -> W,
673 W: Ord + Add<Output = W> + Sub<Output = W> + Default,
674 {
675 if low == high {
676 let weight = weight_fn(&slice[low]);
677 if weight <= remaining_weight {
678 *end += 1;
679 }
680
681 return;
682 }
683
684 if low < high {
685 let (pivot, pivot_weight, sum) = partition(slice, weight_fn, low, high);
686 if sum <= remaining_weight {
687 // Everything up to the pivot can be included
688 *end += pivot - low;
689 remaining_weight = remaining_weight.sub(sum);
690
691 // Check to see if the pivot itself can be included
692 if pivot_weight <= remaining_weight {
693 *end += 1;
694 remaining_weight = remaining_weight.sub(pivot_weight);
695 }
696
697 // Recurse on the right side
698 recurse_fit_maximal_range(slice, remaining_weight, weight_fn, pivot + 1, high, end);
699 } else if pivot > 0 {
700 // Otherwise, we can completely disregard the right side (including the pivot)
701 // and recurse on the left
702 recurse_fit_maximal_range(slice, remaining_weight, weight_fn, low, pivot - 1, end);
703 }
704 }
705 }
706
707 assert!(
708 total_weight >= W::default(),
709 "total weight cannot be negative"
710 );
711
712 if slice.is_empty() {
713 return 0..0;
714 }
715
716 let mut end = 0;
717 recurse_fit_maximal_range(
718 slice,
719 total_weight,
720 &mut weight_fn,
721 0,
722 slice.len() - 1, // won't underflow due to empty check
723 &mut end,
724 );
725
726 0..end
727}
728
729#[cfg(test)]
730mod test {
731 use super::*;
732
733 #[test]
734 fn fit_empty_slice() {
735 let r = fit_longest_range(&mut [], 100, |i| *i);
736 assert!(r.is_empty());
737 }
738
739 #[test]
740 #[should_panic(expected = "total weight cannot be negative")]
741 fn fit_negative_panic() {
742 fit_longest_range(&mut [0], -1, |i| *i);
743 }
744
745 #[test]
746 fn no_fit() {
747 let r = fit_longest_range(&mut [100, 101, 102], 99, |i| *i);
748 assert!(r.is_empty());
749 }
750
751 #[test]
752 fn fit_all() {
753 let r = fit_longest_range(&mut [1, 2, 3, 4, 5], 15, |i| *i);
754 assert_eq!(r.len(), 5);
755
756 let r = fit_longest_range(&mut [5, 4, 3, 2, 1], 20, |i| *i);
757 assert_eq!(r.len(), 5);
758 }
759
760 #[test]
761 fn fit_some() {
762 let s = &mut [8, 2, 2, 3, 2, 1, 2, 4, 1];
763 let r = fit_longest_range(s, 10, |i| *i);
764 assert_eq!(r.len(), 6);
765 assert_eq!(s[r.start..r.end].iter().copied().sum::<i32>(), 10);
766 assert!(s[r.end..].contains(&8));
767 assert!(s[r.end..].contains(&4));
768 assert!(s[r.end..].contains(&3));
769 }
770
771 #[test]
772 fn unlimited_state() {
773 let manager_state = TaskManagerState::<()>::new(u64::MAX, u64::MAX);
774 assert!(manager_state.unlimited());
775 }
776}