rostrum 8.0.0

An efficient implementation of Electrum Server with token support
Documentation
use anyhow::Result;
use rayon::prelude::{FromParallelIterator, IntoParallelIterator, ParallelIterator};
use std::{
    sync::atomic::{AtomicUsize, Ordering},
    time::{Duration, Instant},
};

use crate::{errors::rpc_timeout, signal::Waiter};

pub struct TimeoutTrigger {
    start: Instant,
    timeout: Duration,
}

impl TimeoutTrigger {
    pub fn new(timeout: Duration) -> TimeoutTrigger {
        TimeoutTrigger {
            start: Instant::now(),
            timeout,
        }
    }

    #[inline]
    pub fn check(&self) -> Result<()> {
        Waiter::shutdown_check()?;
        if self.start.elapsed() >= self.timeout {
            return Err(rpc_timeout(&self.timeout));
        }
        Ok(())
    }
}

/**
 * Collect items in iterator, but check for timeout every 1000 elements.
 */
pub fn timeout_collect<'a, T>(
    timeout: &TimeoutTrigger,
    iter: Box<dyn Iterator<Item = T> + 'a>,
) -> Result<Vec<T>> {
    iter.enumerate()
        .map(|(i, item)| {
            if i % 1000 == 0 {
                timeout.check()?;
            }
            Ok(item)
        })
        .collect()
}

/**
 * Collect items in iterator, but check for timeout every 1000 elements.
 */
pub fn par_timeout_collect<'a, ITEM: std::marker::Send, CONTAINER: FromParallelIterator<ITEM>>(
    timeout: &TimeoutTrigger,
    iter: impl 'a + IntoParallelIterator<Item = ITEM>,
) -> Result<CONTAINER> {
    let i: AtomicUsize = AtomicUsize::new(0);

    iter.into_par_iter()
        .map(|item| {
            if i.fetch_add(1, Ordering::Relaxed) % 1000 == 0 {
                timeout.check()?;
            }
            Ok(item)
        })
        .collect()
}

pub fn par_timeout_collect_vec<'a, ITEM: std::marker::Send>(
    timeout: &TimeoutTrigger,
    iter: impl 'a + IntoParallelIterator<Item = ITEM>,
) -> Result<Vec<ITEM>> {
    par_timeout_collect::<ITEM, Vec<ITEM>>(timeout, iter)
}

/**
 * Collect items in iterator, but check for timeout every 1000 elements.
 */
#[allow(clippy::boxed_local)]
pub fn par_timeout_collect_boxed<'a, T: std::marker::Send>(
    timeout: &TimeoutTrigger,
    iter: Box<impl 'a + IntoParallelIterator<Item = T>>,
) -> Result<Vec<T>> {
    let i: AtomicUsize = AtomicUsize::new(0);

    iter.into_par_iter()
        .map(|item| {
            if i.fetch_add(1, Ordering::Relaxed) % 1000 == 0 {
                timeout.check()?;
            }
            Ok(item)
        })
        .collect()
}

#[cfg(test)]
mod tests {

    use super::*;
    use std::thread::sleep;
    #[cfg(feature = "with-benchmarks")]
    use test::Bencher;

    #[test]
    fn test_timeout() {
        let timeout = TimeoutTrigger::new(Duration::from_millis(50));
        assert!(!timeout.check().is_err());
        sleep(Duration::from_millis(100));
        assert!(timeout.check().is_err());
    }

    #[cfg(feature = "with-benchmarks")]
    #[bench]
    fn bench_timeout_check(b: &mut Bencher) {
        let timeout = TimeoutTrigger::new(Duration::from_secs(60));
        b.iter(|| {
            let x: Vec<Result<()>> = (0..100_000).map(|_| timeout.check()).collect();
            x
        })
    }
}