qiniu-http-client 0.2.2

Qiniu HTTP Client for Rust
Documentation
use anyhow::Result;
use once_cell::sync::Lazy;
use std::{
    collections::{HashMap, VecDeque},
    sync::Mutex,
};

type Task = Box<dyn FnOnce() + Send + 'static>;
type TasksDequeue = VecDeque<Task>;
type TasksMap = HashMap<String, TasksDequeue>;

static THREADS_MAP_LOCK: Lazy<Mutex<TasksMap>> = Lazy::new(Default::default);

pub(super) fn spawn<F: FnOnce() + Send + 'static>(task_name: String, f: F) -> Result<()> {
    let mut threads_map = THREADS_MAP_LOCK.lock().unwrap();
    if let Some(dequeue) = threads_map.get_mut(&task_name) {
        dequeue.push_back(Box::new(f));
        return Ok(());
    } else {
        let mut dequeue = TasksDequeue::with_capacity(1);
        dequeue.push_back(Box::new(f));
        threads_map.insert(task_name.to_owned(), dequeue);
        return spawn_inner(task_name);
    }

    fn spawn_inner(task_name: String) -> Result<()> {
        _spawn(task_name.to_owned(), move || {
            while let Some(task) = get_task(&task_name) {
                task();
            }
        })
    }

    fn get_task(task_name: &str) -> Option<Task> {
        let mut threads_map = THREADS_MAP_LOCK.lock().unwrap();
        if let Some(dequeue) = threads_map.get_mut(task_name) {
            if let Some(task) = dequeue.pop_front() {
                return Some(task);
            }
            threads_map.remove(task_name);
        }
        None
    }

    #[cfg(not(feature = "async"))]
    fn _spawn<F: FnOnce() + Send + 'static>(task_name: String, f: F) -> Result<()> {
        std::thread::Builder::new()
            .name(task_name)
            .spawn(f)
            .map(|_| ())
            .map_err(|err| err.into())
    }

    #[cfg(feature = "async")]
    fn _spawn<F: FnOnce() + Send + 'static>(_task_name: String, f: F) -> Result<()> {
        async_std::task::spawn_blocking(f);
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::{
        sync::{
            atomic::{AtomicUsize, Ordering},
            Arc,
        },
        thread::sleep,
        time::Duration,
    };

    #[test]
    fn test_spawn() -> Result<()> {
        env_logger::builder().is_test(true).try_init().ok();

        let spawned_task_1 = Arc::new(AtomicUsize::new(0));
        let spawned_task_2 = Arc::new(AtomicUsize::new(0));

        for i in 0..1000usize {
            let spawned_task_1 = spawned_task_1.to_owned();
            spawn("task1".to_owned(), move || {
                if i == 0 {
                    sleep(Duration::from_secs(1));
                }
                spawned_task_1.fetch_add(1, Ordering::Relaxed);
            })?;
            let spawned_task_2 = spawned_task_2.to_owned();
            spawn("task2".to_owned(), move || {
                if i == 0 {
                    sleep(Duration::from_secs(1));
                }
                spawned_task_2.fetch_add(1, Ordering::Relaxed);
            })?;
        }

        assert_eq!(spawned_task_1.load(Ordering::Relaxed), 0);
        assert_eq!(spawned_task_2.load(Ordering::Relaxed), 0);

        sleep(Duration::from_secs(2));

        assert_eq!(spawned_task_1.load(Ordering::Relaxed), 1000);
        assert_eq!(spawned_task_2.load(Ordering::Relaxed), 1000);

        Ok(())
    }
}