wdl_engine/
backend.rs

1//! Implementation of task execution backends.
2
3use std::collections::HashMap;
4use std::collections::VecDeque;
5use std::fmt;
6use std::future::Future;
7use std::ops::Add;
8use std::ops::Range;
9use std::ops::Sub;
10use std::path::Path;
11use std::path::PathBuf;
12use std::sync::Arc;
13
14use anyhow::Result;
15use anyhow::anyhow;
16use futures::future::BoxFuture;
17use indexmap::IndexMap;
18use ordered_float::OrderedFloat;
19use tokio::sync::mpsc;
20use tokio::sync::oneshot;
21use tokio::sync::oneshot::Receiver;
22use tokio::task::JoinSet;
23use tokio_util::sync::CancellationToken;
24use tracing::debug;
25
26use crate::Input;
27use crate::Value;
28use crate::http::Transferer;
29use crate::path::EvaluationPath;
30
31mod docker;
32mod local;
33mod tes;
34
35pub use docker::*;
36pub use local::*;
37pub use tes::*;
38
39/// The default work directory name.
40pub(crate) const WORK_DIR_NAME: &str = "work";
41
42/// The default command file name.
43pub(crate) const COMMAND_FILE_NAME: &str = "command";
44
45/// The default stdout file name.
46pub(crate) const STDOUT_FILE_NAME: &str = "stdout";
47
48/// The default stderr file name.
49pub(crate) const STDERR_FILE_NAME: &str = "stderr";
50
51/// The number of initial expected task names.
52///
53/// This controls the initial size of the bloom filter and how many names are
54/// prepopulated into a name generator.
55const INITIAL_EXPECTED_NAMES: usize = 1000;
56
57/// Represents constraints applied to a task's execution.
58pub struct TaskExecutionConstraints {
59    /// The container the task will run in.
60    ///
61    /// A value of `None` indicates the task will run on the host.
62    pub container: Option<String>,
63    /// The allocated number of CPUs; must be greater than 0.
64    pub cpu: f64,
65    /// The allocated memory in bytes; must be greater than 0.
66    pub memory: i64,
67    /// A list with one specification per allocated GPU.
68    ///
69    /// The specification is execution engine-specific.
70    ///
71    /// If no GPUs were allocated, then the value must be an empty list.
72    pub gpu: Vec<String>,
73    /// A list with one specification per allocated FPGA.
74    ///
75    /// The specification is execution engine-specific.
76    ///
77    /// If no FPGAs were allocated, then the value must be an empty list.
78    pub fpga: Vec<String>,
79    /// A map with one entry for each disk mount point.
80    ///
81    /// The key is the mount point and the value is the initial amount of disk
82    /// space allocated, in bytes.
83    ///
84    /// The execution engine must, at a minimum, provide one entry for each disk
85    /// mount point requested, but may provide more.
86    ///
87    /// The amount of disk space available for a given mount point may increase
88    /// during the lifetime of the task (e.g., autoscaling volumes provided by
89    /// some cloud services).
90    pub disks: IndexMap<String, i64>,
91}
92
93/// Represents information for spawning a task.
94pub struct TaskSpawnInfo {
95    /// The command of the task.
96    command: String,
97    /// The inputs for task.
98    inputs: Vec<Input>,
99    /// The requirements of the task.
100    requirements: Arc<HashMap<String, Value>>,
101    /// The hints of the task.
102    hints: Arc<HashMap<String, Value>>,
103    /// The environment variables of the task.
104    env: Arc<IndexMap<String, String>>,
105    /// The transferer to use for uploading inputs.
106    transferer: Arc<dyn Transferer>,
107}
108
109impl TaskSpawnInfo {
110    /// Constructs a new task spawn information.
111    pub fn new(
112        command: String,
113        inputs: Vec<Input>,
114        requirements: Arc<HashMap<String, Value>>,
115        hints: Arc<HashMap<String, Value>>,
116        env: Arc<IndexMap<String, String>>,
117        transferer: Arc<dyn Transferer>,
118    ) -> Self {
119        Self {
120            command,
121            inputs,
122            requirements,
123            hints,
124            env,
125            transferer,
126        }
127    }
128}
129
130impl fmt::Debug for TaskSpawnInfo {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        f.debug_struct("TaskSpawnInfo")
133            .field("command", &self.command)
134            .field("inputs", &self.inputs)
135            .field("requirements", &self.requirements)
136            .field("hints", &self.hints)
137            .field("env", &self.env)
138            .field("transferer", &"<transferer>")
139            .finish()
140    }
141}
142
143/// Represents a request to spawn a task.
144#[derive(Debug)]
145pub struct TaskSpawnRequest {
146    /// The id of the task being spawned.
147    id: String,
148    /// The information for the task to spawn.
149    info: TaskSpawnInfo,
150    /// The attempt number for the spawn request.
151    attempt: u64,
152    /// The attempt directory for the task's execution.
153    attempt_dir: PathBuf,
154}
155
156impl TaskSpawnRequest {
157    /// Creates a new task spawn request.
158    pub fn new(id: String, info: TaskSpawnInfo, attempt: u64, attempt_dir: PathBuf) -> Self {
159        Self {
160            id,
161            info,
162            attempt,
163            attempt_dir,
164        }
165    }
166
167    /// The identifier of the task being spawned.
168    pub fn id(&self) -> &str {
169        &self.id
170    }
171
172    /// Gets the command for the task.
173    pub fn command(&self) -> &str {
174        &self.info.command
175    }
176
177    /// Gets the inputs for the task.
178    pub fn inputs(&self) -> &[Input] {
179        &self.info.inputs
180    }
181
182    /// Gets the requirements of the task.
183    pub fn requirements(&self) -> &HashMap<String, Value> {
184        &self.info.requirements
185    }
186
187    /// Gets the hints of the task.
188    pub fn hints(&self) -> &HashMap<String, Value> {
189        &self.info.hints
190    }
191
192    /// Gets the environment variables of the task.
193    pub fn env(&self) -> &IndexMap<String, String> {
194        &self.info.env
195    }
196
197    /// Gets the transferer to use for uploading inputs.
198    pub fn transferer(&self) -> &Arc<dyn Transferer> {
199        &self.info.transferer
200    }
201
202    /// Gets the attempt number for the task's execution.
203    ///
204    /// The attempt number starts at 0.
205    pub fn attempt(&self) -> u64 {
206        self.attempt
207    }
208
209    /// Gets the attempt directory for the task's execution.
210    pub fn attempt_dir(&self) -> &Path {
211        &self.attempt_dir
212    }
213}
214
215/// Represents the result of a task's execution.
216#[derive(Debug)]
217pub struct TaskExecutionResult {
218    /// Stores the task process exit code.
219    pub exit_code: i32,
220    /// The task's working directory.
221    pub work_dir: EvaluationPath,
222    /// The value of the task's stdout file.
223    pub stdout: Value,
224    /// The value of the task's stderr file.
225    pub stderr: Value,
226}
227
228/// Represents a task execution backend.
229pub trait TaskExecutionBackend: Send + Sync {
230    /// Gets the maximum concurrent tasks supported by the backend.
231    fn max_concurrency(&self) -> u64;
232
233    /// Gets the execution constraints given a task's requirements and hints.
234    ///
235    /// Returns an error if the task cannot be constrained for the execution
236    /// environment or if the task specifies invalid requirements.
237    fn constraints(
238        &self,
239        requirements: &HashMap<String, Value>,
240        hints: &HashMap<String, Value>,
241    ) -> Result<TaskExecutionConstraints>;
242
243    /// Gets the guest (container) inputs directory of the backend.
244    ///
245    /// Returns `None` if the backend does not execute tasks in a container.
246    ///
247    /// The returned path is expected to be Unix style and end with a backslash.
248    fn guest_inputs_dir(&self) -> Option<&'static str>;
249
250    /// Determines if the backend needs local inputs.
251    ///
252    /// Backends that run tasks locally or from a shared file system will return
253    /// `true`.
254    fn needs_local_inputs(&self) -> bool;
255
256    /// Spawns a task with the execution backend.
257    ///
258    /// Returns a oneshot receiver for awaiting the completion of the task.
259    fn spawn(
260        &self,
261        request: TaskSpawnRequest,
262        token: CancellationToken,
263    ) -> Result<Receiver<Result<TaskExecutionResult>>>;
264
265    /// Performs cleanup operations after top-level workflow or task evaluation
266    /// completes.
267    ///
268    /// Returns `None` if no cleanup is required.
269    fn cleanup<'a, 'b, 'c>(
270        &'a self,
271        _output_dir: &'b Path,
272        _token: CancellationToken,
273    ) -> Option<BoxFuture<'c, ()>>
274    where
275        'a: 'c,
276        'b: 'c,
277        Self: 'c,
278    {
279        None
280    }
281}
282
283/// A trait implemented by backend requests.
284trait TaskManagerRequest: Send + Sync + 'static {
285    /// Gets the requested CPU allocation from the request.
286    fn cpu(&self) -> f64;
287
288    /// Gets the requested memory allocation from the request, in bytes.
289    fn memory(&self) -> u64;
290
291    /// Runs the request.
292    fn run(self) -> impl Future<Output = Result<TaskExecutionResult>> + Send;
293}
294
295/// Represents a response internal to the task manager.
296struct TaskManagerResponse {
297    /// The previous CPU allocation from the request.
298    cpu: f64,
299    /// The previous memory allocation from the request.
300    memory: u64,
301    /// The result of the task's execution.
302    result: Result<TaskExecutionResult>,
303    /// The channel to send the task's execution result back on.
304    tx: oneshot::Sender<Result<TaskExecutionResult>>,
305}
306
307/// Represents state used by the task manager.
308struct TaskManagerState<Req> {
309    /// The amount of available CPU remaining.
310    cpu: OrderedFloat<f64>,
311    /// The amount of available memory remaining, in bytes.
312    memory: u64,
313    /// The set of spawned tasks.
314    spawned: JoinSet<TaskManagerResponse>,
315    /// The queue of parked spawn requests.
316    parked: VecDeque<(Req, oneshot::Sender<Result<TaskExecutionResult>>)>,
317}
318
319impl<Req> TaskManagerState<Req> {
320    /// Constructs a new task manager state with the given total CPU and memory.
321    fn new(cpu: u64, memory: u64) -> Self {
322        Self {
323            cpu: OrderedFloat(cpu as f64),
324            memory,
325            spawned: Default::default(),
326            parked: Default::default(),
327        }
328    }
329
330    /// Determines if the resources are unlimited.
331    fn unlimited(&self) -> bool {
332        self.cpu == u64::MAX as f64 && self.memory == u64::MAX
333    }
334}
335
336/// Responsible for managing tasks based on available host resources.
337struct TaskManager<Req> {
338    /// The sender for new spawn requests.
339    tx: mpsc::UnboundedSender<(Req, oneshot::Sender<Result<TaskExecutionResult>>)>,
340}
341
342impl<Req> TaskManager<Req>
343where
344    Req: TaskManagerRequest,
345{
346    /// Constructs a new task manager with the given total CPU, maximum CPU per
347    /// request, total memory, and maximum memory per request.
348    fn new(cpu: u64, max_cpu: u64, memory: u64, max_memory: u64) -> Self {
349        let (tx, rx) = mpsc::unbounded_channel();
350
351        tokio::spawn(async move {
352            Self::run_request_queue(rx, cpu, max_cpu, memory, max_memory).await;
353        });
354
355        Self { tx }
356    }
357
358    /// Constructs a new task manager that does not limit requests based on
359    /// available resources.
360    fn new_unlimited(max_cpu: u64, max_memory: u64) -> Self {
361        Self::new(u64::MAX, max_cpu, u64::MAX, max_memory)
362    }
363
364    /// Sends a request to the task manager's queue.
365    fn send(&self, request: Req, completed: oneshot::Sender<Result<TaskExecutionResult>>) {
366        self.tx.send((request, completed)).ok();
367    }
368
369    /// Runs the request queue.
370    async fn run_request_queue(
371        mut rx: mpsc::UnboundedReceiver<(Req, oneshot::Sender<Result<TaskExecutionResult>>)>,
372        cpu: u64,
373        max_cpu: u64,
374        memory: u64,
375        max_memory: u64,
376    ) {
377        let mut state = TaskManagerState::new(cpu, memory);
378
379        loop {
380            // If there aren't any spawned tasks, wait for a spawn request only
381            if state.spawned.is_empty() {
382                assert!(
383                    state.parked.is_empty(),
384                    "there can't be any parked requests if there are no spawned tasks"
385                );
386                match rx.recv().await {
387                    Some((req, completed)) => {
388                        Self::handle_spawn_request(&mut state, max_cpu, max_memory, req, completed);
389                        continue;
390                    }
391                    None => break,
392                }
393            }
394
395            // Otherwise, wait for a spawn request or a completed task
396            tokio::select! {
397                request = rx.recv() => {
398                    match request {
399                        Some((req, completed)) => {
400                            Self::handle_spawn_request(&mut state, max_cpu, max_memory, req, completed);
401                        }
402                        None => break,
403                    }
404                }
405                Some(Ok(response)) = state.spawned.join_next() => {
406                    if !state.unlimited() {
407                        state.cpu += response.cpu;
408                        state.memory += response.memory;
409                    }
410
411                    response.tx.send(response.result).ok();
412                    Self::spawn_parked_tasks(&mut state, max_cpu, max_memory);
413                }
414            }
415        }
416    }
417
418    /// Handles a spawn request by either parking it (not enough resources
419    /// currently available) or by spawning it.
420    fn handle_spawn_request(
421        state: &mut TaskManagerState<Req>,
422        max_cpu: u64,
423        max_memory: u64,
424        request: Req,
425        completed: oneshot::Sender<Result<TaskExecutionResult>>,
426    ) {
427        // Ensure the request does not exceed the maximum CPU
428        let cpu = request.cpu();
429        if cpu > max_cpu as f64 {
430            completed
431                .send(Err(anyhow!(
432                    "requested task CPU count of {cpu} exceeds the maximum CPU count of {max_cpu}",
433                )))
434                .ok();
435            return;
436        }
437
438        // Ensure the request does not exceed the maximum memory
439        let memory = request.memory();
440        if memory > max_memory {
441            completed
442                .send(Err(anyhow!(
443                    "requested task memory of {memory} byte{s} exceeds the maximum memory of \
444                     {max_memory}",
445                    s = if memory == 1 { "" } else { "s" }
446                )))
447                .ok();
448            return;
449        }
450
451        if !state.unlimited() {
452            // If the request can't be processed due to resource constraints, park the
453            // request for now. When a task completes and resources become available,
454            // we'll unpark the request
455            if cpu > state.cpu.into() || memory > state.memory {
456                debug!(
457                    "parking task due to insufficient resources: task reserves {cpu} CPU(s) and \
458                     {memory} bytes of memory but there are only {cpu_remaining} CPU(s) and \
459                     {memory_remaining} bytes of memory available",
460                    cpu_remaining = state.cpu,
461                    memory_remaining = state.memory
462                );
463                state.parked.push_back((request, completed));
464                return;
465            }
466
467            // Decrement the resource counts and spawn the task
468            state.cpu -= cpu;
469            state.memory -= memory;
470            debug!(
471                "spawning task with {cpu} CPUs and {memory} bytes of memory remaining",
472                cpu = state.cpu,
473                memory = state.memory
474            );
475        }
476
477        state.spawned.spawn(async move {
478            TaskManagerResponse {
479                cpu: request.cpu(),
480                memory: request.memory(),
481                result: request.run().await,
482                tx: completed,
483            }
484        });
485    }
486
487    /// Responsible for spawning parked tasks.
488    fn spawn_parked_tasks(state: &mut TaskManagerState<Req>, max_cpu: u64, max_memory: u64) {
489        if state.parked.is_empty() {
490            return;
491        }
492
493        debug!(
494            "attempting to unpark tasks with {cpu} CPUs and {memory} bytes of memory available",
495            cpu = state.cpu,
496            memory = state.memory,
497        );
498
499        // This algorithm is intended to unpark the greatest number of tasks.
500        //
501        // It first finds the greatest subset of tasks that are constrained by CPU and
502        // then by memory.
503        //
504        // Next it finds the greatest subset of tasks that are constrained by memory and
505        // then by CPU.
506        //
507        // It then unparks whichever subset is greater.
508        //
509        // The process is repeated until both subsets reach zero length.
510        loop {
511            let cpu_by_memory_len = {
512                // Start by finding the longest range in the parked set that could run based on
513                // CPU reservation
514                let range =
515                    fit_longest_range(state.parked.make_contiguous(), state.cpu, |(r, ..)| {
516                        OrderedFloat(r.cpu())
517                    });
518
519                // Next, find the longest subset of that subset that could run based on memory
520                // reservation
521                fit_longest_range(
522                    &mut state.parked.make_contiguous()[range],
523                    state.memory,
524                    |(r, ..)| r.memory(),
525                )
526                .len()
527            };
528
529            // Next, find the longest range in the parked set that could run based on memory
530            // reservation
531            let memory_by_cpu =
532                fit_longest_range(state.parked.make_contiguous(), state.memory, |(r, ..)| {
533                    r.memory()
534                });
535
536            // Next, find the longest subset of that subset that could run based on CPU
537            // reservation
538            let memory_by_cpu = fit_longest_range(
539                &mut state.parked.make_contiguous()[memory_by_cpu],
540                state.cpu,
541                |(r, ..)| OrderedFloat(r.cpu()),
542            );
543
544            // If both subsets are empty, break out
545            if cpu_by_memory_len == 0 && memory_by_cpu.is_empty() {
546                break;
547            }
548
549            // Check to see which subset is greater (for equivalence, use the one we don't
550            // need to refit for)
551            let range = if memory_by_cpu.len() >= cpu_by_memory_len {
552                memory_by_cpu
553            } else {
554                // We need to refit because the above calculation of `memory_by_cpu` mutated the
555                // parked list
556                let range =
557                    fit_longest_range(state.parked.make_contiguous(), state.cpu, |(r, ..)| {
558                        OrderedFloat(r.cpu())
559                    });
560
561                fit_longest_range(
562                    &mut state.parked.make_contiguous()[range],
563                    state.memory,
564                    |(r, ..)| r.memory(),
565                )
566            };
567
568            debug!("unparking {len} task(s)", len = range.len());
569
570            assert_eq!(
571                range.start, 0,
572                "expected the fit tasks to be at the front of the queue"
573            );
574            for _ in range {
575                let (request, completed) = state.parked.pop_front().unwrap();
576
577                debug!(
578                    "unparking task with reservation of {cpu} CPU(s) and {memory} bytes of memory",
579                    cpu = request.cpu(),
580                    memory = request.memory(),
581                );
582
583                Self::handle_spawn_request(state, max_cpu, max_memory, request, completed);
584            }
585        }
586    }
587}
588
589/// Determines the longest range in a slice where the sum of the weights of the
590/// elements in the returned range is less than or equal to the supplied total
591/// weight.
592///
593/// The returned range always starts at zero as this algorithm will partially
594/// sort the slice.
595///
596/// Due to the partial sorting, the provided slice will have its elements
597/// rearranged. As the function modifies the slice in-place, this function does
598/// not make any allocations.
599///
600/// # Implementation
601///
602/// This function is implemented using a modified quick sort algorithm as a
603/// solution to the more general "0/1 knapsack" problem where each item has an
604/// equal profit value; this maximizes for the number of items to put
605/// into the knapsack (i.e. longest range that fits).
606///
607/// Using a uniform random pivot point, it partitions the input into two sides:
608/// the left side where all weights are less than the pivot and the right side
609/// where all weights are equal to or greater than the pivot.
610///
611/// It then checks to see if the total weight of the left side is less than or
612/// equal to the total remaining weight; if it is, every element in
613/// the left side is considered as part of the output and it recurses on the
614/// right side.
615///
616/// If the total weight of the left side is greater than the remaining weight
617/// budget, it can completely ignore the right side and instead recurse on the
618/// left side.
619///
620/// The algorithm stops when the partition size reaches zero.
621///
622/// # Panics
623///
624/// Panics if the supplied weight is a negative value.
625fn fit_longest_range<T, F, W>(slice: &mut [T], total_weight: W, mut weight_fn: F) -> Range<usize>
626where
627    F: FnMut(&T) -> W,
628    W: Ord + Add<Output = W> + Sub<Output = W> + Default,
629{
630    /// Partitions the slice so that the weight of every element to the left
631    /// of the pivot is less than the pivot's weight and every element to the
632    /// right of the pivot is greater than or equal to the pivot's weight.
633    ///
634    /// Returns the pivot index, pivot weight, and the sum of the left side
635    /// element's weights.
636    fn partition<T, F, W>(
637        slice: &mut [T],
638        weight_fn: &mut F,
639        mut low: usize,
640        high: usize,
641    ) -> (usize, W, W)
642    where
643        F: FnMut(&T) -> W,
644        W: Ord + Add<Output = W> + Sub<Output = W> + Default,
645    {
646        assert!(low < high);
647
648        // Swap a random element (the pivot) in the remaining range with the high
649        slice.swap(high, rand::random_range(low..high));
650
651        let pivot_weight = weight_fn(&slice[high]);
652        let mut sum_weight = W::default();
653        let range = low..=high;
654        for i in range {
655            let weight = weight_fn(&slice[i]);
656            // If the weight belongs on the left side of the pivot, swap
657            if weight < pivot_weight {
658                slice.swap(i, low);
659                low += 1;
660                sum_weight = sum_weight.add(weight);
661            }
662        }
663
664        slice.swap(low, high);
665        (low, pivot_weight, sum_weight)
666    }
667
668    fn recurse_fit_maximal_range<T, F, W>(
669        slice: &mut [T],
670        mut remaining_weight: W,
671        weight_fn: &mut F,
672        low: usize,
673        high: usize,
674        end: &mut usize,
675    ) where
676        F: FnMut(&T) -> W,
677        W: Ord + Add<Output = W> + Sub<Output = W> + Default,
678    {
679        if low == high {
680            let weight = weight_fn(&slice[low]);
681            if weight <= remaining_weight {
682                *end += 1;
683            }
684
685            return;
686        }
687
688        if low < high {
689            let (pivot, pivot_weight, sum) = partition(slice, weight_fn, low, high);
690            if sum <= remaining_weight {
691                // Everything up to the pivot can be included
692                *end += pivot - low;
693                remaining_weight = remaining_weight.sub(sum);
694
695                // Check to see if the pivot itself can be included
696                if pivot_weight <= remaining_weight {
697                    *end += 1;
698                    remaining_weight = remaining_weight.sub(pivot_weight);
699                }
700
701                // Recurse on the right side
702                recurse_fit_maximal_range(slice, remaining_weight, weight_fn, pivot + 1, high, end);
703            } else if pivot > 0 {
704                // Otherwise, we can completely disregard the right side (including the pivot)
705                // and recurse on the left
706                recurse_fit_maximal_range(slice, remaining_weight, weight_fn, low, pivot - 1, end);
707            }
708        }
709    }
710
711    assert!(
712        total_weight >= W::default(),
713        "total weight cannot be negative"
714    );
715
716    if slice.is_empty() {
717        return 0..0;
718    }
719
720    let mut end = 0;
721    recurse_fit_maximal_range(
722        slice,
723        total_weight,
724        &mut weight_fn,
725        0,
726        slice.len() - 1, // won't underflow due to empty check
727        &mut end,
728    );
729
730    0..end
731}
732
733#[cfg(test)]
734mod test {
735    use super::*;
736
737    #[test]
738    fn fit_empty_slice() {
739        let r = fit_longest_range(&mut [], 100, |i| *i);
740        assert!(r.is_empty());
741    }
742
743    #[test]
744    #[should_panic(expected = "total weight cannot be negative")]
745    fn fit_negative_panic() {
746        fit_longest_range(&mut [0], -1, |i| *i);
747    }
748
749    #[test]
750    fn no_fit() {
751        let r = fit_longest_range(&mut [100, 101, 102], 99, |i| *i);
752        assert!(r.is_empty());
753    }
754
755    #[test]
756    fn fit_all() {
757        let r = fit_longest_range(&mut [1, 2, 3, 4, 5], 15, |i| *i);
758        assert_eq!(r.len(), 5);
759
760        let r = fit_longest_range(&mut [5, 4, 3, 2, 1], 20, |i| *i);
761        assert_eq!(r.len(), 5);
762    }
763
764    #[test]
765    fn fit_some() {
766        let s = &mut [8, 2, 2, 3, 2, 1, 2, 4, 1];
767        let r = fit_longest_range(s, 10, |i| *i);
768        assert_eq!(r.len(), 6);
769        assert_eq!(s[r.start..r.end].iter().copied().sum::<i32>(), 10);
770        assert!(s[r.end..].contains(&8));
771        assert!(s[r.end..].contains(&4));
772        assert!(s[r.end..].contains(&3));
773    }
774
775    #[test]
776    fn unlimited_state() {
777        let manager_state = TaskManagerState::<()>::new(u64::MAX, u64::MAX);
778        assert!(manager_state.unlimited());
779    }
780}