wdl_engine/
backend.rs

1//! Implementation of task execution backends.
2
3use std::collections::HashMap;
4use std::collections::VecDeque;
5use std::fmt;
6use std::future::Future;
7use std::ops::Add;
8use std::ops::Range;
9use std::ops::Sub;
10use std::path::Path;
11use std::path::PathBuf;
12use std::sync::Arc;
13
14use anyhow::Result;
15use anyhow::anyhow;
16use futures::future::BoxFuture;
17use indexmap::IndexMap;
18use ordered_float::OrderedFloat;
19use tokio::sync::mpsc;
20use tokio::sync::oneshot;
21use tokio::sync::oneshot::Receiver;
22use tokio::task::JoinSet;
23use tokio_util::sync::CancellationToken;
24use tracing::debug;
25
26use crate::Input;
27use crate::Value;
28use crate::http::Transferer;
29use crate::path::EvaluationPath;
30
31mod docker;
32mod local;
33mod lsf_apptainer;
34mod tes;
35
36pub use docker::*;
37pub use local::*;
38pub use lsf_apptainer::*;
39pub use tes::*;
40
41/// The default work directory name.
42pub(crate) const WORK_DIR_NAME: &str = "work";
43
44/// The default command file name.
45pub(crate) const COMMAND_FILE_NAME: &str = "command";
46
47/// The default stdout file name.
48pub(crate) const STDOUT_FILE_NAME: &str = "stdout";
49
50/// The default stderr file name.
51pub(crate) const STDERR_FILE_NAME: &str = "stderr";
52
53/// The number of initial expected task names.
54///
55/// This controls the initial size of the bloom filter and how many names are
56/// prepopulated into a name generator.
57const INITIAL_EXPECTED_NAMES: usize = 1000;
58
59/// Represents constraints applied to a task's execution.
60pub struct TaskExecutionConstraints {
61    /// The container the task will run in.
62    ///
63    /// A value of `None` indicates the task will run on the host.
64    pub container: Option<String>,
65    /// The allocated number of CPUs; must be greater than 0.
66    pub cpu: f64,
67    /// The allocated memory in bytes; must be greater than 0.
68    pub memory: i64,
69    /// A list with one specification per allocated GPU.
70    ///
71    /// The specification is execution engine-specific.
72    ///
73    /// If no GPUs were allocated, then the value must be an empty list.
74    pub gpu: Vec<String>,
75    /// A list with one specification per allocated FPGA.
76    ///
77    /// The specification is execution engine-specific.
78    ///
79    /// If no FPGAs were allocated, then the value must be an empty list.
80    pub fpga: Vec<String>,
81    /// A map with one entry for each disk mount point.
82    ///
83    /// The key is the mount point and the value is the initial amount of disk
84    /// space allocated, in bytes.
85    ///
86    /// The execution engine must, at a minimum, provide one entry for each disk
87    /// mount point requested, but may provide more.
88    ///
89    /// The amount of disk space available for a given mount point may increase
90    /// during the lifetime of the task (e.g., autoscaling volumes provided by
91    /// some cloud services).
92    pub disks: IndexMap<String, i64>,
93}
94
95/// Represents information for spawning a task.
96pub struct TaskSpawnInfo {
97    /// The command of the task.
98    command: String,
99    /// The inputs for task.
100    inputs: Vec<Input>,
101    /// The requirements of the task.
102    requirements: Arc<HashMap<String, Value>>,
103    /// The hints of the task.
104    hints: Arc<HashMap<String, Value>>,
105    /// The environment variables of the task.
106    env: Arc<IndexMap<String, String>>,
107    /// The transferer to use for uploading inputs.
108    transferer: Arc<dyn Transferer>,
109}
110
111impl TaskSpawnInfo {
112    /// Constructs a new task spawn information.
113    pub fn new(
114        command: String,
115        inputs: Vec<Input>,
116        requirements: Arc<HashMap<String, Value>>,
117        hints: Arc<HashMap<String, Value>>,
118        env: Arc<IndexMap<String, String>>,
119        transferer: Arc<dyn Transferer>,
120    ) -> Self {
121        Self {
122            command,
123            inputs,
124            requirements,
125            hints,
126            env,
127            transferer,
128        }
129    }
130}
131
132impl fmt::Debug for TaskSpawnInfo {
133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134        f.debug_struct("TaskSpawnInfo")
135            .field("command", &self.command)
136            .field("inputs", &self.inputs)
137            .field("requirements", &self.requirements)
138            .field("hints", &self.hints)
139            .field("env", &self.env)
140            .field("transferer", &"<transferer>")
141            .finish()
142    }
143}
144
145/// Represents a request to spawn a task.
146#[derive(Debug)]
147pub struct TaskSpawnRequest {
148    /// The id of the task being spawned.
149    id: String,
150    /// The information for the task to spawn.
151    info: TaskSpawnInfo,
152    /// The attempt number for the spawn request.
153    attempt: u64,
154    /// The attempt directory for the task's execution.
155    attempt_dir: PathBuf,
156    /// The root directory for the evaluation.
157    root_dir: PathBuf,
158    /// The temp directory for the evaluation.
159    temp_dir: PathBuf,
160}
161
162impl TaskSpawnRequest {
163    /// Creates a new task spawn request.
164    pub fn new(
165        id: String,
166        info: TaskSpawnInfo,
167        attempt: u64,
168        attempt_dir: PathBuf,
169        root_dir: PathBuf,
170        temp_dir: PathBuf,
171    ) -> Self {
172        Self {
173            id,
174            info,
175            attempt,
176            attempt_dir,
177            root_dir,
178            temp_dir,
179        }
180    }
181
182    /// The identifier of the task being spawned.
183    pub fn id(&self) -> &str {
184        &self.id
185    }
186
187    /// Gets the command for the task.
188    pub fn command(&self) -> &str {
189        &self.info.command
190    }
191
192    /// Gets the inputs for the task.
193    pub fn inputs(&self) -> &[Input] {
194        &self.info.inputs
195    }
196
197    /// Gets the requirements of the task.
198    pub fn requirements(&self) -> &HashMap<String, Value> {
199        &self.info.requirements
200    }
201
202    /// Gets the hints of the task.
203    pub fn hints(&self) -> &HashMap<String, Value> {
204        &self.info.hints
205    }
206
207    /// Gets the environment variables of the task.
208    pub fn env(&self) -> &IndexMap<String, String> {
209        &self.info.env
210    }
211
212    /// Gets the transferer to use for uploading inputs.
213    pub fn transferer(&self) -> &Arc<dyn Transferer> {
214        &self.info.transferer
215    }
216
217    /// Gets the attempt number for the task's execution.
218    ///
219    /// The attempt number starts at 0.
220    pub fn attempt(&self) -> u64 {
221        self.attempt
222    }
223
224    /// Gets the attempt directory for the task's execution.
225    pub fn attempt_dir(&self) -> &Path {
226        &self.attempt_dir
227    }
228
229    /// The root directory for the evaluation.
230    pub fn root_dir(&self) -> &Path {
231        &self.root_dir
232    }
233
234    /// The temp directory for the evaluation.
235    pub fn temp_dir(&self) -> &Path {
236        &self.temp_dir
237    }
238}
239
240/// Represents the result of a task's execution.
241#[derive(Debug)]
242pub struct TaskExecutionResult {
243    /// Stores the task process exit code.
244    pub exit_code: i32,
245    /// The task's working directory.
246    pub work_dir: EvaluationPath,
247    /// The value of the task's stdout file.
248    pub stdout: Value,
249    /// The value of the task's stderr file.
250    pub stderr: Value,
251}
252
253/// Represents a task execution backend.
254pub trait TaskExecutionBackend: Send + Sync {
255    /// Gets the maximum concurrent tasks supported by the backend.
256    fn max_concurrency(&self) -> u64;
257
258    /// Gets the execution constraints given a task's requirements and hints.
259    ///
260    /// Returns an error if the task cannot be constrained for the execution
261    /// environment or if the task specifies invalid requirements.
262    fn constraints(
263        &self,
264        requirements: &HashMap<String, Value>,
265        hints: &HashMap<String, Value>,
266    ) -> Result<TaskExecutionConstraints>;
267
268    /// Gets the guest (container) inputs directory of the backend.
269    ///
270    /// Returns `None` if the backend does not execute tasks in a container.
271    ///
272    /// The returned path is expected to be Unix style and end with a backslash.
273    fn guest_inputs_dir(&self) -> Option<&'static str>;
274
275    /// Determines if the backend needs local inputs.
276    ///
277    /// Backends that run tasks locally or from a shared file system will return
278    /// `true`.
279    fn needs_local_inputs(&self) -> bool;
280
281    /// Spawns a task with the execution backend.
282    ///
283    /// Returns a oneshot receiver for awaiting the completion of the task.
284    fn spawn(
285        &self,
286        request: TaskSpawnRequest,
287        token: CancellationToken,
288    ) -> Result<Receiver<Result<TaskExecutionResult>>>;
289
290    /// Performs cleanup operations after task execution completes.
291    ///
292    /// Returns `None` if no cleanup is required.
293    fn cleanup<'a>(
294        &'a self,
295        work_dir: &'a EvaluationPath,
296        token: CancellationToken,
297    ) -> Option<BoxFuture<'a, ()>> {
298        let _ = work_dir;
299        let _ = token;
300        None
301    }
302}
303
304/// A trait implemented by backend requests.
305trait TaskManagerRequest: Send + Sync + 'static {
306    /// Gets the requested CPU allocation from the request.
307    fn cpu(&self) -> f64;
308
309    /// Gets the requested memory allocation from the request, in bytes.
310    fn memory(&self) -> u64;
311
312    /// Runs the request.
313    fn run(self) -> impl Future<Output = Result<TaskExecutionResult>> + Send;
314}
315
316/// Represents a response internal to the task manager.
317struct TaskManagerResponse {
318    /// The previous CPU allocation from the request.
319    cpu: f64,
320    /// The previous memory allocation from the request.
321    memory: u64,
322    /// The result of the task's execution.
323    result: Result<TaskExecutionResult>,
324    /// The channel to send the task's execution result back on.
325    tx: oneshot::Sender<Result<TaskExecutionResult>>,
326}
327
328/// Represents state used by the task manager.
329struct TaskManagerState<Req> {
330    /// The amount of available CPU remaining.
331    cpu: OrderedFloat<f64>,
332    /// The amount of available memory remaining, in bytes.
333    memory: u64,
334    /// The set of spawned tasks.
335    spawned: JoinSet<TaskManagerResponse>,
336    /// The queue of parked spawn requests.
337    parked: VecDeque<(Req, oneshot::Sender<Result<TaskExecutionResult>>)>,
338}
339
340impl<Req> TaskManagerState<Req> {
341    /// Constructs a new task manager state with the given total CPU and memory.
342    fn new(cpu: u64, memory: u64) -> Self {
343        Self {
344            cpu: OrderedFloat(cpu as f64),
345            memory,
346            spawned: Default::default(),
347            parked: Default::default(),
348        }
349    }
350
351    /// Determines if the resources are unlimited.
352    fn unlimited(&self) -> bool {
353        self.cpu == u64::MAX as f64 && self.memory == u64::MAX
354    }
355}
356
357/// Responsible for managing tasks based on available host resources.
358#[derive(Debug)]
359struct TaskManager<Req> {
360    /// The sender for new spawn requests.
361    tx: mpsc::UnboundedSender<(Req, oneshot::Sender<Result<TaskExecutionResult>>)>,
362}
363
364impl<Req> TaskManager<Req>
365where
366    Req: TaskManagerRequest,
367{
368    /// Constructs a new task manager with the given total CPU, maximum CPU per
369    /// request, total memory, and maximum memory per request.
370    fn new(cpu: u64, max_cpu: u64, memory: u64, max_memory: u64) -> Self {
371        let (tx, rx) = mpsc::unbounded_channel();
372
373        tokio::spawn(async move {
374            Self::run_request_queue(rx, cpu, max_cpu, memory, max_memory).await;
375        });
376
377        Self { tx }
378    }
379
380    /// Constructs a new task manager that does not limit requests based on
381    /// available resources.
382    fn new_unlimited(max_cpu: u64, max_memory: u64) -> Self {
383        Self::new(u64::MAX, max_cpu, u64::MAX, max_memory)
384    }
385
386    /// Sends a request to the task manager's queue.
387    fn send(&self, request: Req, completed: oneshot::Sender<Result<TaskExecutionResult>>) {
388        self.tx.send((request, completed)).ok();
389    }
390
391    /// Runs the request queue.
392    async fn run_request_queue(
393        mut rx: mpsc::UnboundedReceiver<(Req, oneshot::Sender<Result<TaskExecutionResult>>)>,
394        cpu: u64,
395        max_cpu: u64,
396        memory: u64,
397        max_memory: u64,
398    ) {
399        let mut state = TaskManagerState::new(cpu, memory);
400
401        loop {
402            // If there aren't any spawned tasks, wait for a spawn request only
403            if state.spawned.is_empty() {
404                assert!(
405                    state.parked.is_empty(),
406                    "there can't be any parked requests if there are no spawned tasks"
407                );
408                match rx.recv().await {
409                    Some((req, completed)) => {
410                        Self::handle_spawn_request(&mut state, max_cpu, max_memory, req, completed);
411                        continue;
412                    }
413                    None => break,
414                }
415            }
416
417            // Otherwise, wait for a spawn request or a completed task
418            tokio::select! {
419                request = rx.recv() => {
420                    match request {
421                        Some((req, completed)) => {
422                            Self::handle_spawn_request(&mut state, max_cpu, max_memory, req, completed);
423                        }
424                        None => break,
425                    }
426                }
427                Some(Ok(response)) = state.spawned.join_next() => {
428                    if !state.unlimited() {
429                        state.cpu += response.cpu;
430                        state.memory += response.memory;
431                    }
432
433                    response.tx.send(response.result).ok();
434                    Self::spawn_parked_tasks(&mut state, max_cpu, max_memory);
435                }
436            }
437        }
438    }
439
440    /// Handles a spawn request by either parking it (not enough resources
441    /// currently available) or by spawning it.
442    fn handle_spawn_request(
443        state: &mut TaskManagerState<Req>,
444        max_cpu: u64,
445        max_memory: u64,
446        request: Req,
447        completed: oneshot::Sender<Result<TaskExecutionResult>>,
448    ) {
449        // Ensure the request does not exceed the maximum CPU
450        let cpu = request.cpu();
451        if cpu > max_cpu as f64 {
452            completed
453                .send(Err(anyhow!(
454                    "requested task CPU count of {cpu} exceeds the maximum CPU count of {max_cpu}",
455                )))
456                .ok();
457            return;
458        }
459
460        // Ensure the request does not exceed the maximum memory
461        let memory = request.memory();
462        if memory > max_memory {
463            completed
464                .send(Err(anyhow!(
465                    "requested task memory of {memory} byte{s} exceeds the maximum memory of \
466                     {max_memory}",
467                    s = if memory == 1 { "" } else { "s" }
468                )))
469                .ok();
470            return;
471        }
472
473        if !state.unlimited() {
474            // If the request can't be processed due to resource constraints, park the
475            // request for now. When a task completes and resources become available,
476            // we'll unpark the request
477            if cpu > state.cpu.into() || memory > state.memory {
478                debug!(
479                    "parking task due to insufficient resources: task reserves {cpu} CPU(s) and \
480                     {memory} bytes of memory but there are only {cpu_remaining} CPU(s) and \
481                     {memory_remaining} bytes of memory available",
482                    cpu_remaining = state.cpu,
483                    memory_remaining = state.memory
484                );
485                state.parked.push_back((request, completed));
486                return;
487            }
488
489            // Decrement the resource counts and spawn the task
490            state.cpu -= cpu;
491            state.memory -= memory;
492            debug!(
493                "spawning task with {cpu} CPUs and {memory} bytes of memory remaining",
494                cpu = state.cpu,
495                memory = state.memory
496            );
497        }
498
499        state.spawned.spawn(async move {
500            TaskManagerResponse {
501                cpu: request.cpu(),
502                memory: request.memory(),
503                result: request.run().await,
504                tx: completed,
505            }
506        });
507    }
508
509    /// Responsible for spawning parked tasks.
510    fn spawn_parked_tasks(state: &mut TaskManagerState<Req>, max_cpu: u64, max_memory: u64) {
511        if state.parked.is_empty() {
512            return;
513        }
514
515        debug!(
516            "attempting to unpark tasks with {cpu} CPUs and {memory} bytes of memory available",
517            cpu = state.cpu,
518            memory = state.memory,
519        );
520
521        // This algorithm is intended to unpark the greatest number of tasks.
522        //
523        // It first finds the greatest subset of tasks that are constrained by CPU and
524        // then by memory.
525        //
526        // Next it finds the greatest subset of tasks that are constrained by memory and
527        // then by CPU.
528        //
529        // It then unparks whichever subset is greater.
530        //
531        // The process is repeated until both subsets reach zero length.
532        loop {
533            let cpu_by_memory_len = {
534                // Start by finding the longest range in the parked set that could run based on
535                // CPU reservation
536                let range =
537                    fit_longest_range(state.parked.make_contiguous(), state.cpu, |(r, ..)| {
538                        OrderedFloat(r.cpu())
539                    });
540
541                // Next, find the longest subset of that subset that could run based on memory
542                // reservation
543                fit_longest_range(
544                    &mut state.parked.make_contiguous()[range],
545                    state.memory,
546                    |(r, ..)| r.memory(),
547                )
548                .len()
549            };
550
551            // Next, find the longest range in the parked set that could run based on memory
552            // reservation
553            let memory_by_cpu =
554                fit_longest_range(state.parked.make_contiguous(), state.memory, |(r, ..)| {
555                    r.memory()
556                });
557
558            // Next, find the longest subset of that subset that could run based on CPU
559            // reservation
560            let memory_by_cpu = fit_longest_range(
561                &mut state.parked.make_contiguous()[memory_by_cpu],
562                state.cpu,
563                |(r, ..)| OrderedFloat(r.cpu()),
564            );
565
566            // If both subsets are empty, break out
567            if cpu_by_memory_len == 0 && memory_by_cpu.is_empty() {
568                break;
569            }
570
571            // Check to see which subset is greater (for equivalence, use the one we don't
572            // need to refit for)
573            let range = if memory_by_cpu.len() >= cpu_by_memory_len {
574                memory_by_cpu
575            } else {
576                // We need to refit because the above calculation of `memory_by_cpu` mutated the
577                // parked list
578                let range =
579                    fit_longest_range(state.parked.make_contiguous(), state.cpu, |(r, ..)| {
580                        OrderedFloat(r.cpu())
581                    });
582
583                fit_longest_range(
584                    &mut state.parked.make_contiguous()[range],
585                    state.memory,
586                    |(r, ..)| r.memory(),
587                )
588            };
589
590            debug!("unparking {len} task(s)", len = range.len());
591
592            assert_eq!(
593                range.start, 0,
594                "expected the fit tasks to be at the front of the queue"
595            );
596            for _ in range {
597                let (request, completed) = state.parked.pop_front().unwrap();
598
599                debug!(
600                    "unparking task with reservation of {cpu} CPU(s) and {memory} bytes of memory",
601                    cpu = request.cpu(),
602                    memory = request.memory(),
603                );
604
605                Self::handle_spawn_request(state, max_cpu, max_memory, request, completed);
606            }
607        }
608    }
609}
610
611/// Determines the longest range in a slice where the sum of the weights of the
612/// elements in the returned range is less than or equal to the supplied total
613/// weight.
614///
615/// The returned range always starts at zero as this algorithm will partially
616/// sort the slice.
617///
618/// Due to the partial sorting, the provided slice will have its elements
619/// rearranged. As the function modifies the slice in-place, this function does
620/// not make any allocations.
621///
622/// # Implementation
623///
624/// This function is implemented using a modified quick sort algorithm as a
625/// solution to the more general "0/1 knapsack" problem where each item has an
626/// equal profit value; this maximizes for the number of items to put
627/// into the knapsack (i.e. longest range that fits).
628///
629/// Using a uniform random pivot point, it partitions the input into two sides:
630/// the left side where all weights are less than the pivot and the right side
631/// where all weights are equal to or greater than the pivot.
632///
633/// It then checks to see if the total weight of the left side is less than or
634/// equal to the total remaining weight; if it is, every element in
635/// the left side is considered as part of the output and it recurses on the
636/// right side.
637///
638/// If the total weight of the left side is greater than the remaining weight
639/// budget, it can completely ignore the right side and instead recurse on the
640/// left side.
641///
642/// The algorithm stops when the partition size reaches zero.
643///
644/// # Panics
645///
646/// Panics if the supplied weight is a negative value.
647fn fit_longest_range<T, F, W>(slice: &mut [T], total_weight: W, mut weight_fn: F) -> Range<usize>
648where
649    F: FnMut(&T) -> W,
650    W: Ord + Add<Output = W> + Sub<Output = W> + Default,
651{
652    /// Partitions the slice so that the weight of every element to the left
653    /// of the pivot is less than the pivot's weight and every element to the
654    /// right of the pivot is greater than or equal to the pivot's weight.
655    ///
656    /// Returns the pivot index, pivot weight, and the sum of the left side
657    /// element's weights.
658    fn partition<T, F, W>(
659        slice: &mut [T],
660        weight_fn: &mut F,
661        mut low: usize,
662        high: usize,
663    ) -> (usize, W, W)
664    where
665        F: FnMut(&T) -> W,
666        W: Ord + Add<Output = W> + Sub<Output = W> + Default,
667    {
668        assert!(low < high);
669
670        // Swap a random element (the pivot) in the remaining range with the high
671        slice.swap(high, rand::random_range(low..high));
672
673        let pivot_weight = weight_fn(&slice[high]);
674        let mut sum_weight = W::default();
675        let range = low..=high;
676        for i in range {
677            let weight = weight_fn(&slice[i]);
678            // If the weight belongs on the left side of the pivot, swap
679            if weight < pivot_weight {
680                slice.swap(i, low);
681                low += 1;
682                sum_weight = sum_weight.add(weight);
683            }
684        }
685
686        slice.swap(low, high);
687        (low, pivot_weight, sum_weight)
688    }
689
690    fn recurse_fit_maximal_range<T, F, W>(
691        slice: &mut [T],
692        mut remaining_weight: W,
693        weight_fn: &mut F,
694        low: usize,
695        high: usize,
696        end: &mut usize,
697    ) where
698        F: FnMut(&T) -> W,
699        W: Ord + Add<Output = W> + Sub<Output = W> + Default,
700    {
701        if low == high {
702            let weight = weight_fn(&slice[low]);
703            if weight <= remaining_weight {
704                *end += 1;
705            }
706
707            return;
708        }
709
710        if low < high {
711            let (pivot, pivot_weight, sum) = partition(slice, weight_fn, low, high);
712            if sum <= remaining_weight {
713                // Everything up to the pivot can be included
714                *end += pivot - low;
715                remaining_weight = remaining_weight.sub(sum);
716
717                // Check to see if the pivot itself can be included
718                if pivot_weight <= remaining_weight {
719                    *end += 1;
720                    remaining_weight = remaining_weight.sub(pivot_weight);
721                }
722
723                // Recurse on the right side
724                recurse_fit_maximal_range(slice, remaining_weight, weight_fn, pivot + 1, high, end);
725            } else if pivot > 0 {
726                // Otherwise, we can completely disregard the right side (including the pivot)
727                // and recurse on the left
728                recurse_fit_maximal_range(slice, remaining_weight, weight_fn, low, pivot - 1, end);
729            }
730        }
731    }
732
733    assert!(
734        total_weight >= W::default(),
735        "total weight cannot be negative"
736    );
737
738    if slice.is_empty() {
739        return 0..0;
740    }
741
742    let mut end = 0;
743    recurse_fit_maximal_range(
744        slice,
745        total_weight,
746        &mut weight_fn,
747        0,
748        slice.len() - 1, // won't underflow due to empty check
749        &mut end,
750    );
751
752    0..end
753}
754
755#[cfg(test)]
756mod test {
757    use super::*;
758
759    #[test]
760    fn fit_empty_slice() {
761        let r = fit_longest_range(&mut [], 100, |i| *i);
762        assert!(r.is_empty());
763    }
764
765    #[test]
766    #[should_panic(expected = "total weight cannot be negative")]
767    fn fit_negative_panic() {
768        fit_longest_range(&mut [0], -1, |i| *i);
769    }
770
771    #[test]
772    fn no_fit() {
773        let r = fit_longest_range(&mut [100, 101, 102], 99, |i| *i);
774        assert!(r.is_empty());
775    }
776
777    #[test]
778    fn fit_all() {
779        let r = fit_longest_range(&mut [1, 2, 3, 4, 5], 15, |i| *i);
780        assert_eq!(r.len(), 5);
781
782        let r = fit_longest_range(&mut [5, 4, 3, 2, 1], 20, |i| *i);
783        assert_eq!(r.len(), 5);
784    }
785
786    #[test]
787    fn fit_some() {
788        let s = &mut [8, 2, 2, 3, 2, 1, 2, 4, 1];
789        let r = fit_longest_range(s, 10, |i| *i);
790        assert_eq!(r.len(), 6);
791        assert_eq!(s[r.start..r.end].iter().copied().sum::<i32>(), 10);
792        assert!(s[r.end..].contains(&8));
793        assert!(s[r.end..].contains(&4));
794        assert!(s[r.end..].contains(&3));
795    }
796
797    #[test]
798    fn unlimited_state() {
799        let manager_state = TaskManagerState::<()>::new(u64::MAX, u64::MAX);
800        assert!(manager_state.unlimited());
801    }
802}