br-db 1.8.71

This is an orm database mysql mssql sqlite
Documentation
use json::JsonValue;
use log::warn;
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::thread::JoinHandle;

type Job = Box<dyn FnOnce(usize) -> JsonValue + 'static + Send>;

enum Message {
    End,
    NewJob(Job),
}

struct Worker {
    _id: usize,
    t: Option<JoinHandle<Vec<JsonValue>>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
        let t = thread::spawn(move || {
            let mut list = vec![];
            let mut count_flag: i64 = 0;
            loop {
                let message = match receiver.lock() {
                    Ok(guard) => match guard.recv() {
                        Ok(msg) => msg,
                        Err(_) => break,
                    },
                    Err(_) => break,
                };
                match message {
                    Message::NewJob(job) => {
                        list.push(job(id));
                        count_flag += 1;
                        if count_flag == 10000 {
                            warn!("Worker-new循环次数: 1w,强制退出");
                            break;
                        }
                    }
                    Message::End => {
                        break;
                    }
                }
            }
            list
        });
        Worker {
            _id: id,
            t: Some(t),
        }
    }
}

pub struct Pool {
    workers: Vec<Worker>,
    max_workers: usize,
    sender: mpsc::Sender<Message>,
}

impl Pool {
    pub fn new(max_workers: usize) -> Pool {
        if max_workers == 0 {
            println!("max_workers 必须大于0")
        }
        let (tx, rx) = mpsc::channel();
        let mut workers = Vec::with_capacity(max_workers);
        let receiver = Arc::new(Mutex::new(rx));
        for i in 0..max_workers {
            workers.push(Worker::new(i, Arc::clone(&receiver)));
        }
        Pool {
            workers,
            max_workers,
            sender: tx,
        }
    }
    pub fn execute<F>(&self, f: F)
    where
        F: 'static + Send + FnOnce(usize) -> JsonValue,
    {
        let job = Message::NewJob(Box::new(f));
        let _ = self.sender.send(job);
    }
    pub fn end(&mut self) -> JsonValue {
        for _ in 0..self.max_workers {
            let _ = self.sender.send(Message::End);
        }
        let mut list = vec![];
        for w in self.workers.iter_mut() {
            if let Some(t) = w.t.take() {
                if let Ok(data) = t.join() {
                    list.extend(data);
                }
            }
        }
        JsonValue::from(list)
    }
    pub fn insert_all(&mut self) -> (Vec<String>, String) {
        for _ in 0..self.max_workers {
            let _ = self.sender.send(Message::End);
        }
        let mut list = String::new();
        let mut id = vec![];
        for w in self.workers.iter_mut() {
            if let Some(t) = w.t.take() {
                if let Ok(data) = t.join() {
                    for item in data.iter() {
                        id.push(item[0].to_string());
                        list = format!("{},{}", list, item[1].clone());
                    }
                }
            }
        }
        (id, list)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use json::{array, object, JsonValue};
    use std::sync::atomic::{AtomicUsize, Ordering};
    use std::sync::Arc;

    #[test]
    fn new_with_1_worker() {
        let pool = Pool::new(1);
        assert_eq!(pool.max_workers, 1);
        assert_eq!(pool.workers.len(), 1);
    }

    #[test]
    fn new_with_2_workers() {
        let pool = Pool::new(2);
        assert_eq!(pool.max_workers, 2);
        assert_eq!(pool.workers.len(), 2);
    }

    #[test]
    fn new_with_4_workers() {
        let pool = Pool::new(4);
        assert_eq!(pool.max_workers, 4);
        assert_eq!(pool.workers.len(), 4);
    }

    #[test]
    fn new_with_0_workers_does_not_panic() {
        let pool = Pool::new(0);
        assert_eq!(pool.max_workers, 0);
        assert_eq!(pool.workers.len(), 0);
    }

    #[test]
    fn execute_single_job_and_end() {
        let mut pool = Pool::new(1);
        pool.execute(|_worker_id| JsonValue::from(42));
        let result = pool.end();
        assert!(result.is_array());
        assert_eq!(result.len(), 1);
        assert_eq!(result[0], 42);
    }

    #[test]
    fn execute_multiple_jobs_single_worker() {
        let mut pool = Pool::new(1);
        for i in 0..5 {
            pool.execute(move |_| JsonValue::from(i));
        }
        let result = pool.end();
        assert_eq!(result.len(), 5);
        let mut values: Vec<i32> = (0..5).map(|i| result[i].as_i32().unwrap()).collect();
        values.sort();
        assert_eq!(values, vec![0, 1, 2, 3, 4]);
    }

    #[test]
    fn execute_multiple_jobs_multiple_workers() {
        let mut pool = Pool::new(4);
        let counter = Arc::new(AtomicUsize::new(0));
        for _ in 0..20 {
            let c = Arc::clone(&counter);
            pool.execute(move |_| {
                c.fetch_add(1, Ordering::SeqCst);
                JsonValue::from(1)
            });
        }
        let result = pool.end();
        assert_eq!(result.len(), 20);
        assert_eq!(counter.load(Ordering::SeqCst), 20);
    }

    #[test]
    fn end_with_no_jobs_returns_empty_array() {
        let mut pool = Pool::new(2);
        let result = pool.end();
        assert!(result.is_array());
        assert_eq!(result.len(), 0);
    }

    #[test]
    fn jobs_receive_valid_worker_id() {
        let mut pool = Pool::new(4);
        for _ in 0..20 {
            pool.execute(|worker_id| {
                assert!(worker_id < 4);
                JsonValue::from(worker_id)
            });
        }
        let result = pool.end();
        assert_eq!(result.len(), 20);
        for i in 0..result.len() {
            let wid = result[i].as_usize().unwrap();
            assert!(wid < 4);
        }
    }

    #[test]
    fn execute_returns_complex_json() {
        let mut pool = Pool::new(2);
        pool.execute(|_| object! { "name": "alice", "age": 30 });
        pool.execute(|_| object! { "name": "bob", "age": 25 });
        let result = pool.end();
        assert_eq!(result.len(), 2);
        let mut names: Vec<String> = (0..2)
            .map(|i| result[i]["name"].as_str().unwrap().to_string())
            .collect();
        names.sort();
        assert_eq!(names, vec!["alice", "bob"]);
    }

    #[test]
    fn insert_all_single_job() {
        let mut pool = Pool::new(1);
        pool.execute(|_| array!["id_1", "(1,'hello')"]);
        let (ids, values) = pool.insert_all();
        assert_eq!(ids, vec!["id_1"]);
        assert_eq!(values, ",(1,'hello')");
    }

    #[test]
    fn insert_all_multiple_jobs() {
        let mut pool = Pool::new(1);
        pool.execute(|_| array!["id_a", "(1,'a')"]);
        pool.execute(|_| array!["id_b", "(2,'b')"]);
        pool.execute(|_| array!["id_c", "(3,'c')"]);
        let (ids, values) = pool.insert_all();
        assert_eq!(ids.len(), 3);
        assert!(ids.contains(&"id_a".to_string()));
        assert!(ids.contains(&"id_b".to_string()));
        assert!(ids.contains(&"id_c".to_string()));
        assert!(values.contains("(1,'a')"));
        assert!(values.contains("(2,'b')"));
        assert!(values.contains("(3,'c')"));
    }

    #[test]
    fn insert_all_no_jobs_returns_empty() {
        let mut pool = Pool::new(2);
        let (ids, values) = pool.insert_all();
        assert!(ids.is_empty());
        assert!(values.is_empty());
    }

    #[test]
    fn insert_all_multiple_workers() {
        let mut pool = Pool::new(3);
        for i in 0..9 {
            let id_str = format!("id_{}", i);
            let val_str = format!("({},'v{}')", i, i);
            pool.execute(move |_| array![id_str.as_str(), val_str.as_str()]);
        }
        let (ids, values) = pool.insert_all();
        assert_eq!(ids.len(), 9);
        for i in 0..9 {
            assert!(ids.contains(&format!("id_{}", i)));
            assert!(values.contains(&format!("({},'v{}')", i, i)));
        }
    }

    #[test]
    fn worker_count_matches_requested() {
        for n in [1, 2, 3, 5, 8] {
            let mut pool = Pool::new(n);
            assert_eq!(pool.workers.len(), n);
            assert_eq!(pool.max_workers, n);
            pool.end();
        }
    }
}