use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::fmt;
use common::prelude::*;
use common::state::{IdKind, State, UniqueId};
use super::scheduled_job::ScheduledJob;
use super::types::ScriptId;
pub enum ProcessResult<S: ScriptsRepositoryTrait + 'static> {
Rejected(ScheduledJob<S>),
Executing,
}
impl<S: ScriptsRepositoryTrait + 'static> ProcessResult<S> {
#[cfg(test)]
pub fn executing(&self) -> bool {
match *self {
ProcessResult::Executing => true,
ProcessResult::Rejected(..) => false,
}
}
#[cfg(test)]
pub fn rejected(&self) -> bool {
!self.executing()
}
}
#[derive(Clone)]
pub struct ThreadCompleter {
thread: thread::Thread,
busy: Arc<AtomicBool>,
manual: bool,
}
impl ThreadCompleter {
pub fn new(busy: Arc<AtomicBool>) -> Self {
ThreadCompleter {
thread: thread::current(),
busy,
manual: false,
}
}
pub fn manual_mode(&mut self) {
self.manual = true;
}
pub fn manual_complete(&self) {
self.busy.store(false, Ordering::SeqCst);
self.thread.unpark();
}
}
impl Drop for ThreadCompleter {
fn drop(&mut self) {
if !self.manual {
self.manual_complete();
}
}
}
pub struct Thread<S: ScriptsRepositoryTrait + 'static> {
id: UniqueId,
handle: thread::JoinHandle<()>,
last_running_id: Option<ScriptId<S>>,
busy: Arc<AtomicBool>,
should_stop: Arc<AtomicBool>,
communication: Arc<Mutex<Option<ScheduledJob<S>>>>,
}
impl<S: ScriptsRepositoryTrait> Thread<S> {
pub fn new<
E: Fn(ScheduledJob<S>, ThreadCompleter) -> Result<()> + Send + 'static,
>(
executor: E,
state: &Arc<State>,
) -> Self {
let thread_id = state.next_id(IdKind::ThreadId);
let busy = Arc::new(AtomicBool::new(false));
let should_stop = Arc::new(AtomicBool::new(false));
let communication = Arc::new(Mutex::new(None));
let c_busy = busy.clone();
let c_should_stop = should_stop.clone();
let c_communication = communication.clone();
let handle = thread::spawn(move || {
let completer = ThreadCompleter::new(c_busy.clone());
let result = Thread::inner_thread(
c_busy,
c_should_stop,
c_communication,
executor,
completer,
);
if let Err(error) = result {
error.pretty_print();
}
});
Thread {
id: thread_id,
handle,
last_running_id: None,
busy,
should_stop,
communication,
}
}
fn inner_thread<
E: Fn(ScheduledJob<S>, ThreadCompleter) -> Result<()> + Send + 'static,
>(
busy: Arc<AtomicBool>,
should_stop: Arc<AtomicBool>,
comm: Arc<Mutex<Option<ScheduledJob<S>>>>,
executor: E,
completer: ThreadCompleter,
) -> Result<()> {
loop {
if should_stop.load(Ordering::SeqCst) {
break;
}
if let Some(job) = comm.lock()?.take() {
executor(job, completer.clone())?;
if busy.load(Ordering::SeqCst) {
thread::park();
}
continue;
}
thread::park();
}
Ok(())
}
pub fn process(&mut self, job: ScheduledJob<S>) -> ProcessResult<S> {
if self.should_stop.load(Ordering::SeqCst) {
return ProcessResult::Rejected(job);
}
if self.busy() {
return ProcessResult::Rejected(job);
}
if let Ok(mut mutex) = self.communication.lock() {
self.busy.store(true, Ordering::SeqCst);
self.last_running_id = Some(job.hook_id());
*mutex = Some(job);
self.handle.thread().unpark();
return ProcessResult::Executing;
}
return ProcessResult::Rejected(job);
}
pub fn stop(self) {
self.should_stop.store(true, Ordering::SeqCst);
self.handle.thread().unpark();
let _ = self.handle.join();
}
pub fn id(&self) -> UniqueId {
self.id
}
pub fn currently_running(&self) -> Option<ScriptId<S>> {
if self.busy.load(Ordering::SeqCst) {
self.last_running_id
} else {
None
}
}
pub fn busy(&self) -> bool {
self.busy.load(Ordering::SeqCst)
}
}
impl<S: ScriptsRepositoryTrait> fmt::Debug for Thread<S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Thread {{ busy: {}, should_stop: {} }}",
self.busy(),
self.should_stop.load(Ordering::SeqCst),
)
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc;
use std::time::Instant;
use common::state::State;
use common::serial::Serial;
use processor::scheduled_job::ScheduledJob;
use processor::test_utils::*;
use super::Thread;
fn job(repo: &Repository<()>, name: &str) -> ScheduledJob<Repository<()>> {
let job = repo.job(name, ()).expect("job does not exist");
ScheduledJob::new(job, 0, Serial::zero())
}
fn create_thread() -> Thread<Repository<()>> {
let state = Arc::new(State::new());
Thread::new(
|job, _| {
job.execute(&())?;
Ok(())
},
&state,
)
}
fn timeout_until_true<F: Fn() -> bool>(func: F, error: &'static str) {
let start = Instant::now();
loop {
if start.elapsed().as_secs() > 10 {
panic!(error);
}
if func() {
return;
}
}
}
#[test]
fn test_thread_executes_a_job() {
test_wrapper(|| {
let executed = Arc::new(AtomicBool::new(false));
let repo = Repository::new();
let job_executed = executed.clone();
repo.add_script("job", true, move |_| {
job_executed.store(true, Ordering::SeqCst);
Ok(())
});
let mut thread = create_thread();
assert!(thread.process(job(&repo, "job")).executing());
timeout_until_true(
|| !thread.busy(),
"The thread didn't process the job",
);
assert!(executed.load(Ordering::SeqCst));
thread.stop();
Ok(())
});
}
#[test]
fn test_thread_correctly_marked_as_busy() {
test_wrapper(|| {
let (block_send, block_recv) = mpsc::channel();
let repo = Repository::new();
repo.add_script("job", true, move |_| {
block_recv.recv()?;
Ok(())
});
let mut thread = create_thread();
assert!(thread.process(job(&repo, "job")).executing());
assert!(thread.busy());
block_send.send(())?;
timeout_until_true(
|| !thread.busy(),
"The thread didn't process the job",
);
thread.stop();
Ok(())
});
}
#[test]
fn test_thread_reports_correct_running_script_id() {
test_wrapper(|| {
let (block_send, block_recv) = mpsc::channel();
let repo = Repository::new();
repo.add_script("job", true, move |_| {
block_recv.recv()?;
Ok(())
});
let script_id = repo.script_id_of("job").expect("Job should exist");
let mut thread = create_thread();
assert!(thread.process(job(&repo, "job")).executing());
assert_eq!(thread.currently_running(), Some(script_id));
block_send.send(())?;
timeout_until_true(
|| !thread.busy(),
"The thread didn't process the job",
);
assert_eq!(thread.currently_running(), None);
thread.stop();
Ok(())
});
}
#[test]
fn test_thread_rejects_new_jobs_when_busy() {
test_wrapper(|| {
let (block_send, block_recv) = mpsc::channel();
let repo = Repository::new();
repo.add_script("wait", true, move |_| {
block_recv.recv()?;
Ok(())
});
repo.add_script("dummy", true, move |_| Ok(()));
let mut thread = create_thread();
assert!(thread.process(job(&repo, "wait")).executing());
assert!(thread.process(job(&repo, "dummy")).rejected());
block_send.send(())?;
thread.stop();
Ok(())
});
}
#[test]
fn test_thread_allows_multiple_jobs_to_be_executed() {
test_wrapper(|| {
let counter = Arc::new(AtomicUsize::new(0));
let repo = Repository::new();
let counter_inner = counter.clone();
repo.add_script("incr", true, move |_| {
counter_inner.fetch_add(1, Ordering::SeqCst);
Ok(())
});
let mut thread = create_thread();
for _ in 0..5 {
assert!(thread.process(job(&repo, "incr")).executing());
timeout_until_true(
|| !thread.busy(),
"The thread didn't process the job",
);
}
assert_eq!(counter.load(Ordering::SeqCst), 5);
thread.stop();
Ok(())
});
}
#[test]
fn test_thread_manual_completion() {
test_wrapper(|| {
let (completion_send, completion_recv) = mpsc::channel();
let finished = Arc::new(AtomicBool::new(false));
let repo = Repository::new();
let finished_clone = finished.clone();
repo.add_script("report", false, move |_| {
finished_clone.store(true, Ordering::SeqCst);
Ok(())
});
let completion_send = Arc::new(Mutex::new(completion_send));
let mut thread = Thread::new(
move |job, mut completion| {
completion.manual_mode();
completion_send.lock()?.send(completion)?;
job.execute(&())?;
Ok(())
},
&Arc::new(State::new()),
);
assert!(thread.process(job(&repo, "report")).executing());
timeout_until_true(
|| finished.load(Ordering::SeqCst),
"The thread didn't process the job",
);
assert!(thread.busy());
let completion = completion_recv.recv()?;
completion.manual_complete();
assert!(!thread.busy());
thread.stop();
Ok(())
});
}
}