1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
//! Progressable [`SubscribersStore`].
pub mod guarded;
pub mod processed;
use std::{cell::RefCell, rc::Rc};
use futures::{channel::mpsc, stream::LocalBoxStream};
use crate::{subscribers_store::SubscribersStore, ObservableCell};
pub use self::{
guarded::{Guard, Guarded},
processed::{AllProcessed, Processed},
};
/// [`SubscribersStore`] for progressable collections/field.
///
/// Will provided [`Guarded`] with an updated data to the
/// [`SubscribersStore::subscribe()`] [`Stream`].
///
/// You can wait for updates processing with a
/// [`SubStore::when_all_processed()`] method.
///
/// [`Stream`]: futures::Stream
#[derive(Debug)]
pub struct SubStore<T> {
/// All subscribers of this store.
store: RefCell<Vec<mpsc::UnboundedSender<Guarded<T>>>>,
/// Manager recognizing when all sent updates are processed.
counter: Rc<ObservableCell<u32>>,
}
// Implemented manually to omit redundant `T: Default` trait bound, imposed by
// `#[derive(Default)]`.
impl<T> Default for SubStore<T> {
fn default() -> Self {
Self {
store: RefCell::new(Vec::new()),
counter: Rc::new(ObservableCell::new(0)),
}
}
}
impl<T> SubStore<T> {
/// Returns [`Future`] resolving when all subscribers processes update.
///
/// [`Future`]: std::future::Future
pub fn when_all_processed(&self) -> Processed<'static> {
let counter = Rc::clone(&self.counter);
Processed::new(Box::new(move || {
let counter = Rc::clone(&counter);
// `async move` required to capture `Rc` into the created `Future`,
// avoiding dropping it in-place.
Box::pin(async move {
_ = counter.when_eq(0).await;
})
}))
}
}
impl<T> SubscribersStore<T, Guarded<T>> for SubStore<T>
where
T: Clone + 'static,
{
fn send_update(&self, value: T) {
self.store
.borrow_mut()
.retain(|sub| sub.unbounded_send(self.wrap(value.clone())).is_ok());
}
fn subscribe(&self) -> LocalBoxStream<'static, Guarded<T>> {
let (tx, rx) = mpsc::unbounded();
self.store.borrow_mut().push(tx);
Box::pin(rx)
}
fn wrap(&self, value: T) -> Guarded<T> {
Guarded::wrap(value, Rc::clone(&self.counter))
}
}