minitask 0.1.1

MiniTask a simple wrapper for async tasks
use alloc::vec::Vec;
use core::{
  pin::Pin,
  task::{Context, Poll},
};
use fastrand::Rng;

use futures_core::{Stream, ready};
use pin_project_lite::pin_project;

pin_project! {
  pub struct StreamMap<K, V> {
    rng: Rng,
    entries: Vec<(K, V)>,
  }
}

impl<K, V> StreamMap<K, V> {
  pub fn new(rng: Rng) -> Self {
    StreamMap {
      rng,
      entries: Vec::new(),
    }
  }

  pub fn with_capacity(rng: Rng, capacity: usize) -> Self {
    StreamMap {
      rng,
      entries: Vec::with_capacity(capacity),
    }
  }

  pub fn is_empty(&self) -> bool {
    self.entries.is_empty()
  }

  pub fn contains<Q>(&mut self, key: &Q) -> bool
  where
    Q: PartialEq<K>,
  {
    self.entries.iter().any(|(k, _)| key == k)
  }

  pub fn insert(&mut self, key: K, value: V)
  where
    K: PartialEq,
  {
    assert!(!self.contains(&key));

    self.entries.push((key, value))
  }

  pub fn remove<Q>(&mut self, key: &Q) -> Option<(K, V)>
  where
    Q: PartialEq<K>,
  {
    self
      .entries
      .iter()
      .position(|(k, _)| key == k)
      .map(|index| self.entries.swap_remove(index))
  }
}

impl<K, V> StreamMap<K, V>
where
  V: Stream + Unpin,
{
  fn poll_next_entry(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> {
    let this = self.project();

    if this.entries.is_empty() {
      return Poll::Ready(None);
    }

    let start = this.rng.usize(..this.entries.len());
    let mut idx = start;

    for _ in 0..this.entries.len() {
      assert!(idx < this.entries.len());

      let (_, stream) = &mut this.entries[idx];

      match Pin::new(stream).poll_next(cx) {
        Poll::Ready(Some(val)) => return Poll::Ready(Some((idx, val))),
        Poll::Ready(None) => {
          this.entries.swap_remove(idx);

          if idx == this.entries.len() {
            idx = 0;
          } else if idx < start && start <= this.entries.len() {
            idx = idx.wrapping_add(1) % this.entries.len();
          }
        }
        Poll::Pending => {
          idx = idx.wrapping_add(1) % this.entries.len();
        }
      }
    }

    if this.entries.is_empty() {
      Poll::Ready(None)
    } else {
      Poll::Pending
    }
  }
}

#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
impl<K, V> Default for StreamMap<K, V> {
  fn default() -> Self {
    Self::new(Rng::new())
  }
}

impl<K, V> Stream for StreamMap<K, V>
where
  K: Clone,
  V: Stream + Unpin,
{
  type Item = (K, V::Item);

  fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
    match ready!(self.as_mut().poll_next_entry(cx)) {
      Some((index, value)) => {
        let key = self.entries[index].0.clone();
        Poll::Ready(Some((key, value)))
      }
      None => Poll::Ready(None),
    }
  }
}

#[cfg(all(test, feature = "alloc"))]
mod tests {

  use alloc::{boxed::Box, collections::BTreeMap};

  use smol::stream::StreamExt;

  use super::*;

  #[test]
  fn basic_stream_map() {
    let mut streams = StreamMap::new(Rng::with_seed(1337));

    {
      let (tx, rx) = async_channel::bounded(1);

      streams.insert(1, Box::pin(rx));
      smol::spawn(async move {
        let _ = tx.send("foobar").await;
      })
      .detach();
    }

    {
      let (tx, rx) = async_channel::bounded(1);

      streams.insert(2, Box::pin(rx));
      smol::spawn(async move {
        let _ = tx.send("foobaz").await;
      })
      .detach();
    }

    assert_eq!(
      smol::block_on(streams.collect::<BTreeMap<_, _>>()),
      BTreeMap::from([(1, "foobar"), (2, "foobaz")])
    );
  }
}