use crate::shared::{priority::Priority, runtime::RuntimeEngine};
use futures_lite::{Stream, StreamExt};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
pub struct ErrSpawnGroup<ValueType, ErrorType> {
runtime: RuntimeEngine<Result<ValueType, ErrorType>>,
pub is_cancelled: bool,
wait_at_drop: bool,
}
impl<ValueType, ErrorType> ErrSpawnGroup<ValueType, ErrorType> {
pub fn new(num_of_threads: usize) -> Self {
Self {
runtime: RuntimeEngine::new(num_of_threads),
is_cancelled: false,
wait_at_drop: true,
}
}
}
impl<ValueType, ErrorType> Default for ErrSpawnGroup<ValueType, ErrorType> {
fn default() -> Self {
Self {
is_cancelled: false,
runtime: RuntimeEngine::default(),
wait_at_drop: true,
}
}
}
impl<ValueType, ErrorType> ErrSpawnGroup<ValueType, ErrorType> {
pub fn dont_wait_at_drop(&mut self) {
self.wait_at_drop = false;
}
}
impl<ValueType, ErrorType> ErrSpawnGroup<ValueType, ErrorType> {
pub fn spawn_task<F>(&mut self, priority: Priority, closure: F)
where
F: Future<Output = Result<ValueType, ErrorType>> + Send + 'static,
{
self.runtime.write_task(priority, closure);
}
pub fn cancel_all(&mut self) {
self.runtime.cancel();
self.is_cancelled = true;
}
pub fn spawn_task_unlessed_cancelled<F>(&mut self, priority: Priority, closure: F)
where
F: Future<Output = Result<ValueType, ErrorType>> + Send + 'static,
{
if !self.is_cancelled {
self.runtime.write_task(priority, closure)
}
}
}
impl<ValueType, ErrorType> ErrSpawnGroup<ValueType, ErrorType> {
pub async fn first(&self) -> Option<Result<ValueType, ErrorType>> {
self.runtime.stream().first().await
}
}
impl<ValueType, ErrorType> ErrSpawnGroup<ValueType, ErrorType> {
pub fn stream(&self) -> impl Stream<Item = Result<ValueType, ErrorType>> {
self.runtime.stream()
}
}
impl<ValueType, ErrorType> ErrSpawnGroup<ValueType, ErrorType> {
pub async fn wait_for_all(&mut self) {
self.wait_non_async()
}
pub fn wait_non_async(&mut self) {
self.runtime.wait_for_all_tasks();
}
}
impl<ValueType, ErrorType> ErrSpawnGroup<ValueType, ErrorType> {
pub fn is_empty(&self) -> bool {
self.runtime.task_count() == 0
}
}
impl<ValueType, ErrorType> ErrSpawnGroup<ValueType, ErrorType> {
#[deprecated(since = "2.0.0", note = "Buggy")]
pub async fn get_chunks(&self, of_count: usize) -> Vec<Result<ValueType, ErrorType>> {
if of_count == 0 {
return vec![];
}
let buffer_count: usize = self.runtime.stream().buffer_count().await;
if buffer_count == of_count {
let mut count: usize = of_count;
let mut results: Vec<Result<ValueType, ErrorType>> = vec![];
while count != 0 {
if let Some(result) = self.runtime.stream().next().await {
results.push(result);
count -= 1;
}
}
return results;
}
if of_count > self.runtime.task_count() {
panic!("The argument supplied cannot be greater than the number of spawned child tasks")
}
let mut count: usize = of_count;
let mut results: Vec<Result<ValueType, ErrorType>> = vec![];
while count != 0 {
if let Some(result) = self.runtime.stream().next().await {
results.push(result);
count -= 1;
}
}
results
}
}
impl<ValueType, ErrorType> Drop for ErrSpawnGroup<ValueType, ErrorType> {
fn drop(&mut self) {
if self.wait_at_drop {
self.runtime.wait_for_all_tasks();
}
self.runtime.end()
}
}
impl<ValueType, ErrorType> Stream for ErrSpawnGroup<ValueType, ErrorType> {
type Item = Result<ValueType, ErrorType>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.runtime.stream().poll_next(cx)
}
}