use crate::{
constants::CHANNEL_CLOSED,
types::{Message, MessageReceiver},
MessageSender, QueueError,
};
use std::{
sync::mpsc::{self, TrySendError},
thread,
time::Duration,
};
use log::{error, trace, warn};
pub struct SingleWorkerTimeout<T: Send> {
thread: Option<thread::JoinHandle<()>>,
sender: MessageSender<T>,
}
impl<T> SingleWorkerTimeout<T>
where
T: 'static + Send,
{
pub fn new<C, F>(
max_jobs_queue: usize,
context: C,
handler: F,
timeout: Duration,
) -> SingleWorkerTimeout<T>
where
C: Send + 'static,
F: Fn(&mut C, Option<T>) + Send + 'static,
{
trace!("Creating WorkerPool...");
assert!(max_jobs_queue > 0);
let (sender, receiver) = mpsc::sync_channel(max_jobs_queue);
let worker =
thread::spawn(move || SingleWorkerTimeout::run(receiver, context, handler, timeout));
trace!("SingleWorkerTimeout created");
SingleWorkerTimeout {
thread: Some(worker),
sender,
}
}
pub fn send(sender: &MessageSender<T>, job: T) -> Result<(), QueueError<T>> {
match sender.try_send(Message::NewJob(job)) {
Ok(()) => {
trace!("Job added to the queue");
Ok(())
}
Err(TrySendError::Full(t)) => match t {
Message::NewJob(job) => Err(QueueError::Full(job)),
Message::Terminate => unreachable!(),
},
Err(TrySendError::Disconnected(_)) => {
error!("{}", CHANNEL_CLOSED);
panic!("{}", CHANNEL_CLOSED);
}
}
}
pub fn clone_sender(&self) -> MessageSender<T> {
self.sender.clone()
}
pub fn stop(&self) {
self.sender.send(Message::Terminate).unwrap();
}
pub fn join(&mut self) {
trace!("Joining SingleWorkerTimeout...");
match self.thread.take() {
Some(joiner) => match joiner.join() {
Ok(_) => trace!("Worker joined"),
Err(e) => error!("Error while joining Worker - {:?}", e),
},
None => {
warn!("Attempt to already joined worker, ignoring");
}
}
}
fn run<C, F>(receiver: MessageReceiver<T>, mut context: C, handler: F, timeout: Duration)
where
C: Send + 'static,
F: Fn(&mut C, Option<T>) + Send + 'static,
{
loop {
let message = receiver.recv_timeout(timeout);
match message {
Ok(Message::NewJob(job)) => {
trace!("SingleWorkerTimeout received job");
handler(&mut context, Some(job));
}
Ok(Message::Terminate) => {
trace!("SingleWorkerTimeout was told to terminate");
break;
}
Err(_) => {
handler(&mut context, None);
}
}
}
}
}