use {
std::{
io::{Error, ErrorKind},
sync::{
Arc,
Mutex,
mpsc::{self, Receiver, RecvTimeoutError, SendError, Sender, SyncSender, TrySendError},
},
thread::{self, JoinHandle},
time::Duration,
},
crate::{Job, Result},
};
pub type ActiveLimit = u32;
type JobManagerHandle = JoinHandle<()>;
type JobRunnerHandle = JoinHandle<()>;
type JobHandle = Arc<Mutex<()>>;
#[derive(Debug)]
enum Msg<J> where J: Job {
NewJob {
job: J,
sender: Sender<Option<J>>,
},
GetJob {
sender: Sender<Option<J>>,
},
JobFinished {
handle: JobHandle,
},
StopJobManager,
StopJobManagerOnIdle,
}
#[derive(Debug)]
pub struct BlackHole<J> where J: Job {
job_manager: JobManagerHandle,
job_sender: SyncSender<Msg<J>>,
}
impl<J> BlackHole<J> where J: Job {
pub fn make(queue_limit: usize) -> Result<Self> {
Self::make_with_active_limit(available_parallelism()?, queue_limit)
}
pub fn make_with_active_limit(active_limit: ActiveLimit, queue_limit: usize) -> Result<Self> {
if active_limit == 0 {
return Err(Error::new(ErrorKind::InvalidData, "Active limit must be larger than zero"));
}
if queue_limit == 0 {
return Err(Error::new(ErrorKind::InvalidData, "Queue limit must be larger than zero"));
}
if cfg!(not(any(target_pointer_width="8", target_pointer_width="16"))) {
if queue_limit >= 2_usize.pow(32) {
crate::io::lock_write_err(__!("Warning: maybe queue limit is too large? -> {}\n", queue_limit));
}
}
let (sender, receiver) = mpsc::sync_channel::<Msg<J>>(queue_limit);
Ok(Self {
job_manager: spawn_job_manager(receiver, sender.clone(), active_limit, queue_limit)?,
job_sender: sender,
})
}
pub fn throw(&self, job: J) -> Result<Option<J>> {
let (sender, receiver) = mpsc::channel();
match self.job_sender.try_send(Msg::NewJob { job, sender }) {
Ok(()) => receiver.recv().map_err(|e| Error::new(ErrorKind::BrokenPipe, format!("Failed waiting for job manager: {}", e))),
Err(TrySendError::Full(Msg::NewJob { job, .. })) => Ok(Some(job)),
Err(TrySendError::Full(_)) => Err(Error::new(ErrorKind::Other, __!("Internal error"))),
Err(TrySendError::Disconnected(_)) => Err(Error::new(ErrorKind::BrokenPipe, "Job manager is disconnected")),
}
}
pub unsafe fn escape(self) -> Result<()> {
self.job_sender.send(Msg::StopJobManager)
.map_err(|e| Error::new(ErrorKind::BrokenPipe, format!("Failed sending stop-message to job manager: {}", e)))
}
pub fn escape_on_idle(self) -> Result<()> {
match self.job_sender.send(Msg::StopJobManagerOnIdle) {
Ok(()) => {
drop(self.job_sender);
self.job_manager.join().map_err(|err|
Error::new(ErrorKind::Other, format!("Failed to wait for job manager to finished: {:?}", err))
)
},
Err(err) => Err(Error::new(ErrorKind::BrokenPipe, format!("Failed sending stop-on-idle message to job manager: {}", err))),
}
}
}
fn spawn_job_manager<J>(
job_manager_receiver: Receiver<Msg<J>>, job_manager_sender: SyncSender<Msg<J>>, active_limit: ActiveLimit, queue_limit: usize,
) -> Result<JobManagerHandle> where J: Job {
let active_limit = usize::try_from(active_limit)
.map_err(|_| Error::new(ErrorKind::Other, format!("Failed to convert active limit to usize: {}", active_limit)))?;
Ok(thread::spawn(move || {
let mut active_jobs = Vec::with_capacity(active_limit);
let mut job_queue = Vec::with_capacity(queue_limit);
let mut stop_on_idle = false;
let update_active_jobs = |jobs: &mut Vec<JobHandle>| jobs.retain(|j| if j.is_poisoned() {
crate::io::lock_write_err(__!("Found 1 panicked job\n"));
false
} else {
Arc::weak_count(j) > 1 || Arc::strong_count(j) > 1
});
let spawn_job = |job, jobs: &mut Vec<_>| {
let handle = Arc::new(Mutex::new(()));
jobs.push(handle.clone());
spawn_job_runner(job_manager_sender.clone(), job, handle);
};
loop {
match job_manager_receiver.recv_timeout(Duration::from_millis(50)) {
Ok(Msg::NewJob { job, sender }) => {
let job = if active_jobs.len() < active_limit {
spawn_job(job, &mut active_jobs);
None
} else if job_queue.len() < queue_limit {
job_queue.push(job);
None
} else {
Some(job)
};
if let Err(err) = sender.send(job) {
crate::io::lock_write_err(__!("Failed sending back job-message to BlackHole instance: {}\n", err));
}
},
Ok(Msg::GetJob { sender }) => if let Err(SendError(job)) = sender.send(job_queue.pop()) {
if let Some(job) = job {
job_queue.push(job);
}
},
Ok(Msg::JobFinished { handle }) => {
drop(handle);
update_active_jobs(&mut active_jobs);
if let Some(job) = job_queue.pop() {
spawn_job(job, &mut active_jobs);
}
if stop_on_idle && active_jobs.is_empty() && job_queue.is_empty() {
break;
}
},
Ok(Msg::StopJobManager) => {
crate::io::lock_write_err(__!(
"Warning: escaping BlackHole while there are {} active job(s) and unknown jobs in queue\n", active_jobs.len(),
));
break;
},
Ok(Msg::StopJobManagerOnIdle) => {
stop_on_idle = true;
if active_jobs.is_empty() && job_queue.is_empty() {
break;
}
},
Err(RecvTimeoutError::Timeout) => {
update_active_jobs(&mut active_jobs);
if stop_on_idle && active_jobs.is_empty() && job_queue.is_empty() {
break;
}
},
Err(RecvTimeoutError::Disconnected) => break,
};
}
}))
}
fn spawn_job_runner<J>(job_manager_sender: SyncSender<Msg<J>>, mut job: J, handle: JobHandle) -> JobRunnerHandle where J: Job {
thread::spawn(move || {
let (sender, receiver) = mpsc::channel();
loop {
crate::run_to_end(job);
if let Err(err) = job_manager_sender.send(Msg::GetJob { sender: sender.clone() }) {
crate::io::lock_write_err(__!("Failed to send message to job manager: {:?}\n", err));
break;
}
match receiver.recv() {
Ok(Some(new_job)) => job = new_job,
Ok(None) => break,
Err(err) => {
crate::io::lock_write_err(__!("Failed to receive message from job manager: {:?}\n", err));
break;
},
};
}
if let Err(err) = job_manager_sender.send(Msg::JobFinished { handle }) {
crate::io::lock_write_err(__!("Failed to send job-finished message to job manager: {}\n", err));
}
})
}
pub fn available_parallelism() -> Result<ActiveLimit> {
ActiveLimit::try_from(thread::available_parallelism()?.get()).map_err(|e| Error::new(ErrorKind::Other, __!("{}", e)))
}