use json::JsonValue;
use log::warn;
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::thread::JoinHandle;
type Job = Box<dyn FnOnce(usize) -> JsonValue + 'static + Send>;
enum Message {
End,
NewJob(Job),
}
struct Worker {
_id: usize,
t: Option<JoinHandle<Vec<JsonValue>>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let t = thread::spawn(move || {
let mut list = vec![];
let mut count_flag: i64 = 0;
loop {
let message = match receiver.lock() {
Ok(guard) => match guard.recv() {
Ok(msg) => msg,
Err(_) => break,
},
Err(_) => break,
};
match message {
Message::NewJob(job) => {
list.push(job(id));
count_flag += 1;
if count_flag == 10000 {
warn!("Worker-new循环次数: 1w,强制退出");
break;
}
}
Message::End => {
break;
}
}
}
list
});
Worker {
_id: id,
t: Some(t),
}
}
}
pub struct Pool {
workers: Vec<Worker>,
max_workers: usize,
sender: mpsc::Sender<Message>,
}
impl Pool {
pub fn new(max_workers: usize) -> Pool {
if max_workers == 0 {
println!("max_workers 必须大于0")
}
let (tx, rx) = mpsc::channel();
let mut workers = Vec::with_capacity(max_workers);
let receiver = Arc::new(Mutex::new(rx));
for i in 0..max_workers {
workers.push(Worker::new(i, Arc::clone(&receiver)));
}
Pool {
workers,
max_workers,
sender: tx,
}
}
pub fn execute<F>(&self, f: F)
where
F: 'static + Send + FnOnce(usize) -> JsonValue,
{
let job = Message::NewJob(Box::new(f));
let _ = self.sender.send(job);
}
pub fn end(&mut self) -> JsonValue {
for _ in 0..self.max_workers {
let _ = self.sender.send(Message::End);
}
let mut list = vec![];
for w in self.workers.iter_mut() {
if let Some(t) = w.t.take() {
if let Ok(data) = t.join() {
list.extend(data);
}
}
}
JsonValue::from(list)
}
pub fn insert_all(&mut self) -> (Vec<String>, String) {
for _ in 0..self.max_workers {
let _ = self.sender.send(Message::End);
}
let mut list = String::new();
let mut id = vec![];
for w in self.workers.iter_mut() {
if let Some(t) = w.t.take() {
if let Ok(data) = t.join() {
for item in data.iter() {
id.push(item[0].to_string());
list = format!("{},{}", list, item[1].clone());
}
}
}
}
(id, list)
}
}
#[cfg(test)]
mod tests {
use super::*;
use json::{array, object, JsonValue};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[test]
fn new_with_1_worker() {
let pool = Pool::new(1);
assert_eq!(pool.max_workers, 1);
assert_eq!(pool.workers.len(), 1);
}
#[test]
fn new_with_2_workers() {
let pool = Pool::new(2);
assert_eq!(pool.max_workers, 2);
assert_eq!(pool.workers.len(), 2);
}
#[test]
fn new_with_4_workers() {
let pool = Pool::new(4);
assert_eq!(pool.max_workers, 4);
assert_eq!(pool.workers.len(), 4);
}
#[test]
fn new_with_0_workers_does_not_panic() {
let pool = Pool::new(0);
assert_eq!(pool.max_workers, 0);
assert_eq!(pool.workers.len(), 0);
}
#[test]
fn execute_single_job_and_end() {
let mut pool = Pool::new(1);
pool.execute(|_worker_id| JsonValue::from(42));
let result = pool.end();
assert!(result.is_array());
assert_eq!(result.len(), 1);
assert_eq!(result[0], 42);
}
#[test]
fn execute_multiple_jobs_single_worker() {
let mut pool = Pool::new(1);
for i in 0..5 {
pool.execute(move |_| JsonValue::from(i));
}
let result = pool.end();
assert_eq!(result.len(), 5);
let mut values: Vec<i32> = (0..5).map(|i| result[i].as_i32().unwrap()).collect();
values.sort();
assert_eq!(values, vec![0, 1, 2, 3, 4]);
}
#[test]
fn execute_multiple_jobs_multiple_workers() {
let mut pool = Pool::new(4);
let counter = Arc::new(AtomicUsize::new(0));
for _ in 0..20 {
let c = Arc::clone(&counter);
pool.execute(move |_| {
c.fetch_add(1, Ordering::SeqCst);
JsonValue::from(1)
});
}
let result = pool.end();
assert_eq!(result.len(), 20);
assert_eq!(counter.load(Ordering::SeqCst), 20);
}
#[test]
fn end_with_no_jobs_returns_empty_array() {
let mut pool = Pool::new(2);
let result = pool.end();
assert!(result.is_array());
assert_eq!(result.len(), 0);
}
#[test]
fn jobs_receive_valid_worker_id() {
let mut pool = Pool::new(4);
for _ in 0..20 {
pool.execute(|worker_id| {
assert!(worker_id < 4);
JsonValue::from(worker_id)
});
}
let result = pool.end();
assert_eq!(result.len(), 20);
for i in 0..result.len() {
let wid = result[i].as_usize().unwrap();
assert!(wid < 4);
}
}
#[test]
fn execute_returns_complex_json() {
let mut pool = Pool::new(2);
pool.execute(|_| object! { "name": "alice", "age": 30 });
pool.execute(|_| object! { "name": "bob", "age": 25 });
let result = pool.end();
assert_eq!(result.len(), 2);
let mut names: Vec<String> = (0..2)
.map(|i| result[i]["name"].as_str().unwrap().to_string())
.collect();
names.sort();
assert_eq!(names, vec!["alice", "bob"]);
}
#[test]
fn insert_all_single_job() {
let mut pool = Pool::new(1);
pool.execute(|_| array!["id_1", "(1,'hello')"]);
let (ids, values) = pool.insert_all();
assert_eq!(ids, vec!["id_1"]);
assert_eq!(values, ",(1,'hello')");
}
#[test]
fn insert_all_multiple_jobs() {
let mut pool = Pool::new(1);
pool.execute(|_| array!["id_a", "(1,'a')"]);
pool.execute(|_| array!["id_b", "(2,'b')"]);
pool.execute(|_| array!["id_c", "(3,'c')"]);
let (ids, values) = pool.insert_all();
assert_eq!(ids.len(), 3);
assert!(ids.contains(&"id_a".to_string()));
assert!(ids.contains(&"id_b".to_string()));
assert!(ids.contains(&"id_c".to_string()));
assert!(values.contains("(1,'a')"));
assert!(values.contains("(2,'b')"));
assert!(values.contains("(3,'c')"));
}
#[test]
fn insert_all_no_jobs_returns_empty() {
let mut pool = Pool::new(2);
let (ids, values) = pool.insert_all();
assert!(ids.is_empty());
assert!(values.is_empty());
}
#[test]
fn insert_all_multiple_workers() {
let mut pool = Pool::new(3);
for i in 0..9 {
let id_str = format!("id_{}", i);
let val_str = format!("({},'v{}')", i, i);
pool.execute(move |_| array![id_str.as_str(), val_str.as_str()]);
}
let (ids, values) = pool.insert_all();
assert_eq!(ids.len(), 9);
for i in 0..9 {
assert!(ids.contains(&format!("id_{}", i)));
assert!(values.contains(&format!("({},'v{}')", i, i)));
}
}
#[test]
fn worker_count_matches_requested() {
for n in [1, 2, 3, 5, 8] {
let mut pool = Pool::new(n);
assert_eq!(pool.workers.len(), n);
assert_eq!(pool.max_workers, n);
pool.end();
}
}
}