rx-rust 0.3.0

Reactive Programming in Rust inspired by ReactiveX https://reactivex.io/
Documentation
use super::{Subject, publish_subject::PublishSubject};
use crate::safe_lock;
use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
use crate::{
    disposable::subscription::Subscription,
    observable::Observable,
    observer::{Observer, Termination},
};
use educe::Educe;
use std::collections::VecDeque;

/// Buffers emissions and replays them to late subscribers.
#[derive(Educe)]
#[educe(Debug, Clone)]
pub struct ReplaySubject<'or, T, E> {
    values: Shared<Mutable<VecDeque<T>>>,
    buffer_size: Option<usize>,
    publish_subject: PublishSubject<'or, T, E>,
}

impl<T, E> ReplaySubject<'_, T, E> {
    pub fn new(buffer_size: Option<usize>) -> Self {
        let vec = match buffer_size {
            Some(size) => VecDeque::with_capacity(size),
            None => VecDeque::new(),
        };
        Self {
            values: Shared::new(Mutable::new(vec)),
            buffer_size,
            publish_subject: PublishSubject::default(),
        }
    }
}

impl<'or, 'sub, T, E> Observable<'or, 'sub, T, E> for ReplaySubject<'or, T, E>
where
    T: Clone + NecessarySendSync + 'sub,
    E: Clone + NecessarySendSync + 'sub,
    'or: 'sub,
{
    fn subscribe(
        self,
        mut observer: impl Observer<T, E> + NecessarySendSync + 'or,
    ) -> Subscription<'sub> {
        if let Some(terminated) = self.terminated() {
            match &terminated {
                Termination::Completed => {
                    let values = safe_lock!(clone: self.values);
                    for value in values {
                        observer.on_next(value);
                    }
                }
                Termination::Error(_) => {}
            }
            observer.on_termination(terminated);
            Subscription::default()
        } else {
            let values = safe_lock!(clone: self.values);
            for value in values {
                observer.on_next(value);
            }
            self.publish_subject.subscribe(observer)
        }
    }
}

impl<T, E> Observer<T, E> for ReplaySubject<'_, T, E>
where
    T: Clone + NecessarySendSync,
    E: Clone + NecessarySendSync,
{
    fn on_next(&mut self, value: T) {
        if self.terminated().is_none() {
            self.values.lock_mut(|mut lock| {
                if let Some(buffer_size) = self.buffer_size {
                    if lock.len() == buffer_size {
                        if lock.pop_front().is_some() {
                            // only push if the buffer is not 0
                            lock.push_back(value.clone());
                        }
                    } else {
                        lock.push_back(value.clone());
                    }
                } else {
                    lock.push_back(value.clone());
                }
            });
            self.publish_subject.on_next(value);
        }
    }

    fn on_termination(self, termination: Termination<E>) {
        self.publish_subject.on_termination(termination);
    }
}

impl<'or, 'sub, T, E> Subject<'or, 'sub, T, E> for ReplaySubject<'or, T, E>
where
    T: Clone + NecessarySendSync + 'sub,
    E: Clone + NecessarySendSync + 'sub,
    'or: 'sub,
{
    fn terminated(&self) -> Option<Termination<E>>
    where
        E: Clone,
    {
        self.publish_subject.terminated()
    }
}