rx-rust 0.3.0

Reactive Programming in Rust inspired by ReactiveX https://reactivex.io/
Documentation
use super::Scheduler;
use crate::{disposable::Disposable, utils::types::NecessarySendSync};
use std::time::Duration;

/// Leverages a Tokio runtime handle to drive scheduled tasks.
impl Scheduler for tokio::runtime::Handle {
    fn schedule_periodically(
        &self,
        mut task: impl FnMut(usize) -> bool + NecessarySendSync + 'static,
        period: Duration,
        delay: Option<Duration>,
    ) -> impl Disposable + NecessarySendSync + 'static {
        let this = self.clone();
        self.schedule_future(async move {
            if let Some(delay) = delay {
                this.sleep(delay).await;
            }
            let mut ticker = tokio::time::interval(period);
            let mut count = 0;
            loop {
                ticker.tick().await;
                let stop = task(count);
                count += 1;
                if stop {
                    break;
                }
            }
        })
    }

    fn schedule_future(
        &self,
        future: impl Future<Output = ()> + NecessarySendSync + 'static,
    ) -> impl Disposable + NecessarySendSync + 'static {
        self.spawn(future)
    }

    fn sleep(&self, duration: Duration) -> impl Future + NecessarySendSync + 'static {
        tokio::time::sleep(duration)
    }
}

/// Allows aborting spawned Tokio tasks via the `Disposable` interface.
impl<T> Disposable for tokio::task::JoinHandle<T> {
    fn dispose(self) {
        self.abort();
    }
}