synd-term 0.3.2

terminal feed viewer
Documentation
use std::{collections::VecDeque, num::NonZero};

use futures_util::{future::BoxFuture, stream::FuturesUnordered, StreamExt as _};

use crate::command::Command;

pub(crate) type JobFuture = BoxFuture<'static, anyhow::Result<Command>>;

pub(crate) struct Jobs {
    futures: FuturesUnordered<JobFuture>,
    delay_queue: VecDeque<JobFuture>,
    concurrent_limit: NonZero<usize>,
}

impl Jobs {
    pub fn new(concurrent_limit: NonZero<usize>) -> Self {
        Self {
            futures: FuturesUnordered::new(),
            delay_queue: VecDeque::new(),
            concurrent_limit,
        }
    }

    pub(crate) fn push(&mut self, job: JobFuture) {
        self.delay_queue.push_back(job);

        while self.concurrent_limit.get() > self.futures.len() {
            let Some(job) = self.delay_queue.pop_front() else {
                break;
            };

            self.futures.push(job);
        }

        tracing::trace!(
            "Job delay_queue: {} futures: {}",
            self.delay_queue.len(),
            self.futures.len()
        );
    }

    pub(crate) async fn next(&mut self) -> Option<anyhow::Result<Command>> {
        debug_assert!(self.concurrent_limit.get() >= self.futures.len());

        match self.futures.next().await {
            Some(result) => {
                if let Some(job) = self.delay_queue.pop_front() {
                    self.futures.push(job);
                }
                Some(result)
            }
            None => None,
        }
    }

    #[cfg(feature = "integration")]
    pub(crate) fn is_empty(&self) -> bool {
        self.futures.is_empty() && self.delay_queue.is_empty()
    }
}

#[cfg(test)]
mod tests {
    use futures_util::FutureExt as _;

    use super::*;
    use std::future;

    #[tokio::test]
    async fn respect_concurrent_limit() {
        let mut job = Jobs::new(NonZero::new(2).unwrap());

        for _ in 0..3 {
            job.push(future::ready(Ok(Command::Nop)).boxed());
        }

        assert_eq!(job.futures.len(), 2);
        assert_eq!(job.delay_queue.len(), 1);

        let mut count = 0;
        loop {
            if let Some(result) = job.next().await {
                assert!(matches!(result, Ok(Command::Nop)));
                count += 1;
            }
            if count == 3 {
                break;
            }
        }
    }
}