use anyhow::Result;
use rayon::prelude::{FromParallelIterator, IntoParallelIterator, ParallelIterator};
use std::{
sync::atomic::{AtomicUsize, Ordering},
time::{Duration, Instant},
};
use crate::{errors::rpc_timeout, signal::Waiter};
pub struct TimeoutTrigger {
start: Instant,
timeout: Duration,
}
impl TimeoutTrigger {
pub fn new(timeout: Duration) -> TimeoutTrigger {
TimeoutTrigger {
start: Instant::now(),
timeout,
}
}
#[inline]
pub fn check(&self) -> Result<()> {
Waiter::shutdown_check()?;
if self.start.elapsed() >= self.timeout {
return Err(rpc_timeout(&self.timeout));
}
Ok(())
}
}
pub fn timeout_collect<'a, T>(
timeout: &TimeoutTrigger,
iter: Box<dyn Iterator<Item = T> + 'a>,
) -> Result<Vec<T>> {
iter.enumerate()
.map(|(i, item)| {
if i % 1000 == 0 {
timeout.check()?;
}
Ok(item)
})
.collect()
}
pub fn par_timeout_collect<'a, ITEM: std::marker::Send, CONTAINER: FromParallelIterator<ITEM>>(
timeout: &TimeoutTrigger,
iter: impl 'a + IntoParallelIterator<Item = ITEM>,
) -> Result<CONTAINER> {
let i: AtomicUsize = AtomicUsize::new(0);
iter.into_par_iter()
.map(|item| {
if i.fetch_add(1, Ordering::Relaxed) % 1000 == 0 {
timeout.check()?;
}
Ok(item)
})
.collect()
}
pub fn par_timeout_collect_vec<'a, ITEM: std::marker::Send>(
timeout: &TimeoutTrigger,
iter: impl 'a + IntoParallelIterator<Item = ITEM>,
) -> Result<Vec<ITEM>> {
par_timeout_collect::<ITEM, Vec<ITEM>>(timeout, iter)
}
#[allow(clippy::boxed_local)]
pub fn par_timeout_collect_boxed<'a, T: std::marker::Send>(
timeout: &TimeoutTrigger,
iter: Box<impl 'a + IntoParallelIterator<Item = T>>,
) -> Result<Vec<T>> {
let i: AtomicUsize = AtomicUsize::new(0);
iter.into_par_iter()
.map(|item| {
if i.fetch_add(1, Ordering::Relaxed) % 1000 == 0 {
timeout.check()?;
}
Ok(item)
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread::sleep;
#[cfg(feature = "with-benchmarks")]
use test::Bencher;
#[test]
fn test_timeout() {
let timeout = TimeoutTrigger::new(Duration::from_millis(50));
assert!(!timeout.check().is_err());
sleep(Duration::from_millis(100));
assert!(timeout.check().is_err());
}
#[cfg(feature = "with-benchmarks")]
#[bench]
fn bench_timeout_check(b: &mut Bencher) {
let timeout = TimeoutTrigger::new(Duration::from_secs(60));
b.iter(|| {
let x: Vec<Result<()>> = (0..100_000).map(|_| timeout.check()).collect();
x
})
}
}