use crate::{
disposable::{
Disposable, boxed_disposal::BoxedDisposal, callback_disposal::CallbackDisposal,
subscription::Subscription,
},
observable::Observable,
observer::{Observer, Termination},
safe_lock, safe_lock_option_observer,
scheduler::Scheduler,
utils::{
types::{Mutable, MutableHelper, NecessarySendSync, Shared},
unsub_after_termination::subscribe_unsub_after_termination,
},
};
use educe::Educe;
use std::time::Duration;
#[derive(Educe)]
#[educe(Debug, Clone, PartialEq, Eq)]
pub enum Error<E> {
Timeout,
SourceError(E),
}
#[derive(Educe)]
#[educe(Debug, Clone)]
pub struct Timeout<OE, S> {
source: OE,
duration: Duration,
scheduler: S,
}
impl<OE, S> Timeout<OE, S> {
pub fn new(source: OE, duration: Duration, scheduler: S) -> Self {
Self {
source,
duration,
scheduler,
}
}
}
impl<'or, 'sub, T, E, OE, S> Observable<'static, 'sub, T, Error<E>> for Timeout<OE, S>
where
OE: Observable<'or, 'static, T, E>,
S: Scheduler,
{
fn subscribe(
self,
observer: impl Observer<T, Error<E>> + NecessarySendSync + 'static,
) -> Subscription<'static> {
subscribe_unsub_after_termination(observer, |observer| {
let context = Shared::new(Mutable::new(TimeoutContext {
timer_state: TimerState::Initialized,
version: 0,
}));
let observer = Shared::new(Mutable::new(Some(observer)));
let timeout_observer = TimeoutObserver {
observer: observer.clone(),
duration: self.duration,
scheduler: self.scheduler.clone(),
context: context.clone(),
};
let sub = self.source.subscribe(timeout_observer);
let timer = create_timer(
0,
observer.clone(),
self.duration,
self.scheduler.clone(),
context.clone(),
);
let timer_state =
safe_lock!(mem_replace: context, timer_state, TimerState::Scheduled(timer));
match timer_state {
TimerState::Initialized => {} TimerState::Scheduled(_) => unreachable!(),
TimerState::DidTimeout => {} TimerState::Disposed => unreachable!(),
}
sub + context
})
}
}
enum TimerState {
Initialized,
Scheduled(BoxedDisposal<'static>),
DidTimeout,
Disposed,
}
struct TimeoutContext {
timer_state: TimerState,
version: usize,
}
impl Disposable for Shared<Mutable<TimeoutContext>> {
fn dispose(self) {
let timer_state = safe_lock!(mem_replace: self, timer_state, TimerState::Disposed);
match timer_state {
TimerState::Initialized => unreachable!(),
TimerState::Scheduled(disposal) => disposal.dispose(), TimerState::DidTimeout => {} TimerState::Disposed => unreachable!(),
}
}
}
struct TimeoutObserver<OR, S> {
observer: Shared<Mutable<Option<OR>>>,
duration: Duration,
scheduler: S,
context: Shared<Mutable<TimeoutContext>>,
}
impl<T, E, OR, S> Observer<T, E> for TimeoutObserver<OR, S>
where
OR: Observer<T, Error<E>> + NecessarySendSync + 'static,
S: Scheduler,
{
fn on_next(&mut self, value: T) {
self.context
.lock_mut(|mut lock| match &mut lock.timer_state {
TimerState::Initialized => {
drop(lock);
safe_lock_option_observer!(on_next: self.observer, value);
}
TimerState::Scheduled(disposal) => {
let disposal = std::mem::replace(
disposal,
BoxedDisposal::new(CallbackDisposal::new(|| {})), );
disposal.dispose();
lock.version += 1;
let timer = create_timer(
lock.version,
self.observer.clone(),
self.duration,
self.scheduler.clone(),
self.context.clone(),
);
lock.timer_state = TimerState::Scheduled(timer);
drop(lock);
safe_lock_option_observer!(on_next: self.observer, value);
}
TimerState::DidTimeout => {}
TimerState::Disposed => {}
});
}
fn on_termination(self, termination: Termination<E>) {
match termination {
Termination::Completed => {
safe_lock_option_observer!(on_termination: self.observer, Termination::Completed);
}
Termination::Error(error) => {
safe_lock_option_observer!(on_termination: self.observer, Termination::Error(Error::SourceError(error)));
}
}
}
}
fn create_timer<T, E, OR, S>(
version: usize,
observer: Shared<Mutable<Option<OR>>>,
duration: Duration,
scheduler: S,
context: Shared<Mutable<TimeoutContext>>,
) -> BoxedDisposal<'static>
where
OR: Observer<T, Error<E>> + NecessarySendSync + 'static,
S: Scheduler,
{
BoxedDisposal::new(scheduler.schedule(
move || {
context.lock_mut(|mut lock| {
if lock.version != version {
return;
}
let timer_state = std::mem::replace(&mut lock.timer_state, TimerState::DidTimeout);
drop(lock);
safe_lock_option_observer!(on_termination: observer, Termination::Error(Error::Timeout));
match timer_state {
TimerState::Initialized => {} TimerState::Scheduled(disposal) => disposal.dispose(), TimerState::DidTimeout => unreachable!(),
TimerState::Disposed => {},
}
});
},
Some(duration),
))
}