use std::future::Future;
use std::pin::Pin;
use std::task::{
Context,
Poll
};
use futures::FutureExt;
use futures::stream::{
futures_unordered as fun,
FuturesUnordered,
Stream,
StreamExt
};
use tokio::task::{
JoinHandle,
JoinError,
spawn
};
#[derive(Debug)]
pub struct Registry<R, T> {
inner: FuturesUnordered<Entry<R, T>>,
}
#[derive(Debug)]
pub struct Entry<R, T> {
handle: Option<T>,
task: JoinHandle<R>
}
pub struct Iter<'a, R, T> {
inner: fun::Iter<'a, Entry<R, T>>
}
pub struct IterMut<'a, R, T> {
inner: fun::IterMut<'a, Entry<R, T>>
}
pub struct IntoIter<R, T> {
inner: fun::IntoIter<Entry<R, T>>
}
impl<R, T> Registry<R, T> {
pub fn new() -> Self {
let inner = FuturesUnordered::new();
Self {inner}
}
pub fn launch<F>(&mut self, handle: T, fut: F)
where F: Future<Output = R> + Send + 'static,
R: Send + 'static
{
self.inner.push(Entry::launch(handle, fut));
}
pub fn insert(&mut self, handle: T, task: JoinHandle<R>) {
self.inner.push(Entry {handle: Some(handle), task});
}
pub fn abort<P: FnMut(&T) -> bool>(&self, mut predicate: P) {
self.inner.iter()
.filter(|entry| predicate(entry.handle.as_ref().unwrap()))
.for_each(|entry| entry.task.abort());
}
pub async fn join_next(&mut self) -> Option<(T, Result<R, JoinError>)> {
self.inner.next().await
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn iter(&self) -> Iter<'_, R, T> {
let inner = IntoIterator::into_iter(&self.inner);
Iter {inner}
}
pub fn iter_mut(&mut self) -> IterMut<'_, R, T> {
let inner = IntoIterator::into_iter(&mut self.inner);
IterMut {inner}
}
}
impl<R, T> Default for Registry<R, T> {
fn default() -> Self {Self::new()}
}
impl<R, T> Stream for Registry<R, T> {
type Item = (T, Result<R, JoinError>);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>
{
self.inner.poll_next_unpin(cx)
}
}
impl<R, T> Entry<R, T> {
pub fn launch<F>(handle: T, fut: F) -> Self
where F: Future<Output = R> + Send + 'static,
R: Send + 'static
{
let task = spawn(fut);
Self {handle: Some(handle), task}
}
}
impl<R, T> Unpin for Entry<R, T> {}
impl<R, T> Future for Entry<R, T> {
type Output = (T, Result<R, JoinError>);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let new_self = self.get_mut();
new_self.task
.poll_unpin(cx)
.map(|join_res| (
new_self.handle.take().expect("Entry polled after completion"),
join_res
))
}
}
impl<'a, R, T> IntoIterator for &'a Registry<R, T> {
type Item = &'a T;
type IntoIter = Iter<'a, R, T>;
fn into_iter(self) -> Self::IntoIter {
let inner = IntoIterator::into_iter(&self.inner);
Iter {inner}
}
}
impl<'a, R, T> Iterator for Iter<'a, R, T> {
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
self.inner.next().map(|entry| entry.handle.as_ref().unwrap())
}
}
impl<'a, R, T> IntoIterator for &'a mut Registry<R, T> {
type Item = &'a mut T;
type IntoIter = IterMut<'a, R, T>;
fn into_iter(self) -> Self::IntoIter {
let inner = IntoIterator::into_iter(&mut self.inner);
IterMut {inner}
}
}
impl<'a, R, T> Iterator for IterMut<'a, R, T> {
type Item = &'a mut T;
fn next(&mut self) -> Option<Self::Item> {
self.inner.next().map(|entry| entry.handle.as_mut().unwrap())
}
}
impl<R, T> IntoIterator for Registry<R, T> {
type Item = (T, JoinHandle<R>);
type IntoIter = IntoIter<R, T>;
fn into_iter(self) -> Self::IntoIter {
IntoIter { inner: self.inner.into_iter() }
}
}
impl<R, T> Iterator for IntoIter<R, T> {
type Item = (T, JoinHandle<R>);
fn next(&mut self) -> Option<Self::Item> {
self.inner.next().map(|entry| (entry.handle.unwrap(), entry.task))
}
}