wdl_engine/
backend.rs

1//! Implementation of task execution backends.
2
3use std::collections::HashMap;
4use std::collections::VecDeque;
5use std::future::Future;
6use std::ops::Add;
7use std::ops::Range;
8use std::ops::Sub;
9use std::path::Path;
10use std::path::PathBuf;
11use std::sync::Arc;
12
13use anyhow::Result;
14use anyhow::anyhow;
15use futures::future::BoxFuture;
16use indexmap::IndexMap;
17use ordered_float::OrderedFloat;
18use tokio::sync::mpsc;
19use tokio::sync::oneshot;
20use tokio::sync::oneshot::Receiver;
21use tokio::task::JoinSet;
22use tokio_util::sync::CancellationToken;
23use tracing::debug;
24
25use crate::Input;
26use crate::Value;
27use crate::http::HttpDownloader;
28use crate::path::EvaluationPath;
29
30mod docker;
31mod local;
32mod tes;
33
34pub use docker::*;
35pub use local::*;
36pub use tes::*;
37
38/// The default work directory name.
39pub(crate) const WORK_DIR_NAME: &str = "work";
40
41/// The default command file name.
42pub(crate) const COMMAND_FILE_NAME: &str = "command";
43
44/// The default stdout file name.
45pub(crate) const STDOUT_FILE_NAME: &str = "stdout";
46
47/// The default stderr file name.
48pub(crate) const STDERR_FILE_NAME: &str = "stderr";
49
50/// Represents constraints applied to a task's execution.
51pub struct TaskExecutionConstraints {
52    /// The container the task will run in.
53    ///
54    /// A value of `None` indicates the task will run on the host.
55    pub container: Option<String>,
56    /// The allocated number of CPUs; must be greater than 0.
57    pub cpu: f64,
58    /// The allocated memory in bytes; must be greater than 0.
59    pub memory: i64,
60    /// A list with one specification per allocated GPU.
61    ///
62    /// The specification is execution engine-specific.
63    ///
64    /// If no GPUs were allocated, then the value must be an empty list.
65    pub gpu: Vec<String>,
66    /// A list with one specification per allocated FPGA.
67    ///
68    /// The specification is execution engine-specific.
69    ///
70    /// If no FPGAs were allocated, then the value must be an empty list.
71    pub fpga: Vec<String>,
72    /// A map with one entry for each disk mount point.
73    ///
74    /// The key is the mount point and the value is the initial amount of disk
75    /// space allocated, in bytes.
76    ///
77    /// The execution engine must, at a minimum, provide one entry for each disk
78    /// mount point requested, but may provide more.
79    ///
80    /// The amount of disk space available for a given mount point may increase
81    /// during the lifetime of the task (e.g., autoscaling volumes provided by
82    /// some cloud services).
83    pub disks: IndexMap<String, i64>,
84}
85
86/// Represents information for spawning a task.
87#[derive(Debug)]
88pub struct TaskSpawnInfo {
89    /// The command of the task.
90    command: String,
91    /// The inputs for task.
92    inputs: Vec<Input>,
93    /// The requirements of the task.
94    requirements: Arc<HashMap<String, Value>>,
95    /// The hints of the task.
96    hints: Arc<HashMap<String, Value>>,
97    /// The environment variables of the task.
98    env: Arc<IndexMap<String, String>>,
99}
100
101impl TaskSpawnInfo {
102    /// Constructs a new task spawn information.
103    pub fn new(
104        command: String,
105        inputs: Vec<Input>,
106        requirements: Arc<HashMap<String, Value>>,
107        hints: Arc<HashMap<String, Value>>,
108        env: Arc<IndexMap<String, String>>,
109    ) -> Self {
110        Self {
111            command,
112            inputs,
113            requirements,
114            hints,
115            env,
116        }
117    }
118}
119
120/// Represents a request to spawn a task.
121#[derive(Debug)]
122pub struct TaskSpawnRequest {
123    /// The id of the task being spawned.
124    id: String,
125    /// The information for the task to spawn.
126    info: TaskSpawnInfo,
127    /// The attempt number for the spawn request.
128    attempt: u64,
129    /// The attempt directory for the task's execution.
130    attempt_dir: PathBuf,
131}
132
133impl TaskSpawnRequest {
134    /// Creates a new task spawn request.
135    pub fn new(id: String, info: TaskSpawnInfo, attempt: u64, attempt_dir: PathBuf) -> Self {
136        Self {
137            id,
138            info,
139            attempt,
140            attempt_dir,
141        }
142    }
143
144    /// The identifier of the task being spawned.
145    pub fn id(&self) -> &str {
146        &self.id
147    }
148
149    /// Gets the command for the task.
150    pub fn command(&self) -> &str {
151        &self.info.command
152    }
153
154    /// Gets the inputs for the task.
155    pub fn inputs(&self) -> &[Input] {
156        &self.info.inputs
157    }
158
159    /// Gets the requirements of the task.
160    pub fn requirements(&self) -> &HashMap<String, Value> {
161        &self.info.requirements
162    }
163
164    /// Gets the hints of the task.
165    pub fn hints(&self) -> &HashMap<String, Value> {
166        &self.info.hints
167    }
168
169    /// Gets the environment variables of the task.
170    pub fn env(&self) -> &IndexMap<String, String> {
171        &self.info.env
172    }
173
174    /// Gets the attempt number for the task's execution.
175    ///
176    /// The attempt number starts at 0.
177    pub fn attempt(&self) -> u64 {
178        self.attempt
179    }
180
181    /// Gets the attempt directory for the task's execution.
182    pub fn attempt_dir(&self) -> &Path {
183        &self.attempt_dir
184    }
185}
186
187/// Represents the result of a task's execution.
188#[derive(Debug)]
189pub struct TaskExecutionResult {
190    /// The inputs that were given to the task.
191    pub inputs: Vec<Input>,
192    /// Stores the task process exit code.
193    pub exit_code: i32,
194    /// The task's working directory.
195    pub work_dir: EvaluationPath,
196    /// The value of the task's stdout file.
197    pub stdout: Value,
198    /// The value of the task's stderr file.
199    pub stderr: Value,
200}
201
202/// Represents events that can be awaited on during task execution.
203pub struct TaskExecutionEvents {
204    /// The event for when the task has spawned and is currently executing.
205    pub spawned: Receiver<()>,
206    /// The event for when the task has completed.
207    ///
208    /// Returns the execution result.
209    pub completed: Receiver<Result<TaskExecutionResult>>,
210}
211
212/// Represents a task execution backend.
213pub trait TaskExecutionBackend: Send + Sync {
214    /// Gets the maximum concurrent tasks supported by the backend.
215    fn max_concurrency(&self) -> u64;
216
217    /// Gets the execution constraints given a task's requirements and hints.
218    ///
219    /// Returns an error if the task cannot be constrained for the execution
220    /// environment or if the task specifies invalid requirements.
221    fn constraints(
222        &self,
223        requirements: &HashMap<String, Value>,
224        hints: &HashMap<String, Value>,
225    ) -> Result<TaskExecutionConstraints>;
226
227    /// Gets the guest path the task working directory (e.g. `/mnt/work`).
228    ///
229    /// Returns `None` if the task execution does not use a container.
230    fn guest_work_dir(&self) -> Option<&Path>;
231
232    /// Localizes the given set of inputs for the backend.
233    ///
234    /// This may involve downloading remote inputs to the host and updating the
235    /// input's guest paths.
236    fn localize_inputs<'a, 'b, 'c, 'd>(
237        &'a self,
238        downloader: &'b HttpDownloader,
239        inputs: &'c mut [Input],
240    ) -> BoxFuture<'d, Result<()>>
241    where
242        'a: 'd,
243        'b: 'd,
244        'c: 'd,
245        Self: 'd;
246
247    /// Spawns a task with the execution backend.
248    ///
249    /// Returns the task execution event receives upon success.
250    fn spawn(
251        &self,
252        request: TaskSpawnRequest,
253        token: CancellationToken,
254    ) -> Result<TaskExecutionEvents>;
255
256    /// Performs cleanup operations after top-level workflow or task evaluation
257    /// completes.
258    ///
259    /// Returns `None` if no cleanup is required.
260    fn cleanup<'a, 'b, 'c>(
261        &'a self,
262        _output_dir: &'b Path,
263        _token: CancellationToken,
264    ) -> Option<BoxFuture<'c, ()>>
265    where
266        'a: 'c,
267        'b: 'c,
268        Self: 'c,
269    {
270        None
271    }
272}
273
274/// A trait implemented by backend requests.
275trait TaskManagerRequest: Send + Sync + 'static {
276    /// Gets the requested CPU allocation from the request.
277    fn cpu(&self) -> f64;
278
279    /// Gets the requested memory allocation from the request, in bytes.
280    fn memory(&self) -> u64;
281
282    /// Runs the request.
283    fn run(
284        self,
285        spawned: oneshot::Sender<()>,
286    ) -> impl Future<Output = Result<TaskExecutionResult>> + Send;
287}
288
289/// Represents a response internal to the task manager.
290struct TaskManagerResponse {
291    /// The previous CPU allocation from the request.
292    cpu: f64,
293    /// The previous memory allocation from the request.
294    memory: u64,
295    /// The result of the task's execution.
296    result: Result<TaskExecutionResult>,
297    /// The channel to send the task's execution result back on.
298    tx: oneshot::Sender<Result<TaskExecutionResult>>,
299}
300
301/// Represents state used by the task manager.
302struct TaskManagerState<Req> {
303    /// The amount of available CPU remaining.
304    cpu: OrderedFloat<f64>,
305    /// The amount of available memory remaining, in bytes.
306    memory: u64,
307    /// The set of spawned tasks.
308    spawned: JoinSet<TaskManagerResponse>,
309    /// The queue of parked spawn requests.
310    parked: VecDeque<(
311        Req,
312        oneshot::Sender<()>,
313        oneshot::Sender<Result<TaskExecutionResult>>,
314    )>,
315}
316
317impl<Req> TaskManagerState<Req> {
318    /// Constructs a new task manager state with the given total CPU and memory.
319    fn new(cpu: u64, memory: u64) -> Self {
320        Self {
321            cpu: OrderedFloat(cpu as f64),
322            memory,
323            spawned: Default::default(),
324            parked: Default::default(),
325        }
326    }
327
328    /// Determines if the resources are unlimited.
329    fn unlimited(&self) -> bool {
330        self.cpu == u64::MAX as f64 && self.memory == u64::MAX
331    }
332}
333
334/// Responsible for managing tasks based on available host resources.
335struct TaskManager<Req> {
336    /// The sender for new spawn requests.
337    tx: mpsc::UnboundedSender<(
338        Req,
339        oneshot::Sender<()>,
340        oneshot::Sender<Result<TaskExecutionResult>>,
341    )>,
342}
343
344impl<Req> TaskManager<Req>
345where
346    Req: TaskManagerRequest,
347{
348    /// Constructs a new task manager with the given total CPU, maximum CPU per
349    /// request, total memory, and maximum memory per request.
350    fn new(cpu: u64, max_cpu: u64, memory: u64, max_memory: u64) -> Self {
351        let (tx, rx) = mpsc::unbounded_channel();
352
353        tokio::spawn(async move {
354            Self::run_request_queue(rx, cpu, max_cpu, memory, max_memory).await;
355        });
356
357        Self { tx }
358    }
359
360    /// Constructs a new task manager that does not limit requests based on
361    /// available resources.
362    fn new_unlimited(max_cpu: u64, max_memory: u64) -> Self {
363        Self::new(u64::MAX, max_cpu, u64::MAX, max_memory)
364    }
365
366    /// Sends a request to the task manager's queue.
367    fn send(
368        &self,
369        request: Req,
370        spawned: oneshot::Sender<()>,
371        completed: oneshot::Sender<Result<TaskExecutionResult>>,
372    ) {
373        self.tx.send((request, spawned, completed)).ok();
374    }
375
376    /// Runs the request queue.
377    async fn run_request_queue(
378        mut rx: mpsc::UnboundedReceiver<(
379            Req,
380            oneshot::Sender<()>,
381            oneshot::Sender<Result<TaskExecutionResult>>,
382        )>,
383        cpu: u64,
384        max_cpu: u64,
385        memory: u64,
386        max_memory: u64,
387    ) {
388        let mut state = TaskManagerState::new(cpu, memory);
389
390        loop {
391            // If there aren't any spawned tasks, wait for a spawn request only
392            if state.spawned.is_empty() {
393                assert!(
394                    state.parked.is_empty(),
395                    "there can't be any parked requests if there are no spawned tasks"
396                );
397                match rx.recv().await {
398                    Some((req, spawned, completed)) => {
399                        Self::handle_spawn_request(
400                            &mut state, max_cpu, max_memory, req, spawned, completed,
401                        );
402                        continue;
403                    }
404                    None => break,
405                }
406            }
407
408            // Otherwise, wait for a spawn request or a completed task
409            tokio::select! {
410                request = rx.recv() => {
411                    match request {
412                        Some((req, spawned, completed)) => {
413                            Self::handle_spawn_request(&mut state, max_cpu, max_memory, req, spawned, completed);
414                        }
415                        None => break,
416                    }
417                }
418                Some(Ok(response)) = state.spawned.join_next() => {
419                    if !state.unlimited() {
420                        state.cpu += response.cpu;
421                        state.memory += response.memory;
422                    }
423
424                    response.tx.send(response.result).ok();
425                    Self::spawn_parked_tasks(&mut state, max_cpu, max_memory);
426                }
427            }
428        }
429    }
430
431    /// Handles a spawn request by either parking it (not enough resources
432    /// currently available) or by spawning it.
433    fn handle_spawn_request(
434        state: &mut TaskManagerState<Req>,
435        max_cpu: u64,
436        max_memory: u64,
437        request: Req,
438        spawned: oneshot::Sender<()>,
439        completed: oneshot::Sender<Result<TaskExecutionResult>>,
440    ) {
441        // Ensure the request does not exceed the maximum CPU
442        let cpu = request.cpu();
443        if cpu > max_cpu as f64 {
444            completed
445                .send(Err(anyhow!(
446                    "requested task CPU count of {cpu} exceeds the maximum CPU count of {max_cpu}",
447                )))
448                .ok();
449            return;
450        }
451
452        // Ensure the request does not exceed the maximum memory
453        let memory = request.memory();
454        if memory > max_memory {
455            completed
456                .send(Err(anyhow!(
457                    "requested task memory of {memory} byte{s} exceeds the maximum memory of \
458                     {max_memory}",
459                    s = if memory == 1 { "" } else { "s" }
460                )))
461                .ok();
462            return;
463        }
464
465        if !state.unlimited() {
466            // If the request can't be processed due to resource constraints, park the
467            // request for now. When a task completes and resources become available,
468            // we'll unpark the request
469            if cpu > state.cpu.into() || memory > state.memory {
470                debug!(
471                    "parking task due to insufficient resources: task reserves {cpu} CPU(s) and \
472                     {memory} bytes of memory but there are only {cpu_remaining} CPU(s) and \
473                     {memory_remaining} bytes of memory available",
474                    cpu_remaining = state.cpu,
475                    memory_remaining = state.memory
476                );
477                state.parked.push_back((request, spawned, completed));
478                return;
479            }
480
481            // Decrement the resource counts and spawn the task
482            state.cpu -= cpu;
483            state.memory -= memory;
484            debug!(
485                "spawning task with {cpu} CPUs and {memory} bytes of memory remaining",
486                cpu = state.cpu,
487                memory = state.memory
488            );
489        }
490
491        state.spawned.spawn(async move {
492            TaskManagerResponse {
493                cpu: request.cpu(),
494                memory: request.memory(),
495                result: request.run(spawned).await,
496                tx: completed,
497            }
498        });
499    }
500
501    /// Responsible for spawning parked tasks.
502    fn spawn_parked_tasks(state: &mut TaskManagerState<Req>, max_cpu: u64, max_memory: u64) {
503        if state.parked.is_empty() {
504            return;
505        }
506
507        debug!(
508            "attempting to unpark tasks with {cpu} CPUs and {memory} bytes of memory available",
509            cpu = state.cpu,
510            memory = state.memory,
511        );
512
513        // This algorithm is intended to unpark the greatest number of tasks.
514        //
515        // It first finds the greatest subset of tasks that are constrained by CPU and
516        // then by memory.
517        //
518        // Next it finds the greatest subset of tasks that are constrained by memory and
519        // then by CPU.
520        //
521        // It then unparks whichever subset is greater.
522        //
523        // The process is repeated until both subsets reach zero length.
524        loop {
525            let cpu_by_memory_len = {
526                // Start by finding the longest range in the parked set that could run based on
527                // CPU reservation
528                let range =
529                    fit_longest_range(state.parked.make_contiguous(), state.cpu, |(r, ..)| {
530                        OrderedFloat(r.cpu())
531                    });
532
533                // Next, find the longest subset of that subset that could run based on memory
534                // reservation
535                fit_longest_range(
536                    &mut state.parked.make_contiguous()[range],
537                    state.memory,
538                    |(r, ..)| r.memory(),
539                )
540                .len()
541            };
542
543            // Next, find the longest range in the parked set that could run based on memory
544            // reservation
545            let memory_by_cpu =
546                fit_longest_range(state.parked.make_contiguous(), state.memory, |(r, ..)| {
547                    r.memory()
548                });
549
550            // Next, find the longest subset of that subset that could run based on CPU
551            // reservation
552            let memory_by_cpu = fit_longest_range(
553                &mut state.parked.make_contiguous()[memory_by_cpu],
554                state.cpu,
555                |(r, ..)| OrderedFloat(r.cpu()),
556            );
557
558            // If both subsets are empty, break out
559            if cpu_by_memory_len == 0 && memory_by_cpu.is_empty() {
560                break;
561            }
562
563            // Check to see which subset is greater (for equivalence, use the one we don't
564            // need to refit for)
565            let range = if memory_by_cpu.len() >= cpu_by_memory_len {
566                memory_by_cpu
567            } else {
568                // We need to refit because the above calculation of `memory_by_cpu` mutated the
569                // parked list
570                let range =
571                    fit_longest_range(state.parked.make_contiguous(), state.cpu, |(r, ..)| {
572                        OrderedFloat(r.cpu())
573                    });
574
575                fit_longest_range(
576                    &mut state.parked.make_contiguous()[range],
577                    state.memory,
578                    |(r, ..)| r.memory(),
579                )
580            };
581
582            debug!("unparking {len} task(s)", len = range.len());
583
584            assert_eq!(
585                range.start, 0,
586                "expected the fit tasks to be at the front of the queue"
587            );
588            for _ in range {
589                let (request, spawned, completed) = state.parked.pop_front().unwrap();
590
591                debug!(
592                    "unparking task with reservation of {cpu} CPU(s) and {memory} bytes of memory",
593                    cpu = request.cpu(),
594                    memory = request.memory(),
595                );
596
597                Self::handle_spawn_request(state, max_cpu, max_memory, request, spawned, completed);
598            }
599        }
600    }
601}
602
603/// Determines the longest range in a slice where the sum of the weights of the
604/// elements in the returned range is less than or equal to the supplied total
605/// weight.
606///
607/// The returned range always starts at zero as this algorithm will partially
608/// sort the slice.
609///
610/// Due to the partial sorting, the provided slice will have its elements
611/// rearranged. As the function modifies the slice in-place, this function does
612/// not make any allocations.
613///
614/// # Implementation
615///
616/// This function is implemented using a modified quick sort algorithm as a
617/// solution to the more general "0/1 knapsack" problem where each item has an
618/// equal profit value; this maximizes for the number of items to put
619/// into the knapsack (i.e. longest range that fits).
620///
621/// Using a uniform random pivot point, it partitions the input into two sides:
622/// the left side where all weights are less than the pivot and the right side
623/// where all weights are equal to or greater than the pivot.
624///
625/// It then checks to see if the total weight of the left side is less than or
626/// equal to the total remaining weight; if it is, every element in
627/// the left side is considered as part of the output and it recurses on the
628/// right side.
629///
630/// If the total weight of the left side is greater than the remaining weight
631/// budget, it can completely ignore the right side and instead recurse on the
632/// left side.
633///
634/// The algorithm stops when the partition size reaches zero.
635///
636/// # Panics
637///
638/// Panics if the supplied weight is a negative value.
639fn fit_longest_range<T, F, W>(slice: &mut [T], total_weight: W, mut weight_fn: F) -> Range<usize>
640where
641    F: FnMut(&T) -> W,
642    W: Ord + Add<Output = W> + Sub<Output = W> + Default,
643{
644    /// Partitions the slice so that the weight of every element to the left
645    /// of the pivot is less than the pivot's weight and every element to the
646    /// right of the pivot is greater than or equal to the pivot's weight.
647    ///
648    /// Returns the pivot index, pivot weight, and the sum of the left side
649    /// element's weights.
650    fn partition<T, F, W>(
651        slice: &mut [T],
652        weight_fn: &mut F,
653        mut low: usize,
654        high: usize,
655    ) -> (usize, W, W)
656    where
657        F: FnMut(&T) -> W,
658        W: Ord + Add<Output = W> + Sub<Output = W> + Default,
659    {
660        assert!(low < high);
661
662        // Swap a random element (the pivot) in the remaining range with the high
663        slice.swap(high, rand::random_range(low..high));
664
665        let pivot_weight = weight_fn(&slice[high]);
666        let mut sum_weight = W::default();
667        let range = low..=high;
668        for i in range {
669            let weight = weight_fn(&slice[i]);
670            // If the weight belongs on the left side of the pivot, swap
671            if weight < pivot_weight {
672                slice.swap(i, low);
673                low += 1;
674                sum_weight = sum_weight.add(weight);
675            }
676        }
677
678        slice.swap(low, high);
679        (low, pivot_weight, sum_weight)
680    }
681
682    fn recurse_fit_maximal_range<T, F, W>(
683        slice: &mut [T],
684        mut remaining_weight: W,
685        weight_fn: &mut F,
686        low: usize,
687        high: usize,
688        end: &mut usize,
689    ) where
690        F: FnMut(&T) -> W,
691        W: Ord + Add<Output = W> + Sub<Output = W> + Default,
692    {
693        if low == high {
694            let weight = weight_fn(&slice[low]);
695            if weight <= remaining_weight {
696                *end += 1;
697            }
698
699            return;
700        }
701
702        if low < high {
703            let (pivot, pivot_weight, sum) = partition(slice, weight_fn, low, high);
704            if sum <= remaining_weight {
705                // Everything up to the pivot can be included
706                *end += pivot - low;
707                remaining_weight = remaining_weight.sub(sum);
708
709                // Check to see if the pivot itself can be included
710                if pivot_weight <= remaining_weight {
711                    *end += 1;
712                    remaining_weight = remaining_weight.sub(pivot_weight);
713                }
714
715                // Recurse on the right side
716                recurse_fit_maximal_range(slice, remaining_weight, weight_fn, pivot + 1, high, end);
717            } else if pivot > 0 {
718                // Otherwise, we can completely disregard the right side (including the pivot)
719                // and recurse on the left
720                recurse_fit_maximal_range(slice, remaining_weight, weight_fn, low, pivot - 1, end);
721            }
722        }
723    }
724
725    assert!(
726        total_weight >= W::default(),
727        "total weight cannot be negative"
728    );
729
730    if slice.is_empty() {
731        return 0..0;
732    }
733
734    let mut end = 0;
735    recurse_fit_maximal_range(
736        slice,
737        total_weight,
738        &mut weight_fn,
739        0,
740        slice.len() - 1, // won't underflow due to empty check
741        &mut end,
742    );
743
744    0..end
745}
746
747#[cfg(test)]
748mod test {
749    use super::*;
750
751    #[test]
752    fn fit_empty_slice() {
753        let r = fit_longest_range(&mut [], 100, |i| *i);
754        assert!(r.is_empty());
755    }
756
757    #[test]
758    #[should_panic(expected = "total weight cannot be negative")]
759    fn fit_negative_panic() {
760        fit_longest_range(&mut [0], -1, |i| *i);
761    }
762
763    #[test]
764    fn no_fit() {
765        let r = fit_longest_range(&mut [100, 101, 102], 99, |i| *i);
766        assert!(r.is_empty());
767    }
768
769    #[test]
770    fn fit_all() {
771        let r = fit_longest_range(&mut [1, 2, 3, 4, 5], 15, |i| *i);
772        assert_eq!(r.len(), 5);
773
774        let r = fit_longest_range(&mut [5, 4, 3, 2, 1], 20, |i| *i);
775        assert_eq!(r.len(), 5);
776    }
777
778    #[test]
779    fn fit_some() {
780        let s = &mut [8, 2, 2, 3, 2, 1, 2, 4, 1];
781        let r = fit_longest_range(s, 10, |i| *i);
782        assert_eq!(r.len(), 6);
783        assert_eq!(s[r.start..r.end].iter().copied().sum::<i32>(), 10);
784        assert!(s[r.end..].contains(&8));
785        assert!(s[r.end..].contains(&4));
786        assert!(s[r.end..].contains(&3));
787    }
788
789    #[test]
790    fn unlimited_state() {
791        let manager_state = TaskManagerState::<()>::new(u64::MAX, u64::MAX);
792        assert!(manager_state.unlimited());
793    }
794}