use super::{task::Task, FutMut, MappedFutures};
use bisetmap::BisetMap;
use core::hash::Hash;
use futures_core::{Future, Stream};
use futures_task::Poll;
use std::{marker::PhantomData, pin::Pin, sync::Arc, task::ready};
pub struct BiMultiMapFutures<L: Clone + Hash + Eq, R: Clone + Hash + Eq, Fut: Future> {
bi_multi_map: BisetMap<L, R>,
futures: MappedFutures<(L, R), Fut>,
}
impl<L: Clone + Hash + Eq, R: Clone + Hash + Eq, Fut: Future> BiMultiMapFutures<L, R, Fut> {
pub fn new() -> Self {
Self {
bi_multi_map: BisetMap::new(),
futures: MappedFutures::new(),
}
}
pub fn insert(&mut self, left: L, right: R, future: Fut) -> bool {
self.bi_multi_map.insert(left.clone(), right.clone());
self.futures.insert((left, right), future)
}
pub fn insert_pin(&mut self, left: L, right: R, future: Fut) -> Option<Fut>
where
Fut: Unpin,
{
let ret = self.futures.replace((left.clone(), right.clone()), future);
if ret.is_some() {
self.bi_multi_map.insert(left, right);
}
ret
}
pub fn contains(&self, left: &L, right: &R) -> bool {
self.bi_multi_map.contains(&left, &right)
}
pub fn contains_left(&self, left: &L) -> bool {
self.bi_multi_map.key_exists(&left)
}
pub fn contains_right(&self, right: &R) -> bool {
self.bi_multi_map.value_exists(&right)
}
pub fn remove(&mut self, left: &L, right: &R) -> Option<Fut>
where
Fut: Unpin,
{
self.bi_multi_map.remove(left, right);
self.futures.remove(&(left.clone(), right.clone()))
}
pub fn cancel(&mut self, left: &L, right: &R) -> bool {
self.bi_multi_map.remove(left, right);
self.futures.cancel(&(left.clone(), right.clone()))
}
pub fn get_mut(&mut self, left: &L, right: &R) -> Option<FutMut<(L, R), Fut>>
where
Fut: Unpin,
{
self.futures.get_mut(&(left.clone(), right.clone()))
}
pub fn get_pin_mut(&mut self, left: &L, right: &R) -> Option<Pin<FutMut<(L, R), Fut>>> {
self.futures.get_pin_mut(&(left.clone(), right.clone()))
}
pub fn get_right_mut(&mut self, left: &L) -> RightIterMut<L, R, Fut>
where
Fut: Unpin,
{
RightIterMut::new(self, left)
}
pub fn get_left_mut(&mut self, right: &R) -> LeftIterMut<L, R, Fut>
where
Fut: Unpin,
{
LeftIterMut::new(self, right)
}
pub fn get_right_pin_mut(&mut self, left: &L) -> RightIterPinMut<L, R, Fut> {
RightIterPinMut::new(self, left)
}
pub fn get_left_pin_mut(&mut self, right: &R) -> LeftIterPinMut<L, R, Fut> {
LeftIterPinMut::new(self, right)
}
pub fn left_cancel(&mut self, left: &L) -> Vec<R> {
let rights = self.bi_multi_map.delete(left);
rights.iter().for_each(|right| {
self.futures.cancel(&(left.clone(), right.clone()));
});
rights
}
pub fn right_cancel(&mut self, right: &R) -> Vec<L> {
let lefts = self.bi_multi_map.rev_delete(right);
lefts.iter().for_each(|left| {
self.futures.cancel(&(left.clone(), right.clone()));
});
lefts
}
pub fn left_remove(&mut self, left: &L) -> Vec<(R, Fut)>
where
Fut: Unpin,
{
let rights = self.bi_multi_map.delete(left);
rights
.iter()
.map(|right| {
(
right.clone(),
self.futures.remove(&(left.clone(), right.clone())).unwrap(),
)
})
.collect()
}
pub fn right_remove(&mut self, right: &R) -> Vec<(L, Fut)>
where
Fut: Unpin,
{
let lefts = self.bi_multi_map.rev_delete(right);
lefts
.iter()
.map(|left| {
(
left.clone(),
self.futures.remove(&(left.clone(), right.clone())).unwrap(),
)
})
.collect()
}
pub fn right_len(&self) -> usize {
self.bi_multi_map.rev_len()
}
pub fn left_len(&self) -> usize {
self.bi_multi_map.len()
}
pub fn len(&self) -> usize {
self.futures.len()
}
pub fn is_empty(&self) -> bool {
self.bi_multi_map.is_empty()
}
pub fn clear(&mut self) {
self.bi_multi_map.clear();
self.futures.clear();
}
pub fn collect_keys(&mut self) -> Vec<(L, R)> {
self.bi_multi_map.flat_collect()
}
}
impl<L: Clone + Hash + Eq, R: Clone + Hash + Eq, Fut: Future> Stream
for BiMultiMapFutures<L, R, Fut>
{
type Item = (L, R, Fut::Output);
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut futures_task::Context<'_>,
) -> Poll<Option<Self::Item>> {
match ready!(Pin::new(&mut self.futures).poll_next(cx)) {
Some(((left, right), output)) => {
self.bi_multi_map.remove(&left, &right);
Poll::Ready(Some((left, right, output)))
}
None => Poll::Ready(None),
}
}
}
pub struct RightIterPinMut<'a, L: Clone + Hash + Eq, R: Clone + Hash + Eq, Fut: Future> {
inner: Vec<(R, *const Task<(L, R), Fut>)>,
_marker: PhantomData<&'a mut MappedFutures<(L, R), Fut>>,
}
impl<'a, L: Clone + Hash + Eq, R: Clone + Hash + Eq, Fut: Future> RightIterPinMut<'a, L, R, Fut> {
fn new(futures: &'a mut BiMultiMapFutures<L, R, Fut>, left: &L) -> Self {
let rights = futures.bi_multi_map.get(left);
let mut tasks = vec![];
rights.into_iter().for_each(|right| {
let key = (left.clone(), right);
let hash_task = futures.futures.hash_set.get(&key).unwrap();
tasks.push((key.1, Arc::as_ptr(&hash_task.inner)));
});
RightIterPinMut {
inner: tasks,
_marker: PhantomData,
}
}
}
impl<'a, L: Clone + Hash + Eq, R: Clone + Hash + Eq, Fut: Future> Iterator
for RightIterPinMut<'a, L, R, Fut>
{
type Item = (R, Pin<FutMut<'a, (L, R), Fut>>);
fn next(&mut self) -> Option<Self::Item> {
unsafe {
self.inner
.pop()
.map(|(left, task)| (left, Pin::new_unchecked(FutMut::new_from_ptr(task))))
}
}
}
pub struct LeftIterPinMut<'a, L: Clone + Hash + Eq, R: Clone + Hash + Eq, Fut: Future> {
inner: Vec<(L, *const Task<(L, R), Fut>)>,
_marker: PhantomData<&'a mut MappedFutures<(L, R), Fut>>,
}
impl<'a, L: Clone + Hash + Eq, R: Clone + Hash + Eq, Fut: Future> Iterator
for LeftIterPinMut<'a, L, R, Fut>
{
type Item = (L, Pin<FutMut<'a, (L, R), Fut>>);
fn next(&mut self) -> Option<Self::Item> {
unsafe {
self.inner
.pop()
.map(|(right, task)| (right, Pin::new_unchecked(FutMut::new_from_ptr(task))))
}
}
}
impl<'a, L: Clone + Hash + Eq, R: Clone + Hash + Eq, Fut: Future> LeftIterPinMut<'a, L, R, Fut> {
fn new(futures: &'a mut BiMultiMapFutures<L, R, Fut>, right: &R) -> Self {
let lefts = futures.bi_multi_map.rev_get(right);
let mut tasks = vec![];
lefts.into_iter().for_each(|left| {
let key = (left, right.clone());
let hash_task = futures.futures.hash_set.get(&key).unwrap();
tasks.push((key.0, Arc::as_ptr(&hash_task.inner)));
});
LeftIterPinMut {
inner: tasks,
_marker: PhantomData,
}
}
}
pub struct RightIterMut<'a, L: Clone + Hash + Eq, R: Clone + Hash + Eq, Fut: Future>(
RightIterPinMut<'a, L, R, Fut>,
);
impl<'a, L: Clone + Hash + Eq, R: Clone + Hash + Eq, Fut: Future + Unpin> Iterator
for RightIterMut<'a, L, R, Fut>
{
type Item = (R, FutMut<'a, (L, R), Fut>);
fn next(&mut self) -> Option<Self::Item> {
self.0.next().map(|(key, fut)| (key, Pin::into_inner(fut)))
}
}
impl<'a, L: Clone + Hash + Eq, R: Clone + Hash + Eq, Fut: Future + Unpin>
RightIterMut<'a, L, R, Fut>
{
fn new(futures: &'a mut BiMultiMapFutures<L, R, Fut>, left: &L) -> Self {
Self(RightIterPinMut::new(futures, left))
}
}
pub struct LeftIterMut<'a, L: Clone + Hash + Eq, R: Clone + Hash + Eq, Fut: Future>(
LeftIterPinMut<'a, L, R, Fut>,
);
impl<'a, L: Clone + Hash + Eq, R: Clone + Hash + Eq, Fut: Future + Unpin> Iterator
for LeftIterMut<'a, L, R, Fut>
{
type Item = (L, FutMut<'a, (L, R), Fut>);
fn next(&mut self) -> Option<Self::Item> {
self.0.next().map(|(key, fut)| (key, Pin::into_inner(fut)))
}
}
impl<'a, L: Clone + Hash + Eq, R: Clone + Hash + Eq, Fut: Future + Unpin>
LeftIterMut<'a, L, R, Fut>
{
fn new(futures: &'a mut BiMultiMapFutures<L, R, Fut>, right: &R) -> Self {
Self(LeftIterPinMut::new(futures, right))
}
}
#[cfg(test)]
#[allow(unused_imports)]
pub mod tests {
use futures::executor::block_on;
use futures_timer::Delay;
use futures_util::StreamExt;
use std::time::Duration;
use super::BiMultiMapFutures;
fn insert_millis(
futures: &mut BiMultiMapFutures<u64, u64, Delay>,
key: (u64, u64),
millis: u64,
) {
futures.insert(key.0, key.1, Delay::new(Duration::from_millis(millis)));
}
#[test]
fn bi_multi_map_futures() {
let mut futures = BiMultiMapFutures::new();
insert_millis(&mut futures, (1, 1), 50);
insert_millis(&mut futures, (1, 2), 60);
insert_millis(&mut futures, (2, 3), 70);
futures.cancel(&1, &2);
let mut ones = futures.get_right_mut(&1);
ones.next().unwrap().1.reset(Duration::from_millis(80));
assert_eq!(block_on(futures.next()).unwrap(), ((2, 3, ())));
assert_eq!(block_on(futures.next()).unwrap(), ((1, 1, ())));
assert_eq!(block_on(futures.next()), None);
assert!(futures.is_empty());
}
#[test]
fn iter() {
let mut futures = BiMultiMapFutures::new();
insert_millis(&mut futures, (1, 3), 50);
insert_millis(&mut futures, (1, 2), 100);
insert_millis(&mut futures, (1, 1), 150);
futures
.get_right_mut(&1)
.for_each(|(right, mut fut)| fut.reset(Duration::from_millis(right * 100)));
assert_eq!(block_on(futures.next()).unwrap().1, 1);
assert_eq!(block_on(futures.next()).unwrap().1, 2);
assert_eq!(block_on(futures.next()).unwrap().1, 3);
assert_eq!(block_on(futures.next()), None);
}
}