roboplc 0.6.4

Framework for PLCs and real-time micro-services
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
use std::{
    sync::{
        atomic::{AtomicI8, Ordering},
        Arc,
    },
    thread,
    time::Duration,
};

#[cfg(target_os = "linux")]
use crate::suicide;
use crate::{
    critical,
    hub::Hub,
    supervisor::Supervisor,
    thread_rt::{Builder, RTParams, Scheduling},
    Error, Result,
};
pub use roboplc_derive::WorkerOpts;
use rtsc::data_policy::DataDeliveryPolicy;
#[cfg(target_os = "linux")]
use signal_hook::{
    consts::{SIGINT, SIGTERM, SIGUSR2},
    iterator::Signals,
};
use tracing::error;

/// Controller prelude
pub mod prelude {
    pub use super::{Context, Controller, WResult, Worker, WorkerOptions};
    pub use roboplc_derive::WorkerOpts;
}

/// Result type, which must be returned by workers' `run` method
pub type WResult = std::result::Result<(), Box<dyn std::error::Error + Send + Sync>>;

/// Handler result type
pub type HandlerResult =
    std::result::Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>;

/// Sleep step (used in blocking)
pub const SLEEP_STEP: Duration = Duration::from_millis(100);

/// Controller state beacon. Can be cloned and shared with no limitations.
#[derive(Clone)]
pub struct State {
    state: Arc<AtomicI8>,
}

impl State {
    fn new() -> Self {
        Self {
            state: AtomicI8::new(ControllerStateKind::Starting as i8).into(),
        }
    }
    /// Set controller state
    pub fn set(&self, state: ControllerStateKind) {
        self.state.store(state as i8, Ordering::SeqCst);
    }
    /// Get controller state
    pub fn get(&self) -> ControllerStateKind {
        ControllerStateKind::from(self.state.load(Ordering::SeqCst))
    }
    /// Is the controller online (starting or running)
    pub fn is_online(&self) -> bool {
        self.get() >= ControllerStateKind::Starting
    }
}

impl Default for State {
    fn default() -> Self {
        Self::new()
    }
}

/// Controller state kind
#[derive(Default, Eq, PartialEq, Clone, Copy, Ord, PartialOrd)]
#[repr(i8)]
#[allow(clippy::module_name_repetitions)]
pub enum ControllerStateKind {
    #[default]
    /// The controller is starting
    Starting = 0,
    /// The controller is active (accepting tasks)
    Active = 1,
    /// The controller is running (tasks are being executed)
    Running = 2,
    /// The controller is stopping
    Stopping = -1,
    /// The controller is stopped
    Stopped = -100,
    /// The controller state is unknown
    Unknown = -128,
}

impl From<i8> for ControllerStateKind {
    fn from(v: i8) -> Self {
        match v {
            0 => ControllerStateKind::Starting,
            1 => ControllerStateKind::Active,
            2 => ControllerStateKind::Running,
            -100 => ControllerStateKind::Stopped,
            _ => ControllerStateKind::Unknown,
        }
    }
}

/// Controller, used to manage workers and their context
///
/// Generic parameter `D` is the message type for the controller's [`Hub`] messages.
/// Generic parameter `V` is the type of shared variables. If shared variables are not required, it
/// can be set to `()`.
///
pub struct Controller<D, V>
where
    D: DataDeliveryPolicy + Clone + Send + Sync + 'static,
    V: Send + Sync + 'static,
{
    supervisor: Supervisor<()>,
    hub: Hub<D>,
    state: State,
    variables: Arc<V>,
}

impl<D, V> Controller<D, V>
where
    D: DataDeliveryPolicy + Clone + Send + Sync + 'static,
    V: Send + Sync + 'static,
{
    /// Creates a new controller instance, variables MUST implement [`Default`] trait
    pub fn new() -> Self
    where
        V: Default,
    {
        Self {
            supervisor: <_>::default(),
            hub: <_>::default(),
            state: State::new(),
            variables: <_>::default(),
        }
    }
    /// Creates a new controller instance with a pre-defined variables object
    pub fn new_with_variables(variables: V) -> Self {
        Self {
            supervisor: <_>::default(),
            hub: <_>::default(),
            state: State::new(),
            variables: Arc::new(variables),
        }
    }
    /// Spawns a worker
    pub fn spawn_worker<W: Worker<D, V> + WorkerOptions + 'static>(
        &mut self,
        mut worker: W,
    ) -> Result<()> {
        let context = self.context();
        let mut rt_params = RTParams::new().set_scheduling(worker.worker_scheduling());
        if let Some(priority) = worker.worker_priority() {
            rt_params = rt_params.set_priority(priority);
        }
        if let Some(cpu_ids) = worker.worker_cpu_ids() {
            rt_params = rt_params.set_cpu_ids(cpu_ids);
        }
        let mut builder = Builder::new()
            .name(worker.worker_name())
            .rt_params(rt_params)
            .blocking(worker.worker_is_blocking());
        if let Some(stack_size) = worker.worker_stack_size() {
            builder = builder.stack_size(stack_size);
        }
        self.supervisor.spawn(builder, move || {
            if let Err(e) = worker.run(&context) {
                error!(worker=worker.worker_name(), error=%e, "worker terminated");
                critical(&format!(
                    "Worker {} terminated: {}",
                    worker.worker_name(),
                    e
                ));
            }
        })?;
        Ok(())
    }
    /// Spawns a task thread (non-real-time) with the default options
    pub fn spawn_task<F>(&mut self, name: &str, f: F) -> Result<()>
    where
        F: FnOnce() + Send + 'static,
    {
        self.supervisor.spawn(Builder::new().name(name), f)?;
        Ok(())
    }
    /// Registers SIGINT, SIGTERM and SIGUSR2 signals to a thread which terminates the controller
    /// with dummy handlers (see [`Controller::register_signals_with_handlers()`]).
    pub fn register_signals(&mut self, shutdown_timeout: Duration) -> Result<()> {
        self.register_signals_with_handlers(|_| {}, |_| Ok(()), shutdown_timeout)
    }
    /// Registers SIGINT, SIGUSR2 and SIGTERM signals to a thread which terminates the controller.
    ///
    /// The signals SIGINT and SIGTERM are used to gracefully terminate the controller. The SIGUSR2
    /// signal is used to perform a live reload of the executable.
    ///     
    /// Note: to properly terminate all workers must either periodically check the controller state
    /// with [`Context::is_online()`] or be marked as blocking by overriding
    /// [`WorkerOptions::worker_is_blocking()`] (or setting `blocking` to `true` in [`WorkerOpts`]
    /// derive macro).
    ///
    /// Workers that listen to hub messages may also receive a custom termination message and gracefully
    /// shut themselves down. For such functionality a custom signal handler should be implemented
    /// (See <https://github.com/roboplc/roboplc/blob/main/examples/shutdown.rs>).
    ///
    /// The thread is automatically spawned with FIFO scheduling and the highest priority on CPU 0
    /// or falled back to non-realtime.
    ///
    /// If the reload handler function returns error, the reload process is aborted. Otherwise, the
    /// executable is reloaded (see [`crate::reload_executable()`]).
    pub fn register_signals_with_handlers<SH, RH>(
        &mut self,
        #[allow(unused_variables)] shutdown_handler_fn: SH,
        #[allow(unused_variables)] reload_handler_fn: RH,
        #[allow(unused_variables)] shutdown_timeout: Duration,
    ) -> Result<()>
    where
        SH: Fn(&Context<D, V>) + Send + Sync + 'static,
        RH: Fn(&Context<D, V>) -> HandlerResult + Send + Sync + 'static,
    {
        let shutdown_handler = Arc::new(shutdown_handler_fn);
        let reload_handler = Arc::new(reload_handler_fn);
        let mut builder = Builder::new().name("RoboPLCSigRT").rt_params(
            RTParams::new()
                .set_priority(99)
                .set_scheduling(Scheduling::FIFO)
                .set_cpu_ids(&[0]),
        );
        builder.park_on_errors = true;
        macro_rules! sig_handler {
            ($shutdown_handler: expr, $reload_handler: expr) => {{
                #[cfg(target_os = "linux")]
                {
                    let context = self.context();
                    let mut signals = Signals::new([SIGTERM, SIGINT, SIGUSR2])?;
                    move || {
                        if let Some(sig) = signals.forever().next() {
                            match sig {
                                SIGTERM | SIGINT => {
                                    suicide(shutdown_timeout, true);
                                    $shutdown_handler(&context);
                                    context.terminate();
                                }
                                SIGUSR2 => {
                                    tracing::warn!("Performing live reload");
                                    if let Err(e) = $reload_handler(&context) {
                                        error!(error=%e, "reload handler");
                                    } else if let Err(e) = crate::reload_executable() {
                                        panic!("Live reload failed: {}", e);
                                    }
                                }
                                _ => unreachable!(),
                            }
                        }
                    }
                }
                #[cfg(not(target_os = "linux"))]
                {
                    move || {}
                }
            }};
        }
        #[allow(unused_variables)]
        let sh = shutdown_handler.clone();
        #[allow(unused_variables)]
        let rh = reload_handler.clone();
        if let Err(e) = self.supervisor.spawn(builder.clone(), sig_handler!(sh, rh)) {
            if !matches!(e, Error::RTSchedSetSchduler(_)) {
                return Err(e);
            }
        } else {
            return Ok(());
        }
        // fall-back to non-rt handler
        let builder = builder.name("RoboPLCSig").rt_params(RTParams::new());
        self.supervisor
            .spawn(builder, sig_handler!(shutdown_handler, reload_handler))?;
        Ok(())
    }
    fn context(&self) -> Context<D, V> {
        Context {
            hub: self.hub.clone(),
            state: self.state.clone(),
            variables: self.variables.clone(),
        }
    }
    /// Blocks until all non-blocking tasks/workers are finished. In case if signals are
    /// registered, the controller is blocked until a signal is received, despite all worker tasks
    /// are finished. Use [`Controller::block_while_online()`] instead to let controller workers
    /// shut down the controller.
    pub fn block(&mut self) {
        self.supervisor.join_all();
        self.state.set(ControllerStateKind::Stopped);
    }
    /// Blocks until the controller goes into stopping/stopped
    pub fn block_while_online(&self) {
        while self.state.is_online() {
            thread::sleep(SLEEP_STEP);
        }
        self.state.set(ControllerStateKind::Stopped);
    }
    /// Is the controller online (starting or running)
    pub fn is_online(&self) {
        self.state.is_online();
    }
    /// Sets controller state to Stopping
    pub fn terminate(&mut self) {
        self.state.set(ControllerStateKind::Stopping);
    }
    /// State beacon
    pub fn state(&self) -> &State {
        &self.state
    }
    /// Controller [`Hub`] instance
    pub fn hub(&self) -> &Hub<D> {
        &self.hub
    }
    /// Controller [`Supervisor`] instance
    pub fn supervisor(&self) -> &Supervisor<()> {
        &self.supervisor
    }
    /// Controller shared variables
    pub fn variables(&self) -> &Arc<V> {
        &self.variables
    }
}

impl<D, V> Default for Controller<D, V>
where
    D: DataDeliveryPolicy + Clone + Send + Sync + 'static,
    V: Send + Sync + 'static + Default,
{
    fn default() -> Self {
        Self::new()
    }
}

/// The context type is used to give workers access to the controller's hub, state, and shared
/// variables.
pub struct Context<D, V>
where
    D: DataDeliveryPolicy + Clone + Send + Sync + 'static,
    V: Send,
{
    hub: Hub<D>,
    state: State,
    variables: Arc<V>,
}

impl<D, V> Clone for Context<D, V>
where
    D: DataDeliveryPolicy + Clone + Send + Sync + 'static,
    V: Send,
{
    fn clone(&self) -> Self {
        Self {
            hub: self.hub.clone(),
            state: self.state.clone(),
            variables: self.variables.clone(),
        }
    }
}

impl<D, V> Context<D, V>
where
    D: DataDeliveryPolicy + Clone + Send + Sync + 'static,
    V: Send,
{
    /// Controller's hub instance
    pub fn hub(&self) -> &Hub<D> {
        &self.hub
    }
    /// Controller's shared variables
    pub fn variables(&self) -> &Arc<V> {
        &self.variables
    }
    /// Controller's state
    pub fn get_state(&self) -> ControllerStateKind {
        self.state.get()
    }
    /// Set controller's state
    pub fn set_state(&self, state: ControllerStateKind) {
        self.state.set(state);
    }
    /// Is the controller online (starting or running)
    pub fn is_online(&self) -> bool {
        self.state.is_online()
    }
    /// Sets controller state to Stopping
    pub fn terminate(&self) {
        self.state.set(ControllerStateKind::Stopping);
    }
}

/// The trait which MUST be implemented by all workers
pub trait Worker<D: DataDeliveryPolicy + Clone + Send + Sync + 'static, V: Send>:
    Send + Sync
{
    /// The worker's main function, started by [`Controller::spawn_worker()`]. If the function
    /// returns an error, the process is terminated using [`critical()`].
    fn run(&mut self, context: &Context<D, V>) -> WResult;
}

/// The trait which MUST be implemented by all workers
pub trait WorkerOptions {
    /// A mandatory method, an unique name for the worker
    fn worker_name(&self) -> &str;
    /// The stack size for the worker thread
    fn worker_stack_size(&self) -> Option<usize> {
        None
    }
    /// The [`Scheduling`] policy for the worker thread
    fn worker_scheduling(&self) -> Scheduling {
        Scheduling::default()
    }
    /// The scheduled priority for the worker thread
    fn worker_priority(&self) -> Option<i32> {
        None
    }
    /// The CPU ID(s) affinity for the worker thread
    fn worker_cpu_ids(&self) -> Option<&[usize]> {
        None
    }
    /// A hint for task supervisors that the worker blocks the thread (e.g. listens to a socket or
    /// has got a big interval in the main loop, does not return any useful result and should not
    /// be joined)
    fn worker_is_blocking(&self) -> bool {
        false
    }
}