rx_core_observable_timer 0.2.0

timer observable for rx_core
Documentation
use std::time::Duration;

use rx_core_common::{
	Scheduler, SchedulerHandle, SchedulerScheduleWorkExtension, SharedSubscriber, Subscriber,
	SubscriptionLike, WorkCancellationId,
};
use rx_core_macro_subscription_derive::RxSubscription;

#[derive(RxSubscription)]
#[rx_delegate_teardown_collection]
pub struct TimerSubscription<Destination, S>
where
	Destination: 'static + Subscriber<In = ()>,
	S: Scheduler,
{
	#[destination]
	destination: SharedSubscriber<Destination>,
	scheduler: SchedulerHandle<S>,
	cancellation_id: Option<WorkCancellationId>,
}

impl<Destination, S> TimerSubscription<Destination, S>
where
	Destination: 'static + Subscriber<In = ()>,
	S: Scheduler,
{
	pub fn new(
		destination: Destination,
		duration: Duration,
		scheduler: SchedulerHandle<S>,
	) -> Self {
		let scheduler_clone = scheduler.clone();
		let destination = SharedSubscriber::new(destination);
		let cancellation_id = {
			let mut scheduler = scheduler_clone.lock();
			let cancellation_id = scheduler.generate_cancellation_id();
			let destination_clone = destination.clone();

			scheduler.schedule_delayed_work(
				move |_, _| {
					let mut destination = destination_clone.lock();
					destination.next(());
					if !destination.is_closed() {
						destination.complete();
					}
				},
				duration,
				cancellation_id,
			);

			cancellation_id
		};

		TimerSubscription {
			destination,
			scheduler,
			cancellation_id: Some(cancellation_id),
		}
	}
}

impl<Destination, S> SubscriptionLike for TimerSubscription<Destination, S>
where
	Destination: 'static + Subscriber<In = ()>,
	S: Scheduler,
{
	fn is_closed(&self) -> bool {
		self.destination.is_closed()
	}

	fn unsubscribe(&mut self) {
		if let Some(cancellation_id) = self.cancellation_id.take() {
			self.scheduler.lock().cancel(cancellation_id);
		}

		if !self.destination.is_closed() {
			self.destination.unsubscribe();
		}
	}
}