use std::net::Ipv4Addr;
use std::sync::{
mpsc::{self, Sender},
Arc, Mutex,
};
use std::thread;
use actix_web::web::Data;
use log::{error, info};
use crate::{
models::{Light, LightRequest, LightingResponse, Payload},
Error, Result, Storage,
};
pub enum DispatchMessage {
Job((Ipv4Addr, LightRequest, Sender<ReplyMessage>)),
Shutdown,
}
pub enum ReplyMessage {
Reply(LightingResponse),
Shutdown,
}
pub struct Worker {
tx: Sender<DispatchMessage>,
reply_tx: Sender<ReplyMessage>,
thread: Option<thread::JoinHandle<()>>,
reply_thread: Option<thread::JoinHandle<()>>,
}
fn send_reply(resp: Result<LightingResponse>, tx: Sender<ReplyMessage>) {
match resp {
Ok(resp) => {
if let Err(e) = tx.send(ReplyMessage::Reply(resp)) {
error!("Failed to sync response: {:?}", e);
}
}
Err(e) => {
error!("Lighting error: {}", e);
}
};
}
fn handle_request(ip: Ipv4Addr, request: LightRequest, tx: Sender<ReplyMessage>) {
let light = Light::new(ip, None);
let payload = Payload::from(&request);
if payload.is_valid() {
send_reply(light.set(&payload), tx.clone());
}
if let Some(power) = request.power() {
send_reply(light.set_power(power), tx);
}
}
impl Worker {
pub fn new(data: Data<Mutex<Storage>>) -> Self {
let (tx, rx) = mpsc::channel::<DispatchMessage>();
let (reply_tx, reply_rx) = mpsc::channel::<ReplyMessage>();
let pool = ThreadPool::new(4);
let handle = thread::spawn(move || {
for msg in rx {
match msg {
DispatchMessage::Job(msg) => {
pool.execute(move || {
handle_request(msg.0, msg.1, msg.2);
});
}
DispatchMessage::Shutdown => {
return;
}
}
}
});
let reply_handle = thread::spawn(move || {
for msg in reply_rx {
match msg {
ReplyMessage::Reply(resp) => {
let mut data = data.lock().unwrap();
data.process_reply(&resp);
}
ReplyMessage::Shutdown => {
return;
}
}
}
});
Worker {
tx,
reply_tx,
thread: Some(handle),
reply_thread: Some(reply_handle),
}
}
pub fn create_task(&mut self, ip: Ipv4Addr, req: LightRequest) -> Result<()> {
match self
.tx
.send(DispatchMessage::Job((ip, req, self.reply_tx.clone())))
{
Ok(_) => {}
Err(e) => return Err(Error::Dispatch(e)),
}
Ok(())
}
pub fn queue_update(&mut self, resp: LightingResponse) -> Result<()> {
match self.reply_tx.send(ReplyMessage::Reply(resp)) {
Ok(_) => Ok(()),
Err(e) => Err(Error::Reply(e)),
}
}
}
impl Drop for Worker {
fn drop(&mut self) {
info!("shutting down dispatch");
if let Err(e) = self.tx.send(DispatchMessage::Shutdown) {
error!("Failed to send dispatch shutdown: {}", e);
}
if let Some(thread) = self.thread.take() {
thread.join().unwrap_or_else(|_| {
error!("failed to shutdown dispatch");
});
}
if let Err(e) = self.reply_tx.send(ReplyMessage::Shutdown) {
error!("Failed to send response listener shutdown: {}", e);
}
if let Some(thread) = self.reply_thread.take() {
thread.join().unwrap_or_else(|_| {
error!("failed to shutdown response listener");
});
}
}
}
trait FnBox {
fn call_box(self: Box<Self>);
}
impl<F: FnOnce()> FnBox for F {
fn call_box(self: Box<F>) {
(*self)()
}
}
enum Message {
Job(Box<dyn FnBox + Send + 'static>),
Shutdown,
}
struct ThreadPool {
runners: Vec<Runner>,
sender: Sender<Message>,
}
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut runners = Vec::with_capacity(size);
for id in 0..size {
runners.push(Runner::new(id, Arc::clone(&receiver)));
}
ThreadPool { runners, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
self.sender.send(Message::Job(Box::new(f))).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
info!("shutting down runners");
for _ in &mut self.runners {
self.sender.send(Message::Shutdown).unwrap();
}
for runner in &mut self.runners {
if let Some(thread) = runner.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Runner {
thread: Option<thread::JoinHandle<()>>,
}
impl Runner {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Self {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
match job {
Message::Job(j) => {
j.call_box();
}
Message::Shutdown => {
info!("runner {id} shutting down");
return;
}
}
});
Runner {
thread: Some(thread),
}
}
}