1#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
2#![deny(missing_docs)]
3
4use std::{borrow::Cow, marker::PhantomData};
5#[cfg(feature = "recording")]
6use std::{
7 collections::{btree_map, BTreeMap},
8 mem,
9 sync::{atomic, Arc},
10};
11
12#[cfg(feature = "recording")]
13use serde::{Deserialize, Serialize};
14
15#[cfg(feature = "recording")]
16mod recording;
17#[cfg(feature = "recording")]
18pub use recording::{InputKind, LineState, Snapshot, SnapshotFormatter, StepState, StepStateInfo};
19#[cfg(feature = "recording")]
20use serde_json::Value;
21
22pub mod global {
24 #[cfg(feature = "exporter")]
25 const DEFAULT_PORT: u16 = 9001;
26
27 #[cfg(feature = "exporter")]
28 use std::net::{IpAddr, ToSocketAddrs};
29
30 use super::{Processor, Rack};
31 use once_cell::sync::Lazy;
32
33 #[cfg(feature = "locking-default")]
34 use parking_lot::Mutex;
35
36 #[cfg(feature = "locking-rt")]
37 use parking_lot_rt::Mutex;
38
39 #[cfg(feature = "locking-rt-safe")]
40 use rtsc::pi::Mutex;
41
42 static GLOBAL_LADDER: Lazy<Mutex<Rack>> = Lazy::new(|| Mutex::new(Rack::new()));
43
44 #[cfg(all(feature = "recording", feature = "exporter"))]
45 static SNAPSHOT_FORMATTER: once_cell::sync::OnceCell<
46 Box<dyn super::recording::SnapshotFormatter>,
47 > = once_cell::sync::OnceCell::new();
48
49 #[cfg(all(feature = "recording", feature = "exporter"))]
50 pub fn set_snapshot_formatter(formatter: Box<dyn super::recording::SnapshotFormatter>) {
56 SNAPSHOT_FORMATTER
57 .set(formatter)
58 .unwrap_or_else(|_| panic!("Snapshot formatter already set"));
59 }
60
61 #[cfg(feature = "recording")]
63 pub fn set_recording(recording: bool) {
64 GLOBAL_LADDER.lock().set_recording(recording);
65 }
66
67 #[cfg(feature = "recording")]
69 pub fn snapshot() -> super::Snapshot {
70 GLOBAL_LADDER.lock().snapshot()
71 }
72 #[cfg(feature = "recording")]
75 pub fn snapshot_filtered<P>(predicate: P) -> super::Snapshot
76 where
77 P: Fn(&super::LineState) -> bool,
78 {
79 GLOBAL_LADDER.lock().snapshot_filtered(predicate)
80 }
81
82 pub fn processor() -> Processor {
84 GLOBAL_LADDER.lock().processor()
85 }
86
87 #[cfg(feature = "recording")]
89 pub fn ingress(processor: &mut Processor) {
90 GLOBAL_LADDER.lock().ingress(processor);
91 }
92
93 #[cfg(not(feature = "recording"))]
94 pub fn ingress(_processor: &mut Processor) {}
96
97 #[cfg(feature = "exporter")]
99 pub fn install_exporter() -> Result<(), Box<dyn std::error::Error>> {
100 install_exporter_on((IpAddr::from([0, 0, 0, 0]), DEFAULT_PORT))
101 }
102
103 #[cfg(feature = "exporter")]
105 pub fn install_exporter_on<A: ToSocketAddrs>(
106 addr: A,
107 ) -> Result<(), Box<dyn std::error::Error>> {
108 let server = rouille::Server::new(addr, move |request| {
109 if request.method() != "GET" {
110 return rouille::Response::empty_406();
111 }
112 if request.url() == "/state" {
113 let mut snapshot = snapshot();
114 if let Some(formatter) = SNAPSHOT_FORMATTER.get() {
115 snapshot = formatter.format(snapshot);
116 }
117 return rouille::Response::json(&snapshot)
118 .with_additional_header("Access-Control-Allow-Origin", "*")
119 .with_additional_header("Access-Control-Allow-Methods", "GET, OPTIONS")
120 .with_additional_header("Access-Control-Allow-Headers", "Content-Type");
121 }
122 #[cfg(feature = "exporter-ui")]
123 if request.url() == "/" {
124 return rouille::Response::html(include_str!("../ll-default-view/dist/index.html"));
125 }
126 rouille::Response::empty_404()
127 })
128 .map_err(|e| e.to_string())?;
129 std::thread::Builder::new()
130 .name("ll-exporter".to_string())
131 .spawn(move || {
132 server.run();
133 })?;
134 Ok(())
135 }
136}
137
138pub struct Step<'p, INPUT> {
140 active: bool,
141 input: Option<INPUT>,
142 processor: Option<&'p mut Processor>,
143 line_name: Option<Cow<'static, str>>,
144}
145
146pub mod ops {
148
149 pub fn not(input: Option<()>) -> Option<()> {
152 if input.is_some() {
153 None
154 } else {
155 Some(())
156 }
157 }
158}
159
160#[cfg(feature = "recording")]
161pub trait StepInput: Serialize {}
163
164#[cfg(feature = "recording")]
165impl<T> StepInput for T where T: Serialize {}
166
167#[cfg(not(feature = "recording"))]
168pub trait StepInput {}
170
171#[cfg(not(feature = "recording"))]
172impl<T> StepInput for T {}
173
174impl<'p, INPUT> Step<'p, INPUT>
175where
176 INPUT: StepInput,
177{
178 pub fn is_active(&self) -> bool {
180 self.active
181 }
182 pub fn new(value: INPUT) -> Self {
184 Step {
185 input: Some(value),
186 active: true,
187 processor: None,
188 line_name: None,
189 }
190 }
191
192 #[cfg(feature = "recording")]
193 fn processor_is_recording(&self) -> bool {
194 self.processor
195 .as_ref()
196 .is_some_and(|processor| processor.is_recording())
197 }
198
199 #[cfg(feature = "recording")]
200 fn line_state_mut(&mut self) -> Option<&mut LineState> {
201 let processor = self.processor.as_mut()?;
202 let line_name = self.line_name.as_ref()?;
203 processor.result.get_mut(line_name)
204 }
205
206 #[allow(clippy::missing_panics_doc)]
208 pub fn then_any<OUTPUT, A, A2, F, F2>(mut self, action1: A, action2: A2) -> Step<'p, OUTPUT>
209 where
210 A: Into<Action<'p, F, INPUT, OUTPUT>>,
211 F: FnOnce(INPUT) -> Option<OUTPUT>,
212 A2: Into<Action<'p, F2, INPUT, OUTPUT>>,
213 F: FnOnce(INPUT) -> Option<OUTPUT>,
214 F2: FnOnce(INPUT) -> Option<OUTPUT>,
215 INPUT: Clone,
216 {
217 #[allow(unused_mut)]
218 let mut action1 = action1.into();
219 #[cfg(feature = "recording")]
220 let input_kind1 = action1.input_kind();
221 #[cfg(feature = "recording")]
222 let recorded_input1 = self
223 .processor_is_recording()
224 .then(|| action1.take_recorded_input_serialized(self.input.as_ref()))
225 .unwrap_or_default();
226 #[allow(unused_mut)]
227 let mut action2 = action2.into();
228 #[cfg(feature = "recording")]
229 let input_kind2 = action2.input_kind();
230 #[cfg(feature = "recording")]
231 let recorded_input2 = self
232 .processor_is_recording()
233 .then(|| action2.take_recorded_input_serialized(self.input.as_ref()))
234 .unwrap_or_default();
235 if !self.active || self.input.is_none() {
236 #[cfg(feature = "recording")]
237 {
238 if let Some(l) = self.line_state_mut() {
239 let step_states = vec![
240 StepStateInfo::new(action1.name, None::<()>, input_kind1, false),
241 StepStateInfo::new(action2.name, None::<()>, input_kind2, false),
242 ];
243 l.extend(step_states);
244 }
245 }
246 return Step {
247 input: None,
248 active: false,
249 processor: self.processor,
250 line_name: self.line_name,
251 };
252 }
253 let action_input = self.input.take().unwrap();
254 let mut next_input = None;
255 #[cfg(feature = "recording")]
256 let mut step_states = Vec::with_capacity(2);
257 if let Some(output) = (action1.f)(action_input.clone()) {
258 next_input = Some(output);
259 #[cfg(feature = "recording")]
260 step_states.push(StepStateInfo::new_with_serialized_input(
261 action1.name,
262 recorded_input1,
263 input_kind1,
264 true,
265 ));
266 } else {
267 #[cfg(feature = "recording")]
268 step_states.push(StepStateInfo::new_with_serialized_input(
269 action1.name,
270 recorded_input1,
271 input_kind1,
272 false,
273 ));
274 }
275 if let Some(output) = (action2.f)(action_input) {
276 if next_input.is_none() {
277 next_input = Some(output);
278 }
279 #[cfg(feature = "recording")]
280 step_states.push(StepStateInfo::new_with_serialized_input(
281 action2.name,
282 recorded_input2,
283 input_kind2,
284 true,
285 ));
286 } else {
287 #[cfg(feature = "recording")]
288 step_states.push(StepStateInfo::new_with_serialized_input(
289 action2.name,
290 recorded_input2,
291 input_kind2,
292 false,
293 ));
294 }
295 #[cfg(feature = "recording")]
296 if let Some(l) = self.line_state_mut() {
297 l.extend(step_states);
298 }
299 Step {
300 active: next_input.is_some(),
301 input: next_input,
302 processor: self.processor,
303 line_name: self.line_name,
304 }
305 }
306
307 #[allow(clippy::missing_panics_doc)]
309 pub fn then<OUTPUT, A, F>(mut self, action: A) -> Step<'p, OUTPUT>
310 where
311 A: Into<Action<'p, F, INPUT, OUTPUT>>,
312 F: FnOnce(INPUT) -> Option<OUTPUT>,
313 {
314 #[allow(unused_mut)]
315 let mut action = action.into();
316 #[cfg(feature = "recording")]
317 let input_kind = action.input_kind();
318 #[cfg(feature = "recording")]
319 macro_rules! record_processed {
320 ($name:expr, $passed:expr, $input:expr) => {
321 if let Some(l) = self.line_state_mut() {
322 l.push_step_state(action.name, $input, input_kind, $passed);
323 }
324 };
325 }
326 if !self.active || self.input.is_none() {
327 #[cfg(feature = "recording")]
328 record_processed!(action.name, false, Value::Null);
329 return Step {
330 input: None,
331 active: false,
332 processor: self.processor,
333 line_name: self.line_name,
334 };
335 }
336 #[cfg(feature = "recording")]
337 let recorded_input = self
338 .processor_is_recording()
339 .then(|| action.take_recorded_input_serialized(self.input.as_ref()))
340 .unwrap_or_default();
341 if let Some(output) = (action.f)(self.input.take().unwrap()) {
342 #[cfg(feature = "recording")]
343 record_processed!(action.name, true, recorded_input);
344 Step {
345 input: Some(output),
346 active: true,
347 processor: self.processor,
348 line_name: self.line_name,
349 }
350 } else {
351 #[cfg(feature = "recording")]
352 record_processed!(action.name, false, recorded_input);
353 Step {
354 input: None,
355 active: false,
356 processor: self.processor,
357 line_name: self.line_name,
358 }
359 }
360 }
361}
362
363#[allow(dead_code)]
364pub struct Action<'a, F, INPUT, OUTPUT>
366where
367 F: FnOnce(INPUT) -> Option<OUTPUT>,
368{
369 f: F,
370 name: Cow<'static, str>,
371 #[cfg(feature = "recording")]
372 recorded_input: Option<&'a dyn erased_serde::Serialize>,
373 #[cfg(not(feature = "recording"))]
374 _recorded_input: PhantomData<&'a ()>,
375 _input: PhantomData<INPUT>,
376}
377
378impl<F, INPUT, OUTPUT> From<F> for Action<'_, F, INPUT, OUTPUT>
379where
380 F: FnOnce(INPUT) -> Option<OUTPUT>,
381{
382 fn from(function: F) -> Self {
383 Action::new("", function)
384 }
385}
386
387#[macro_export]
391macro_rules! action {
392 ($f: expr) => {
393 $crate::Action::new(stringify!($f), $f)
394 };
395 ($name: expr, $f: expr) => {
396 $crate::Action::new($name, $f)
397 };
398}
399
400impl<'a, F, INPUT, OUTPUT> Action<'a, F, INPUT, OUTPUT>
401where
402 F: FnOnce(INPUT) -> Option<OUTPUT>,
403{
404 pub fn new(name: impl Into<Cow<'static, str>>, f: F) -> Self {
406 Action {
407 f,
408 name: name.into(),
409 #[cfg(feature = "recording")]
410 recorded_input: None,
411 #[cfg(not(feature = "recording"))]
412 _recorded_input: PhantomData,
413 _input: PhantomData,
414 }
415 }
416 #[cfg(feature = "recording")]
418 pub fn with_recorded_input<V>(mut self, input: &'a V) -> Self
419 where
420 V: Serialize,
421 {
422 self.recorded_input = Some(input);
423 self
424 }
425 #[cfg(not(feature = "recording"))]
426 #[allow(unused_mut)]
427 pub fn with_recorded_input<V>(mut self, _input: &'a V) -> Self {
429 self
430 }
431 #[cfg(feature = "recording")]
432 fn input_kind(&self) -> InputKind {
434 if self.recorded_input.is_some() {
435 InputKind::External
436 } else {
437 InputKind::Flow
438 }
439 }
440 #[cfg(feature = "recording")]
441 fn take_recorded_input_serialized(&mut self, fallback: Option<&INPUT>) -> Value
442 where
443 INPUT: StepInput,
444 {
445 if let Some(i) = self.recorded_input.take() {
446 serde_json::to_value(i).unwrap_or_default()
447 } else {
448 serde_json::to_value(fallback).unwrap_or_default()
449 }
450 }
451}
452
453#[derive(Default, Debug, Clone)]
454#[cfg_attr(feature = "recording", derive(Serialize, Deserialize))]
455pub struct Rack {
458 #[cfg(feature = "recording")]
459 lines: BTreeMap<Cow<'static, str>, LineState>,
460 #[serde(skip)]
461 #[cfg(feature = "recording")]
462 recording: Arc<atomic::AtomicBool>,
463}
464
465impl Rack {
466 pub fn new() -> Self {
468 Self::default()
469 }
470 #[allow(unused_variables)]
472 pub fn ingress(&mut self, processor: &mut Processor) {
473 #[cfg(feature = "recording")]
474 self.lines.extend(mem::take(&mut processor.result));
475 #[cfg(not(feature = "recording"))]
476 processor.reset();
477 }
478 #[cfg(feature = "recording")]
480 pub fn line_state(&self, name: &str) -> Option<&LineState> {
481 self.lines.get(name)
482 }
483 #[cfg(feature = "recording")]
485 pub fn lines(&self) -> &BTreeMap<Cow<'static, str>, LineState> {
486 &self.lines
487 }
488 #[cfg(feature = "recording")]
490 pub fn snapshot(&self) -> Snapshot {
491 Snapshot {
492 lines: self.lines.clone(),
493 }
494 }
495 #[cfg(feature = "recording")]
497 pub fn snapshot_filtered<P>(&self, predicate: P) -> Snapshot
498 where
499 P: Fn(&LineState) -> bool,
500 {
501 let lines = self
502 .lines
503 .iter()
504 .filter(|(_, line)| predicate(line))
505 .map(|(name, line)| (name.clone(), line.clone()))
506 .collect();
507 Snapshot { lines }
508 }
509 pub fn processor(&self) -> Processor {
511 Processor {
512 #[cfg(feature = "recording")]
513 result: BTreeMap::new(),
514 #[cfg(feature = "recording")]
515 recording: Arc::clone(&self.recording),
516 }
517 }
518
519 #[cfg(feature = "recording")]
521 pub fn with_recording_enabled(self) -> Self {
522 self.recording.store(true, atomic::Ordering::SeqCst);
523 self
524 }
525
526 #[cfg(feature = "recording")]
528 pub fn set_recording(&mut self, recording: bool) {
529 self.recording.store(recording, atomic::Ordering::SeqCst);
530 }
531}
532
533#[derive(Default)]
535pub struct Processor {
536 #[cfg(feature = "recording")]
537 result: BTreeMap<Cow<'static, str>, LineState>,
538 #[cfg(feature = "recording")]
539 recording: Arc<atomic::AtomicBool>,
540}
541
542impl Processor {
543 pub fn new() -> Self {
545 Self::default()
546 }
547 pub fn reset(&mut self) {
549 #[cfg(feature = "recording")]
550 self.result.clear();
551 }
552 #[cfg(feature = "recording")]
554 pub fn line_state(&self, name: &str) -> Option<&LineState> {
555 self.result.get(name)
556 }
557 #[cfg(feature = "recording")]
559 pub fn is_recording(&self) -> bool {
560 self.recording.load(atomic::Ordering::SeqCst)
561 }
562 pub fn line<INPUT>(&mut self, name: impl Into<Cow<'static, str>>, input: INPUT) -> Step<INPUT> {
564 let name = name.into();
565 #[cfg(feature = "recording")]
566 if self.is_recording() {
567 match self.result.entry(name.clone()) {
568 btree_map::Entry::Vacant(entry) => {
569 entry.insert(LineState::new(name.clone()));
570 }
571 btree_map::Entry::Occupied(mut entry) => {
572 entry.get_mut().clear();
573 }
574 }
575 }
576 Step {
577 input: Some(input),
578 active: true,
579 processor: Some(self),
580 line_name: Some(name),
581 }
582 }
583}
584
585#[cfg(test)]
586mod test {
587 use super::Rack;
588 #[cfg(feature = "recording")]
589 use serde::Serialize;
590
591 #[test]
592 fn test_lines() {
593 #[allow(clippy::cast_lossless, clippy::unnecessary_wraps)]
594 fn get_temp1(data: &ModbusData) -> Option<f32> {
595 Some(data.temperature_1 as f32 / 10.0)
596 }
597
598 #[allow(clippy::cast_lossless, clippy::unnecessary_wraps)]
599 fn get_temp2(data: &ModbusData) -> Option<f32> {
600 Some(data.temperature_2 as f32 / 10.0)
601 }
602
603 #[allow(clippy::unnecessary_wraps)]
604 fn temperature_critical(temp: f32) -> Option<()> {
605 if temp > 90. {
606 Some(())
607 } else {
608 None
609 }
610 }
611
612 fn voltage_critical(voltage: u16) -> Option<()> {
613 if voltage > 300 {
614 Some(())
615 } else {
616 None
617 }
618 }
619
620 #[cfg_attr(feature = "recording", derive(Serialize))]
621 struct ModbusData {
622 temperature_1: u16,
623 voltage_1: u16,
624 temperature_2: u16,
625 voltage_2: u16,
626 }
627
628 let modbus_data = ModbusData {
629 temperature_1: 950,
630 voltage_1: 395,
631 temperature_2: 250,
632 voltage_2: 295,
633 };
634 let mut state = Rack::new();
635 let mut processor = state.processor();
636 let mut line1_active = true;
637 let mut line2_active = true;
638 #[cfg(feature = "recording")]
639 state.set_recording(true);
640 assert!(processor
641 .line("line1", &modbus_data)
642 .then(action!(get_temp1))
643 .then(action!(temperature_critical))
644 .then(
645 action!("voltage", |()| Some(modbus_data.voltage_1))
646 .with_recorded_input(&modbus_data.voltage_1)
647 )
648 .then(action!(voltage_critical))
649 .then(action!("OFF", |()| {
650 line1_active = false;
651 Some(())
652 }))
653 .is_active());
654 assert!(!processor
655 .line("line2", &modbus_data)
656 .then(get_temp2)
657 .then(action!(temperature_critical))
658 .then(
659 action!("voltage", |()| Some(modbus_data.voltage_2))
660 .with_recorded_input(&modbus_data.voltage_2)
661 )
662 .then(action!(voltage_critical))
663 .then(action!("OFF", |()| {
664 line2_active = false;
665 Some(())
666 }))
667 .is_active());
668 assert!(!line1_active);
669 assert!(line2_active);
670 state.ingress(&mut processor);
671 }
672}