wdl_engine/
backend.rs

1//! Implementation of task execution backends.
2
3use std::collections::HashMap;
4use std::collections::VecDeque;
5use std::future::Future;
6use std::ops::Add;
7use std::ops::Range;
8use std::ops::Sub;
9use std::path::Path;
10use std::path::PathBuf;
11use std::sync::Arc;
12
13use anyhow::Result;
14use anyhow::anyhow;
15use futures::future::BoxFuture;
16use indexmap::IndexMap;
17use ordered_float::OrderedFloat;
18use tokio::sync::mpsc;
19use tokio::sync::oneshot;
20use tokio::sync::oneshot::Receiver;
21use tokio::task::JoinSet;
22use tokio_util::sync::CancellationToken;
23use tracing::debug;
24
25use crate::Input;
26use crate::Value;
27use crate::http::HttpDownloader;
28use crate::path::EvaluationPath;
29
30mod docker;
31mod local;
32
33pub use docker::*;
34pub use local::*;
35
36/// The default work directory name.
37pub(crate) const WORK_DIR_NAME: &str = "work";
38
39/// The default command file name.
40pub(crate) const COMMAND_FILE_NAME: &str = "command";
41
42/// The default stdout file name.
43pub(crate) const STDOUT_FILE_NAME: &str = "stdout";
44
45/// The default stderr file name.
46pub(crate) const STDERR_FILE_NAME: &str = "stderr";
47
48/// Represents constraints applied to a task's execution.
49pub struct TaskExecutionConstraints {
50    /// The container the task will run in.
51    ///
52    /// A value of `None` indicates the task will run on the host.
53    pub container: Option<String>,
54    /// The allocated number of CPUs; must be greater than 0.
55    pub cpu: f64,
56    /// The allocated memory in bytes; must be greater than 0.
57    pub memory: i64,
58    /// A list with one specification per allocated GPU.
59    ///
60    /// The specification is execution engine-specific.
61    ///
62    /// If no GPUs were allocated, then the value must be an empty list.
63    pub gpu: Vec<String>,
64    /// A list with one specification per allocated FPGA.
65    ///
66    /// The specification is execution engine-specific.
67    ///
68    /// If no FPGAs were allocated, then the value must be an empty list.
69    pub fpga: Vec<String>,
70    /// A map with one entry for each disk mount point.
71    ///
72    /// The key is the mount point and the value is the initial amount of disk
73    /// space allocated, in bytes.
74    ///
75    /// The execution engine must, at a minimum, provide one entry for each disk
76    /// mount point requested, but may provide more.
77    ///
78    /// The amount of disk space available for a given mount point may increase
79    /// during the lifetime of the task (e.g., autoscaling volumes provided by
80    /// some cloud services).
81    pub disks: IndexMap<String, i64>,
82}
83
84/// Represents information for spawning a task.
85#[derive(Debug)]
86pub struct TaskSpawnInfo {
87    /// The command of the task.
88    command: String,
89    /// The inputs for task.
90    inputs: Vec<Input>,
91    /// The requirements of the task.
92    requirements: Arc<HashMap<String, Value>>,
93    /// The hints of the task.
94    hints: Arc<HashMap<String, Value>>,
95    /// The environment variables of the task.
96    env: Arc<IndexMap<String, String>>,
97}
98
99impl TaskSpawnInfo {
100    /// Constructs a new task spawn information.
101    pub fn new(
102        command: String,
103        inputs: Vec<Input>,
104        requirements: Arc<HashMap<String, Value>>,
105        hints: Arc<HashMap<String, Value>>,
106        env: Arc<IndexMap<String, String>>,
107    ) -> Self {
108        Self {
109            command,
110            inputs,
111            requirements,
112            hints,
113            env,
114        }
115    }
116}
117
118/// Represents a request to spawn a task.
119#[derive(Debug)]
120pub struct TaskSpawnRequest {
121    /// The id of the task being spawned.
122    id: String,
123    /// The information for the task to spawn.
124    info: TaskSpawnInfo,
125    /// The attempt number for the spawn request.
126    attempt: u64,
127    /// The attempt directory for the task's execution.
128    attempt_dir: PathBuf,
129}
130
131impl TaskSpawnRequest {
132    /// Creates a new task spawn request.
133    pub fn new(id: String, info: TaskSpawnInfo, attempt: u64, attempt_dir: PathBuf) -> Self {
134        Self {
135            id,
136            info,
137            attempt,
138            attempt_dir,
139        }
140    }
141
142    /// The identifier of the task being spawned.
143    pub fn id(&self) -> &str {
144        &self.id
145    }
146
147    /// Gets the command for the task.
148    pub fn command(&self) -> &str {
149        &self.info.command
150    }
151
152    /// Gets the inputs for the task.
153    pub fn inputs(&self) -> &[Input] {
154        &self.info.inputs
155    }
156
157    /// Gets the requirements of the task.
158    pub fn requirements(&self) -> &HashMap<String, Value> {
159        &self.info.requirements
160    }
161
162    /// Gets the hints of the task.
163    pub fn hints(&self) -> &HashMap<String, Value> {
164        &self.info.hints
165    }
166
167    /// Gets the environment variables of the task.
168    pub fn env(&self) -> &IndexMap<String, String> {
169        &self.info.env
170    }
171
172    /// Gets the attempt number for the task's execution.
173    ///
174    /// The attempt number starts at 0.
175    pub fn attempt(&self) -> u64 {
176        self.attempt
177    }
178
179    /// Gets the attempt directory for the task's execution.
180    pub fn attempt_dir(&self) -> &Path {
181        &self.attempt_dir
182    }
183}
184
185/// Represents the result of a task's execution.
186#[derive(Debug)]
187pub struct TaskExecutionResult {
188    /// The inputs that were given to the task.
189    pub inputs: Vec<Input>,
190    /// Stores the task process exit code.
191    pub exit_code: i32,
192    /// The task's working directory.
193    pub work_dir: EvaluationPath,
194    /// The value of the task's stdout file.
195    pub stdout: Value,
196    /// The value of the task's stderr file.
197    pub stderr: Value,
198}
199
200/// Represents events that can be awaited on during task execution.
201pub struct TaskExecutionEvents {
202    /// The event for when the task has spawned and is currently executing.
203    pub spawned: Receiver<()>,
204    /// The event for when the task has completed.
205    ///
206    /// Returns the execution result.
207    pub completed: Receiver<Result<TaskExecutionResult>>,
208}
209
210/// Represents a task execution backend.
211pub trait TaskExecutionBackend: Send + Sync {
212    /// Gets the maximum concurrent tasks supported by the backend.
213    fn max_concurrency(&self) -> u64;
214
215    /// Gets the execution constraints given a task's requirements and hints.
216    ///
217    /// Returns an error if the task cannot be constrained for the execution
218    /// environment or if the task specifies invalid requirements.
219    fn constraints(
220        &self,
221        requirements: &HashMap<String, Value>,
222        hints: &HashMap<String, Value>,
223    ) -> Result<TaskExecutionConstraints>;
224
225    /// Gets the guest path the task working directory (e.g. `/mnt/work`).
226    ///
227    /// Returns `None` if the task execution does not use a container.
228    fn guest_work_dir(&self) -> Option<&Path>;
229
230    /// Localizes the given set of inputs for the backend.
231    ///
232    /// This may involve downloading remote inputs to the host and updating the
233    /// input's guest paths.
234    fn localize_inputs<'a, 'b, 'c, 'd>(
235        &'a self,
236        downloader: &'b HttpDownloader,
237        inputs: &'c mut [Input],
238    ) -> BoxFuture<'d, Result<()>>
239    where
240        'a: 'd,
241        'b: 'd,
242        'c: 'd,
243        Self: 'd;
244
245    /// Spawns a task with the execution backend.
246    ///
247    /// Returns the task execution event receives upon success.
248    fn spawn(
249        &self,
250        request: TaskSpawnRequest,
251        token: CancellationToken,
252    ) -> Result<TaskExecutionEvents>;
253
254    /// Performs cleanup operations after top-level workflow or task evaluation
255    /// completes.
256    ///
257    /// Returns `None` if no cleanup is required.
258    fn cleanup<'a, 'b, 'c>(
259        &'a self,
260        _output_dir: &'b Path,
261        _token: CancellationToken,
262    ) -> Option<BoxFuture<'c, ()>>
263    where
264        'a: 'c,
265        'b: 'c,
266        Self: 'c,
267    {
268        None
269    }
270}
271
272/// A trait implemented by backend requests.
273trait TaskManagerRequest: Send + Sync + 'static {
274    /// Gets the requested CPU allocation from the request.
275    fn cpu(&self) -> f64;
276
277    /// Gets the requested memory allocation from the request, in bytes.
278    fn memory(&self) -> u64;
279
280    /// Runs the request.
281    fn run(
282        self,
283        spawned: oneshot::Sender<()>,
284    ) -> impl Future<Output = Result<TaskExecutionResult>> + Send;
285}
286
287/// Represents a response internal to the task manager.
288struct TaskManagerResponse {
289    /// The previous CPU allocation from the request.
290    cpu: f64,
291    /// The previous memory allocation from the request.
292    memory: u64,
293    /// The result of the task's execution.
294    result: Result<TaskExecutionResult>,
295    /// The channel to send the task's execution result back on.
296    tx: oneshot::Sender<Result<TaskExecutionResult>>,
297}
298
299/// Represents state used by the task manager.
300struct TaskManagerState<Req> {
301    /// The amount of available CPU remaining.
302    cpu: OrderedFloat<f64>,
303    /// The amount of available memory remaining, in bytes.
304    memory: u64,
305    /// The set of spawned tasks.
306    spawned: JoinSet<TaskManagerResponse>,
307    /// The queue of parked spawn requests.
308    parked: VecDeque<(
309        Req,
310        oneshot::Sender<()>,
311        oneshot::Sender<Result<TaskExecutionResult>>,
312    )>,
313}
314
315impl<Req> TaskManagerState<Req> {
316    /// Constructs a new task manager state with the given total CPU and memory.
317    fn new(cpu: u64, memory: u64) -> Self {
318        Self {
319            cpu: OrderedFloat(cpu as f64),
320            memory,
321            spawned: Default::default(),
322            parked: Default::default(),
323        }
324    }
325
326    /// Determines if the resources are unlimited.
327    fn unlimited(&self) -> bool {
328        self.cpu == u64::MAX as f64 && self.memory == u64::MAX
329    }
330}
331
332/// Responsible for managing tasks based on available host resources.
333struct TaskManager<Req> {
334    /// The sender for new spawn requests.
335    tx: mpsc::UnboundedSender<(
336        Req,
337        oneshot::Sender<()>,
338        oneshot::Sender<Result<TaskExecutionResult>>,
339    )>,
340}
341
342impl<Req> TaskManager<Req>
343where
344    Req: TaskManagerRequest,
345{
346    /// Constructs a new task manager with the given total CPU, maximum CPU per
347    /// request, total memory, and maximum memory per request.
348    fn new(cpu: u64, max_cpu: u64, memory: u64, max_memory: u64) -> Self {
349        let (tx, rx) = mpsc::unbounded_channel();
350
351        tokio::spawn(async move {
352            Self::run_request_queue(rx, cpu, max_cpu, memory, max_memory).await;
353        });
354
355        Self { tx }
356    }
357
358    /// Constructs a new task manager that does not limit requests based on
359    /// available resources.
360    fn new_unlimited(max_cpu: u64, max_memory: u64) -> Self {
361        Self::new(u64::MAX, max_cpu, u64::MAX, max_memory)
362    }
363
364    /// Sends a request to the task manager's queue.
365    fn send(
366        &self,
367        request: Req,
368        spawned: oneshot::Sender<()>,
369        completed: oneshot::Sender<Result<TaskExecutionResult>>,
370    ) {
371        self.tx.send((request, spawned, completed)).ok();
372    }
373
374    /// Runs the request queue.
375    async fn run_request_queue(
376        mut rx: mpsc::UnboundedReceiver<(
377            Req,
378            oneshot::Sender<()>,
379            oneshot::Sender<Result<TaskExecutionResult>>,
380        )>,
381        cpu: u64,
382        max_cpu: u64,
383        memory: u64,
384        max_memory: u64,
385    ) {
386        let mut state = TaskManagerState::new(cpu, memory);
387
388        loop {
389            // If there aren't any spawned tasks, wait for a spawn request only
390            if state.spawned.is_empty() {
391                assert!(
392                    state.parked.is_empty(),
393                    "there can't be any parked requests if there are no spawned tasks"
394                );
395                match rx.recv().await {
396                    Some((req, spawned, completed)) => {
397                        Self::handle_spawn_request(
398                            &mut state, max_cpu, max_memory, req, spawned, completed,
399                        );
400                        continue;
401                    }
402                    None => break,
403                }
404            }
405
406            // Otherwise, wait for a spawn request or a completed task
407            tokio::select! {
408                request = rx.recv() => {
409                    match request {
410                        Some((req, spawned, completed)) => {
411                            Self::handle_spawn_request(&mut state, max_cpu, max_memory, req, spawned, completed);
412                        }
413                        None => break,
414                    }
415                }
416                Some(Ok(response)) = state.spawned.join_next() => {
417                    if !state.unlimited() {
418                        state.cpu += response.cpu;
419                        state.memory += response.memory;
420                    }
421
422                    response.tx.send(response.result).ok();
423                    Self::spawn_parked_tasks(&mut state, max_cpu, max_memory);
424                }
425            }
426        }
427    }
428
429    /// Handles a spawn request by either parking it (not enough resources
430    /// currently available) or by spawning it.
431    fn handle_spawn_request(
432        state: &mut TaskManagerState<Req>,
433        max_cpu: u64,
434        max_memory: u64,
435        request: Req,
436        spawned: oneshot::Sender<()>,
437        completed: oneshot::Sender<Result<TaskExecutionResult>>,
438    ) {
439        // Ensure the request does not exceed the maximum CPU
440        let cpu = request.cpu();
441        if cpu > max_cpu as f64 {
442            completed
443                .send(Err(anyhow!(
444                    "requested task CPU count of {cpu} exceeds the maximum CPU count of {max_cpu}",
445                )))
446                .ok();
447            return;
448        }
449
450        // Ensure the request does not exceed the maximum memory
451        let memory = request.memory();
452        if memory > max_memory {
453            completed
454                .send(Err(anyhow!(
455                    "requested task memory of {memory} byte{s} exceeds the maximum memory of \
456                     {max_memory}",
457                    s = if memory == 1 { "" } else { "s" }
458                )))
459                .ok();
460            return;
461        }
462
463        if !state.unlimited() {
464            // If the request can't be processed due to resource constraints, park the
465            // request for now. When a task completes and resources become available,
466            // we'll unpark the request
467            if cpu > state.cpu.into() || memory > state.memory {
468                debug!(
469                    "parking task due to insufficient resources: task reserves {cpu} CPU(s) and \
470                     {memory} bytes of memory but there are only {cpu_remaining} CPU(s) and \
471                     {memory_remaining} bytes of memory available",
472                    cpu_remaining = state.cpu,
473                    memory_remaining = state.memory
474                );
475                state.parked.push_back((request, spawned, completed));
476                return;
477            }
478
479            // Decrement the resource counts and spawn the task
480            state.cpu -= cpu;
481            state.memory -= memory;
482            debug!(
483                "spawning task with {cpu} CPUs and {memory} bytes of memory remaining",
484                cpu = state.cpu,
485                memory = state.memory
486            );
487        }
488
489        state.spawned.spawn(async move {
490            TaskManagerResponse {
491                cpu: request.cpu(),
492                memory: request.memory(),
493                result: request.run(spawned).await,
494                tx: completed,
495            }
496        });
497    }
498
499    /// Responsible for spawning parked tasks.
500    fn spawn_parked_tasks(state: &mut TaskManagerState<Req>, max_cpu: u64, max_memory: u64) {
501        if state.parked.is_empty() {
502            return;
503        }
504
505        debug!(
506            "attempting to unpark tasks with {cpu} CPUs and {memory} bytes of memory available",
507            cpu = state.cpu,
508            memory = state.memory,
509        );
510
511        // This algorithm is intended to unpark the greatest number of tasks.
512        //
513        // It first finds the greatest subset of tasks that are constrained by CPU and
514        // then by memory.
515        //
516        // Next it finds the greatest subset of tasks that are constrained by memory and
517        // then by CPU.
518        //
519        // It then unparks whichever subset is greater.
520        //
521        // The process is repeated until both subsets reach zero length.
522        loop {
523            let cpu_by_memory_len = {
524                // Start by finding the longest range in the parked set that could run based on
525                // CPU reservation
526                let range =
527                    fit_longest_range(state.parked.make_contiguous(), state.cpu, |(r, ..)| {
528                        OrderedFloat(r.cpu())
529                    });
530
531                // Next, find the longest subset of that subset that could run based on memory
532                // reservation
533                fit_longest_range(
534                    &mut state.parked.make_contiguous()[range],
535                    state.memory,
536                    |(r, ..)| r.memory(),
537                )
538                .len()
539            };
540
541            // Next, find the longest range in the parked set that could run based on memory
542            // reservation
543            let memory_by_cpu =
544                fit_longest_range(state.parked.make_contiguous(), state.memory, |(r, ..)| {
545                    r.memory()
546                });
547
548            // Next, find the longest subset of that subset that could run based on CPU
549            // reservation
550            let memory_by_cpu = fit_longest_range(
551                &mut state.parked.make_contiguous()[memory_by_cpu],
552                state.cpu,
553                |(r, ..)| OrderedFloat(r.cpu()),
554            );
555
556            // If both subsets are empty, break out
557            if cpu_by_memory_len == 0 && memory_by_cpu.is_empty() {
558                break;
559            }
560
561            // Check to see which subset is greater (for equivalence, use the one we don't
562            // need to refit for)
563            let range = if memory_by_cpu.len() >= cpu_by_memory_len {
564                memory_by_cpu
565            } else {
566                // We need to refit because the above calculation of `memory_by_cpu` mutated the
567                // parked list
568                let range =
569                    fit_longest_range(state.parked.make_contiguous(), state.cpu, |(r, ..)| {
570                        OrderedFloat(r.cpu())
571                    });
572
573                fit_longest_range(
574                    &mut state.parked.make_contiguous()[range],
575                    state.memory,
576                    |(r, ..)| r.memory(),
577                )
578            };
579
580            debug!("unparking {len} task(s)", len = range.len());
581
582            assert_eq!(
583                range.start, 0,
584                "expected the fit tasks to be at the front of the queue"
585            );
586            for _ in range {
587                let (request, spawned, completed) = state.parked.pop_front().unwrap();
588
589                debug!(
590                    "unparking task with reservation of {cpu} CPU(s) and {memory} bytes of memory",
591                    cpu = request.cpu(),
592                    memory = request.memory(),
593                );
594
595                Self::handle_spawn_request(state, max_cpu, max_memory, request, spawned, completed);
596            }
597        }
598    }
599}
600
601/// Determines the longest range in a slice where the sum of the weights of the
602/// elements in the returned range is less than or equal to the supplied total
603/// weight.
604///
605/// The returned range always starts at zero as this algorithm will partially
606/// sort the slice.
607///
608/// Due to the partial sorting, the provided slice will have its elements
609/// rearranged. As the function modifies the slice in-place, this function does
610/// not make any allocations.
611///
612/// # Implementation
613///
614/// This function is implemented using a modified quick sort algorithm as a
615/// solution to the more general "0/1 knapsack" problem where each item has an
616/// equal profit value; this maximizes for the number of items to put
617/// into the knapsack (i.e. longest range that fits).
618///
619/// Using a uniform random pivot point, it partitions the input into two sides:
620/// the left side where all weights are less than the pivot and the right side
621/// where all weights are equal to or greater than the pivot.
622///
623/// It then checks to see if the total weight of the left side is less than or
624/// equal to the total remaining weight; if it is, every element in
625/// the left side is considered as part of the output and it recurses on the
626/// right side.
627///
628/// If the total weight of the left side is greater than the remaining weight
629/// budget, it can completely ignore the right side and instead recurse on the
630/// left side.
631///
632/// The algorithm stops when the partition size reaches zero.
633///
634/// # Panics
635///
636/// Panics if the supplied weight is a negative value.
637fn fit_longest_range<T, F, W>(slice: &mut [T], total_weight: W, mut weight_fn: F) -> Range<usize>
638where
639    F: FnMut(&T) -> W,
640    W: Ord + Add<Output = W> + Sub<Output = W> + Default,
641{
642    /// Partitions the slice so that the weight of every element to the left
643    /// of the pivot is less than the pivot's weight and every element to the
644    /// right of the pivot is greater than or equal to the pivot's weight.
645    ///
646    /// Returns the pivot index, pivot weight, and the sum of the left side
647    /// element's weights.
648    fn partition<T, F, W>(
649        slice: &mut [T],
650        weight_fn: &mut F,
651        mut low: usize,
652        high: usize,
653    ) -> (usize, W, W)
654    where
655        F: FnMut(&T) -> W,
656        W: Ord + Add<Output = W> + Sub<Output = W> + Default,
657    {
658        assert!(low < high);
659
660        // Swap a random element (the pivot) in the remaining range with the high
661        slice.swap(high, rand::random_range(low..high));
662
663        let pivot_weight = weight_fn(&slice[high]);
664        let mut sum_weight = W::default();
665        let range = low..=high;
666        for i in range {
667            let weight = weight_fn(&slice[i]);
668            // If the weight belongs on the left side of the pivot, swap
669            if weight < pivot_weight {
670                slice.swap(i, low);
671                low += 1;
672                sum_weight = sum_weight.add(weight);
673            }
674        }
675
676        slice.swap(low, high);
677        (low, pivot_weight, sum_weight)
678    }
679
680    fn recurse_fit_maximal_range<T, F, W>(
681        slice: &mut [T],
682        mut remaining_weight: W,
683        weight_fn: &mut F,
684        low: usize,
685        high: usize,
686        end: &mut usize,
687    ) where
688        F: FnMut(&T) -> W,
689        W: Ord + Add<Output = W> + Sub<Output = W> + Default,
690    {
691        if low == high {
692            let weight = weight_fn(&slice[low]);
693            if weight <= remaining_weight {
694                *end += 1;
695            }
696
697            return;
698        }
699
700        if low < high {
701            let (pivot, pivot_weight, sum) = partition(slice, weight_fn, low, high);
702            if sum <= remaining_weight {
703                // Everything up to the pivot can be included
704                *end += pivot - low;
705                remaining_weight = remaining_weight.sub(sum);
706
707                // Check to see if the pivot itself can be included
708                if pivot_weight <= remaining_weight {
709                    *end += 1;
710                    remaining_weight = remaining_weight.sub(pivot_weight);
711                }
712
713                // Recurse on the right side
714                recurse_fit_maximal_range(slice, remaining_weight, weight_fn, pivot + 1, high, end);
715            } else if pivot > 0 {
716                // Otherwise, we can completely disregard the right side (including the pivot)
717                // and recurse on the left
718                recurse_fit_maximal_range(slice, remaining_weight, weight_fn, low, pivot - 1, end);
719            }
720        }
721    }
722
723    assert!(
724        total_weight >= W::default(),
725        "total weight cannot be negative"
726    );
727
728    if slice.is_empty() {
729        return 0..0;
730    }
731
732    let mut end = 0;
733    recurse_fit_maximal_range(
734        slice,
735        total_weight,
736        &mut weight_fn,
737        0,
738        slice.len() - 1, // won't underflow due to empty check
739        &mut end,
740    );
741
742    0..end
743}
744
745#[cfg(test)]
746mod test {
747    use super::*;
748
749    #[test]
750    fn fit_empty_slice() {
751        let r = fit_longest_range(&mut [], 100, |i| *i);
752        assert!(r.is_empty());
753    }
754
755    #[test]
756    #[should_panic(expected = "total weight cannot be negative")]
757    fn fit_negative_panic() {
758        fit_longest_range(&mut [0], -1, |i| *i);
759    }
760
761    #[test]
762    fn no_fit() {
763        let r = fit_longest_range(&mut [100, 101, 102], 99, |i| *i);
764        assert!(r.is_empty());
765    }
766
767    #[test]
768    fn fit_all() {
769        let r = fit_longest_range(&mut [1, 2, 3, 4, 5], 15, |i| *i);
770        assert_eq!(r.len(), 5);
771
772        let r = fit_longest_range(&mut [5, 4, 3, 2, 1], 20, |i| *i);
773        assert_eq!(r.len(), 5);
774    }
775
776    #[test]
777    fn fit_some() {
778        let s = &mut [8, 2, 2, 3, 2, 1, 2, 4, 1];
779        let r = fit_longest_range(s, 10, |i| *i);
780        assert_eq!(r.len(), 6);
781        assert_eq!(s[r.start..r.end].iter().copied().sum::<i32>(), 10);
782        assert!(s[r.end..].contains(&8));
783        assert!(s[r.end..].contains(&4));
784        assert!(s[r.end..].contains(&3));
785    }
786
787    #[test]
788    fn unlimited_state() {
789        let manager_state = TaskManagerState::<()>::new(u64::MAX, u64::MAX);
790        assert!(manager_state.unlimited());
791    }
792}