logicline/
lib.rs

1#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
2#![deny(missing_docs)]
3
4use std::{borrow::Cow, marker::PhantomData};
5#[cfg(feature = "recording")]
6use std::{
7    collections::{btree_map, BTreeMap},
8    mem,
9    sync::{atomic, Arc},
10};
11
12#[cfg(feature = "recording")]
13use serde::{Deserialize, Serialize};
14
15#[cfg(feature = "recording")]
16mod recording;
17#[cfg(feature = "recording")]
18pub use recording::{InputKind, LineState, Snapshot, SnapshotFormatter, StepState, StepStateInfo};
19#[cfg(feature = "recording")]
20use serde_json::Value;
21
22/// The process global state
23pub mod global {
24    #[cfg(feature = "exporter")]
25    const DEFAULT_PORT: u16 = 9001;
26
27    #[cfg(feature = "exporter")]
28    use std::net::{IpAddr, ToSocketAddrs};
29
30    use super::{Processor, Rack};
31    use once_cell::sync::Lazy;
32
33    #[cfg(feature = "locking-default")]
34    use parking_lot::Mutex;
35
36    #[cfg(feature = "locking-rt")]
37    use parking_lot_rt::Mutex;
38
39    #[cfg(feature = "locking-rt-safe")]
40    use rtsc::pi::Mutex;
41
42    static GLOBAL_LADDER: Lazy<Mutex<Rack>> = Lazy::new(|| Mutex::new(Rack::new()));
43
44    #[cfg(all(feature = "recording", feature = "exporter"))]
45    static SNAPSHOT_FORMATTER: once_cell::sync::OnceCell<
46        Box<dyn super::recording::SnapshotFormatter>,
47    > = once_cell::sync::OnceCell::new();
48
49    #[cfg(all(feature = "recording", feature = "exporter"))]
50    /// Sets the snapshot formatter for the global rack state (used for exporter only)
51    ///
52    /// # Panics
53    ///
54    /// Panics if the snapshot formatter is already set
55    pub fn set_snapshot_formatter(formatter: Box<dyn super::recording::SnapshotFormatter>) {
56        SNAPSHOT_FORMATTER
57            .set(formatter)
58            .unwrap_or_else(|_| panic!("Snapshot formatter already set"));
59    }
60
61    /// Sets the recording state for the global rack state
62    #[cfg(feature = "recording")]
63    pub fn set_recording(recording: bool) {
64        GLOBAL_LADDER.lock().set_recording(recording);
65    }
66
67    /// Is the global rack state recording
68    #[cfg(feature = "recording")]
69    pub fn is_recording() -> bool {
70        GLOBAL_LADDER.lock().is_recording()
71    }
72
73    /// Creates a snapshot of the global state
74    #[cfg(feature = "recording")]
75    pub fn snapshot() -> super::Snapshot {
76        GLOBAL_LADDER.lock().snapshot()
77    }
78    ///
79    /// Creates a filtered snapshot of the global state
80    #[cfg(feature = "recording")]
81    pub fn snapshot_filtered<P>(predicate: P) -> super::Snapshot
82    where
83        P: Fn(&super::LineState) -> bool,
84    {
85        GLOBAL_LADDER.lock().snapshot_filtered(predicate)
86    }
87
88    /// Creates a new processor for the global state
89    pub fn processor() -> Processor {
90        GLOBAL_LADDER.lock().processor()
91    }
92
93    /// Stores the state of the processor in the global state
94    #[cfg(feature = "recording")]
95    pub fn ingress(processor: &mut Processor) {
96        GLOBAL_LADDER.lock().ingress(processor);
97    }
98
99    #[cfg(not(feature = "recording"))]
100    /// When the recording feature is disabled, this function does nothing
101    pub fn ingress(_processor: &mut Processor) {}
102
103    /// Installs the exporter (HTTP server) on the default address (all interfaces, 9001)
104    #[cfg(feature = "exporter")]
105    pub fn install_exporter() -> Result<(), Box<dyn std::error::Error>> {
106        install_exporter_on((IpAddr::from([0, 0, 0, 0]), DEFAULT_PORT))
107    }
108
109    /// Installs the exporter (HTTP server) on the specified address
110    #[cfg(feature = "exporter")]
111    pub fn install_exporter_on<A: ToSocketAddrs>(
112        addr: A,
113    ) -> Result<(), Box<dyn std::error::Error>> {
114        let server = rouille::Server::new(addr, move |request| {
115            if request.method() != "GET" {
116                return rouille::Response::empty_406();
117            }
118            if request.url() == "/state" {
119                let mut snapshot = snapshot();
120                if let Some(formatter) = SNAPSHOT_FORMATTER.get() {
121                    snapshot = formatter.format(snapshot);
122                }
123                return rouille::Response::json(&snapshot)
124                    .with_additional_header("Access-Control-Allow-Origin", "*")
125                    .with_additional_header("Access-Control-Allow-Methods", "GET, OPTIONS")
126                    .with_additional_header("Access-Control-Allow-Headers", "Content-Type");
127            }
128            #[cfg(feature = "exporter-ui")]
129            if request.url() == "/" {
130                return rouille::Response::html(include_str!("../ll-default-view/dist/index.html"));
131            }
132            rouille::Response::empty_404()
133        })
134        .map_err(|e| e.to_string())?;
135        std::thread::Builder::new()
136            .name("ll-exporter".to_string())
137            .spawn(move || {
138                server.run();
139            })?;
140        Ok(())
141    }
142}
143
144/// Logical step in the line
145pub struct Step<'p, INPUT> {
146    active: bool,
147    input: Option<INPUT>,
148    processor: Option<&'p mut Processor>,
149    line_name: Option<Cow<'static, str>>,
150}
151
152/// Operation helpers
153pub mod ops {
154
155    /// Logical NOT operation. In case if the input is `Some`, returns `None`, otherwise returns
156    /// `Some(())`
157    pub fn not(input: Option<()>) -> Option<()> {
158        if input.is_some() {
159            None
160        } else {
161            Some(())
162        }
163    }
164}
165
166#[cfg(feature = "recording")]
167/// When the recording feature is enabled, inputs must implement the [`serde::Serialize`] trait
168pub trait StepInput: Serialize {}
169
170#[cfg(feature = "recording")]
171impl<T> StepInput for T where T: Serialize {}
172
173#[cfg(not(feature = "recording"))]
174/// When the recording feature is disabled, the trait is empty
175pub trait StepInput {}
176
177#[cfg(not(feature = "recording"))]
178impl<T> StepInput for T {}
179
180impl<'p, INPUT> Step<'p, INPUT>
181where
182    INPUT: StepInput,
183{
184    /// Returns if the step is active (can be passed)
185    pub fn is_active(&self) -> bool {
186        self.active
187    }
188    /// Creates a new step
189    pub fn new(value: INPUT) -> Self {
190        Step {
191            input: Some(value),
192            active: true,
193            processor: None,
194            line_name: None,
195        }
196    }
197
198    #[cfg(feature = "recording")]
199    fn processor_is_recording(&self) -> bool {
200        self.processor
201            .as_ref()
202            .is_some_and(|processor| processor.is_recording())
203    }
204
205    #[cfg(feature = "recording")]
206    fn line_state_mut(&mut self) -> Option<&mut LineState> {
207        let processor = self.processor.as_mut()?;
208        let line_name = self.line_name.as_ref()?;
209        processor.result.get_mut(line_name)
210    }
211
212    /// Passes the step in case if any of the actions returns `Some`
213    #[allow(clippy::missing_panics_doc)]
214    pub fn then_any<OUTPUT, A, A2, F, F2>(mut self, action1: A, action2: A2) -> Step<'p, OUTPUT>
215    where
216        A: Into<Action<'p, F, INPUT, OUTPUT>>,
217        F: FnOnce(INPUT) -> Option<OUTPUT>,
218        A2: Into<Action<'p, F2, INPUT, OUTPUT>>,
219        F: FnOnce(INPUT) -> Option<OUTPUT>,
220        F2: FnOnce(INPUT) -> Option<OUTPUT>,
221        INPUT: Clone,
222    {
223        #[allow(unused_mut)]
224        let mut action1 = action1.into();
225        #[cfg(feature = "recording")]
226        let input_kind1 = action1.input_kind();
227        #[cfg(feature = "recording")]
228        let recorded_input1 = self
229            .processor_is_recording()
230            .then(|| action1.take_recorded_input_serialized(self.input.as_ref()))
231            .unwrap_or_default();
232        #[allow(unused_mut)]
233        let mut action2 = action2.into();
234        #[cfg(feature = "recording")]
235        let input_kind2 = action2.input_kind();
236        #[cfg(feature = "recording")]
237        let recorded_input2 = self
238            .processor_is_recording()
239            .then(|| action2.take_recorded_input_serialized(self.input.as_ref()))
240            .unwrap_or_default();
241        if !self.active || self.input.is_none() {
242            #[cfg(feature = "recording")]
243            {
244                if let Some(l) = self.line_state_mut() {
245                    let step_states = vec![
246                        StepStateInfo::new(action1.name, None::<()>, input_kind1, false),
247                        StepStateInfo::new(action2.name, None::<()>, input_kind2, false),
248                    ];
249                    l.extend(step_states);
250                }
251            }
252            return Step {
253                input: None,
254                active: false,
255                processor: self.processor,
256                line_name: self.line_name,
257            };
258        }
259        let action_input = self.input.take().unwrap();
260        let mut next_input = None;
261        #[cfg(feature = "recording")]
262        let mut step_states = Vec::with_capacity(2);
263        if let Some(output) = (action1.f)(action_input.clone()) {
264            next_input = Some(output);
265            #[cfg(feature = "recording")]
266            step_states.push(StepStateInfo::new_with_serialized_input(
267                action1.name,
268                recorded_input1,
269                input_kind1,
270                true,
271            ));
272        } else {
273            #[cfg(feature = "recording")]
274            step_states.push(StepStateInfo::new_with_serialized_input(
275                action1.name,
276                recorded_input1,
277                input_kind1,
278                false,
279            ));
280        }
281        if let Some(output) = (action2.f)(action_input) {
282            if next_input.is_none() {
283                next_input = Some(output);
284            }
285            #[cfg(feature = "recording")]
286            step_states.push(StepStateInfo::new_with_serialized_input(
287                action2.name,
288                recorded_input2,
289                input_kind2,
290                true,
291            ));
292        } else {
293            #[cfg(feature = "recording")]
294            step_states.push(StepStateInfo::new_with_serialized_input(
295                action2.name,
296                recorded_input2,
297                input_kind2,
298                false,
299            ));
300        }
301        #[cfg(feature = "recording")]
302        if let Some(l) = self.line_state_mut() {
303            l.extend(step_states);
304        }
305        Step {
306            active: next_input.is_some(),
307            input: next_input,
308            processor: self.processor,
309            line_name: self.line_name,
310        }
311    }
312
313    /// Passes the step in case if the action returns `Some`
314    #[allow(clippy::missing_panics_doc)]
315    pub fn then<OUTPUT, A, F>(mut self, action: A) -> Step<'p, OUTPUT>
316    where
317        A: Into<Action<'p, F, INPUT, OUTPUT>>,
318        F: FnOnce(INPUT) -> Option<OUTPUT>,
319    {
320        #[allow(unused_mut)]
321        let mut action = action.into();
322        #[cfg(feature = "recording")]
323        let input_kind = action.input_kind();
324        #[cfg(feature = "recording")]
325        macro_rules! record_processed {
326            ($name:expr, $passed:expr, $input:expr) => {
327                if let Some(l) = self.line_state_mut() {
328                    l.push_step_state(action.name, $input, input_kind, $passed);
329                }
330            };
331        }
332        if !self.active || self.input.is_none() {
333            #[cfg(feature = "recording")]
334            record_processed!(action.name, false, Value::Null);
335            return Step {
336                input: None,
337                active: false,
338                processor: self.processor,
339                line_name: self.line_name,
340            };
341        }
342        #[cfg(feature = "recording")]
343        let recorded_input = self
344            .processor_is_recording()
345            .then(|| action.take_recorded_input_serialized(self.input.as_ref()))
346            .unwrap_or_default();
347        if let Some(output) = (action.f)(self.input.take().unwrap()) {
348            #[cfg(feature = "recording")]
349            record_processed!(action.name, true, recorded_input);
350            Step {
351                input: Some(output),
352                active: true,
353                processor: self.processor,
354                line_name: self.line_name,
355            }
356        } else {
357            #[cfg(feature = "recording")]
358            record_processed!(action.name, false, recorded_input);
359            Step {
360                input: None,
361                active: false,
362                processor: self.processor,
363                line_name: self.line_name,
364            }
365        }
366    }
367}
368
369#[allow(dead_code)]
370/// Action is a function wrapper that can be used in a step
371pub struct Action<'a, F, INPUT, OUTPUT>
372where
373    F: FnOnce(INPUT) -> Option<OUTPUT>,
374{
375    f: F,
376    name: Cow<'static, str>,
377    #[cfg(feature = "recording")]
378    recorded_input: Option<&'a dyn erased_serde::Serialize>,
379    #[cfg(not(feature = "recording"))]
380    _recorded_input: PhantomData<&'a ()>,
381    _input: PhantomData<INPUT>,
382}
383
384impl<F, INPUT, OUTPUT> From<F> for Action<'_, F, INPUT, OUTPUT>
385where
386    F: FnOnce(INPUT) -> Option<OUTPUT>,
387{
388    fn from(function: F) -> Self {
389        Action::new("", function)
390    }
391}
392
393/// Creates a new action, in case if the name is not provided, the function name will be used as
394/// the name of the action (in case of closures it is recommended to always provide a name to get
395/// it clear and readable).
396#[macro_export]
397macro_rules! action {
398    ($f: expr) => {
399        $crate::Action::new(stringify!($f), $f)
400    };
401    ($name: expr, $f: expr) => {
402        $crate::Action::new($name, $f)
403    };
404}
405
406impl<'a, F, INPUT, OUTPUT> Action<'a, F, INPUT, OUTPUT>
407where
408    F: FnOnce(INPUT) -> Option<OUTPUT>,
409{
410    /// Creates a new action
411    pub fn new(name: impl Into<Cow<'static, str>>, f: F) -> Self {
412        Action {
413            f,
414            name: name.into(),
415            #[cfg(feature = "recording")]
416            recorded_input: None,
417            #[cfg(not(feature = "recording"))]
418            _recorded_input: PhantomData,
419            _input: PhantomData,
420        }
421    }
422    /// Sets the recorded (actual) input for the action function
423    #[cfg(feature = "recording")]
424    pub fn with_recorded_input<V>(mut self, input: &'a V) -> Self
425    where
426        V: Serialize,
427    {
428        self.recorded_input = Some(input);
429        self
430    }
431    #[cfg(not(feature = "recording"))]
432    #[allow(unused_mut)]
433    /// When the recording feature is disabled, this function does nothing
434    pub fn with_recorded_input<V>(mut self, _input: &'a V) -> Self {
435        self
436    }
437    #[cfg(feature = "recording")]
438    // WARNING: must be called before the input is taken
439    fn input_kind(&self) -> InputKind {
440        if self.recorded_input.is_some() {
441            InputKind::External
442        } else {
443            InputKind::Flow
444        }
445    }
446    #[cfg(feature = "recording")]
447    fn take_recorded_input_serialized(&mut self, fallback: Option<&INPUT>) -> Value
448    where
449        INPUT: StepInput,
450    {
451        if let Some(i) = self.recorded_input.take() {
452            serde_json::to_value(i).unwrap_or_default()
453        } else {
454            serde_json::to_value(fallback).unwrap_or_default()
455        }
456    }
457}
458
459#[derive(Default, Debug, Clone)]
460#[cfg_attr(feature = "recording", derive(Serialize, Deserialize))]
461/// State of the process or a group of logic lines. Acts as a factory for [`Processor`] instances.
462/// Shares recording state with the created processors.
463pub struct Rack {
464    #[cfg(feature = "recording")]
465    lines: BTreeMap<Cow<'static, str>, LineState>,
466    #[serde(skip)]
467    #[cfg(feature = "recording")]
468    recording: Arc<atomic::AtomicBool>,
469}
470
471impl Rack {
472    /// Creates a new state
473    pub fn new() -> Self {
474        Self::default()
475    }
476    /// Record the state of the lines and reset the processor
477    #[allow(unused_variables)]
478    pub fn ingress(&mut self, processor: &mut Processor) {
479        #[cfg(feature = "recording")]
480        self.lines.extend(mem::take(&mut processor.result));
481        #[cfg(not(feature = "recording"))]
482        processor.reset();
483    }
484    /// Returns the state of the line
485    #[cfg(feature = "recording")]
486    pub fn line_state(&self, name: &str) -> Option<&LineState> {
487        self.lines.get(name)
488    }
489    /// Returns all states of the lines
490    #[cfg(feature = "recording")]
491    pub fn lines(&self) -> &BTreeMap<Cow<'static, str>, LineState> {
492        &self.lines
493    }
494    /// Creates a snapshot of the current state of the lines
495    #[cfg(feature = "recording")]
496    pub fn snapshot(&self) -> Snapshot {
497        Snapshot {
498            lines: self.lines.clone(),
499        }
500    }
501    /// Creates a filtered snapshot of the current state of the lines
502    #[cfg(feature = "recording")]
503    pub fn snapshot_filtered<P>(&self, predicate: P) -> Snapshot
504    where
505        P: Fn(&LineState) -> bool,
506    {
507        let lines = self
508            .lines
509            .iter()
510            .filter(|(_, line)| predicate(line))
511            .map(|(name, line)| (name.clone(), line.clone()))
512            .collect();
513        Snapshot { lines }
514    }
515    /// Creates a new processor
516    pub fn processor(&self) -> Processor {
517        Processor {
518            #[cfg(feature = "recording")]
519            result: BTreeMap::new(),
520            #[cfg(feature = "recording")]
521            recording: Arc::clone(&self.recording),
522        }
523    }
524
525    /// Enables recording for the state
526    #[cfg(feature = "recording")]
527    pub fn with_recording_enabled(self) -> Self {
528        self.recording.store(true, atomic::Ordering::SeqCst);
529        self
530    }
531
532    /// Sets the recording state for the state
533    #[cfg(feature = "recording")]
534    pub fn set_recording(&mut self, recording: bool) {
535        self.recording.store(recording, atomic::Ordering::SeqCst);
536    }
537
538    /// Returns `true` if the rack is recording
539    #[cfg(feature = "recording")]
540    pub fn is_recording(&self) -> bool {
541        self.recording.load(atomic::Ordering::SeqCst)
542    }
543}
544
545/// Processor is an instance which creates logical lines
546#[derive(Default)]
547pub struct Processor {
548    #[cfg(feature = "recording")]
549    result: BTreeMap<Cow<'static, str>, LineState>,
550    #[cfg(feature = "recording")]
551    recording: Arc<atomic::AtomicBool>,
552}
553
554impl Processor {
555    /// Creates a new processor (state-independent)
556    pub fn new() -> Self {
557        Self::default()
558    }
559    /// Resets the processor recordings
560    pub fn reset(&mut self) {
561        #[cfg(feature = "recording")]
562        self.result.clear();
563    }
564    /// Returns the state of the line
565    #[cfg(feature = "recording")]
566    pub fn line_state(&self, name: &str) -> Option<&LineState> {
567        self.result.get(name)
568    }
569    /// Returns `true` if the processor is recording
570    #[cfg(feature = "recording")]
571    pub fn is_recording(&self) -> bool {
572        self.recording.load(atomic::Ordering::SeqCst)
573    }
574    /// Creates a new logical line
575    pub fn line<INPUT>(&mut self, name: impl Into<Cow<'static, str>>, input: INPUT) -> Step<INPUT> {
576        let name = name.into();
577        #[cfg(feature = "recording")]
578        if self.is_recording() {
579            match self.result.entry(name.clone()) {
580                btree_map::Entry::Vacant(entry) => {
581                    entry.insert(LineState::new(name.clone()));
582                }
583                btree_map::Entry::Occupied(mut entry) => {
584                    entry.get_mut().clear();
585                }
586            }
587        }
588        Step {
589            input: Some(input),
590            active: true,
591            processor: Some(self),
592            line_name: Some(name),
593        }
594    }
595}
596
597#[cfg(test)]
598mod test {
599    use super::Rack;
600    #[cfg(feature = "recording")]
601    use serde::Serialize;
602
603    #[test]
604    fn test_lines() {
605        #[allow(clippy::cast_lossless, clippy::unnecessary_wraps)]
606        fn get_temp1(data: &ModbusData) -> Option<f32> {
607            Some(data.temperature_1 as f32 / 10.0)
608        }
609
610        #[allow(clippy::cast_lossless, clippy::unnecessary_wraps)]
611        fn get_temp2(data: &ModbusData) -> Option<f32> {
612            Some(data.temperature_2 as f32 / 10.0)
613        }
614
615        #[allow(clippy::unnecessary_wraps)]
616        fn temperature_critical(temp: f32) -> Option<()> {
617            if temp > 90. {
618                Some(())
619            } else {
620                None
621            }
622        }
623
624        fn voltage_critical(voltage: u16) -> Option<()> {
625            if voltage > 300 {
626                Some(())
627            } else {
628                None
629            }
630        }
631
632        #[cfg_attr(feature = "recording", derive(Serialize))]
633        struct ModbusData {
634            temperature_1: u16,
635            voltage_1: u16,
636            temperature_2: u16,
637            voltage_2: u16,
638        }
639
640        let modbus_data = ModbusData {
641            temperature_1: 950,
642            voltage_1: 395,
643            temperature_2: 250,
644            voltage_2: 295,
645        };
646        let mut state = Rack::new();
647        let mut processor = state.processor();
648        let mut line1_active = true;
649        let mut line2_active = true;
650        #[cfg(feature = "recording")]
651        state.set_recording(true);
652        assert!(processor
653            .line("line1", &modbus_data)
654            .then(action!(get_temp1))
655            .then(action!(temperature_critical))
656            .then(
657                action!("voltage", |()| Some(modbus_data.voltage_1))
658                    .with_recorded_input(&modbus_data.voltage_1)
659            )
660            .then(action!(voltage_critical))
661            .then(action!("OFF", |()| {
662                line1_active = false;
663                Some(())
664            }))
665            .is_active());
666        assert!(!processor
667            .line("line2", &modbus_data)
668            .then(get_temp2)
669            .then(action!(temperature_critical))
670            .then(
671                action!("voltage", |()| Some(modbus_data.voltage_2))
672                    .with_recorded_input(&modbus_data.voltage_2)
673            )
674            .then(action!(voltage_critical))
675            .then(action!("OFF", |()| {
676                line2_active = false;
677                Some(())
678            }))
679            .is_active());
680        assert!(!line1_active);
681        assert!(line2_active);
682        state.ingress(&mut processor);
683    }
684}