secop_core/
module.rs

1// -----------------------------------------------------------------------------
2// Rust SECoP playground
3//
4// This program is free software; you can redistribute it and/or modify it under
5// the terms of the GNU General Public License as published by the Free Software
6// Foundation; either version 2 of the License, or (at your option) any later
7// version.
8//
9// This program is distributed in the hope that it will be useful, but WITHOUT
10// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11// FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
12// details.
13//
14// You should have received a copy of the GNU General Public License along with
15// this program; if not, write to the Free Software Foundation, Inc.,
16// 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17//
18// Module authors:
19//   Georg Brandl <g.brandl@fz-juelich.de>
20//
21// -----------------------------------------------------------------------------
22//
23//! This module contains basic module functionality.
24
25use std::fmt;
26use std::ops::Deref;
27use std::time::{Duration, Instant};
28use log::*;
29use serde_json::{Value, json};
30use derive_new::new;
31use mlzutil::time::localtime;
32use crossbeam_channel::{tick, Receiver, select};
33
34use crate::config::{ModuleConfig, Visibility};
35use crate::errors::Error;
36use crate::proto::Msg;
37use crate::server::{ReqReceiver, ModRepSender};
38use crate::types::TypeDesc;
39
40/// Data that every module requires.
41#[derive(new, Clone)]
42pub struct ModInternals {
43    name: String,
44    config: ModuleConfig,
45    req_receiver: ReqReceiver,
46    rep_sender: ModRepSender,
47    poll_tickers: (Receiver<Instant>, Receiver<Instant>),
48}
49
50impl ModInternals {
51    pub fn name(&self) -> &str {
52        &self.name
53    }
54    pub fn config(&self) -> &ModuleConfig {
55        &self.config
56    }
57    pub fn class(&self) -> &str {
58        &self.config.class
59    }
60    pub fn req_receiver(&self) -> &ReqReceiver {
61        &self.req_receiver
62    }
63}
64
65/// Cache for a single parameter value.
66#[derive(Default)]
67pub struct CachedParam<T> {
68    data: T,
69    time: f64,
70}
71
72impl<T> Deref for CachedParam<T> {
73    type Target = T;
74    fn deref(&self) -> &T {
75        &self.data
76    }
77}
78
79impl<T> fmt::Display for CachedParam<T> where T: fmt::Display {
80    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
81        write!(fmt, "{}", self.data)
82    }
83}
84
85impl<T> fmt::Debug for CachedParam<T> where T: fmt::Debug {
86    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
87        write!(fmt, "{:?}", self.data)
88    }
89}
90
91impl<T: PartialEq + Clone> CachedParam<T> {
92    pub fn new(value: T) -> Self {
93        Self { data: value, time: localtime() }
94    }
95
96    pub fn set(&mut self, value: T) {
97        self.time = localtime();
98        self.data = value;
99    }
100
101    /// Gets a newly determined value for this parameter, which is then cached,
102    /// possibly an update message is sent, and the value is returned JSONified
103    /// for sending in a reply.
104    pub fn update<TD: TypeDesc<Repr=T>>(&mut self, value: T, td: &TD) -> Result<(Value, f64, bool), Error> {
105        self.time = localtime();
106        let is_update = if value != self.data {
107            self.data = value.clone();
108            true
109        } else {
110            false
111        };
112        Ok((td.to_json(value)?, self.time, is_update))
113    }
114
115    pub fn time(&self) -> f64 {
116        self.time
117    }
118}
119
120/// Part of the Module trait to be implemented by user.
121pub trait Module : ModuleBase {
122    fn create(internals: ModInternals) -> Result<Self, Error> where Self: Sized;
123    fn setup(&mut self) -> Result<(), Error>;
124    fn teardown(&mut self);
125}
126
127/// Part of the Module trait that is implemented by the derive macro.
128pub trait ModuleBase {
129    /// Return the descriptive data for this module (a JSON object).
130    fn describe(&self) -> Value;
131    /// Execute a command.
132    fn command(&mut self, cmd: &str, args: Value) -> Result<Value, Error>;
133    /// Read a parameter and possibly emit an update message.
134    fn read(&mut self, param: &str) -> Result<Value, Error>;
135    /// Change a parameter and possibly emit an update message.
136    fn change(&mut self, param: &str, value: Value) -> Result<Value, Error>;
137    // TODO: is a result necessary?
138    /// Initialize cached values for all parameters.
139    fn init_params(&mut self) -> Result<(), Error>;
140    /// Get a list of updates for all parameters, which must be sent upon
141    /// activation of the module.
142    fn activate_updates(&mut self) -> Vec<Msg>;
143
144    /// Poll parameters.  If device is busy, parameters that participate in
145    /// busy-poll are not polled.
146    fn poll_normal(&mut self, n: usize);
147    /// Poll parameters that participate in busy-poll if device status is busy.
148    fn poll_busy(&mut self, n: usize);
149
150    /// Return a reference to the module internals.  Even though we require
151    /// the internals to be a member with a fixed name, the member is not
152    /// known in the `run` method below.
153    fn internals(&self) -> &ModInternals;
154    fn internals_mut(&mut self) -> &mut ModInternals;
155    #[inline]
156    fn name(&self) -> &str { &self.internals().name }
157    #[inline]
158    fn config(&self) -> &ModuleConfig { &self.internals().config }
159
160    /// Determine and set the initial value for a parameter.
161    ///
162    /// This is quite complex since we have multiple sources (defaults from
163    /// code, config file, hardware) and multiple ways of using them (depending
164    /// on whether the parameter is writable at runtime).
165    fn init_parameter<T: Clone + PartialEq>(
166        &mut self, param: &str, cached: impl Fn(&mut Self) -> &mut CachedParam<T>,
167        partype: &impl TypeDesc<Repr=T>, update: impl Fn(&mut Self, T) -> Result<(), Error>,
168        swonly: bool, readonly: bool, default: Option<impl Fn() -> T>
169    ) -> Result<(), Error> {
170        if swonly {
171            let value = if let Some(def) = default {
172                if let Some(val) = self.config().parameters.get(param) {
173                    debug!("initializing value for param {} (from config)", param);
174                    partype.from_json(val)?
175                } else {
176                    debug!("initializing value for param {} (from default)", param);
177                    def().into()
178                }
179            } else {
180                // must be mandatory
181                debug!("initializing value for param {} (from config)", param);
182                partype.from_json(&self.config().parameters[param])?
183            };
184            cached(self).set(value);
185            if !readonly {
186                let value = cached(self).clone();
187                update(self, value)?;
188            }
189        } else {
190            if !readonly {
191                if let Some(def) = default {
192                    let value = if let Some(val) = self.config().parameters.get(param) {
193                        debug!("initializing value for param {} (from config)", param);
194                        val.clone()
195                    } else {
196                        debug!("initializing value for param {} (from default)", param);
197                        partype.to_json(def().into())?
198                    };
199                    // This will emit an update message, but since the server is starting
200                    // up, we can assume it hasn't been activated yet.
201                    self.change(param, value)?;
202                } else {
203                    if let Some(val) = self.config().parameters.get(param) {
204                        debug!("initializing value for param {} (from config)", param);
205                        let val = val.clone();
206                        self.change(param, val)?;
207                    } else {
208                        debug!("initializing value for param {} (from hardware)", param);
209                        self.read(param)?;
210                    }
211                }
212            } else {
213                debug!("initializing value for param {} (from hardware)", param);
214                self.read(param)?;
215            }
216        }
217        Ok(())
218    }
219
220    /// Send a general update message back to the dispatcher, which decides if
221    /// and where to send it on.
222    fn send_update(&self, param: &str, value: Value, tstamp: f64) {
223        self.internals().rep_sender.send(
224            (None, Msg::Update { module: self.name().into(),
225                                 param: param.into(),
226                                 data: json!([value, {"t": tstamp}]) })).unwrap();
227    }
228
229    /// Updates the regular poll interval to the given value in seconds, and the
230    /// busy poll interval to 1/5 of it.
231    ///
232    /// This is like an ordinary `update_param` method, but on the trait since
233    /// it is always implemented the same.
234    fn update_pollinterval(&mut self, val: f64) -> Result<(), Error> {
235        self.internals_mut().poll_tickers = (
236            tick(Duration::from_millis((val * 1000.) as u64)),
237            tick(Duration::from_millis((val * 200.) as u64)),
238        );
239        Ok(())
240    }
241
242    /// Runs the main loop for the module, which does the following:
243    ///
244    /// * Initialize the module parameters
245    /// * Handle incoming requests
246    /// * Poll parameters periodically
247    fn run(mut self) where Self: Sized + Module {
248        mlzlog::set_thread_prefix(format!("[{}] ", self.name()));
249
250        // Do initialization steps.  On failure, we panic, which will be caught
251        // upstream and retries are scheduled accordingly.
252        if let Err(e) = self.init_params() {
253            panic!("error initializing params: {}", e);
254        }
255        if let Err(e) = self.setup() {
256            panic!("setup failed: {}", e);
257        }
258
259        // Tell the dispatcher how to describe ourselves.  If the visibility is
260        // "none", the module is assumed to be internal-use only.
261        if self.config().visibility != Visibility::None {
262            self.internals().rep_sender.send(
263                (None, Msg::Describing { id: self.name().into(),
264                                         structure: self.describe() })).unwrap();
265        }
266
267        let mut poll_normal_counter = 0usize;
268        let mut poll_busy_counter = 0usize;
269
270        loop {
271            select! {
272                recv(self.internals().req_receiver) -> res => if let Ok((hid, req)) = res {
273                    // These are the only messages that are handled here.  They all
274                    // generate a reply, which is sent back to the dispatcher.
275                    let rep = match req.1 {
276                        Msg::Read { module, param } => match self.read(&param) {
277                            Ok(data) => Msg::Update { module, param, data },
278                            Err(e) => e.into_msg(req.0),
279                        },
280                        Msg::Change { module, param, value } => match self.change(&param, value) {
281                            Ok(data) => Msg::Changed { module, param, data },
282                            Err(e) => e.into_msg(req.0),
283                        },
284                        Msg::Do { module, command, arg } => match self.command(&command, arg) {
285                            Ok(data) => Msg::Done { module, command, data },
286                            Err(e) => e.into_msg(req.0),
287                        },
288                        Msg::Activate { module } => {
289                            Msg::InitUpdates { module: module,
290                                               updates: self.activate_updates() }
291                        },
292                        _ => {
293                            warn!("message should not arrive here: {}", req);
294                            continue;
295                        }
296                    };
297                    self.internals().rep_sender.send((Some(hid), rep)).unwrap();
298                },
299                // TODO: decide if polling "atomically" (i.e. all parameters at once)
300                // is ok, since it could delay client requests.
301                recv(self.internals().poll_tickers.0) -> _ => {
302                    self.poll_normal(poll_normal_counter);
303                    poll_normal_counter = poll_normal_counter.wrapping_add(1);
304                },
305                recv(self.internals().poll_tickers.1) -> _ => {
306                    self.poll_busy(poll_busy_counter);
307                    poll_busy_counter = poll_busy_counter.wrapping_add(1);
308                }
309            }
310        }
311    }
312}