1#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
2#![deny(missing_docs)]
3
4use std::{borrow::Cow, marker::PhantomData};
5#[cfg(feature = "recording")]
6use std::{
7 collections::{btree_map, BTreeMap},
8 mem,
9 sync::{atomic, Arc},
10};
11
12#[cfg(feature = "recording")]
13use serde::{Deserialize, Serialize};
14
15#[cfg(feature = "recording")]
16mod recording;
17#[cfg(feature = "recording")]
18pub use recording::{InputKind, LineState, Snapshot, SnapshotFormatter, StepState, StepStateInfo};
19#[cfg(feature = "recording")]
20use serde_json::Value;
21
22pub mod global {
24 #[cfg(feature = "exporter")]
25 const DEFAULT_PORT: u16 = 9001;
26
27 #[cfg(feature = "exporter")]
28 use std::net::{IpAddr, ToSocketAddrs};
29
30 use super::{Processor, Rack};
31 use once_cell::sync::Lazy;
32
33 #[cfg(feature = "locking-default")]
34 use parking_lot::Mutex;
35
36 #[cfg(feature = "locking-rt")]
37 use parking_lot_rt::Mutex;
38
39 #[cfg(feature = "locking-rt-safe")]
40 use rtsc::pi::Mutex;
41
42 static GLOBAL_LADDER: Lazy<Mutex<Rack>> = Lazy::new(|| Mutex::new(Rack::new()));
43
44 #[cfg(all(feature = "recording", feature = "exporter"))]
45 static SNAPSHOT_FORMATTER: once_cell::sync::OnceCell<
46 Box<dyn super::recording::SnapshotFormatter>,
47 > = once_cell::sync::OnceCell::new();
48
49 #[cfg(all(feature = "recording", feature = "exporter"))]
50 pub fn set_snapshot_formatter(formatter: Box<dyn super::recording::SnapshotFormatter>) {
56 SNAPSHOT_FORMATTER
57 .set(formatter)
58 .unwrap_or_else(|_| panic!("Snapshot formatter already set"));
59 }
60
61 #[cfg(feature = "recording")]
63 pub fn set_recording(recording: bool) {
64 GLOBAL_LADDER.lock().set_recording(recording);
65 }
66
67 #[cfg(feature = "recording")]
69 pub fn is_recording() -> bool {
70 GLOBAL_LADDER.lock().is_recording()
71 }
72
73 #[cfg(feature = "recording")]
75 pub fn snapshot() -> super::Snapshot {
76 GLOBAL_LADDER.lock().snapshot()
77 }
78 #[cfg(feature = "recording")]
81 pub fn snapshot_filtered<P>(predicate: P) -> super::Snapshot
82 where
83 P: Fn(&super::LineState) -> bool,
84 {
85 GLOBAL_LADDER.lock().snapshot_filtered(predicate)
86 }
87
88 pub fn processor() -> Processor {
90 GLOBAL_LADDER.lock().processor()
91 }
92
93 #[cfg(feature = "recording")]
95 pub fn ingress(processor: &mut Processor) {
96 GLOBAL_LADDER.lock().ingress(processor);
97 }
98
99 #[cfg(not(feature = "recording"))]
100 pub fn ingress(_processor: &mut Processor) {}
102
103 #[cfg(feature = "exporter")]
105 pub fn install_exporter() -> Result<(), Box<dyn std::error::Error>> {
106 install_exporter_on((IpAddr::from([0, 0, 0, 0]), DEFAULT_PORT))
107 }
108
109 #[cfg(feature = "exporter")]
111 pub fn install_exporter_on<A: ToSocketAddrs>(
112 addr: A,
113 ) -> Result<(), Box<dyn std::error::Error>> {
114 let server = rouille::Server::new(addr, move |request| {
115 if request.method() != "GET" {
116 return rouille::Response::empty_406();
117 }
118 if request.url() == "/state" {
119 let mut snapshot = snapshot();
120 if let Some(formatter) = SNAPSHOT_FORMATTER.get() {
121 snapshot = formatter.format(snapshot);
122 }
123 return rouille::Response::json(&snapshot)
124 .with_additional_header("Access-Control-Allow-Origin", "*")
125 .with_additional_header("Access-Control-Allow-Methods", "GET, OPTIONS")
126 .with_additional_header("Access-Control-Allow-Headers", "Content-Type");
127 }
128 #[cfg(feature = "exporter-ui")]
129 if request.url() == "/" {
130 return rouille::Response::html(include_str!("../ll-default-view/dist/index.html"));
131 }
132 rouille::Response::empty_404()
133 })
134 .map_err(|e| e.to_string())?;
135 std::thread::Builder::new()
136 .name("ll-exporter".to_string())
137 .spawn(move || {
138 server.run();
139 })?;
140 Ok(())
141 }
142}
143
144pub struct Step<'p, INPUT> {
146 active: bool,
147 input: Option<INPUT>,
148 processor: Option<&'p mut Processor>,
149 line_name: Option<Cow<'static, str>>,
150}
151
152pub mod ops {
154
155 pub fn not(input: Option<()>) -> Option<()> {
158 if input.is_some() {
159 None
160 } else {
161 Some(())
162 }
163 }
164}
165
166#[cfg(feature = "recording")]
167pub trait StepInput: Serialize {}
169
170#[cfg(feature = "recording")]
171impl<T> StepInput for T where T: Serialize {}
172
173#[cfg(not(feature = "recording"))]
174pub trait StepInput {}
176
177#[cfg(not(feature = "recording"))]
178impl<T> StepInput for T {}
179
180impl<'p, INPUT> Step<'p, INPUT>
181where
182 INPUT: StepInput,
183{
184 pub fn is_active(&self) -> bool {
186 self.active
187 }
188 pub fn new(value: INPUT) -> Self {
190 Step {
191 input: Some(value),
192 active: true,
193 processor: None,
194 line_name: None,
195 }
196 }
197
198 #[cfg(feature = "recording")]
199 fn processor_is_recording(&self) -> bool {
200 self.processor
201 .as_ref()
202 .is_some_and(|processor| processor.is_recording())
203 }
204
205 #[cfg(feature = "recording")]
206 fn line_state_mut(&mut self) -> Option<&mut LineState> {
207 let processor = self.processor.as_mut()?;
208 let line_name = self.line_name.as_ref()?;
209 processor.result.get_mut(line_name)
210 }
211
212 #[allow(clippy::missing_panics_doc)]
214 pub fn then_any<OUTPUT, A, A2, F, F2>(mut self, action1: A, action2: A2) -> Step<'p, OUTPUT>
215 where
216 A: Into<Action<'p, F, INPUT, OUTPUT>>,
217 F: FnOnce(INPUT) -> Option<OUTPUT>,
218 A2: Into<Action<'p, F2, INPUT, OUTPUT>>,
219 F: FnOnce(INPUT) -> Option<OUTPUT>,
220 F2: FnOnce(INPUT) -> Option<OUTPUT>,
221 INPUT: Clone,
222 {
223 #[allow(unused_mut)]
224 let mut action1 = action1.into();
225 #[cfg(feature = "recording")]
226 let input_kind1 = action1.input_kind();
227 #[cfg(feature = "recording")]
228 let recorded_input1 = self
229 .processor_is_recording()
230 .then(|| action1.take_recorded_input_serialized(self.input.as_ref()))
231 .unwrap_or_default();
232 #[allow(unused_mut)]
233 let mut action2 = action2.into();
234 #[cfg(feature = "recording")]
235 let input_kind2 = action2.input_kind();
236 #[cfg(feature = "recording")]
237 let recorded_input2 = self
238 .processor_is_recording()
239 .then(|| action2.take_recorded_input_serialized(self.input.as_ref()))
240 .unwrap_or_default();
241 if !self.active || self.input.is_none() {
242 #[cfg(feature = "recording")]
243 {
244 if let Some(l) = self.line_state_mut() {
245 let step_states = vec![
246 StepStateInfo::new(action1.name, None::<()>, input_kind1, false),
247 StepStateInfo::new(action2.name, None::<()>, input_kind2, false),
248 ];
249 l.extend(step_states);
250 }
251 }
252 return Step {
253 input: None,
254 active: false,
255 processor: self.processor,
256 line_name: self.line_name,
257 };
258 }
259 let action_input = self.input.take().unwrap();
260 let mut next_input = None;
261 #[cfg(feature = "recording")]
262 let mut step_states = Vec::with_capacity(2);
263 if let Some(output) = (action1.f)(action_input.clone()) {
264 next_input = Some(output);
265 #[cfg(feature = "recording")]
266 step_states.push(StepStateInfo::new_with_serialized_input(
267 action1.name,
268 recorded_input1,
269 input_kind1,
270 true,
271 ));
272 } else {
273 #[cfg(feature = "recording")]
274 step_states.push(StepStateInfo::new_with_serialized_input(
275 action1.name,
276 recorded_input1,
277 input_kind1,
278 false,
279 ));
280 }
281 if let Some(output) = (action2.f)(action_input) {
282 if next_input.is_none() {
283 next_input = Some(output);
284 }
285 #[cfg(feature = "recording")]
286 step_states.push(StepStateInfo::new_with_serialized_input(
287 action2.name,
288 recorded_input2,
289 input_kind2,
290 true,
291 ));
292 } else {
293 #[cfg(feature = "recording")]
294 step_states.push(StepStateInfo::new_with_serialized_input(
295 action2.name,
296 recorded_input2,
297 input_kind2,
298 false,
299 ));
300 }
301 #[cfg(feature = "recording")]
302 if let Some(l) = self.line_state_mut() {
303 l.extend(step_states);
304 }
305 Step {
306 active: next_input.is_some(),
307 input: next_input,
308 processor: self.processor,
309 line_name: self.line_name,
310 }
311 }
312
313 #[allow(clippy::missing_panics_doc)]
315 pub fn then<OUTPUT, A, F>(mut self, action: A) -> Step<'p, OUTPUT>
316 where
317 A: Into<Action<'p, F, INPUT, OUTPUT>>,
318 F: FnOnce(INPUT) -> Option<OUTPUT>,
319 {
320 #[allow(unused_mut)]
321 let mut action = action.into();
322 #[cfg(feature = "recording")]
323 let input_kind = action.input_kind();
324 #[cfg(feature = "recording")]
325 macro_rules! record_processed {
326 ($name:expr, $passed:expr, $input:expr) => {
327 if let Some(l) = self.line_state_mut() {
328 l.push_step_state(action.name, $input, input_kind, $passed);
329 }
330 };
331 }
332 if !self.active || self.input.is_none() {
333 #[cfg(feature = "recording")]
334 record_processed!(action.name, false, Value::Null);
335 return Step {
336 input: None,
337 active: false,
338 processor: self.processor,
339 line_name: self.line_name,
340 };
341 }
342 #[cfg(feature = "recording")]
343 let recorded_input = self
344 .processor_is_recording()
345 .then(|| action.take_recorded_input_serialized(self.input.as_ref()))
346 .unwrap_or_default();
347 if let Some(output) = (action.f)(self.input.take().unwrap()) {
348 #[cfg(feature = "recording")]
349 record_processed!(action.name, true, recorded_input);
350 Step {
351 input: Some(output),
352 active: true,
353 processor: self.processor,
354 line_name: self.line_name,
355 }
356 } else {
357 #[cfg(feature = "recording")]
358 record_processed!(action.name, false, recorded_input);
359 Step {
360 input: None,
361 active: false,
362 processor: self.processor,
363 line_name: self.line_name,
364 }
365 }
366 }
367}
368
369#[allow(dead_code)]
370pub struct Action<'a, F, INPUT, OUTPUT>
372where
373 F: FnOnce(INPUT) -> Option<OUTPUT>,
374{
375 f: F,
376 name: Cow<'static, str>,
377 #[cfg(feature = "recording")]
378 recorded_input: Option<&'a dyn erased_serde::Serialize>,
379 #[cfg(not(feature = "recording"))]
380 _recorded_input: PhantomData<&'a ()>,
381 _input: PhantomData<INPUT>,
382}
383
384impl<F, INPUT, OUTPUT> From<F> for Action<'_, F, INPUT, OUTPUT>
385where
386 F: FnOnce(INPUT) -> Option<OUTPUT>,
387{
388 fn from(function: F) -> Self {
389 Action::new("", function)
390 }
391}
392
393#[macro_export]
397macro_rules! action {
398 ($f: expr) => {
399 $crate::Action::new(stringify!($f), $f)
400 };
401 ($name: expr, $f: expr) => {
402 $crate::Action::new($name, $f)
403 };
404}
405
406impl<'a, F, INPUT, OUTPUT> Action<'a, F, INPUT, OUTPUT>
407where
408 F: FnOnce(INPUT) -> Option<OUTPUT>,
409{
410 pub fn new(name: impl Into<Cow<'static, str>>, f: F) -> Self {
412 Action {
413 f,
414 name: name.into(),
415 #[cfg(feature = "recording")]
416 recorded_input: None,
417 #[cfg(not(feature = "recording"))]
418 _recorded_input: PhantomData,
419 _input: PhantomData,
420 }
421 }
422 #[cfg(feature = "recording")]
424 pub fn with_recorded_input<V>(mut self, input: &'a V) -> Self
425 where
426 V: Serialize,
427 {
428 self.recorded_input = Some(input);
429 self
430 }
431 #[cfg(not(feature = "recording"))]
432 #[allow(unused_mut)]
433 pub fn with_recorded_input<V>(mut self, _input: &'a V) -> Self {
435 self
436 }
437 #[cfg(feature = "recording")]
438 fn input_kind(&self) -> InputKind {
440 if self.recorded_input.is_some() {
441 InputKind::External
442 } else {
443 InputKind::Flow
444 }
445 }
446 #[cfg(feature = "recording")]
447 fn take_recorded_input_serialized(&mut self, fallback: Option<&INPUT>) -> Value
448 where
449 INPUT: StepInput,
450 {
451 if let Some(i) = self.recorded_input.take() {
452 serde_json::to_value(i).unwrap_or_default()
453 } else {
454 serde_json::to_value(fallback).unwrap_or_default()
455 }
456 }
457}
458
459#[derive(Default, Debug, Clone)]
460#[cfg_attr(feature = "recording", derive(Serialize, Deserialize))]
461pub struct Rack {
464 #[cfg(feature = "recording")]
465 lines: BTreeMap<Cow<'static, str>, LineState>,
466 #[serde(skip)]
467 #[cfg(feature = "recording")]
468 recording: Arc<atomic::AtomicBool>,
469}
470
471impl Rack {
472 pub fn new() -> Self {
474 Self::default()
475 }
476 #[allow(unused_variables)]
478 pub fn ingress(&mut self, processor: &mut Processor) {
479 #[cfg(feature = "recording")]
480 self.lines.extend(mem::take(&mut processor.result));
481 #[cfg(not(feature = "recording"))]
482 processor.reset();
483 }
484 #[cfg(feature = "recording")]
486 pub fn line_state(&self, name: &str) -> Option<&LineState> {
487 self.lines.get(name)
488 }
489 #[cfg(feature = "recording")]
491 pub fn lines(&self) -> &BTreeMap<Cow<'static, str>, LineState> {
492 &self.lines
493 }
494 #[cfg(feature = "recording")]
496 pub fn snapshot(&self) -> Snapshot {
497 Snapshot {
498 lines: self.lines.clone(),
499 }
500 }
501 #[cfg(feature = "recording")]
503 pub fn snapshot_filtered<P>(&self, predicate: P) -> Snapshot
504 where
505 P: Fn(&LineState) -> bool,
506 {
507 let lines = self
508 .lines
509 .iter()
510 .filter(|(_, line)| predicate(line))
511 .map(|(name, line)| (name.clone(), line.clone()))
512 .collect();
513 Snapshot { lines }
514 }
515 pub fn processor(&self) -> Processor {
517 Processor {
518 #[cfg(feature = "recording")]
519 result: BTreeMap::new(),
520 #[cfg(feature = "recording")]
521 recording: Arc::clone(&self.recording),
522 }
523 }
524
525 #[cfg(feature = "recording")]
527 pub fn with_recording_enabled(self) -> Self {
528 self.recording.store(true, atomic::Ordering::SeqCst);
529 self
530 }
531
532 #[cfg(feature = "recording")]
534 pub fn set_recording(&mut self, recording: bool) {
535 self.recording.store(recording, atomic::Ordering::SeqCst);
536 }
537
538 #[cfg(feature = "recording")]
540 pub fn is_recording(&self) -> bool {
541 self.recording.load(atomic::Ordering::SeqCst)
542 }
543}
544
545#[derive(Default)]
547pub struct Processor {
548 #[cfg(feature = "recording")]
549 result: BTreeMap<Cow<'static, str>, LineState>,
550 #[cfg(feature = "recording")]
551 recording: Arc<atomic::AtomicBool>,
552}
553
554impl Processor {
555 pub fn new() -> Self {
557 Self::default()
558 }
559 pub fn reset(&mut self) {
561 #[cfg(feature = "recording")]
562 self.result.clear();
563 }
564 #[cfg(feature = "recording")]
566 pub fn line_state(&self, name: &str) -> Option<&LineState> {
567 self.result.get(name)
568 }
569 #[cfg(feature = "recording")]
571 pub fn is_recording(&self) -> bool {
572 self.recording.load(atomic::Ordering::SeqCst)
573 }
574 pub fn line<INPUT>(&mut self, name: impl Into<Cow<'static, str>>, input: INPUT) -> Step<INPUT> {
576 let name = name.into();
577 #[cfg(feature = "recording")]
578 if self.is_recording() {
579 match self.result.entry(name.clone()) {
580 btree_map::Entry::Vacant(entry) => {
581 entry.insert(LineState::new(name.clone()));
582 }
583 btree_map::Entry::Occupied(mut entry) => {
584 entry.get_mut().clear();
585 }
586 }
587 }
588 Step {
589 input: Some(input),
590 active: true,
591 processor: Some(self),
592 line_name: Some(name),
593 }
594 }
595}
596
597#[cfg(test)]
598mod test {
599 use super::Rack;
600 #[cfg(feature = "recording")]
601 use serde::Serialize;
602
603 #[test]
604 fn test_lines() {
605 #[allow(clippy::cast_lossless, clippy::unnecessary_wraps)]
606 fn get_temp1(data: &ModbusData) -> Option<f32> {
607 Some(data.temperature_1 as f32 / 10.0)
608 }
609
610 #[allow(clippy::cast_lossless, clippy::unnecessary_wraps)]
611 fn get_temp2(data: &ModbusData) -> Option<f32> {
612 Some(data.temperature_2 as f32 / 10.0)
613 }
614
615 #[allow(clippy::unnecessary_wraps)]
616 fn temperature_critical(temp: f32) -> Option<()> {
617 if temp > 90. {
618 Some(())
619 } else {
620 None
621 }
622 }
623
624 fn voltage_critical(voltage: u16) -> Option<()> {
625 if voltage > 300 {
626 Some(())
627 } else {
628 None
629 }
630 }
631
632 #[cfg_attr(feature = "recording", derive(Serialize))]
633 struct ModbusData {
634 temperature_1: u16,
635 voltage_1: u16,
636 temperature_2: u16,
637 voltage_2: u16,
638 }
639
640 let modbus_data = ModbusData {
641 temperature_1: 950,
642 voltage_1: 395,
643 temperature_2: 250,
644 voltage_2: 295,
645 };
646 let mut state = Rack::new();
647 let mut processor = state.processor();
648 let mut line1_active = true;
649 let mut line2_active = true;
650 #[cfg(feature = "recording")]
651 state.set_recording(true);
652 assert!(processor
653 .line("line1", &modbus_data)
654 .then(action!(get_temp1))
655 .then(action!(temperature_critical))
656 .then(
657 action!("voltage", |()| Some(modbus_data.voltage_1))
658 .with_recorded_input(&modbus_data.voltage_1)
659 )
660 .then(action!(voltage_critical))
661 .then(action!("OFF", |()| {
662 line1_active = false;
663 Some(())
664 }))
665 .is_active());
666 assert!(!processor
667 .line("line2", &modbus_data)
668 .then(get_temp2)
669 .then(action!(temperature_critical))
670 .then(
671 action!("voltage", |()| Some(modbus_data.voltage_2))
672 .with_recorded_input(&modbus_data.voltage_2)
673 )
674 .then(action!(voltage_critical))
675 .then(action!("OFF", |()| {
676 line2_active = false;
677 Some(())
678 }))
679 .is_active());
680 assert!(!line1_active);
681 assert!(line2_active);
682 state.ingress(&mut processor);
683 }
684}