queuingtask 0.3.0

queueingtask is a library for running different threads in order in Rust
Documentation
use std::sync::{Arc, RwLock, RwLockWriteGuard};
use std::sync::PoisonError;
use std::sync::mpsc;
use std::sync::mpsc::{Sender};
use std::thread::JoinHandle;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};

#[derive(Clone, Copy, Eq, PartialOrd, PartialEq, Debug)]
pub enum Notify {
	Started(usize),
	Go,
	Terminated(usize),
	Quit,
}
pub struct ThreadQueue {
	sender_map:Arc<RwLock<HashMap<usize,(Sender<Notify>,bool)>>>,
	sender:Sender<Notify>,
	last_id:Arc<RwLock<usize>>,
	working:Arc<AtomicBool>,
	busy_count:Arc<AtomicUsize>,
	worker_handler:Option<JoinHandle<()>>
}
impl ThreadQueue {
	pub fn new() -> ThreadQueue {
		let (ss,sr) = mpsc::channel();

		let last_id = Arc::new(RwLock::new(0));
		let working = Arc::new(AtomicBool::new(true));
		let busy_count = Arc::new(AtomicUsize::new(0));

		let sender_map = Arc::new(RwLock::new(HashMap::<usize,(Sender<Notify>,bool)>::new()));

		let h = {
			let sender_map = Arc::clone(&sender_map);
			let working = Arc::clone(&working);
			let busy_count = Arc::clone(&busy_count);

			let mut current_id = 0usize;

			let (ws,wr) = mpsc::channel();

			let h = std::thread::spawn(move || {
				ws.send(()).unwrap();

				loop {
					match sr.recv().unwrap() {
						Notify::Started(id) => {
							if id == current_id {
								match sender_map.read().unwrap().get(&id) {
									Some((ref sender, false)) => {
										sender.send(Notify::Go).unwrap();
									},
									_ => ()
								}

							}
						},
						Notify::Terminated(id) => {
							if id == current_id {
								match sender_map.write() {
									Ok(mut map) => {
										map.remove(&id);
										let id = id + 1;
										current_id = id;
										match map.get_mut(&id) {
											Some((ref sender, ref mut started)) => {
												*started = true;
												sender.send(Notify::Go).unwrap();
											},
											None => ()
										}
									},
									Err(ref e) => {
										panic!("{:?}", e);
									}
								};

								busy_count.fetch_sub(1, Ordering::Release);

								if !working.load(Ordering::Acquire) && busy_count.load(Ordering::Acquire) == 0 {
									break;
								}
							}
						},
						Notify::Quit if busy_count.load(Ordering::Acquire) > 0 => {

						},
						Notify::Quit if sender_map.read().unwrap().is_empty() => {
							break;
						},
						Notify::Quit => {
							panic!("There are still threads waiting to be processed.");
						}
						_ => (),
					}
				}
			});

			wr.recv().unwrap();

			h
		};

		ThreadQueue {
			sender_map:sender_map,
			sender:ss,
			last_id:last_id,
			working:working,
			busy_count:Arc::clone(&busy_count),
			worker_handler:Some(h)
		}
	}

	pub fn submit<F,T>(&mut self,f:F) ->
		Result<JoinHandle<T>,PoisonError<RwLockWriteGuard<'_,HashMap<usize,(Sender<Notify>,bool)>>>>
		where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static {

		let (cs,cr) = mpsc::channel();

		match self.last_id.write().unwrap() {
			mut last_id => {
				{
					self.sender_map.write()?.insert(*last_id, (cs.clone(),false));
				}

				let ss = self.sender.clone();

				let id = *last_id;

				let (on_start_sender,on_start_receiver) = mpsc::channel();

				let busy_count = Arc::clone(&self.busy_count);

				let r = std::thread::spawn(move || {
					ss.send(Notify::Started(id)).unwrap();
					busy_count.fetch_add(1, Ordering::Release);
					on_start_sender.send(()).unwrap();
					cr.recv().unwrap();

					let r = f();

					ss.send(Notify::Terminated(id)).unwrap();
					r
				});

				*last_id += 1;

				on_start_receiver.recv().unwrap();

				Ok(r)
			}
		}
	}
}
impl Drop for ThreadQueue {
	fn drop(&mut self) {
		let r = self.sender.send(Notify::Quit);
		self.working.store(false, Ordering::Release);
		self.worker_handler.take().map(|h| {
			h.join().unwrap()
		}).unwrap();
		r.unwrap();
	}
}