use super::{task::Task, MappedStreams};
use crate::mapped_streams::StMut;
use bisetmap::BisetMap;
use core::hash::Hash;
use futures_core::Stream;
use futures_task::Poll;
use futures_util::stream::StreamFuture;
use std::{marker::PhantomData, pin::Pin, sync::Arc, task::ready};
pub struct BiMultiMapStreams<L: Clone + Hash + Eq, R: Clone + Hash + Eq, St: Stream + Unpin> {
bi_multi_map: BisetMap<L, R>,
streams: MappedStreams<(L, R), St>,
}
impl<L: Clone + Hash + Eq, R: Clone + Hash + Eq, St: Stream + Unpin> BiMultiMapStreams<L, R, St> {
pub fn new() -> Self {
Self {
bi_multi_map: BisetMap::new(),
streams: MappedStreams::new(),
}
}
pub fn insert(&mut self, left: L, right: R, stream: St) -> bool {
self.bi_multi_map.insert(left.clone(), right.clone());
self.streams.insert((left, right), stream)
}
pub fn insert_pin(&mut self, left: L, right: R, stream: St) -> Option<St>
where
St: Unpin,
{
let ret = self.streams.replace((left.clone(), right.clone()), stream);
if ret.is_some() {
self.bi_multi_map.insert(left, right);
}
ret
}
pub fn contains(&self, left: &L, right: &R) -> bool {
self.bi_multi_map.contains(&left, &right)
}
pub fn contains_left(&self, left: &L) -> bool {
self.bi_multi_map.key_exists(&left)
}
pub fn contains_right(&self, right: &R) -> bool {
self.bi_multi_map.value_exists(&right)
}
pub fn remove(&mut self, left: &L, right: &R) -> Option<St>
where
St: Unpin,
{
self.bi_multi_map.remove(left, right);
self.streams.remove(&(left.clone(), right.clone()))
}
pub fn cancel(&mut self, left: &L, right: &R) -> bool {
self.bi_multi_map.remove(left, right);
self.streams.cancel(&(left.clone(), right.clone()))
}
pub fn get_mut(&mut self, left: &L, right: &R) -> Option<StMut<(L, R), St>>
where
St: Unpin,
{
self.streams.get_mut(&(left.clone(), right.clone()))
}
pub fn get_right_mut<'a>(&'a mut self, left: &L) -> RightIterMut<'a, L, R, St>
where
St: Unpin,
{
RightIterMut::new(self, left)
}
pub fn get_left_mut<'a>(&'a mut self, right: &R) -> LeftIterMut<'a, L, R, St>
where
St: Unpin,
{
LeftIterMut::new(self, right)
}
pub fn left_cancel(&mut self, left: &L) -> Vec<R> {
let rights = self.bi_multi_map.delete(left);
rights.iter().for_each(|right| {
self.streams.cancel(&(left.clone(), right.clone()));
});
rights
}
pub fn right_cancel(&mut self, right: &R) -> Vec<L> {
let lefts = self.bi_multi_map.rev_delete(right);
lefts.iter().for_each(|left| {
self.streams.cancel(&(left.clone(), right.clone()));
});
lefts
}
pub fn left_remove(&mut self, left: &L) -> Vec<(R, St)>
where
St: Unpin,
{
let rights = self.bi_multi_map.delete(left);
rights
.iter()
.map(|right| {
(
right.clone(),
self.streams.remove(&(left.clone(), right.clone())).unwrap(),
)
})
.collect()
}
pub fn right_remove(&mut self, right: &R) -> Vec<(L, St)>
where
St: Unpin,
{
let lefts = self.bi_multi_map.rev_delete(right);
lefts
.iter()
.map(|left| {
(
left.clone(),
self.streams.remove(&(left.clone(), right.clone())).unwrap(),
)
})
.collect()
}
pub fn right_len(&self) -> usize {
self.bi_multi_map.rev_len()
}
pub fn left_len(&self) -> usize {
self.bi_multi_map.len()
}
pub fn len(&self) -> usize {
self.streams.len()
}
pub fn is_empty(&self) -> bool {
self.bi_multi_map.is_empty()
}
pub fn clear(&mut self) {
self.bi_multi_map.clear();
self.streams.clear();
}
pub fn collect_keys(&mut self) -> Vec<(L, R)> {
self.bi_multi_map.flat_collect()
}
}
impl<L: Clone + Hash + Eq, R: Clone + Hash + Eq, St: Stream + Unpin> Stream
for BiMultiMapStreams<L, R, St>
{
type Item = (L, R, Option<St::Item>);
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut futures_task::Context<'_>,
) -> Poll<Option<Self::Item>> {
match ready!(Pin::new(&mut self.streams).poll_next(cx)) {
Some(((left, right), output)) => {
self.bi_multi_map.remove(&left, &right);
Poll::Ready(Some((left, right, output)))
}
None => Poll::Ready(None),
}
}
}
pub struct RightIterMut<'a, L: Clone + Hash + Eq, R: Clone + Hash + Eq, St: Stream + Unpin> {
inner: Vec<(R, *const Task<(L, R), StreamFuture<St>>)>,
_marker: PhantomData<&'a mut MappedStreams<(L, R), St>>,
}
impl<'a, L: Clone + Hash + Eq, R: Clone + Hash + Eq, St: Stream + Unpin>
RightIterMut<'a, L, R, St>
{
fn new(streams: &'a mut BiMultiMapStreams<L, R, St>, left: &L) -> Self {
let rights = streams.bi_multi_map.get(left);
let mut tasks = vec![];
rights.into_iter().for_each(|right| {
let key = (left.clone(), right);
let hash_task = streams.streams.inner.hash_set.get(&key).unwrap();
tasks.push((key.1, Arc::as_ptr(&hash_task.inner)));
});
RightIterMut {
inner: tasks,
_marker: PhantomData,
}
}
}
impl<'a, L: Clone + Hash + Eq, R: Clone + Hash + Eq, St: Stream + Unpin> Iterator
for RightIterMut<'a, L, R, St>
{
type Item = (R, StMut<'a, (L, R), St>);
fn next(&mut self) -> Option<Self::Item> {
self.inner
.pop()
.map(|(left, task)| (left, StMut::new_from_ptr(task)))
}
}
pub struct LeftIterMut<'a, L: Clone + Hash + Eq, R: Clone + Hash + Eq, St: Stream + Unpin> {
inner: Vec<(L, *const Task<(L, R), StreamFuture<St>>)>,
_marker: PhantomData<&'a mut MappedStreams<(L, R), St>>,
}
impl<'a, L: Clone + Hash + Eq, R: Clone + Hash + Eq, St: Stream + Unpin> Iterator
for LeftIterMut<'a, L, R, St>
{
type Item = (L, StMut<'a, (L, R), St>);
fn next(&mut self) -> Option<Self::Item> {
self.inner
.pop()
.map(|(right, task)| (right, StMut::new_from_ptr(task)))
}
}
impl<'a, L: Clone + Hash + Eq, R: Clone + Hash + Eq, St: Stream + Unpin> LeftIterMut<'a, L, R, St> {
fn new(streams: &'a mut BiMultiMapStreams<L, R, St>, right: &R) -> Self {
let lefts = streams.bi_multi_map.rev_get(right);
let mut tasks = vec![];
lefts.into_iter().for_each(|left| {
let key = (left, right.clone());
let hash_task = streams.streams.inner.hash_set.get(&key).unwrap();
tasks.push((key.0, Arc::as_ptr(&hash_task.inner)));
});
LeftIterMut {
inner: tasks,
_marker: PhantomData,
}
}
}
#[cfg(test)]
#[allow(unused_imports)]
pub mod tests {
use crate::BiMultiMapStreams;
use super::MappedStreams;
use futures::executor::block_on;
use futures_core::Stream;
use futures_lite::FutureExt as FutureLiteExt;
use futures_task::Poll;
use futures_timer::Delay;
use futures_util::StreamExt;
use std::time::Duration;
use std::{pin::Pin, task::ready};
struct DelayStream {
num: u8,
interval: u64,
fut: Option<Delay>,
}
impl DelayStream {
fn new(num: u8, interval: u64) -> Self {
DelayStream {
num,
interval,
fut: None,
}
}
}
impl Stream for DelayStream {
type Item = ();
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut futures_task::Context<'_>,
) -> Poll<Option<Self::Item>> {
match &mut self.fut {
Some(fut) => {
ready!(Pin::new(fut).poll(cx));
self.fut = None;
self.num = self.num - 1;
Poll::Ready(Some(()))
}
None => match self.num {
0 => Poll::Ready(None),
_ => {
println!("interval: {}", self.interval);
self.fut = Some(Delay::new(Duration::from_millis(self.interval)));
self.poll_next(cx)
}
},
}
}
}
#[test]
fn map_streams() {
let mut streams = BiMultiMapStreams::new();
let s1 = futures::stream::iter(vec![1]);
let s2 = futures::stream::iter(vec![2]);
streams.insert(1, 1, s1);
streams.insert(2, 2, s2);
let output: Vec<_> = block_on(streams.collect());
assert_eq!(
output,
vec![(1, 1, Some(1)), (2, 2, Some(2)), (1, 1, None), (2, 2, None)]
);
}
#[test]
fn mutate_streams() {
let mut streams = BiMultiMapStreams::new();
streams.insert(1, 1, DelayStream::new(1, 500));
streams.insert(2, 1, DelayStream::new(1, 600));
streams.get_mut(&1, &1).unwrap().interval = 700;
assert_eq!(block_on(streams.next()), Some((2, 1, Some(()))));
assert_eq!(block_on(streams.next()), Some((2, 1, None)));
assert_eq!(block_on(streams.next()), Some((1, 1, Some(()))));
assert_eq!(block_on(streams.next()), Some((1, 1, None)));
assert!(streams.is_empty());
}
#[test]
fn remove_streams() {
let mut streams: BiMultiMapStreams<i32, i32, DelayStream> = BiMultiMapStreams::new();
streams.insert(1, 1, DelayStream::new(1, 50));
streams.insert(2, 1, DelayStream::new(1, 60));
streams.remove(&1, &1);
assert_eq!(block_on(streams.next()), Some((2, 1, Some(()))));
}
}