use crate::{Signal, SubscriberNotification};
use bitflags::bitflags;
bitflags! {
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
struct SubscriberStateBits: u8 {
const WAITING = 0b00000001;
const PRIMED = 0b00000010;
const COMPLETED = 0b00000100;
const ERRORED = 0b00001000;
const UNSUBSCRIBED = 0b00010000;
}
}
impl Default for SubscriberStateBits {
#[inline]
fn default() -> Self {
SubscriberStateBits::WAITING
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
pub struct SubscriberState {
state: SubscriberStateBits,
}
impl SubscriberState {
#[inline]
pub fn is_waiting(&self) -> bool {
self.state.contains(SubscriberStateBits::WAITING)
}
#[inline]
pub fn is_primed(&self) -> bool {
self.state.contains(SubscriberStateBits::PRIMED)
}
#[inline]
pub fn is_completed(&self) -> bool {
self.state.contains(SubscriberStateBits::COMPLETED)
}
#[inline]
pub fn is_completed_but_not_primed(&self) -> bool {
self.is_completed() && !self.is_primed()
}
#[inline]
pub fn is_errored(&self) -> bool {
self.state.contains(SubscriberStateBits::ERRORED)
}
#[inline]
pub fn is_unsubscribed(&self) -> bool {
self.state.contains(SubscriberStateBits::UNSUBSCRIBED)
}
#[inline]
pub fn is_closed(&self) -> bool {
self.is_unsubscribed() || self.is_finished()
}
#[inline]
pub fn is_finished(&self) -> bool {
self.is_completed() || self.is_errored()
}
#[inline]
pub fn is_closed_but_primed(&self) -> bool {
self.is_closed() && self.is_primed()
}
#[inline]
pub fn is_closed_but_not_primed(&self) -> bool {
self.is_closed() && !self.is_primed()
}
#[inline]
pub fn is_closed_but_not_primed_and_not_completed(&self) -> bool {
self.is_closed() && !self.is_primed() && !self.is_completed()
}
#[inline]
pub fn is_closed_but_not_completed(&self) -> bool {
(self.is_unsubscribed() || self.is_errored()) && !self.is_completed()
}
#[inline]
pub fn is_closed_but_not_errored(&self) -> bool {
(self.is_unsubscribed() || self.is_completed()) && !self.is_errored()
}
#[inline]
pub fn is_closed_but_not_completed_and_primed(&self) -> bool {
self.is_closed_but_not_completed() && self.is_primed()
}
#[inline]
pub fn next(&mut self) {
debug_assert!(
!self.is_closed(),
"It should not be possible that an already closed observable nexts!"
);
self.state.remove(SubscriberStateBits::WAITING);
self.state.insert(SubscriberStateBits::PRIMED);
}
#[inline]
pub fn complete(&mut self) {
debug_assert!(
!self.is_closed(),
"It should not be possible that an already closed observable completes!"
);
self.state.remove(SubscriberStateBits::WAITING);
self.state.insert(SubscriberStateBits::COMPLETED);
}
#[inline]
pub fn error(&mut self) {
debug_assert!(
!self.is_closed(),
"It should not be possible that an already closed observable errors!"
);
self.state.remove(SubscriberStateBits::WAITING);
self.state.insert(SubscriberStateBits::ERRORED);
}
#[inline]
pub fn unsubscribe(&mut self) {
debug_assert!(
!self.is_unsubscribed(),
"It should not be possible that an already unsubscribed observable unsubscribes!"
);
self.state.remove(SubscriberStateBits::WAITING);
self.state.insert(SubscriberStateBits::UNSUBSCRIBED);
}
#[inline]
pub fn unsubscribe_if_not_already(&mut self) {
self.state.remove(SubscriberStateBits::WAITING);
self.state.insert(SubscriberStateBits::UNSUBSCRIBED);
}
pub fn update_with_notification<In, InError>(
&mut self,
notification: &SubscriberNotification<In, InError>,
) where
In: Signal,
InError: Signal,
{
match notification {
SubscriberNotification::Unsubscribe => self.unsubscribe(),
SubscriberNotification::Complete => self.complete(),
SubscriberNotification::Error(_) => self.error(),
SubscriberNotification::Next(_) => self.next(),
}
}
pub fn notification_matches_state<In, InError>(
&mut self,
notification: &SubscriberNotification<In, InError>,
) -> bool
where
In: Signal,
InError: Signal,
{
match notification {
SubscriberNotification::Unsubscribe => self.is_unsubscribed(),
SubscriberNotification::Complete => self.is_completed(),
SubscriberNotification::Error(_) => self.is_errored(),
SubscriberNotification::Next(_) => self.is_primed(),
}
}
pub fn update_with_notification_would_be_invalid<In, InError>(
&self,
notification: &SubscriberNotification<In, InError>,
) -> bool
where
In: Signal,
InError: Signal,
{
match notification {
SubscriberNotification::Unsubscribe => self.is_unsubscribed(),
SubscriberNotification::Complete
| SubscriberNotification::Error(_)
| SubscriberNotification::Next(_) => self.is_closed(),
}
}
}
#[cfg(test)]
mod test {
use crate::SubscriberState;
fn mute_panic<R>(fun: impl FnOnce() -> R) -> R {
let hook = std::panic::take_hook();
std::panic::set_hook(Box::new(|_| {}));
let result = fun();
std::panic::set_hook(hook);
result
}
#[test]
fn it_should_be_waiting_by_default() {
assert!(SubscriberState::default().is_waiting());
}
mod next {
use super::*;
#[test]
fn it_should_not_be_waiting_once_nexted() {
let mut state = SubscriberState::default();
state.next();
assert!(!state.is_waiting());
assert!(!state.is_closed());
state.next();
state.next();
assert!(!state.is_waiting());
assert!(!state.is_closed());
}
#[test]
#[should_panic]
fn it_should_panic_when_nexting_after_complete() {
mute_panic(|| {
let mut state = SubscriberState::default();
state.complete();
state.next();
});
}
#[test]
#[should_panic]
fn it_should_panic_when_nexting_after_error() {
mute_panic(|| {
let mut state = SubscriberState::default();
state.error();
state.next();
});
}
#[test]
#[should_panic]
fn it_should_panic_when_nexting_after_unsubscribe() {
mute_panic(|| {
let mut state = SubscriberState::default();
state.unsubscribe();
state.next();
});
}
}
mod complete {
use super::*;
#[test]
fn it_should_not_be_waiting_once_completed() {
let mut state = SubscriberState::default();
state.complete();
assert!(state.is_completed());
assert!(state.is_closed_but_not_errored());
assert!(!state.is_waiting());
}
#[test]
#[should_panic]
fn it_should_panic_when_completing_twice() {
mute_panic(|| {
let mut state = SubscriberState::default();
state.complete();
state.complete();
});
}
#[test]
#[should_panic]
fn it_should_panic_when_completing_after_error() {
mute_panic(|| {
let mut state = SubscriberState::default();
state.error();
state.complete();
});
}
#[test]
#[should_panic]
fn it_should_panic_when_completing_after_unsubscribe() {
mute_panic(|| {
let mut state = SubscriberState::default();
state.unsubscribe();
state.complete();
});
}
}
mod error {
use super::*;
#[test]
fn it_should_not_be_waiting_once_errored() {
let mut state = SubscriberState::default();
state.error();
assert!(state.is_errored());
assert!(state.is_closed_but_not_completed());
assert!(!state.is_waiting());
}
#[test]
#[should_panic]
fn it_should_panic_when_erroring_after_complete() {
mute_panic(|| {
let mut state = SubscriberState::default();
state.complete();
state.error();
});
}
#[test]
#[should_panic]
fn it_should_panic_when_erroring_twice() {
mute_panic(|| {
let mut state = SubscriberState::default();
state.error();
state.error();
});
}
#[test]
#[should_panic]
fn it_should_panic_when_erroring_after_unsubscribe() {
mute_panic(|| {
let mut state = SubscriberState::default();
state.unsubscribe();
state.error();
});
}
}
mod unsubscribe {
use super::*;
#[test]
fn it_should_not_be_waiting_once_unsubscribed() {
let mut state = SubscriberState::default();
state.unsubscribe();
assert!(state.is_unsubscribed());
assert!(!state.is_waiting());
}
#[test]
fn it_can_unsubscribe_after_completed() {
let mut state = SubscriberState::default();
state.complete();
state.unsubscribe();
assert!(state.is_completed());
assert!(state.is_unsubscribed());
assert!(!state.is_waiting());
}
#[test]
fn it_can_unsubscribe_after_errored() {
let mut state = SubscriberState::default();
state.error();
state.unsubscribe();
assert!(state.is_errored());
assert!(state.is_unsubscribed());
assert!(!state.is_waiting());
}
#[test]
#[should_panic]
fn it_should_panic_when_unsubscribed_twice() {
mute_panic(|| {
let mut state = SubscriberState::default();
state.unsubscribe();
state.unsubscribe();
});
}
}
}