use std::sync::{mpsc, Arc};
use common::prelude::*;
use common::state::State;
use common::structs::HealthDetails;
use processor::scheduler::{Scheduler, SchedulerInput};
#[cfg(test)]
use processor::scheduler::DebugDetails;
use processor::types::{Job, JobContext};
#[derive(Debug)]
pub struct Processor<S: ScriptsRepositoryTrait + 'static> {
input: mpsc::Sender<SchedulerInput<S>>,
wait: mpsc::Receiver<()>,
}
impl<S: ScriptsRepositoryTrait> Processor<S> {
pub fn new(
max_threads: u16,
hooks: Arc<S>,
ctx: JobContext<S>,
state: Arc<State>,
) -> Result<Self> {
let (input_send, input_recv) = mpsc::sync_channel(0);
let (wait_send, wait_recv) = mpsc::channel();
::std::thread::spawn(move || {
let inner = Scheduler::new(max_threads, hooks, ctx, state);
input_send.send(inner.input()).unwrap();
inner.run().unwrap();
wait_send.send(()).unwrap();
});
Ok(Processor {
input: input_recv.recv()?,
wait: wait_recv,
})
}
pub fn stop(self) -> Result<()> {
self.input.send(SchedulerInput::StopSignal)?;
self.wait.recv()?;
Ok(())
}
pub fn api(&self) -> ProcessorApi<S> {
ProcessorApi {
input: self.input.clone(),
}
}
}
#[derive(Debug, Clone)]
pub struct ProcessorApi<S: ScriptsRepositoryTrait> {
input: mpsc::Sender<SchedulerInput<S>>,
}
impl<S: ScriptsRepositoryTrait> ProcessorApi<S> {
#[cfg(test)]
pub fn debug_details(&self) -> Result<DebugDetails<S>> {
let (res_send, res_recv) = mpsc::channel();
self.input.send(SchedulerInput::DebugDetails(res_send))?;
Ok(res_recv.recv()?)
}
pub fn update_context(&self, ctx: JobContext<S>) -> Result<()> {
self.input.send(SchedulerInput::UpdateContext(ctx))?;
Ok(())
}
pub fn set_threads_count(&self, count: u16) -> Result<()> {
self.input.send(SchedulerInput::SetThreadsCount(count))?;
Ok(())
}
}
impl<S: ScriptsRepositoryTrait> ProcessorApiTrait<S> for ProcessorApi<S> {
fn queue(&self, job: Job<S>, priority: isize) -> Result<()> {
self.input.send(SchedulerInput::Job(job, priority))?;
Ok(())
}
fn health_details(&self) -> Result<HealthDetails> {
let (res_send, res_recv) = mpsc::channel();
self.input.send(SchedulerInput::HealthStatus(res_send))?;
Ok(res_recv.recv()?)
}
fn cleanup(&self) -> Result<()> {
self.input.send(SchedulerInput::Cleanup)?;
Ok(())
}
fn lock(&self) -> Result<()> {
self.input.send(SchedulerInput::Lock)?;
Ok(())
}
fn unlock(&self) -> Result<()> {
self.input.send(SchedulerInput::Unlock)?;
Ok(())
}
}