spawn_groups 1.1.0

Structured concurrency construct written in Rust, for Rustaceans
Documentation
use crate::shared::{priority::Priority, runtime::RuntimeEngine};
use futures_lite::{Stream, StreamExt};
use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

/// Spawn Group
///
/// A kind of a spawn group that spawns asynchronous child tasks that returns a value of ValueType,
/// that implicitly wait for the spawned tasks to return before being dropped unless by
/// explicitly calling ``dont_wait_at_drop()``
///
/// Child tasks are spawned by calling either ``spawn_task()`` or ``spawn_task_unless_cancelled()`` methods.
///
/// Running child tasks can be cancelled by calling ``cancel_all()`` method.
///
/// Child tasks spawned to a spawn group execute concurrently, and may be scheduled in
/// any order.
///
/// It dereferences into a ``futures`` crate ``Stream`` type where the results of each finished child task is stored and it pops out the result in First-In First-Out
/// FIFO order whenever it is being used
pub struct SpawnGroup<ValueType> {
    runtime: RuntimeEngine<ValueType>,
    /// A field that indicates if the spawn group had been cancelled
    pub is_cancelled: bool,
    wait_at_drop: bool,
}

impl<ValueType> SpawnGroup<ValueType> {
    /// Instantiates `SpawnGroup` with a specific number of threads to use in the underlying threadpool when polling futures
    ///
    /// # Parameters
    ///
    /// * `num_of_threads`: number of threads to use
    pub fn new(num_of_threads: usize) -> Self {
        Self {
            runtime: RuntimeEngine::new(num_of_threads),
            is_cancelled: false,
            wait_at_drop: true,
        }
    }
}

impl<ValueType> Default for SpawnGroup<ValueType> {
    /// Instantiates `SpawnGroup` with the number of threads as the number of cores as the system to use in the underlying threadpool when polling futures
    fn default() -> Self {
        Self {
            is_cancelled: false,
            runtime: RuntimeEngine::default(),
            wait_at_drop: true,
        }
    }
}

impl<ValueType> SpawnGroup<ValueType> {
    /// Don't implicity wait for spawned child tasks to finish before being dropped
    pub fn dont_wait_at_drop(&mut self) {
        self.wait_at_drop = false;
    }

    /// Cancels all running task in the spawn group
    pub fn cancel_all(&mut self) {
        self.runtime.cancel();
        self.is_cancelled = true;
    }
}

impl<ValueType> SpawnGroup<ValueType> {
    /// Spawns a new task into the spawn group
    /// # Parameters
    ///
    /// * `priority`: priority to use
    /// * `closure`: an async closure that return a value of type ``ValueType``
    pub fn spawn_task(
        &mut self,
        priority: Priority,
        closure: impl Future<Output = ValueType> + Send + 'static,
    ) {
        self.runtime.write_task(priority, closure);
    }

    /// Spawn a new task only if the group is not cancelled yet,
    /// otherwise does nothing
    ///
    /// # Parameters
    ///
    /// * `priority`: priority to use
    /// * `closure`: an async closure that return a value of type ``ValueType``
    pub fn spawn_task_unlessed_cancelled(
        &mut self,
        priority: Priority,
        closure: impl Future<Output = ValueType> + Send + 'static,
    ) {
        if !self.is_cancelled {
            self.runtime.write_task(priority, closure)
        }
    }
}

impl<ValueType> SpawnGroup<ValueType> {
    /// Returns the first element of the stream, or None if it is empty.
    pub async fn first(&self) -> Option<ValueType> {
        self.runtime.stream().first().await
    }
}

impl<ValueType> SpawnGroup<ValueType> {
    /// Waits for all remaining child tasks for finish.
    pub async fn wait_for_all(&mut self) {
        self.wait_non_async()
    }

    /// Waits for all remaining child tasks for finish in non async context.
    pub fn wait_non_async(&mut self) {
        self.runtime.wait_for_all_tasks();
    }
}

impl<ValueType> SpawnGroup<ValueType> {
    /// A Boolean value that indicates whether the group has any remaining tasks.
    ///
    /// At the start of the body of a ``with_spawn_group()`` call, , or before calling ``spawn_task`` or ``spawn_task_unless_cancelled`` methods
    /// the spawn group is always empty.
    ///  
    /// # Returns
    /// - true: if there's no child task still running
    /// - false: if any child task is still running
    pub fn is_empty(&self) -> bool {
        self.runtime.task_count() == 0
    }
}

impl<ValueType> SpawnGroup<ValueType> {
    /// Returns an instance of the `Stream` trait.
    pub fn stream(&self) -> impl Stream<Item = ValueType> {
        self.runtime.stream()
    }
}

impl<ValueType> SpawnGroup<ValueType> {
    /// Waits for a specific number of spawned child tasks to finish and returns their respectively result as a vector  
    ///
    /// # Panics
    /// If the `of_count` parameter is larger than the number of already spawned child tasks, this method panics
    ///
    /// Remember whenever you call either ``wait_for_all`` or ``cancel_all`` methods, the child tasks' count reverts back to zero
    ///
    /// # Parameter
    /// * `of_count`: The number of running child tasks to wait for their results to return
    ///
    /// # Returns
    /// Returns a vector of length `of_count` elements from the spawn group instance
    #[deprecated(since = "2.0.0", note = "Buggy")]
    pub async fn get_chunks(&self, of_count: usize) -> Vec<ValueType> {
        if of_count == 0 {
            return vec![];
        }
        let buffer_count = self.runtime.stream().buffer_count().await;
        if buffer_count == of_count {
            let mut count: usize = of_count;
            let mut results: Vec<ValueType> = vec![];
            while count != 0 {
                if let Some(result) = self.runtime.stream().next().await {
                    results.push(result);
                    count -= 1;
                }
            }
            return results;
        }
        if of_count > self.runtime.task_count() {
            panic!("The argument supplied cannot be greater than the number of spawned child tasks")
        }
        let mut count: usize = of_count;
        let mut results: Vec<ValueType> = vec![];
        while count != 0 {
            if let Some(result) = self.runtime.stream().next().await {
                results.push(result);
                count -= 1;
            }
        }
        results
    }
}

impl<ValueType> Drop for SpawnGroup<ValueType> {
    fn drop(&mut self) {
        if self.wait_at_drop {
            self.runtime.wait_for_all_tasks();
        }
        self.runtime.end()
    }
}

impl<ValueType> Stream for SpawnGroup<ValueType> {
    type Item = ValueType;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.runtime.stream().poll_next(cx)
    }
}