1use std::fmt;
26use std::ops::Deref;
27use std::time::{Duration, Instant};
28use log::*;
29use serde_json::{Value, json};
30use derive_new::new;
31use mlzutil::time::localtime;
32use crossbeam_channel::{tick, Receiver, select};
33
34use crate::config::{ModuleConfig, Visibility};
35use crate::errors::Error;
36use crate::proto::Msg;
37use crate::server::{ReqReceiver, ModRepSender};
38use crate::types::TypeDesc;
39
40#[derive(new, Clone)]
42pub struct ModInternals {
43 name: String,
44 config: ModuleConfig,
45 req_receiver: ReqReceiver,
46 rep_sender: ModRepSender,
47 poll_tickers: (Receiver<Instant>, Receiver<Instant>),
48}
49
50impl ModInternals {
51 pub fn name(&self) -> &str {
52 &self.name
53 }
54 pub fn config(&self) -> &ModuleConfig {
55 &self.config
56 }
57 pub fn class(&self) -> &str {
58 &self.config.class
59 }
60 pub fn req_receiver(&self) -> &ReqReceiver {
61 &self.req_receiver
62 }
63}
64
65#[derive(Default)]
67pub struct CachedParam<T> {
68 data: T,
69 time: f64,
70}
71
72impl<T> Deref for CachedParam<T> {
73 type Target = T;
74 fn deref(&self) -> &T {
75 &self.data
76 }
77}
78
79impl<T> fmt::Display for CachedParam<T> where T: fmt::Display {
80 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
81 write!(fmt, "{}", self.data)
82 }
83}
84
85impl<T> fmt::Debug for CachedParam<T> where T: fmt::Debug {
86 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
87 write!(fmt, "{:?}", self.data)
88 }
89}
90
91impl<T: PartialEq + Clone> CachedParam<T> {
92 pub fn new(value: T) -> Self {
93 Self { data: value, time: localtime() }
94 }
95
96 pub fn set(&mut self, value: T) {
97 self.time = localtime();
98 self.data = value;
99 }
100
101 pub fn update<TD: TypeDesc<Repr=T>>(&mut self, value: T, td: &TD) -> Result<(Value, f64, bool), Error> {
105 self.time = localtime();
106 let is_update = if value != self.data {
107 self.data = value.clone();
108 true
109 } else {
110 false
111 };
112 Ok((td.to_json(value)?, self.time, is_update))
113 }
114
115 pub fn time(&self) -> f64 {
116 self.time
117 }
118}
119
120pub trait Module : ModuleBase {
122 fn create(internals: ModInternals) -> Result<Self, Error> where Self: Sized;
123 fn setup(&mut self) -> Result<(), Error>;
124 fn teardown(&mut self);
125}
126
127pub trait ModuleBase {
129 fn describe(&self) -> Value;
131 fn command(&mut self, cmd: &str, args: Value) -> Result<Value, Error>;
133 fn read(&mut self, param: &str) -> Result<Value, Error>;
135 fn change(&mut self, param: &str, value: Value) -> Result<Value, Error>;
137 fn init_params(&mut self) -> Result<(), Error>;
140 fn activate_updates(&mut self) -> Vec<Msg>;
143
144 fn poll_normal(&mut self, n: usize);
147 fn poll_busy(&mut self, n: usize);
149
150 fn internals(&self) -> &ModInternals;
154 fn internals_mut(&mut self) -> &mut ModInternals;
155 #[inline]
156 fn name(&self) -> &str { &self.internals().name }
157 #[inline]
158 fn config(&self) -> &ModuleConfig { &self.internals().config }
159
160 fn init_parameter<T: Clone + PartialEq>(
166 &mut self, param: &str, cached: impl Fn(&mut Self) -> &mut CachedParam<T>,
167 partype: &impl TypeDesc<Repr=T>, update: impl Fn(&mut Self, T) -> Result<(), Error>,
168 swonly: bool, readonly: bool, default: Option<impl Fn() -> T>
169 ) -> Result<(), Error> {
170 if swonly {
171 let value = if let Some(def) = default {
172 if let Some(val) = self.config().parameters.get(param) {
173 debug!("initializing value for param {} (from config)", param);
174 partype.from_json(val)?
175 } else {
176 debug!("initializing value for param {} (from default)", param);
177 def().into()
178 }
179 } else {
180 debug!("initializing value for param {} (from config)", param);
182 partype.from_json(&self.config().parameters[param])?
183 };
184 cached(self).set(value);
185 if !readonly {
186 let value = cached(self).clone();
187 update(self, value)?;
188 }
189 } else {
190 if !readonly {
191 if let Some(def) = default {
192 let value = if let Some(val) = self.config().parameters.get(param) {
193 debug!("initializing value for param {} (from config)", param);
194 val.clone()
195 } else {
196 debug!("initializing value for param {} (from default)", param);
197 partype.to_json(def().into())?
198 };
199 self.change(param, value)?;
202 } else {
203 if let Some(val) = self.config().parameters.get(param) {
204 debug!("initializing value for param {} (from config)", param);
205 let val = val.clone();
206 self.change(param, val)?;
207 } else {
208 debug!("initializing value for param {} (from hardware)", param);
209 self.read(param)?;
210 }
211 }
212 } else {
213 debug!("initializing value for param {} (from hardware)", param);
214 self.read(param)?;
215 }
216 }
217 Ok(())
218 }
219
220 fn send_update(&self, param: &str, value: Value, tstamp: f64) {
223 self.internals().rep_sender.send(
224 (None, Msg::Update { module: self.name().into(),
225 param: param.into(),
226 data: json!([value, {"t": tstamp}]) })).unwrap();
227 }
228
229 fn update_pollinterval(&mut self, val: f64) -> Result<(), Error> {
235 self.internals_mut().poll_tickers = (
236 tick(Duration::from_millis((val * 1000.) as u64)),
237 tick(Duration::from_millis((val * 200.) as u64)),
238 );
239 Ok(())
240 }
241
242 fn run(mut self) where Self: Sized + Module {
248 mlzlog::set_thread_prefix(format!("[{}] ", self.name()));
249
250 if let Err(e) = self.init_params() {
253 panic!("error initializing params: {}", e);
254 }
255 if let Err(e) = self.setup() {
256 panic!("setup failed: {}", e);
257 }
258
259 if self.config().visibility != Visibility::None {
262 self.internals().rep_sender.send(
263 (None, Msg::Describing { id: self.name().into(),
264 structure: self.describe() })).unwrap();
265 }
266
267 let mut poll_normal_counter = 0usize;
268 let mut poll_busy_counter = 0usize;
269
270 loop {
271 select! {
272 recv(self.internals().req_receiver) -> res => if let Ok((hid, req)) = res {
273 let rep = match req.1 {
276 Msg::Read { module, param } => match self.read(¶m) {
277 Ok(data) => Msg::Update { module, param, data },
278 Err(e) => e.into_msg(req.0),
279 },
280 Msg::Change { module, param, value } => match self.change(¶m, value) {
281 Ok(data) => Msg::Changed { module, param, data },
282 Err(e) => e.into_msg(req.0),
283 },
284 Msg::Do { module, command, arg } => match self.command(&command, arg) {
285 Ok(data) => Msg::Done { module, command, data },
286 Err(e) => e.into_msg(req.0),
287 },
288 Msg::Activate { module } => {
289 Msg::InitUpdates { module: module,
290 updates: self.activate_updates() }
291 },
292 _ => {
293 warn!("message should not arrive here: {}", req);
294 continue;
295 }
296 };
297 self.internals().rep_sender.send((Some(hid), rep)).unwrap();
298 },
299 recv(self.internals().poll_tickers.0) -> _ => {
302 self.poll_normal(poll_normal_counter);
303 poll_normal_counter = poll_normal_counter.wrapping_add(1);
304 },
305 recv(self.internals().poll_tickers.1) -> _ => {
306 self.poll_busy(poll_busy_counter);
307 poll_busy_counter = poll_busy_counter.wrapping_add(1);
308 }
309 }
310 }
311 }
312}