rx-rust 0.3.0

Reactive Programming in Rust inspired by ReactiveX https://reactivex.io/
Documentation
use crate::tests_utils::join_handle::JoinHandle;
use educe::Educe;
use rx_rust::utils::types::NecessarySendSync;
use rx_rust::utils::types::Shared;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;

#[derive(Educe)]
#[educe(Debug, Clone, Default)]
struct TestRuntimeContext {
    alive_tasks_count: Shared<AtomicUsize>,
    #[cfg(not(feature = "single-threaded"))]
    is_spawned_late: bool,
    #[cfg(not(feature = "single-threaded"))]
    is_abort_late: bool,
    #[cfg(not(feature = "single-threaded"))]
    thread_name: Option<&'static str>,
}

cfg_if::cfg_if! {
    if #[cfg(feature = "local-pool-scheduler")] {
        use futures::executor::{LocalPool, LocalSpawner};
        use rx_rust::utils::types::Mutable;
        #[derive(Educe)]
        #[educe(Debug, Clone)]
        pub(crate) struct TestRuntime {
            pool: Shared<Mutable<LocalPool>>,
            pub(crate) spawner: LocalSpawner,
            context: TestRuntimeContext
        }
        impl Default for TestRuntime {
            fn default() -> Self {
                let pool = LocalPool::new();
                let spawner = pool.spawner();
                Self {
                    pool: Shared::new(Mutable::new(pool)),
                    spawner,
                    context: TestRuntimeContext::default(),
                }
            }
        }
    } else if #[cfg(feature = "thread-pool-scheduler")] {
        use futures::executor::ThreadPool;
        #[derive(Educe)]
        #[educe(Debug, Clone)]
        pub(crate) struct TestRuntime {
            pub(crate) pool: ThreadPool,
            context: TestRuntimeContext
        }
        impl Default for TestRuntime {
            fn default() -> Self {
                Self {
                    pool: ThreadPool::new().unwrap(),
                    context: TestRuntimeContext::default(),
                }
            }
        }
    } else {
        #[derive(Educe)]
        #[educe(Debug, Clone, Default)]
        pub(crate) struct TestRuntime {
            context: TestRuntimeContext
        }
    }

}

impl TestRuntime {
    pub(crate) fn spawn<FU>(&self, future: FU) -> JoinHandle<FU>
    where
        FU: Future + NecessarySendSync + 'static,
        FU::Output: NecessarySendSync + 'static,
    {
        let (join_handle, future) = JoinHandle::wrap(future);
        cfg_if::cfg_if! {
            if #[cfg(feature = "local-pool-scheduler")] {
                use futures::task::LocalSpawnExt;
                self.spawner.spawn_local(future).unwrap();
            } else if #[cfg(feature = "thread-pool-scheduler")] {
                use futures::task::SpawnExt;
                self.pool.spawn(future).unwrap();
            } else if #[cfg(feature = "tokio-scheduler")] {
                tokio::runtime::Handle::current().spawn(future);
            } else if #[cfg(feature = "async-std-scheduler")] {
                async_std::task::spawn(future);
            } else {
                panic!("You need to specify a feature to run tests.");
            }
        }
        join_handle
    }

    #[cfg(not(feature = "single-threaded"))]
    pub(crate) fn clone_with_thread_name(&self, thread_name: &'static str) -> Self {
        let mut cloned = self.clone();
        cloned.context.thread_name = Some(thread_name);
        cloned
    }

    pub(crate) fn get_alive_tasks_count(&self) -> usize {
        self.context.alive_tasks_count.load(Ordering::SeqCst)
    }

    pub(crate) fn alive_tasks_count_add_1(&self) {
        self.context
            .alive_tasks_count
            .fetch_add(1, Ordering::SeqCst);
    }

    pub(crate) fn alive_tasks_count_sub_1(&self) {
        self.context
            .alive_tasks_count
            .fetch_sub(1, Ordering::SeqCst);
    }

    #[cfg(not(feature = "single-threaded"))]
    pub(crate) fn is_spawned_late(&self) -> bool {
        self.context.is_spawned_late
    }

    #[cfg(not(feature = "single-threaded"))]
    pub(crate) fn is_abort_late(&self) -> bool {
        self.context.is_abort_late
    }

    #[cfg(not(feature = "single-threaded"))]
    pub(crate) fn get_expected_thread_name(&self) -> Option<&'static str> {
        self.context.thread_name
    }
}

pub(crate) fn block_on<FU>(body: impl FnOnce(TestRuntime) -> FU)
where
    FU: Future<Output = ()> + 'static,
{
    cfg_if::cfg_if! {
        if #[cfg(not(feature = "single-threaded"))] {
            use rand::random;
            let mut runtime = TestRuntime::default();
            runtime.context.is_spawned_late = random();
            runtime.context.is_abort_late = random();
        } else {
            let runtime = TestRuntime::default();
        }
    }
    cfg_if::cfg_if! {
        if #[cfg(feature = "local-pool-scheduler")] {
            use futures::task::LocalSpawnExt;
            use rx_rust::utils::types::MutableHelper;
            runtime.spawner.spawn_local(body(runtime.clone())).unwrap();
            runtime.pool.lock_mut(|mut lock| lock.run());
        } else if #[cfg(feature = "thread-pool-scheduler")] {
            futures::executor::block_on(body(runtime));
        } else if #[cfg(feature = "tokio-scheduler")] {
            tokio::runtime::Builder::new_multi_thread()
            .enable_all()
            .build()
            .expect("Failed building the Runtime")
            .block_on(body(runtime));
        } else if #[cfg(feature = "async-std-scheduler")] {
            async_std::task::block_on(body(runtime));
        } else {
            panic!("You need to specify a feature to run tests.");
        }
    }
}

#[macro_export]
macro_rules! check_with_spawned_late {
    ($runtime:ident, $not_spawned:expr, $spawned:expr) => {
        cfg_if::cfg_if! {
            if #[cfg(not(feature = "single-threaded"))] {
                if $runtime.is_spawned_late() {
                    $not_spawned
                    $runtime.sleep(tests_utils::DURATION_10_MS).await;
                    $spawned
                } else {
                    $runtime.sleep(tests_utils::DURATION_3_MS).await;
                    $spawned
                }
            } else {
                $not_spawned
                $runtime.sleep(tests_utils::DURATION_3_MS).await;
                $spawned
            }
        }
    };
}

#[macro_export]
macro_rules! check_with_abort_late {
    ($runtime:ident, $not_spawned:expr, $spawned:expr) => {
        cfg_if::cfg_if! {
            if #[cfg(not(feature = "single-threaded"))] {
                if $runtime.is_abort_late() {
                    $not_spawned
                    $runtime.sleep(tests_utils::DURATION_10_MS).await;
                    $spawned
                } else {
                    $runtime.sleep(tests_utils::DURATION_3_MS).await;
                    $spawned
                }
            } else {
                $not_spawned
                $runtime.sleep(tests_utils::DURATION_3_MS).await;
                $spawned
            }
        }
    };
}

#[macro_export]
macro_rules! check_with_spawned_and_abort_late {
    ($runtime:ident, $spawned_first:expr, $abort_first:expr) => {
        #[cfg(not(feature = "single-threaded"))]
        {
            $runtime.sleep(tests_utils::DURATION_3_MS).await;
            match ($runtime.is_spawned_late(), $runtime.is_abort_late()) {
                (true, true) => {}
                (false, true) => $spawned_first,
                (true, false) => $abort_first,
                (false, false) => {}
            }
        }
    };
}