use std::{marker::PhantomData, slice::Iter};
use futures::stream::LocalBoxStream;
use crate::subscribers_store::{
common, progressable,
progressable::{processed::AllProcessed, Processed},
SubscribersStore,
};
pub type ProgressableVec<T> =
Vec<T, progressable::SubStore<T>, progressable::Guarded<T>>;
pub type ObservableVec<T> = Vec<T, common::SubStore<T>, T>;
#[derive(Debug)]
pub struct Vec<T, S: SubscribersStore<T, O>, O> {
store: std::vec::Vec<T>,
on_push_subs: S,
on_remove_subs: S,
_output: PhantomData<O>,
}
impl<T> ProgressableVec<T>
where
T: Clone + 'static,
{
#[inline]
pub fn when_push_processed(&self) -> Processed<'static> {
self.on_push_subs.when_all_processed()
}
#[inline]
pub fn when_remove_processed(&self) -> Processed<'static> {
self.on_remove_subs.when_all_processed()
}
#[inline]
pub fn when_all_processed(&self) -> AllProcessed<'static> {
crate::when_all_processed(vec![
self.when_remove_processed().into(),
self.when_push_processed().into(),
])
}
}
impl<T, S: SubscribersStore<T, O>, O> Vec<T, S, O> {
#[must_use]
#[inline]
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn iter(&self) -> impl Iterator<Item = &T> {
self.into_iter()
}
#[inline]
pub fn on_push(&self) -> LocalBoxStream<'static, O> {
self.on_push_subs.subscribe()
}
#[inline]
pub fn on_remove(&self) -> LocalBoxStream<'static, O> {
self.on_remove_subs.subscribe()
}
}
impl<T, S, O> Vec<T, S, O>
where
T: Clone,
S: SubscribersStore<T, O>,
O: 'static,
{
pub fn push(&mut self, value: T) {
self.store.push(value.clone());
self.on_push_subs.send_update(value);
}
pub fn remove(&mut self, index: usize) -> T {
let value = self.store.remove(index);
self.on_remove_subs.send_update(value.clone());
value
}
#[inline]
pub fn replay_on_push(&self) -> LocalBoxStream<'static, O> {
Box::pin(futures::stream::iter(
self.store
.clone()
.into_iter()
.map(|val| self.on_push_subs.wrap(val))
.collect::<std::vec::Vec<_>>(),
))
}
}
impl<T, S: SubscribersStore<T, O>, O> Default for Vec<T, S, O> {
#[inline]
fn default() -> Self {
Self {
store: std::vec::Vec::new(),
on_push_subs: S::default(),
on_remove_subs: S::default(),
_output: PhantomData::default(),
}
}
}
impl<T, S: SubscribersStore<T, O>, O> From<std::vec::Vec<T>> for Vec<T, S, O> {
#[inline]
fn from(from: std::vec::Vec<T>) -> Self {
Self {
store: from,
on_push_subs: S::default(),
on_remove_subs: S::default(),
_output: PhantomData::default(),
}
}
}
impl<'a, T, S: SubscribersStore<T, O>, O> IntoIterator for &'a Vec<T, S, O> {
type IntoIter = Iter<'a, T>;
type Item = &'a T;
#[inline]
fn into_iter(self) -> Self::IntoIter {
self.store.iter()
}
}
impl<T, S: SubscribersStore<T, O>, O> Drop for Vec<T, S, O> {
fn drop(&mut self) {
let store = &mut self.store;
let on_remove_subs = &self.on_remove_subs;
store.drain(..).for_each(|value| {
on_remove_subs.send_update(value);
});
}
}
impl<T, S, O> AsRef<[T]> for Vec<T, S, O>
where
T: Clone,
S: SubscribersStore<T, O>,
{
#[inline]
fn as_ref(&self) -> &[T] {
&self.store
}
}
#[cfg(test)]
mod tests {
use futures::{poll, task::Poll, StreamExt as _};
use super::ProgressableVec;
#[tokio::test]
async fn replay_on_push() {
let mut vec = ProgressableVec::from(vec![1, 2, 3]);
let replay_on_push = vec.replay_on_push();
let on_push = vec.on_push();
vec.push(4);
assert_eq!(poll!(vec.when_push_processed()), Poll::Pending);
let replayed: Vec<_> = replay_on_push.collect().await;
assert_eq!(poll!(vec.when_push_processed()), Poll::Pending);
let replayed: Vec<_> =
replayed.into_iter().map(|val| val.into_inner()).collect();
assert_eq!(poll!(vec.when_push_processed()), Poll::Pending);
drop(on_push);
assert_eq!(poll!(vec.when_push_processed()), Poll::Ready(()));
assert_eq!(replayed.len(), 3);
assert!(replayed.contains(&1));
assert!(replayed.contains(&2));
assert!(replayed.contains(&3));
}
#[tokio::test]
async fn when_push_processed() {
let mut vec = ProgressableVec::new();
let _ = vec.push(0);
let mut on_push = vec.on_push();
assert_eq!(poll!(vec.when_push_processed()), Poll::Ready(()));
let _ = vec.push(1);
assert_eq!(poll!(vec.when_push_processed()), Poll::Pending);
let (val, guard) = on_push.next().await.unwrap().into_parts();
assert_eq!(val, 1);
assert_eq!(poll!(vec.when_push_processed()), Poll::Pending);
drop(guard);
assert_eq!(poll!(vec.when_push_processed()), Poll::Ready(()));
}
#[tokio::test]
async fn multiple_when_push_processed_subs() {
let mut vec = ProgressableVec::new();
let _ = vec.push(0);
let mut on_push1 = vec.on_push();
let mut on_push2 = vec.on_push();
assert_eq!(poll!(vec.when_push_processed()), Poll::Ready(()));
vec.push(0);
assert_eq!(poll!(vec.when_push_processed()), Poll::Pending);
assert_eq!(on_push1.next().await.unwrap().into_inner(), 0);
assert_eq!(poll!(vec.when_push_processed()), Poll::Pending);
assert_eq!(on_push2.next().await.unwrap().into_inner(), 0);
assert_eq!(poll!(vec.when_push_processed()), Poll::Ready(()));
}
#[tokio::test]
async fn when_remove_processed() {
let mut vec = ProgressableVec::new();
let _ = vec.push(10);
let mut on_remove = vec.on_remove();
assert_eq!(poll!(vec.when_remove_processed()), Poll::Ready(()));
assert_eq!(vec.remove(0), 10);
assert_eq!(poll!(vec.when_remove_processed()), Poll::Pending);
let (val, guard) = on_remove.next().await.unwrap().into_parts();
assert_eq!(val, 10);
assert_eq!(poll!(vec.when_remove_processed()), Poll::Pending);
drop(guard);
assert_eq!(poll!(vec.when_remove_processed()), Poll::Ready(()));
}
#[tokio::test]
async fn multiple_when_remove_processed_subs() {
let mut vec = ProgressableVec::new();
let _ = vec.push(10);
let mut on_remove1 = vec.on_remove();
let mut on_remove2 = vec.on_remove();
assert_eq!(poll!(vec.when_remove_processed()), Poll::Ready(()));
assert_eq!(vec.remove(0), 10);
assert_eq!(poll!(vec.when_remove_processed()), Poll::Pending);
assert_eq!(on_remove1.next().await.unwrap().into_inner(), 10);
assert_eq!(poll!(vec.when_remove_processed()), Poll::Pending);
assert_eq!(on_remove2.next().await.unwrap().into_inner(), 10);
assert_eq!(poll!(vec.when_remove_processed()), Poll::Ready(()));
}
}