use crate::shared::{priority::Priority, runtime::RuntimeEngine};
use futures_lite::{Stream, StreamExt};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
pub struct SpawnGroup<ValueType> {
runtime: RuntimeEngine<ValueType>,
pub is_cancelled: bool,
wait_at_drop: bool,
}
impl<ValueType> SpawnGroup<ValueType> {
pub fn new(num_of_threads: usize) -> Self {
Self {
runtime: RuntimeEngine::new(num_of_threads),
is_cancelled: false,
wait_at_drop: true,
}
}
}
impl<ValueType> Default for SpawnGroup<ValueType> {
fn default() -> Self {
Self {
is_cancelled: false,
runtime: RuntimeEngine::default(),
wait_at_drop: true,
}
}
}
impl<ValueType> SpawnGroup<ValueType> {
pub fn dont_wait_at_drop(&mut self) {
self.wait_at_drop = false;
}
pub fn cancel_all(&mut self) {
self.runtime.cancel();
self.is_cancelled = true;
}
}
impl<ValueType> SpawnGroup<ValueType> {
pub fn spawn_task(
&mut self,
priority: Priority,
closure: impl Future<Output = ValueType> + Send + 'static,
) {
self.runtime.write_task(priority, closure);
}
pub fn spawn_task_unlessed_cancelled(
&mut self,
priority: Priority,
closure: impl Future<Output = ValueType> + Send + 'static,
) {
if !self.is_cancelled {
self.runtime.write_task(priority, closure)
}
}
}
impl<ValueType> SpawnGroup<ValueType> {
pub async fn first(&self) -> Option<ValueType> {
self.runtime.stream().first().await
}
}
impl<ValueType> SpawnGroup<ValueType> {
pub async fn wait_for_all(&mut self) {
self.wait_non_async()
}
pub fn wait_non_async(&mut self) {
self.runtime.wait_for_all_tasks();
}
}
impl<ValueType> SpawnGroup<ValueType> {
pub fn is_empty(&self) -> bool {
self.runtime.task_count() == 0
}
}
impl<ValueType> SpawnGroup<ValueType> {
pub fn stream(&self) -> impl Stream<Item = ValueType> {
self.runtime.stream()
}
}
impl<ValueType> SpawnGroup<ValueType> {
#[deprecated(since = "2.0.0", note = "Buggy")]
pub async fn get_chunks(&self, of_count: usize) -> Vec<ValueType> {
if of_count == 0 {
return vec![];
}
let buffer_count = self.runtime.stream().buffer_count().await;
if buffer_count == of_count {
let mut count: usize = of_count;
let mut results: Vec<ValueType> = vec![];
while count != 0 {
if let Some(result) = self.runtime.stream().next().await {
results.push(result);
count -= 1;
}
}
return results;
}
if of_count > self.runtime.task_count() {
panic!("The argument supplied cannot be greater than the number of spawned child tasks")
}
let mut count: usize = of_count;
let mut results: Vec<ValueType> = vec![];
while count != 0 {
if let Some(result) = self.runtime.stream().next().await {
results.push(result);
count -= 1;
}
}
results
}
}
impl<ValueType> Drop for SpawnGroup<ValueType> {
fn drop(&mut self) {
if self.wait_at_drop {
self.runtime.wait_for_all_tasks();
}
self.runtime.end()
}
}
impl<ValueType> Stream for SpawnGroup<ValueType> {
type Item = ValueType;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.runtime.stream().poll_next(cx)
}
}