logicline/
lib.rs

1#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
2#![deny(missing_docs)]
3
4use std::{borrow::Cow, marker::PhantomData};
5#[cfg(feature = "recording")]
6use std::{
7    collections::{btree_map, BTreeMap},
8    mem,
9    sync::{atomic, Arc},
10};
11
12#[cfg(feature = "recording")]
13use serde::{Deserialize, Serialize};
14
15#[cfg(feature = "recording")]
16mod recording;
17#[cfg(feature = "recording")]
18pub use recording::{InputKind, LineState, Snapshot, SnapshotFormatter, StepState, StepStateInfo};
19#[cfg(feature = "recording")]
20use serde_json::Value;
21
22/// The process global state
23pub mod global {
24    #[cfg(feature = "exporter")]
25    const DEFAULT_PORT: u16 = 9001;
26
27    #[cfg(feature = "exporter")]
28    use std::net::{IpAddr, ToSocketAddrs};
29
30    use super::{Processor, Rack};
31    use once_cell::sync::Lazy;
32
33    #[cfg(feature = "locking-default")]
34    use parking_lot::Mutex;
35
36    #[cfg(feature = "locking-rt")]
37    use parking_lot_rt::Mutex;
38
39    #[cfg(feature = "locking-rt-safe")]
40    use rtsc::pi::Mutex;
41
42    static GLOBAL_LADDER: Lazy<Mutex<Rack>> = Lazy::new(|| Mutex::new(Rack::new()));
43
44    #[cfg(all(feature = "recording", feature = "exporter"))]
45    static SNAPSHOT_FORMATTER: once_cell::sync::OnceCell<
46        Box<dyn super::recording::SnapshotFormatter>,
47    > = once_cell::sync::OnceCell::new();
48
49    #[cfg(all(feature = "recording", feature = "exporter"))]
50    /// Sets the snapshot formatter for the global rack state (used for exporter only)
51    ///
52    /// # Panics
53    ///
54    /// Panics if the snapshot formatter is already set
55    pub fn set_snapshot_formatter(formatter: Box<dyn super::recording::SnapshotFormatter>) {
56        SNAPSHOT_FORMATTER
57            .set(formatter)
58            .unwrap_or_else(|_| panic!("Snapshot formatter already set"));
59    }
60
61    /// Sets the recording state for the global rack state
62    #[cfg(feature = "recording")]
63    pub fn set_recording(recording: bool) {
64        GLOBAL_LADDER.lock().set_recording(recording);
65    }
66
67    /// Creates a snapshot of the global state
68    #[cfg(feature = "recording")]
69    pub fn snapshot() -> super::Snapshot {
70        GLOBAL_LADDER.lock().snapshot()
71    }
72    ///
73    /// Creates a filtered snapshot of the global state
74    #[cfg(feature = "recording")]
75    pub fn snapshot_filtered<P>(predicate: P) -> super::Snapshot
76    where
77        P: Fn(&super::LineState) -> bool,
78    {
79        GLOBAL_LADDER.lock().snapshot_filtered(predicate)
80    }
81
82    /// Creates a new processor for the global state
83    pub fn processor() -> Processor {
84        GLOBAL_LADDER.lock().processor()
85    }
86
87    /// Stores the state of the processor in the global state
88    #[cfg(feature = "recording")]
89    pub fn ingress(processor: &mut Processor) {
90        GLOBAL_LADDER.lock().ingress(processor);
91    }
92
93    #[cfg(not(feature = "recording"))]
94    /// When the recording feature is disabled, this function does nothing
95    pub fn ingress(_processor: &mut Processor) {}
96
97    /// Installs the exporter (HTTP server) on the default address (all interfaces, 9001)
98    #[cfg(feature = "exporter")]
99    pub fn install_exporter() -> Result<(), Box<dyn std::error::Error>> {
100        install_exporter_on((IpAddr::from([0, 0, 0, 0]), DEFAULT_PORT))
101    }
102
103    /// Installs the exporter (HTTP server) on the specified address
104    #[cfg(feature = "exporter")]
105    pub fn install_exporter_on<A: ToSocketAddrs>(
106        addr: A,
107    ) -> Result<(), Box<dyn std::error::Error>> {
108        let server = rouille::Server::new(addr, move |request| {
109            if request.method() != "GET" {
110                return rouille::Response::empty_406();
111            }
112            if request.url() == "/state" {
113                let mut snapshot = snapshot();
114                if let Some(formatter) = SNAPSHOT_FORMATTER.get() {
115                    snapshot = formatter.format(snapshot);
116                }
117                return rouille::Response::json(&snapshot)
118                    .with_additional_header("Access-Control-Allow-Origin", "*")
119                    .with_additional_header("Access-Control-Allow-Methods", "GET, OPTIONS")
120                    .with_additional_header("Access-Control-Allow-Headers", "Content-Type");
121            }
122            #[cfg(feature = "exporter-ui")]
123            if request.url() == "/" {
124                return rouille::Response::html(include_str!("../ll-default-view/dist/index.html"));
125            }
126            rouille::Response::empty_404()
127        })
128        .map_err(|e| e.to_string())?;
129        std::thread::Builder::new()
130            .name("ll-exporter".to_string())
131            .spawn(move || {
132                server.run();
133            })?;
134        Ok(())
135    }
136}
137
138/// Logical step in the line
139pub struct Step<'p, INPUT> {
140    active: bool,
141    input: Option<INPUT>,
142    processor: Option<&'p mut Processor>,
143    line_name: Option<Cow<'static, str>>,
144}
145
146/// Operation helpers
147pub mod ops {
148
149    /// Logical NOT operation. In case if the input is `Some`, returns `None`, otherwise returns
150    /// `Some(())`
151    pub fn not(input: Option<()>) -> Option<()> {
152        if input.is_some() {
153            None
154        } else {
155            Some(())
156        }
157    }
158}
159
160#[cfg(feature = "recording")]
161/// When the recording feature is enabled, inputs must implement the [`serde::Serialize`] trait
162pub trait StepInput: Serialize {}
163
164#[cfg(feature = "recording")]
165impl<T> StepInput for T where T: Serialize {}
166
167#[cfg(not(feature = "recording"))]
168/// When the recording feature is disabled, the trait is empty
169pub trait StepInput {}
170
171#[cfg(not(feature = "recording"))]
172impl<T> StepInput for T {}
173
174impl<'p, INPUT> Step<'p, INPUT>
175where
176    INPUT: StepInput,
177{
178    /// Returns if the step is active (can be passed)
179    pub fn is_active(&self) -> bool {
180        self.active
181    }
182    /// Creates a new step
183    pub fn new(value: INPUT) -> Self {
184        Step {
185            input: Some(value),
186            active: true,
187            processor: None,
188            line_name: None,
189        }
190    }
191
192    #[cfg(feature = "recording")]
193    fn processor_is_recording(&self) -> bool {
194        self.processor
195            .as_ref()
196            .is_some_and(|processor| processor.is_recording())
197    }
198
199    #[cfg(feature = "recording")]
200    fn line_state_mut(&mut self) -> Option<&mut LineState> {
201        let processor = self.processor.as_mut()?;
202        let line_name = self.line_name.as_ref()?;
203        processor.result.get_mut(line_name)
204    }
205
206    /// Passes the step in case if any of the actions returns `Some`
207    #[allow(clippy::missing_panics_doc)]
208    pub fn then_any<OUTPUT, A, A2, F, F2>(mut self, action1: A, action2: A2) -> Step<'p, OUTPUT>
209    where
210        A: Into<Action<'p, F, INPUT, OUTPUT>>,
211        F: FnOnce(INPUT) -> Option<OUTPUT>,
212        A2: Into<Action<'p, F2, INPUT, OUTPUT>>,
213        F: FnOnce(INPUT) -> Option<OUTPUT>,
214        F2: FnOnce(INPUT) -> Option<OUTPUT>,
215        INPUT: Clone,
216    {
217        #[allow(unused_mut)]
218        let mut action1 = action1.into();
219        #[cfg(feature = "recording")]
220        let input_kind1 = action1.input_kind();
221        #[cfg(feature = "recording")]
222        let recorded_input1 = self
223            .processor_is_recording()
224            .then(|| action1.take_recorded_input_serialized(self.input.as_ref()))
225            .unwrap_or_default();
226        #[allow(unused_mut)]
227        let mut action2 = action2.into();
228        #[cfg(feature = "recording")]
229        let input_kind2 = action2.input_kind();
230        #[cfg(feature = "recording")]
231        let recorded_input2 = self
232            .processor_is_recording()
233            .then(|| action2.take_recorded_input_serialized(self.input.as_ref()))
234            .unwrap_or_default();
235        if !self.active || self.input.is_none() {
236            #[cfg(feature = "recording")]
237            {
238                if let Some(l) = self.line_state_mut() {
239                    let step_states = vec![
240                        StepStateInfo::new(action1.name, None::<()>, input_kind1, false),
241                        StepStateInfo::new(action2.name, None::<()>, input_kind2, false),
242                    ];
243                    l.extend(step_states);
244                }
245            }
246            return Step {
247                input: None,
248                active: false,
249                processor: self.processor,
250                line_name: self.line_name,
251            };
252        }
253        let action_input = self.input.take().unwrap();
254        let mut next_input = None;
255        #[cfg(feature = "recording")]
256        let mut step_states = Vec::with_capacity(2);
257        if let Some(output) = (action1.f)(action_input.clone()) {
258            next_input = Some(output);
259            #[cfg(feature = "recording")]
260            step_states.push(StepStateInfo::new_with_serialized_input(
261                action1.name,
262                recorded_input1,
263                input_kind1,
264                true,
265            ));
266        } else {
267            #[cfg(feature = "recording")]
268            step_states.push(StepStateInfo::new_with_serialized_input(
269                action1.name,
270                recorded_input1,
271                input_kind1,
272                false,
273            ));
274        }
275        if let Some(output) = (action2.f)(action_input) {
276            if next_input.is_none() {
277                next_input = Some(output);
278            }
279            #[cfg(feature = "recording")]
280            step_states.push(StepStateInfo::new_with_serialized_input(
281                action2.name,
282                recorded_input2,
283                input_kind2,
284                true,
285            ));
286        } else {
287            #[cfg(feature = "recording")]
288            step_states.push(StepStateInfo::new_with_serialized_input(
289                action2.name,
290                recorded_input2,
291                input_kind2,
292                false,
293            ));
294        }
295        #[cfg(feature = "recording")]
296        if let Some(l) = self.line_state_mut() {
297            l.extend(step_states);
298        }
299        Step {
300            active: next_input.is_some(),
301            input: next_input,
302            processor: self.processor,
303            line_name: self.line_name,
304        }
305    }
306
307    /// Passes the step in case if the action returns `Some`
308    #[allow(clippy::missing_panics_doc)]
309    pub fn then<OUTPUT, A, F>(mut self, action: A) -> Step<'p, OUTPUT>
310    where
311        A: Into<Action<'p, F, INPUT, OUTPUT>>,
312        F: FnOnce(INPUT) -> Option<OUTPUT>,
313    {
314        #[allow(unused_mut)]
315        let mut action = action.into();
316        #[cfg(feature = "recording")]
317        let input_kind = action.input_kind();
318        #[cfg(feature = "recording")]
319        macro_rules! record_processed {
320            ($name:expr, $passed:expr, $input:expr) => {
321                if let Some(l) = self.line_state_mut() {
322                    l.push_step_state(action.name, $input, input_kind, $passed);
323                }
324            };
325        }
326        if !self.active || self.input.is_none() {
327            #[cfg(feature = "recording")]
328            record_processed!(action.name, false, Value::Null);
329            return Step {
330                input: None,
331                active: false,
332                processor: self.processor,
333                line_name: self.line_name,
334            };
335        }
336        #[cfg(feature = "recording")]
337        let recorded_input = self
338            .processor_is_recording()
339            .then(|| action.take_recorded_input_serialized(self.input.as_ref()))
340            .unwrap_or_default();
341        if let Some(output) = (action.f)(self.input.take().unwrap()) {
342            #[cfg(feature = "recording")]
343            record_processed!(action.name, true, recorded_input);
344            Step {
345                input: Some(output),
346                active: true,
347                processor: self.processor,
348                line_name: self.line_name,
349            }
350        } else {
351            #[cfg(feature = "recording")]
352            record_processed!(action.name, false, recorded_input);
353            Step {
354                input: None,
355                active: false,
356                processor: self.processor,
357                line_name: self.line_name,
358            }
359        }
360    }
361}
362
363#[allow(dead_code)]
364/// Action is a function wrapper that can be used in a step
365pub struct Action<'a, F, INPUT, OUTPUT>
366where
367    F: FnOnce(INPUT) -> Option<OUTPUT>,
368{
369    f: F,
370    name: Cow<'static, str>,
371    #[cfg(feature = "recording")]
372    recorded_input: Option<&'a dyn erased_serde::Serialize>,
373    #[cfg(not(feature = "recording"))]
374    _recorded_input: PhantomData<&'a ()>,
375    _input: PhantomData<INPUT>,
376}
377
378impl<F, INPUT, OUTPUT> From<F> for Action<'_, F, INPUT, OUTPUT>
379where
380    F: FnOnce(INPUT) -> Option<OUTPUT>,
381{
382    fn from(function: F) -> Self {
383        Action::new("", function)
384    }
385}
386
387/// Creates a new action, in case if the name is not provided, the function name will be used as
388/// the name of the action (in case of closures it is recommended to always provide a name to get
389/// it clear and readable).
390#[macro_export]
391macro_rules! action {
392    ($f: expr) => {
393        $crate::Action::new(stringify!($f), $f)
394    };
395    ($name: expr, $f: expr) => {
396        $crate::Action::new($name, $f)
397    };
398}
399
400impl<'a, F, INPUT, OUTPUT> Action<'a, F, INPUT, OUTPUT>
401where
402    F: FnOnce(INPUT) -> Option<OUTPUT>,
403{
404    /// Creates a new action
405    pub fn new(name: impl Into<Cow<'static, str>>, f: F) -> Self {
406        Action {
407            f,
408            name: name.into(),
409            #[cfg(feature = "recording")]
410            recorded_input: None,
411            #[cfg(not(feature = "recording"))]
412            _recorded_input: PhantomData,
413            _input: PhantomData,
414        }
415    }
416    /// Sets the recorded (actual) input for the action function
417    #[cfg(feature = "recording")]
418    pub fn with_recorded_input<V>(mut self, input: &'a V) -> Self
419    where
420        V: Serialize,
421    {
422        self.recorded_input = Some(input);
423        self
424    }
425    #[cfg(not(feature = "recording"))]
426    #[allow(unused_mut)]
427    /// When the recording feature is disabled, this function does nothing
428    pub fn with_recorded_input<V>(mut self, _input: &'a V) -> Self {
429        self
430    }
431    #[cfg(feature = "recording")]
432    // WARNING: must be called before the input is taken
433    fn input_kind(&self) -> InputKind {
434        if self.recorded_input.is_some() {
435            InputKind::External
436        } else {
437            InputKind::Flow
438        }
439    }
440    #[cfg(feature = "recording")]
441    fn take_recorded_input_serialized(&mut self, fallback: Option<&INPUT>) -> Value
442    where
443        INPUT: StepInput,
444    {
445        if let Some(i) = self.recorded_input.take() {
446            serde_json::to_value(i).unwrap_or_default()
447        } else {
448            serde_json::to_value(fallback).unwrap_or_default()
449        }
450    }
451}
452
453#[derive(Default, Debug, Clone)]
454#[cfg_attr(feature = "recording", derive(Serialize, Deserialize))]
455/// State of the process or a group of logic lines. Acts as a factory for [`Processor`] instances.
456/// Shares recording state with the created processors.
457pub struct Rack {
458    #[cfg(feature = "recording")]
459    lines: BTreeMap<Cow<'static, str>, LineState>,
460    #[serde(skip)]
461    #[cfg(feature = "recording")]
462    recording: Arc<atomic::AtomicBool>,
463}
464
465impl Rack {
466    /// Creates a new state
467    pub fn new() -> Self {
468        Self::default()
469    }
470    /// Record the state of the lines and reset the processor
471    #[allow(unused_variables)]
472    pub fn ingress(&mut self, processor: &mut Processor) {
473        #[cfg(feature = "recording")]
474        self.lines.extend(mem::take(&mut processor.result));
475        #[cfg(not(feature = "recording"))]
476        processor.reset();
477    }
478    /// Returns the state of the line
479    #[cfg(feature = "recording")]
480    pub fn line_state(&self, name: &str) -> Option<&LineState> {
481        self.lines.get(name)
482    }
483    /// Returns all states of the lines
484    #[cfg(feature = "recording")]
485    pub fn lines(&self) -> &BTreeMap<Cow<'static, str>, LineState> {
486        &self.lines
487    }
488    /// Creates a snapshot of the current state of the lines
489    #[cfg(feature = "recording")]
490    pub fn snapshot(&self) -> Snapshot {
491        Snapshot {
492            lines: self.lines.clone(),
493        }
494    }
495    /// Creates a filtered snapshot of the current state of the lines
496    #[cfg(feature = "recording")]
497    pub fn snapshot_filtered<P>(&self, predicate: P) -> Snapshot
498    where
499        P: Fn(&LineState) -> bool,
500    {
501        let lines = self
502            .lines
503            .iter()
504            .filter(|(_, line)| predicate(line))
505            .map(|(name, line)| (name.clone(), line.clone()))
506            .collect();
507        Snapshot { lines }
508    }
509    /// Creates a new processor
510    pub fn processor(&self) -> Processor {
511        Processor {
512            #[cfg(feature = "recording")]
513            result: BTreeMap::new(),
514            #[cfg(feature = "recording")]
515            recording: Arc::clone(&self.recording),
516        }
517    }
518
519    /// Enables recording for the state
520    #[cfg(feature = "recording")]
521    pub fn with_recording_enabled(self) -> Self {
522        self.recording.store(true, atomic::Ordering::SeqCst);
523        self
524    }
525
526    /// Sets the recording state for the state
527    #[cfg(feature = "recording")]
528    pub fn set_recording(&mut self, recording: bool) {
529        self.recording.store(recording, atomic::Ordering::SeqCst);
530    }
531}
532
533/// Processor is an instance which creates logical lines
534#[derive(Default)]
535pub struct Processor {
536    #[cfg(feature = "recording")]
537    result: BTreeMap<Cow<'static, str>, LineState>,
538    #[cfg(feature = "recording")]
539    recording: Arc<atomic::AtomicBool>,
540}
541
542impl Processor {
543    /// Creates a new processor (state-independent)
544    pub fn new() -> Self {
545        Self::default()
546    }
547    /// Resets the processor recordings
548    pub fn reset(&mut self) {
549        #[cfg(feature = "recording")]
550        self.result.clear();
551    }
552    /// Returns the state of the line
553    #[cfg(feature = "recording")]
554    pub fn line_state(&self, name: &str) -> Option<&LineState> {
555        self.result.get(name)
556    }
557    /// Returns `true` if the processor is recording
558    #[cfg(feature = "recording")]
559    pub fn is_recording(&self) -> bool {
560        self.recording.load(atomic::Ordering::SeqCst)
561    }
562    /// Creates a new logical line
563    pub fn line<INPUT>(&mut self, name: impl Into<Cow<'static, str>>, input: INPUT) -> Step<INPUT> {
564        let name = name.into();
565        #[cfg(feature = "recording")]
566        if self.is_recording() {
567            match self.result.entry(name.clone()) {
568                btree_map::Entry::Vacant(entry) => {
569                    entry.insert(LineState::new(name.clone()));
570                }
571                btree_map::Entry::Occupied(mut entry) => {
572                    entry.get_mut().clear();
573                }
574            }
575        }
576        Step {
577            input: Some(input),
578            active: true,
579            processor: Some(self),
580            line_name: Some(name),
581        }
582    }
583}
584
585#[cfg(test)]
586mod test {
587    use super::Rack;
588    #[cfg(feature = "recording")]
589    use serde::Serialize;
590
591    #[test]
592    fn test_lines() {
593        #[allow(clippy::cast_lossless, clippy::unnecessary_wraps)]
594        fn get_temp1(data: &ModbusData) -> Option<f32> {
595            Some(data.temperature_1 as f32 / 10.0)
596        }
597
598        #[allow(clippy::cast_lossless, clippy::unnecessary_wraps)]
599        fn get_temp2(data: &ModbusData) -> Option<f32> {
600            Some(data.temperature_2 as f32 / 10.0)
601        }
602
603        #[allow(clippy::unnecessary_wraps)]
604        fn temperature_critical(temp: f32) -> Option<()> {
605            if temp > 90. {
606                Some(())
607            } else {
608                None
609            }
610        }
611
612        fn voltage_critical(voltage: u16) -> Option<()> {
613            if voltage > 300 {
614                Some(())
615            } else {
616                None
617            }
618        }
619
620        #[cfg_attr(feature = "recording", derive(Serialize))]
621        struct ModbusData {
622            temperature_1: u16,
623            voltage_1: u16,
624            temperature_2: u16,
625            voltage_2: u16,
626        }
627
628        let modbus_data = ModbusData {
629            temperature_1: 950,
630            voltage_1: 395,
631            temperature_2: 250,
632            voltage_2: 295,
633        };
634        let mut state = Rack::new();
635        let mut processor = state.processor();
636        let mut line1_active = true;
637        let mut line2_active = true;
638        #[cfg(feature = "recording")]
639        state.set_recording(true);
640        assert!(processor
641            .line("line1", &modbus_data)
642            .then(action!(get_temp1))
643            .then(action!(temperature_critical))
644            .then(
645                action!("voltage", |()| Some(modbus_data.voltage_1))
646                    .with_recorded_input(&modbus_data.voltage_1)
647            )
648            .then(action!(voltage_critical))
649            .then(action!("OFF", |()| {
650                line1_active = false;
651                Some(())
652            }))
653            .is_active());
654        assert!(!processor
655            .line("line2", &modbus_data)
656            .then(get_temp2)
657            .then(action!(temperature_critical))
658            .then(
659                action!("voltage", |()| Some(modbus_data.voltage_2))
660                    .with_recorded_input(&modbus_data.voltage_2)
661            )
662            .then(action!(voltage_critical))
663            .then(action!("OFF", |()| {
664                line2_active = false;
665                Some(())
666            }))
667            .is_active());
668        assert!(!line1_active);
669        assert!(line2_active);
670        state.ingress(&mut processor);
671    }
672}