threadpool-executor 0.3.3

A threadpool executor.
Documentation
use std::{
    collections::VecDeque,
    sync::{atomic::AtomicBool, Arc},
    time::Duration,
};

mod common;

#[test]
fn test() {
    common::setup_log();
    let pool = threadpool_executor::threadpool::Builder::new()
        .core_pool_size(1)
        .maximum_pool_size(3)
        .keep_alive_time(Duration::from_secs(300))
        .exeed_limit_policy(threadpool_executor::threadpool::ExceedLimitPolicy::CallerRuns)
        .build();
    let mut expectations = VecDeque::new();
    let e = 10;
    for i in 0..e {
        let j = i.clone();
        let exp = pool
            .execute(move || {
                log::info!("Run in a thread pool!");
                std::thread::sleep(Duration::from_secs(3));
                j
            })
            .unwrap();

        expectations.push_back(exp);
    }
    for i in 0..e {
        let mut exp = expectations.pop_front().unwrap();
        assert_eq!(exp.get_result().unwrap(), i);
    }
    let f = pool.execute(|| "abc");
    assert_eq!(f.unwrap().get_result().unwrap(), "abc");
}

#[test]
fn test_panic() {
    common::setup_log();
    let pool = threadpool_executor::ThreadPool::new(1);
    let r = pool.execute(|| {
        panic!("panic!!!");
    });
    let res = r.unwrap().get_result();
    assert!(res.is_err());
    if let Err(err) = res {
        matches!(err.kind(), threadpool_executor::error::ErrorKind::Panic);
    }

    let r = pool.execute(|| "abc");
    assert_eq!(r.unwrap().get_result().unwrap(), "abc");
}

#[test]
fn test_timeout() {
    common::setup_log();
    let pool = threadpool_executor::ThreadPool::new(1);
    let r = pool.execute(|| {
        std::thread::sleep(Duration::from_secs(3));
    });
    let res = r.unwrap().get_result_timeout(Duration::from_secs(1));
    assert!(res.is_err());
    if let Err(err) = res {
        matches!(err.kind(), threadpool_executor::error::ErrorKind::TimeOut);
    }
}

#[test]
fn test_worker_exit() {
    common::setup_log();
    let pool = threadpool_executor::threadpool::Builder::new()
        .core_pool_size(1)
        .maximum_pool_size(2)
        .keep_alive_time(Duration::from_secs(1))
        .build();
    pool.execute(|| std::thread::sleep(Duration::from_secs(1)))
        .unwrap();
    pool.execute(|| std::thread::sleep(Duration::from_secs(1)))
        .unwrap();

    std::thread::sleep(Duration::from_secs(10));
    assert_eq!(1, pool.size());
}

#[test]
fn test_reject() {
    common::setup_log();
    let pool = threadpool_executor::threadpool::Builder::new()
        .core_pool_size(1)
        .maximum_pool_size(1)
        .exeed_limit_policy(threadpool_executor::threadpool::ExceedLimitPolicy::Reject)
        .build();
    let res = pool.execute(|| {
        std::thread::sleep(std::time::Duration::from_secs(3));
    });
    assert!(res.is_ok());
    let res = pool.execute(|| "a");
    assert!(res.is_err());
    if let Err(err) = res {
        matches!(
            err.kind(),
            threadpool_executor::error::ErrorKind::TaskRejected
        );
    }
}

#[test]
fn test_cancel() {
    common::setup_log();
    let pool = threadpool_executor::ThreadPool::new(1);

    let a = Arc::new(AtomicBool::new(true));
    let a1 = a.clone();
    let mut r1 = pool
        .execute(move || {
            std::thread::sleep(std::time::Duration::from_secs(3));
            a1.store(false, std::sync::atomic::Ordering::Relaxed);
        })
        .unwrap();
    std::thread::sleep(Duration::from_secs(1));
    let r1c = r1.cancel();
    assert!(r1c.is_err());
    if let Err(err) = r1c {
        matches!(
            err.kind(),
            threadpool_executor::error::ErrorKind::TaskRunning
        );
    }

    let b = Arc::new(AtomicBool::new(true));
    let b1 = b.clone();
    let mut r2 = pool
        .execute(move || b1.store(false, std::sync::atomic::Ordering::Relaxed))
        .unwrap();
    r2.cancel().unwrap();
    std::thread::sleep(Duration::from_secs(5));
    assert!(r2.is_cancelled());
    assert!(b.load(std::sync::atomic::Ordering::Relaxed));
    assert!(r1.is_done());
    assert!(!a.load(std::sync::atomic::Ordering::Relaxed));
}