use std::fmt;
use std::ops::Deref;
use std::time::{Duration, Instant};
use log::*;
use serde_json::{Value, json};
use derive_new::new;
use mlzutil::time::localtime;
use crossbeam_channel::{tick, Receiver, select};
use crate::config::{ModuleConfig, Visibility};
use crate::errors::Error;
use crate::proto::Msg;
use crate::server::{ReqReceiver, ModRepSender};
use crate::types::TypeDesc;
#[derive(new, Clone)]
pub struct ModInternals {
name: String,
config: ModuleConfig,
req_receiver: ReqReceiver,
rep_sender: ModRepSender,
poll_tickers: (Receiver<Instant>, Receiver<Instant>),
}
impl ModInternals {
pub fn name(&self) -> &str {
&self.name
}
pub fn config(&self) -> &ModuleConfig {
&self.config
}
pub fn class(&self) -> &str {
&self.config.class
}
pub fn req_receiver(&self) -> &ReqReceiver {
&self.req_receiver
}
}
#[derive(Default)]
pub struct CachedParam<T> {
data: T,
time: f64,
}
impl<T> Deref for CachedParam<T> {
type Target = T;
fn deref(&self) -> &T {
&self.data
}
}
impl<T> fmt::Display for CachedParam<T> where T: fmt::Display {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{}", self.data)
}
}
impl<T> fmt::Debug for CachedParam<T> where T: fmt::Debug {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{:?}", self.data)
}
}
impl<T: PartialEq + Clone> CachedParam<T> {
pub fn new(value: T) -> Self {
Self { data: value, time: localtime() }
}
pub fn set(&mut self, value: T) {
self.time = localtime();
self.data = value;
}
pub fn update<TD: TypeDesc<Repr=T>>(&mut self, value: T, td: &TD) -> Result<(Value, f64, bool), Error> {
self.time = localtime();
let is_update = if value != self.data {
self.data = value.clone();
true
} else {
false
};
Ok((td.to_json(value)?, self.time, is_update))
}
pub fn time(&self) -> f64 {
self.time
}
}
pub trait Module : ModuleBase {
fn create(internals: ModInternals) -> Result<Self, Error> where Self: Sized;
fn setup(&mut self) -> Result<(), Error>;
fn teardown(&mut self);
}
pub trait ModuleBase {
fn describe(&self) -> Value;
fn command(&mut self, cmd: &str, args: Value) -> Result<Value, Error>;
fn read(&mut self, param: &str) -> Result<Value, Error>;
fn change(&mut self, param: &str, value: Value) -> Result<Value, Error>;
fn init_params(&mut self) -> Result<(), Error>;
fn activate_updates(&mut self) -> Vec<Msg>;
fn poll_normal(&mut self, n: usize);
fn poll_busy(&mut self, n: usize);
fn internals(&self) -> &ModInternals;
fn internals_mut(&mut self) -> &mut ModInternals;
#[inline]
fn name(&self) -> &str { &self.internals().name }
#[inline]
fn config(&self) -> &ModuleConfig { &self.internals().config }
fn init_parameter<T: Clone + PartialEq>(
&mut self, param: &str, cached: impl Fn(&mut Self) -> &mut CachedParam<T>,
partype: &impl TypeDesc<Repr=T>, update: impl Fn(&mut Self, T) -> Result<(), Error>,
swonly: bool, readonly: bool, default: Option<impl Fn() -> T>
) -> Result<(), Error> {
if swonly {
let value = if let Some(def) = default {
if let Some(val) = self.config().parameters.get(param) {
debug!("initializing value for param {} (from config)", param);
partype.from_json(val)?
} else {
debug!("initializing value for param {} (from default)", param);
def().into()
}
} else {
debug!("initializing value for param {} (from config)", param);
partype.from_json(&self.config().parameters[param])?
};
cached(self).set(value);
if !readonly {
let value = cached(self).clone();
update(self, value)?;
}
} else {
if !readonly {
if let Some(def) = default {
let value = if let Some(val) = self.config().parameters.get(param) {
debug!("initializing value for param {} (from config)", param);
val.clone()
} else {
debug!("initializing value for param {} (from default)", param);
partype.to_json(def().into())?
};
self.change(param, value)?;
} else {
if let Some(val) = self.config().parameters.get(param) {
debug!("initializing value for param {} (from config)", param);
let val = val.clone();
self.change(param, val)?;
} else {
debug!("initializing value for param {} (from hardware)", param);
self.read(param)?;
}
}
} else {
debug!("initializing value for param {} (from hardware)", param);
self.read(param)?;
}
}
Ok(())
}
fn send_update(&self, param: &str, value: Value, tstamp: f64) {
self.internals().rep_sender.send(
(None, Msg::Update { module: self.name().into(),
param: param.into(),
data: json!([value, {"t": tstamp}]) })).unwrap();
}
fn update_pollinterval(&mut self, val: f64) -> Result<(), Error> {
self.internals_mut().poll_tickers = (
tick(Duration::from_millis((val * 1000.) as u64)),
tick(Duration::from_millis((val * 200.) as u64)),
);
Ok(())
}
fn run(mut self) where Self: Sized + Module {
mlzlog::set_thread_prefix(format!("[{}] ", self.name()));
if let Err(e) = self.init_params() {
panic!("error initializing params: {}", e);
}
if let Err(e) = self.setup() {
panic!("setup failed: {}", e);
}
if self.config().visibility != Visibility::None {
self.internals().rep_sender.send(
(None, Msg::Describing { id: self.name().into(),
structure: self.describe() })).unwrap();
}
let mut poll_normal_counter = 0usize;
let mut poll_busy_counter = 0usize;
loop {
select! {
recv(self.internals().req_receiver) -> res => if let Ok((hid, req)) = res {
let rep = match req.1 {
Msg::Read { module, param } => match self.read(¶m) {
Ok(data) => Msg::Update { module, param, data },
Err(e) => e.into_msg(req.0),
},
Msg::Change { module, param, value } => match self.change(¶m, value) {
Ok(data) => Msg::Changed { module, param, data },
Err(e) => e.into_msg(req.0),
},
Msg::Do { module, command, arg } => match self.command(&command, arg) {
Ok(data) => Msg::Done { module, command, data },
Err(e) => e.into_msg(req.0),
},
Msg::Activate { module } => {
Msg::InitUpdates { module: module,
updates: self.activate_updates() }
},
_ => {
warn!("message should not arrive here: {}", req);
continue;
}
};
self.internals().rep_sender.send((Some(hid), rep)).unwrap();
},
recv(self.internals().poll_tickers.0) -> _ => {
self.poll_normal(poll_normal_counter);
poll_normal_counter = poll_normal_counter.wrapping_add(1);
},
recv(self.internals().poll_tickers.1) -> _ => {
self.poll_busy(poll_busy_counter);
poll_busy_counter = poll_busy_counter.wrapping_add(1);
}
}
}
}
}