wdl_engine/backend.rs
1//! Implementation of task execution backends.
2
3use std::collections::HashMap;
4use std::collections::VecDeque;
5use std::future::Future;
6use std::ops::Add;
7use std::ops::Range;
8use std::ops::Sub;
9use std::path::Path;
10use std::path::PathBuf;
11use std::sync::Arc;
12
13use anyhow::Result;
14use anyhow::anyhow;
15use futures::future::BoxFuture;
16use indexmap::IndexMap;
17use ordered_float::OrderedFloat;
18use tokio::sync::mpsc;
19use tokio::sync::oneshot;
20use tokio::sync::oneshot::Receiver;
21use tokio::task::JoinSet;
22use tokio_util::sync::CancellationToken;
23use tracing::debug;
24
25use crate::Input;
26use crate::Value;
27use crate::http::HttpDownloader;
28use crate::path::EvaluationPath;
29
30mod docker;
31mod local;
32mod tes;
33
34pub use docker::*;
35pub use local::*;
36pub use tes::*;
37
38/// The default work directory name.
39pub(crate) const WORK_DIR_NAME: &str = "work";
40
41/// The default command file name.
42pub(crate) const COMMAND_FILE_NAME: &str = "command";
43
44/// The default stdout file name.
45pub(crate) const STDOUT_FILE_NAME: &str = "stdout";
46
47/// The default stderr file name.
48pub(crate) const STDERR_FILE_NAME: &str = "stderr";
49
50/// Represents constraints applied to a task's execution.
51pub struct TaskExecutionConstraints {
52 /// The container the task will run in.
53 ///
54 /// A value of `None` indicates the task will run on the host.
55 pub container: Option<String>,
56 /// The allocated number of CPUs; must be greater than 0.
57 pub cpu: f64,
58 /// The allocated memory in bytes; must be greater than 0.
59 pub memory: i64,
60 /// A list with one specification per allocated GPU.
61 ///
62 /// The specification is execution engine-specific.
63 ///
64 /// If no GPUs were allocated, then the value must be an empty list.
65 pub gpu: Vec<String>,
66 /// A list with one specification per allocated FPGA.
67 ///
68 /// The specification is execution engine-specific.
69 ///
70 /// If no FPGAs were allocated, then the value must be an empty list.
71 pub fpga: Vec<String>,
72 /// A map with one entry for each disk mount point.
73 ///
74 /// The key is the mount point and the value is the initial amount of disk
75 /// space allocated, in bytes.
76 ///
77 /// The execution engine must, at a minimum, provide one entry for each disk
78 /// mount point requested, but may provide more.
79 ///
80 /// The amount of disk space available for a given mount point may increase
81 /// during the lifetime of the task (e.g., autoscaling volumes provided by
82 /// some cloud services).
83 pub disks: IndexMap<String, i64>,
84}
85
86/// Represents information for spawning a task.
87#[derive(Debug)]
88pub struct TaskSpawnInfo {
89 /// The command of the task.
90 command: String,
91 /// The inputs for task.
92 inputs: Vec<Input>,
93 /// The requirements of the task.
94 requirements: Arc<HashMap<String, Value>>,
95 /// The hints of the task.
96 hints: Arc<HashMap<String, Value>>,
97 /// The environment variables of the task.
98 env: Arc<IndexMap<String, String>>,
99}
100
101impl TaskSpawnInfo {
102 /// Constructs a new task spawn information.
103 pub fn new(
104 command: String,
105 inputs: Vec<Input>,
106 requirements: Arc<HashMap<String, Value>>,
107 hints: Arc<HashMap<String, Value>>,
108 env: Arc<IndexMap<String, String>>,
109 ) -> Self {
110 Self {
111 command,
112 inputs,
113 requirements,
114 hints,
115 env,
116 }
117 }
118}
119
120/// Represents a request to spawn a task.
121#[derive(Debug)]
122pub struct TaskSpawnRequest {
123 /// The id of the task being spawned.
124 id: String,
125 /// The information for the task to spawn.
126 info: TaskSpawnInfo,
127 /// The attempt number for the spawn request.
128 attempt: u64,
129 /// The attempt directory for the task's execution.
130 attempt_dir: PathBuf,
131}
132
133impl TaskSpawnRequest {
134 /// Creates a new task spawn request.
135 pub fn new(id: String, info: TaskSpawnInfo, attempt: u64, attempt_dir: PathBuf) -> Self {
136 Self {
137 id,
138 info,
139 attempt,
140 attempt_dir,
141 }
142 }
143
144 /// The identifier of the task being spawned.
145 pub fn id(&self) -> &str {
146 &self.id
147 }
148
149 /// Gets the command for the task.
150 pub fn command(&self) -> &str {
151 &self.info.command
152 }
153
154 /// Gets the inputs for the task.
155 pub fn inputs(&self) -> &[Input] {
156 &self.info.inputs
157 }
158
159 /// Gets the requirements of the task.
160 pub fn requirements(&self) -> &HashMap<String, Value> {
161 &self.info.requirements
162 }
163
164 /// Gets the hints of the task.
165 pub fn hints(&self) -> &HashMap<String, Value> {
166 &self.info.hints
167 }
168
169 /// Gets the environment variables of the task.
170 pub fn env(&self) -> &IndexMap<String, String> {
171 &self.info.env
172 }
173
174 /// Gets the attempt number for the task's execution.
175 ///
176 /// The attempt number starts at 0.
177 pub fn attempt(&self) -> u64 {
178 self.attempt
179 }
180
181 /// Gets the attempt directory for the task's execution.
182 pub fn attempt_dir(&self) -> &Path {
183 &self.attempt_dir
184 }
185}
186
187/// Represents the result of a task's execution.
188#[derive(Debug)]
189pub struct TaskExecutionResult {
190 /// The inputs that were given to the task.
191 pub inputs: Vec<Input>,
192 /// Stores the task process exit code.
193 pub exit_code: i32,
194 /// The task's working directory.
195 pub work_dir: EvaluationPath,
196 /// The value of the task's stdout file.
197 pub stdout: Value,
198 /// The value of the task's stderr file.
199 pub stderr: Value,
200}
201
202/// Represents events that can be awaited on during task execution.
203pub struct TaskExecutionEvents {
204 /// The event for when the task has spawned and is currently executing.
205 pub spawned: Receiver<()>,
206 /// The event for when the task has completed.
207 ///
208 /// Returns the execution result.
209 pub completed: Receiver<Result<TaskExecutionResult>>,
210}
211
212/// Represents a task execution backend.
213pub trait TaskExecutionBackend: Send + Sync {
214 /// Gets the maximum concurrent tasks supported by the backend.
215 fn max_concurrency(&self) -> u64;
216
217 /// Gets the execution constraints given a task's requirements and hints.
218 ///
219 /// Returns an error if the task cannot be constrained for the execution
220 /// environment or if the task specifies invalid requirements.
221 fn constraints(
222 &self,
223 requirements: &HashMap<String, Value>,
224 hints: &HashMap<String, Value>,
225 ) -> Result<TaskExecutionConstraints>;
226
227 /// Gets the guest path the task working directory (e.g. `/mnt/work`).
228 ///
229 /// Returns `None` if the task execution does not use a container.
230 fn guest_work_dir(&self) -> Option<&Path>;
231
232 /// Localizes the given set of inputs for the backend.
233 ///
234 /// This may involve downloading remote inputs to the host and updating the
235 /// input's guest paths.
236 fn localize_inputs<'a, 'b, 'c, 'd>(
237 &'a self,
238 downloader: &'b HttpDownloader,
239 inputs: &'c mut [Input],
240 ) -> BoxFuture<'d, Result<()>>
241 where
242 'a: 'd,
243 'b: 'd,
244 'c: 'd,
245 Self: 'd;
246
247 /// Spawns a task with the execution backend.
248 ///
249 /// Returns the task execution event receives upon success.
250 fn spawn(
251 &self,
252 request: TaskSpawnRequest,
253 token: CancellationToken,
254 ) -> Result<TaskExecutionEvents>;
255
256 /// Performs cleanup operations after top-level workflow or task evaluation
257 /// completes.
258 ///
259 /// Returns `None` if no cleanup is required.
260 fn cleanup<'a, 'b, 'c>(
261 &'a self,
262 _output_dir: &'b Path,
263 _token: CancellationToken,
264 ) -> Option<BoxFuture<'c, ()>>
265 where
266 'a: 'c,
267 'b: 'c,
268 Self: 'c,
269 {
270 None
271 }
272}
273
274/// A trait implemented by backend requests.
275trait TaskManagerRequest: Send + Sync + 'static {
276 /// Gets the requested CPU allocation from the request.
277 fn cpu(&self) -> f64;
278
279 /// Gets the requested memory allocation from the request, in bytes.
280 fn memory(&self) -> u64;
281
282 /// Runs the request.
283 fn run(
284 self,
285 spawned: oneshot::Sender<()>,
286 ) -> impl Future<Output = Result<TaskExecutionResult>> + Send;
287}
288
289/// Represents a response internal to the task manager.
290struct TaskManagerResponse {
291 /// The previous CPU allocation from the request.
292 cpu: f64,
293 /// The previous memory allocation from the request.
294 memory: u64,
295 /// The result of the task's execution.
296 result: Result<TaskExecutionResult>,
297 /// The channel to send the task's execution result back on.
298 tx: oneshot::Sender<Result<TaskExecutionResult>>,
299}
300
301/// Represents state used by the task manager.
302struct TaskManagerState<Req> {
303 /// The amount of available CPU remaining.
304 cpu: OrderedFloat<f64>,
305 /// The amount of available memory remaining, in bytes.
306 memory: u64,
307 /// The set of spawned tasks.
308 spawned: JoinSet<TaskManagerResponse>,
309 /// The queue of parked spawn requests.
310 parked: VecDeque<(
311 Req,
312 oneshot::Sender<()>,
313 oneshot::Sender<Result<TaskExecutionResult>>,
314 )>,
315}
316
317impl<Req> TaskManagerState<Req> {
318 /// Constructs a new task manager state with the given total CPU and memory.
319 fn new(cpu: u64, memory: u64) -> Self {
320 Self {
321 cpu: OrderedFloat(cpu as f64),
322 memory,
323 spawned: Default::default(),
324 parked: Default::default(),
325 }
326 }
327
328 /// Determines if the resources are unlimited.
329 fn unlimited(&self) -> bool {
330 self.cpu == u64::MAX as f64 && self.memory == u64::MAX
331 }
332}
333
334/// Responsible for managing tasks based on available host resources.
335struct TaskManager<Req> {
336 /// The sender for new spawn requests.
337 tx: mpsc::UnboundedSender<(
338 Req,
339 oneshot::Sender<()>,
340 oneshot::Sender<Result<TaskExecutionResult>>,
341 )>,
342}
343
344impl<Req> TaskManager<Req>
345where
346 Req: TaskManagerRequest,
347{
348 /// Constructs a new task manager with the given total CPU, maximum CPU per
349 /// request, total memory, and maximum memory per request.
350 fn new(cpu: u64, max_cpu: u64, memory: u64, max_memory: u64) -> Self {
351 let (tx, rx) = mpsc::unbounded_channel();
352
353 tokio::spawn(async move {
354 Self::run_request_queue(rx, cpu, max_cpu, memory, max_memory).await;
355 });
356
357 Self { tx }
358 }
359
360 /// Constructs a new task manager that does not limit requests based on
361 /// available resources.
362 fn new_unlimited(max_cpu: u64, max_memory: u64) -> Self {
363 Self::new(u64::MAX, max_cpu, u64::MAX, max_memory)
364 }
365
366 /// Sends a request to the task manager's queue.
367 fn send(
368 &self,
369 request: Req,
370 spawned: oneshot::Sender<()>,
371 completed: oneshot::Sender<Result<TaskExecutionResult>>,
372 ) {
373 self.tx.send((request, spawned, completed)).ok();
374 }
375
376 /// Runs the request queue.
377 async fn run_request_queue(
378 mut rx: mpsc::UnboundedReceiver<(
379 Req,
380 oneshot::Sender<()>,
381 oneshot::Sender<Result<TaskExecutionResult>>,
382 )>,
383 cpu: u64,
384 max_cpu: u64,
385 memory: u64,
386 max_memory: u64,
387 ) {
388 let mut state = TaskManagerState::new(cpu, memory);
389
390 loop {
391 // If there aren't any spawned tasks, wait for a spawn request only
392 if state.spawned.is_empty() {
393 assert!(
394 state.parked.is_empty(),
395 "there can't be any parked requests if there are no spawned tasks"
396 );
397 match rx.recv().await {
398 Some((req, spawned, completed)) => {
399 Self::handle_spawn_request(
400 &mut state, max_cpu, max_memory, req, spawned, completed,
401 );
402 continue;
403 }
404 None => break,
405 }
406 }
407
408 // Otherwise, wait for a spawn request or a completed task
409 tokio::select! {
410 request = rx.recv() => {
411 match request {
412 Some((req, spawned, completed)) => {
413 Self::handle_spawn_request(&mut state, max_cpu, max_memory, req, spawned, completed);
414 }
415 None => break,
416 }
417 }
418 Some(Ok(response)) = state.spawned.join_next() => {
419 if !state.unlimited() {
420 state.cpu += response.cpu;
421 state.memory += response.memory;
422 }
423
424 response.tx.send(response.result).ok();
425 Self::spawn_parked_tasks(&mut state, max_cpu, max_memory);
426 }
427 }
428 }
429 }
430
431 /// Handles a spawn request by either parking it (not enough resources
432 /// currently available) or by spawning it.
433 fn handle_spawn_request(
434 state: &mut TaskManagerState<Req>,
435 max_cpu: u64,
436 max_memory: u64,
437 request: Req,
438 spawned: oneshot::Sender<()>,
439 completed: oneshot::Sender<Result<TaskExecutionResult>>,
440 ) {
441 // Ensure the request does not exceed the maximum CPU
442 let cpu = request.cpu();
443 if cpu > max_cpu as f64 {
444 completed
445 .send(Err(anyhow!(
446 "requested task CPU count of {cpu} exceeds the maximum CPU count of {max_cpu}",
447 )))
448 .ok();
449 return;
450 }
451
452 // Ensure the request does not exceed the maximum memory
453 let memory = request.memory();
454 if memory > max_memory {
455 completed
456 .send(Err(anyhow!(
457 "requested task memory of {memory} byte{s} exceeds the maximum memory of \
458 {max_memory}",
459 s = if memory == 1 { "" } else { "s" }
460 )))
461 .ok();
462 return;
463 }
464
465 if !state.unlimited() {
466 // If the request can't be processed due to resource constraints, park the
467 // request for now. When a task completes and resources become available,
468 // we'll unpark the request
469 if cpu > state.cpu.into() || memory > state.memory {
470 debug!(
471 "parking task due to insufficient resources: task reserves {cpu} CPU(s) and \
472 {memory} bytes of memory but there are only {cpu_remaining} CPU(s) and \
473 {memory_remaining} bytes of memory available",
474 cpu_remaining = state.cpu,
475 memory_remaining = state.memory
476 );
477 state.parked.push_back((request, spawned, completed));
478 return;
479 }
480
481 // Decrement the resource counts and spawn the task
482 state.cpu -= cpu;
483 state.memory -= memory;
484 debug!(
485 "spawning task with {cpu} CPUs and {memory} bytes of memory remaining",
486 cpu = state.cpu,
487 memory = state.memory
488 );
489 }
490
491 state.spawned.spawn(async move {
492 TaskManagerResponse {
493 cpu: request.cpu(),
494 memory: request.memory(),
495 result: request.run(spawned).await,
496 tx: completed,
497 }
498 });
499 }
500
501 /// Responsible for spawning parked tasks.
502 fn spawn_parked_tasks(state: &mut TaskManagerState<Req>, max_cpu: u64, max_memory: u64) {
503 if state.parked.is_empty() {
504 return;
505 }
506
507 debug!(
508 "attempting to unpark tasks with {cpu} CPUs and {memory} bytes of memory available",
509 cpu = state.cpu,
510 memory = state.memory,
511 );
512
513 // This algorithm is intended to unpark the greatest number of tasks.
514 //
515 // It first finds the greatest subset of tasks that are constrained by CPU and
516 // then by memory.
517 //
518 // Next it finds the greatest subset of tasks that are constrained by memory and
519 // then by CPU.
520 //
521 // It then unparks whichever subset is greater.
522 //
523 // The process is repeated until both subsets reach zero length.
524 loop {
525 let cpu_by_memory_len = {
526 // Start by finding the longest range in the parked set that could run based on
527 // CPU reservation
528 let range =
529 fit_longest_range(state.parked.make_contiguous(), state.cpu, |(r, ..)| {
530 OrderedFloat(r.cpu())
531 });
532
533 // Next, find the longest subset of that subset that could run based on memory
534 // reservation
535 fit_longest_range(
536 &mut state.parked.make_contiguous()[range],
537 state.memory,
538 |(r, ..)| r.memory(),
539 )
540 .len()
541 };
542
543 // Next, find the longest range in the parked set that could run based on memory
544 // reservation
545 let memory_by_cpu =
546 fit_longest_range(state.parked.make_contiguous(), state.memory, |(r, ..)| {
547 r.memory()
548 });
549
550 // Next, find the longest subset of that subset that could run based on CPU
551 // reservation
552 let memory_by_cpu = fit_longest_range(
553 &mut state.parked.make_contiguous()[memory_by_cpu],
554 state.cpu,
555 |(r, ..)| OrderedFloat(r.cpu()),
556 );
557
558 // If both subsets are empty, break out
559 if cpu_by_memory_len == 0 && memory_by_cpu.is_empty() {
560 break;
561 }
562
563 // Check to see which subset is greater (for equivalence, use the one we don't
564 // need to refit for)
565 let range = if memory_by_cpu.len() >= cpu_by_memory_len {
566 memory_by_cpu
567 } else {
568 // We need to refit because the above calculation of `memory_by_cpu` mutated the
569 // parked list
570 let range =
571 fit_longest_range(state.parked.make_contiguous(), state.cpu, |(r, ..)| {
572 OrderedFloat(r.cpu())
573 });
574
575 fit_longest_range(
576 &mut state.parked.make_contiguous()[range],
577 state.memory,
578 |(r, ..)| r.memory(),
579 )
580 };
581
582 debug!("unparking {len} task(s)", len = range.len());
583
584 assert_eq!(
585 range.start, 0,
586 "expected the fit tasks to be at the front of the queue"
587 );
588 for _ in range {
589 let (request, spawned, completed) = state.parked.pop_front().unwrap();
590
591 debug!(
592 "unparking task with reservation of {cpu} CPU(s) and {memory} bytes of memory",
593 cpu = request.cpu(),
594 memory = request.memory(),
595 );
596
597 Self::handle_spawn_request(state, max_cpu, max_memory, request, spawned, completed);
598 }
599 }
600 }
601}
602
603/// Determines the longest range in a slice where the sum of the weights of the
604/// elements in the returned range is less than or equal to the supplied total
605/// weight.
606///
607/// The returned range always starts at zero as this algorithm will partially
608/// sort the slice.
609///
610/// Due to the partial sorting, the provided slice will have its elements
611/// rearranged. As the function modifies the slice in-place, this function does
612/// not make any allocations.
613///
614/// # Implementation
615///
616/// This function is implemented using a modified quick sort algorithm as a
617/// solution to the more general "0/1 knapsack" problem where each item has an
618/// equal profit value; this maximizes for the number of items to put
619/// into the knapsack (i.e. longest range that fits).
620///
621/// Using a uniform random pivot point, it partitions the input into two sides:
622/// the left side where all weights are less than the pivot and the right side
623/// where all weights are equal to or greater than the pivot.
624///
625/// It then checks to see if the total weight of the left side is less than or
626/// equal to the total remaining weight; if it is, every element in
627/// the left side is considered as part of the output and it recurses on the
628/// right side.
629///
630/// If the total weight of the left side is greater than the remaining weight
631/// budget, it can completely ignore the right side and instead recurse on the
632/// left side.
633///
634/// The algorithm stops when the partition size reaches zero.
635///
636/// # Panics
637///
638/// Panics if the supplied weight is a negative value.
639fn fit_longest_range<T, F, W>(slice: &mut [T], total_weight: W, mut weight_fn: F) -> Range<usize>
640where
641 F: FnMut(&T) -> W,
642 W: Ord + Add<Output = W> + Sub<Output = W> + Default,
643{
644 /// Partitions the slice so that the weight of every element to the left
645 /// of the pivot is less than the pivot's weight and every element to the
646 /// right of the pivot is greater than or equal to the pivot's weight.
647 ///
648 /// Returns the pivot index, pivot weight, and the sum of the left side
649 /// element's weights.
650 fn partition<T, F, W>(
651 slice: &mut [T],
652 weight_fn: &mut F,
653 mut low: usize,
654 high: usize,
655 ) -> (usize, W, W)
656 where
657 F: FnMut(&T) -> W,
658 W: Ord + Add<Output = W> + Sub<Output = W> + Default,
659 {
660 assert!(low < high);
661
662 // Swap a random element (the pivot) in the remaining range with the high
663 slice.swap(high, rand::random_range(low..high));
664
665 let pivot_weight = weight_fn(&slice[high]);
666 let mut sum_weight = W::default();
667 let range = low..=high;
668 for i in range {
669 let weight = weight_fn(&slice[i]);
670 // If the weight belongs on the left side of the pivot, swap
671 if weight < pivot_weight {
672 slice.swap(i, low);
673 low += 1;
674 sum_weight = sum_weight.add(weight);
675 }
676 }
677
678 slice.swap(low, high);
679 (low, pivot_weight, sum_weight)
680 }
681
682 fn recurse_fit_maximal_range<T, F, W>(
683 slice: &mut [T],
684 mut remaining_weight: W,
685 weight_fn: &mut F,
686 low: usize,
687 high: usize,
688 end: &mut usize,
689 ) where
690 F: FnMut(&T) -> W,
691 W: Ord + Add<Output = W> + Sub<Output = W> + Default,
692 {
693 if low == high {
694 let weight = weight_fn(&slice[low]);
695 if weight <= remaining_weight {
696 *end += 1;
697 }
698
699 return;
700 }
701
702 if low < high {
703 let (pivot, pivot_weight, sum) = partition(slice, weight_fn, low, high);
704 if sum <= remaining_weight {
705 // Everything up to the pivot can be included
706 *end += pivot - low;
707 remaining_weight = remaining_weight.sub(sum);
708
709 // Check to see if the pivot itself can be included
710 if pivot_weight <= remaining_weight {
711 *end += 1;
712 remaining_weight = remaining_weight.sub(pivot_weight);
713 }
714
715 // Recurse on the right side
716 recurse_fit_maximal_range(slice, remaining_weight, weight_fn, pivot + 1, high, end);
717 } else if pivot > 0 {
718 // Otherwise, we can completely disregard the right side (including the pivot)
719 // and recurse on the left
720 recurse_fit_maximal_range(slice, remaining_weight, weight_fn, low, pivot - 1, end);
721 }
722 }
723 }
724
725 assert!(
726 total_weight >= W::default(),
727 "total weight cannot be negative"
728 );
729
730 if slice.is_empty() {
731 return 0..0;
732 }
733
734 let mut end = 0;
735 recurse_fit_maximal_range(
736 slice,
737 total_weight,
738 &mut weight_fn,
739 0,
740 slice.len() - 1, // won't underflow due to empty check
741 &mut end,
742 );
743
744 0..end
745}
746
747#[cfg(test)]
748mod test {
749 use super::*;
750
751 #[test]
752 fn fit_empty_slice() {
753 let r = fit_longest_range(&mut [], 100, |i| *i);
754 assert!(r.is_empty());
755 }
756
757 #[test]
758 #[should_panic(expected = "total weight cannot be negative")]
759 fn fit_negative_panic() {
760 fit_longest_range(&mut [0], -1, |i| *i);
761 }
762
763 #[test]
764 fn no_fit() {
765 let r = fit_longest_range(&mut [100, 101, 102], 99, |i| *i);
766 assert!(r.is_empty());
767 }
768
769 #[test]
770 fn fit_all() {
771 let r = fit_longest_range(&mut [1, 2, 3, 4, 5], 15, |i| *i);
772 assert_eq!(r.len(), 5);
773
774 let r = fit_longest_range(&mut [5, 4, 3, 2, 1], 20, |i| *i);
775 assert_eq!(r.len(), 5);
776 }
777
778 #[test]
779 fn fit_some() {
780 let s = &mut [8, 2, 2, 3, 2, 1, 2, 4, 1];
781 let r = fit_longest_range(s, 10, |i| *i);
782 assert_eq!(r.len(), 6);
783 assert_eq!(s[r.start..r.end].iter().copied().sum::<i32>(), 10);
784 assert!(s[r.end..].contains(&8));
785 assert!(s[r.end..].contains(&4));
786 assert!(s[r.end..].contains(&3));
787 }
788
789 #[test]
790 fn unlimited_state() {
791 let manager_state = TaskManagerState::<()>::new(u64::MAX, u64::MAX);
792 assert!(manager_state.unlimited());
793 }
794}