use crate::Stream;
use std::borrow::Borrow;
use std::future::poll_fn;
use std::hash::Hash;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
#[derive(Debug)]
pub struct StreamMap<K, V> {
entries: Vec<(K, V)>,
}
impl<K, V> StreamMap<K, V> {
pub fn iter(&self) -> impl Iterator<Item = &(K, V)> {
self.entries.iter()
}
pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> {
self.entries.iter_mut()
}
pub fn new() -> StreamMap<K, V> {
StreamMap { entries: vec![] }
}
pub fn with_capacity(capacity: usize) -> StreamMap<K, V> {
StreamMap {
entries: Vec::with_capacity(capacity),
}
}
pub fn keys(&self) -> impl Iterator<Item = &K> {
self.iter().map(|(k, _)| k)
}
pub fn values(&self) -> impl Iterator<Item = &V> {
self.iter().map(|(_, v)| v)
}
pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> {
self.iter_mut().map(|(_, v)| v)
}
pub fn capacity(&self) -> usize {
self.entries.capacity()
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn clear(&mut self) {
self.entries.clear();
}
pub fn insert(&mut self, k: K, stream: V) -> Option<V>
where
K: Hash + Eq,
{
let ret = self.remove(&k);
self.entries.push((k, stream));
ret
}
pub fn remove<Q>(&mut self, k: &Q) -> Option<V>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
for i in 0..self.entries.len() {
if self.entries[i].0.borrow() == k {
return Some(self.entries.swap_remove(i).1);
}
}
None
}
pub fn contains_key<Q>(&self, k: &Q) -> bool
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
for i in 0..self.entries.len() {
if self.entries[i].0.borrow() == k {
return true;
}
}
false
}
}
impl<K, V> StreamMap<K, V>
where
K: Unpin,
V: Stream + Unpin,
{
fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> {
let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize;
let mut idx = start;
for _ in 0..self.entries.len() {
let (_, stream) = &mut self.entries[idx];
match Pin::new(stream).poll_next(cx) {
Poll::Ready(Some(val)) => return Poll::Ready(Some((idx, val))),
Poll::Ready(None) => {
self.entries.swap_remove(idx);
if idx == self.entries.len() {
idx = 0;
} else if idx < start && start <= self.entries.len() {
idx = idx.wrapping_add(1) % self.entries.len();
}
}
Poll::Pending => {
idx = idx.wrapping_add(1) % self.entries.len();
}
}
}
if self.entries.is_empty() {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}
impl<K, V> Default for StreamMap<K, V> {
fn default() -> Self {
Self::new()
}
}
impl<K, V> StreamMap<K, V>
where
K: Clone + Unpin,
V: Stream + Unpin,
{
pub async fn next_many(&mut self, buffer: &mut Vec<(K, V::Item)>, limit: usize) -> usize {
poll_fn(|cx| self.poll_next_many(cx, buffer, limit)).await
}
pub fn poll_next_many(
&mut self,
cx: &mut Context<'_>,
buffer: &mut Vec<(K, V::Item)>,
limit: usize,
) -> Poll<usize> {
if limit == 0 || self.entries.is_empty() {
return Poll::Ready(0);
}
let mut added = 0;
let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize;
let mut idx = start;
while added < limit {
let mut should_loop = false;
for _ in 0..self.entries.len() {
let (_, stream) = &mut self.entries[idx];
match Pin::new(stream).poll_next(cx) {
Poll::Ready(Some(val)) => {
added += 1;
let key = self.entries[idx].0.clone();
buffer.push((key, val));
should_loop = true;
idx = idx.wrapping_add(1) % self.entries.len();
}
Poll::Ready(None) => {
self.entries.swap_remove(idx);
if idx == self.entries.len() {
idx = 0;
} else if idx < start && start <= self.entries.len() {
idx = idx.wrapping_add(1) % self.entries.len();
}
}
Poll::Pending => {
idx = idx.wrapping_add(1) % self.entries.len();
}
}
}
if !should_loop {
break;
}
}
if added > 0 {
Poll::Ready(added)
} else if self.entries.is_empty() {
Poll::Ready(0)
} else {
Poll::Pending
}
}
}
impl<K, V> Stream for StreamMap<K, V>
where
K: Clone + Unpin,
V: Stream + Unpin,
{
type Item = (K, V::Item);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) {
let key = self.entries[idx].0.clone();
Poll::Ready(Some((key, val)))
} else {
Poll::Ready(None)
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let mut ret = (0, Some(0));
for (_, stream) in &self.entries {
let hint = stream.size_hint();
ret.0 += hint.0;
match (ret.1, hint.1) {
(Some(a), Some(b)) => ret.1 = Some(a + b),
(Some(_), None) => ret.1 = None,
_ => {}
}
}
ret
}
}
impl<K, V> FromIterator<(K, V)> for StreamMap<K, V>
where
K: Hash + Eq,
{
fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
let iterator = iter.into_iter();
let (lower_bound, _) = iterator.size_hint();
let mut stream_map = Self::with_capacity(lower_bound);
for (key, value) in iterator {
stream_map.insert(key, value);
}
stream_map
}
}
impl<K, V> Extend<(K, V)> for StreamMap<K, V> {
fn extend<T>(&mut self, iter: T)
where
T: IntoIterator<Item = (K, V)>,
{
self.entries.extend(iter);
}
}
mod rand {
use std::cell::Cell;
mod loom {
#[cfg(not(loom))]
pub(crate) mod rand {
use std::collections::hash_map::RandomState;
use std::hash::BuildHasher;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering::Relaxed;
static COUNTER: AtomicU32 = AtomicU32::new(1);
pub(crate) fn seed() -> u64 {
RandomState::new().hash_one(COUNTER.fetch_add(1, Relaxed))
}
}
#[cfg(loom)]
pub(crate) mod rand {
pub(crate) fn seed() -> u64 {
1
}
}
}
#[derive(Debug)]
pub(crate) struct FastRand {
one: Cell<u32>,
two: Cell<u32>,
}
impl FastRand {
pub(crate) fn new(seed: u64) -> FastRand {
let one = (seed >> 32) as u32;
let mut two = seed as u32;
if two == 0 {
two = 1;
}
FastRand {
one: Cell::new(one),
two: Cell::new(two),
}
}
pub(crate) fn fastrand_n(&self, n: u32) -> u32 {
let mul = (self.fastrand() as u64).wrapping_mul(n as u64);
(mul >> 32) as u32
}
fn fastrand(&self) -> u32 {
let mut s1 = self.one.get();
let s0 = self.two.get();
s1 ^= s1 << 17;
s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16;
self.one.set(s0);
self.two.set(s1);
s0.wrapping_add(s1)
}
}
pub(crate) fn thread_rng_n(n: u32) -> u32 {
thread_local! {
static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed());
}
THREAD_RNG.with(|rng| rng.fastrand_n(n))
}
}