rspl 0.1.3

A stream processor language.
Documentation
# HICS

This example implements a Heat Index Control System (hics).
More precisely, the example implements a system to keep the heat index (a quantity depending on the real temperature and the humidity via a certain mapping) bearable in a room located in a region with a hot and damp climate.\
The system's specification is that the heat index has to be brought down periodically to a certain value within a window of tolerance depending on the time of day, by first trying to dehumidify the room and, if that does not suffice, by also cooling down the room.
Furthermore, a thermohygrometer and a clock must be accessible for the implementation.\
The hics implementation presented in this example measures temperature and humidity, as well as the time, to decide if it has to take action.
And if it actuates something, it waits for it to take effect in order to decide whether to repeat or go idle for a period.
Moreover, it follows the structuring suggestions from [Why Functional Programming Matters](https://www.cse.chalmers.se/~rjmh/Papers/whyfp.pdf).
There they describe how the use of higher-order functions and lazy evaluation can greatly improve the modularity of programs.
They illustrate that point by modularizing on-demand computations of perhaps infinite objects like real numbers and game trees, exploiting the fact that in languages with first-class functions and lazy evaluation, everything is a generator in some sense.
As the hics described above shares that 'on-demand computation' aspect - most notably, it measures (that is, reads out the thermohygrometer) on demand - these modularization techniques will apply in this case, too.
Adapting the techniques to rspl and Rust yields a modular implementation of this hics in Rust.

The intention of the example is to demonstrate rspl's applicability in demand-driven[^1] programming (with generators, of course).

Now that we have said what we are going to implement and why, let us explain our techniques before presenting the code applying those techniques.
To this end, we split the following discussion into two parts.
First, we introduce the general design pattern of encoding generators in rspl.
This is done without referring to the specific example of control systems.
Then, second, before discussing the actual hics code, we briefly discuss the code's overall structure.\
So, first, rspl's stream processors can encode a sort of generator: given a sufficiently general definition of a generator, you can think of any stream processor a generator because output is generated on demand in an incremental manner.
But even for the more specific definition of generators as functions which can remember state information between calls, rspl's stream processors offer an encoding.
To understand how, first note that rspl's stream processors would implement the `Fn`-trait if Rust allowed users to implement that trait freely.
This is because implementing a stream processor from `A` to `B` in rspl corresponds to defining a function from `Stream<A>` to `Stream<B>`.
So, if `A` is the input signature and `B` the yield type of a generator, then that generator could be encoded as a stream processor from `A` to `B`, provided that a way to remember state information is available.
Now, the perhaps most common approach to state in functional programming - and rspl is functional programming - is state-passing style[^2].
And, in fact, state passing is applicable to rspl's stream processors.
One way is to construct a stream processor using a Rust function with a single parameter representing the state, returning a stream processor that captures a computation of the perhaps manipulated state within a (lazy) recursive call, like so:
```rust
fn generator<'a, S, A, B>(mut state: S) -> StreamProcessor<'a, A, B> {
    ...;
    StreamProcessor::get(|a: A| StreamProcessor::put(..., || generator(...state...)))
}
```
The returned stream processor of such a function is a generator, where the dots represent the body of that generator.
(Note that if `A` is `()`, it can make sense to omit the `get`-part.)\
After having discussed the encoding of generators as stream processors, let us have a look at the structure of our hics implementation.
Essentially, it consists of four parts.
The first one is a module encapsulating general aspects of control systems.
The second and third parts specialize to and use those aspects for heat index control.
In particular, the second part implements the control system interface, while the third part is a driver responsible for executing that implementation according to the measure-on-demand strategy.
Finally, the fourth part is the main function simulating the hics environment and setting up the driver for the hics.

Let us now walk through the code together.

```rust
mod control {
    use rspl::streams::infinite_lists::InfiniteList;
    use rspl::streams::Stream;
    use rspl::StreamProcessor;

    use std::thread;
    use std::time::Duration;

    // This is a definition of control systems. Importantly, it requires a `meter` to generate
    // measurements on demand, and this is statically enforced by the typing.
    pub trait System<'a, Space> {
        fn meter(&self) -> StreamProcessor<'a, (), Space>;
        fn reference(&self) -> f64;
        fn quantity(&self, position: Space) -> f64;
        fn controller(self, deviation: f64, status: f64, position: Space) -> Self;
    }

    pub trait Strategy<'a, Space> {
        fn execute(self, cs: impl System<'a, Space>, epsilon: f64);
    }

    pub struct MeasureOnDemand {
        pub dwell_time: Duration,
    }

    impl<'a, Space: 'a + Copy> Strategy<'a, Space> for MeasureOnDemand {
        fn execute(self, mut cs: impl System<'a, Space>, epsilon: f64) {
            let mut status;

            // Here the measurements are generated (lazily).
            let mut positions = cs.meter().eval(InfiniteList::constant(()));

            loop {
                // Here the actual measurement is made.
                positions = positions.tail();
                let position = *positions.head();

                status = cs.quantity(position);
                let setpoint = cs.reference();
                let deviation = status - setpoint;

                if f64::abs(deviation) < epsilon {
                    break;
                }

                cs = cs.controller(deviation, status, position);

                thread::sleep(self.dwell_time);
            }
        }
    }
}

use control::Strategy;

use rspl::streams::infinite_lists::InfiniteList;
use rspl::streams::Stream;
use rspl::StreamProcessor;

use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

use crossbeam::channel;
use crossbeam::channel::Sender;

// This constant is the window of tolerance for the heat index.
const EPSILON: f64 = 0.5;

const REFERENCE_HEAT_INDEX_DAY: f64 = 91.0;
const REFERENCE_HEAT_INDEX_NIGHT: f64 = 83.0;

const MINIMAL_TEMPERATURE: f64 = 80.0;
const MINIMAL_HUMIDITY: f64 = 50.0;

const INITIAL_TEMPERATURE: f64 = 87.0;
const INITIAL_HUMIDITY: f64 = 72.0;

const ACTUATOR_DECREASE: HeatIndexSpace = HeatIndexSpace {
    temperature: 0.25,
    humidity: 1.5,
};
const NATURAL_INCREASE: HeatIndexSpace = HeatIndexSpace {
    temperature: 0.02,
    humidity: 0.1,
};

// This block defines time-related constants. In particular, note that the `TICK` is intended to
// represent 10 real seconds.
const TICK_LENGTH: u64 = 5; // in (real) millis
const TICK: u64 = 1;
const DAY: u64 = 8640 * TICK;
const DWELL_TIME: u64 = 6 * TICK;
const CONTROL_PERIOD: u64 = 180 * TICK;
const NATURAL_INCREASE_PERIOD: u64 = 3 * TICK;

const UNSAFE_BARRIER: usize = 100_000;
const SERVICE_BARRIER: usize = UNSAFE_BARRIER - 5000;

type HeatIndex = f64;
type Time = u64;
type Clock = AtomicU64;

#[derive(Copy, Clone)]
struct HeatIndexSpace {
    temperature: f64, // in degree Fahrenheit
    humidity: f64,    // in percent
}

// This type defines the output signals of the hics. A signal is either status information
// (`Show(...)`) or orders for the actuator to execute (`Dehumidify` and `Cool`).
enum HeatIndexSignal {
    Show(Time, HeatIndex),
    Dehumidify,
    Cool,
}

// This type is the actual hics. Essentially, it is the communication interface to its environment.
#[derive(Clone)]
struct Hics {
    clock_finger: Arc<Clock>,
    thermohygrometer_finger: Arc<Mutex<HeatIndexSpace>>,
    signals_s: Sender<HeatIndexSignal>,
}

impl<'a> control::System<'a, HeatIndexSpace> for Hics {
    fn meter(&self) -> StreamProcessor<'a, (), HeatIndexSpace> {
        fn read_out<'a, X: 'a + Copy>(finger: Arc<Mutex<X>>) -> StreamProcessor<'a, (), X> {
            StreamProcessor::Put(
                *Arc::clone(&finger).lock().unwrap(),
                Box::new(|| read_out(finger)),
            )
        }

        read_out(Arc::clone(&self.thermohygrometer_finger))
    }
    fn reference(&self) -> f64 {
        let time = self.clock_finger.load(Ordering::SeqCst);

        if time % DAY < DAY / 2 {
            REFERENCE_HEAT_INDEX_DAY
        } else {
            REFERENCE_HEAT_INDEX_NIGHT
        }
    }
    fn quantity(&self, position: HeatIndexSpace) -> f64 {
        // The body is the heat index formula from https://en.wikipedia.org/wiki/Heat_index.
        const C_1: f64 = -42.379;
        const C_2: f64 = 2.049_015_23;
        const C_3: f64 = 10.143_331_27;
        const C_4: f64 = -0.224_755_41;
        const C_5: f64 = -0.006_837_83;
        const C_6: f64 = -0.054_817_17;
        const C_7: f64 = 0.001_228_74;
        const C_8: f64 = 0.000_852_82;
        const C_9: f64 = -0.000_001_99;

        let t = position.temperature;
        let r = position.humidity;

        C_1 + C_2 * t
            + C_3 * r
            + C_4 * t * r
            + C_5 * t * t
            + C_6 * r * r
            + C_7 * t * t * r
            + C_8 * t * r * r
            + C_9 * t * t * r * r
    }
    fn controller(self, deviation: f64, status: f64, position: HeatIndexSpace) -> Self {
        let time = self.clock_finger.load(Ordering::SeqCst);
        self.signals_s
            .send(HeatIndexSignal::Show(time, status))
            .unwrap();

        if deviation > 0.0 {
            if position.humidity > MINIMAL_HUMIDITY {
                self.signals_s.send(HeatIndexSignal::Dehumidify).unwrap();
            } else if position.temperature > MINIMAL_TEMPERATURE {
                self.signals_s.send(HeatIndexSignal::Cool).unwrap();
            }
        }

        self
    }
}

#[allow(clippy::assertions_on_constants)]
fn driver(hics: Hics) {
    fn control<'a>(hics: Hics, mut counter: usize) -> StreamProcessor<'a, (), usize> {
        control::MeasureOnDemand {
            dwell_time: Duration::from_millis(DWELL_TIME * TICK_LENGTH),
        }
        .execute(hics.clone(), EPSILON);

        counter += 1;

        StreamProcessor::Put(counter, Box::new(move || control(hics, counter)))
    }

    assert!(UNSAFE_BARRIER > SERVICE_BARRIER);

    // Here the runs of the hics are generated (lazily).
    let mut runs = control(hics, 0).eval(InfiniteList::constant(()));

    loop {
        thread::sleep(Duration::from_millis(CONTROL_PERIOD * TICK_LENGTH));

        // Here, an iteration of the hics is started.
        runs = runs.tail();
        let run_count = *runs.head();

        if run_count > SERVICE_BARRIER {
            if run_count > UNSAFE_BARRIER {
                break;
            }
            println!(
                "Warning: Service needed. ({} runs > {} runs)",
                run_count, SERVICE_BARRIER
            );
        }
    }
}

fn main() {
    fn print_heat_index_event(time: Time, heat_index: HeatIndex) {
        let red = |x| (x * 16.0 - 1350.0) as u8;
        let green = 25;
        let blue = |x| (1450.0 - x * 16.0) as u8;
        // The cryptic part of the following `format!(...)` is just an ANSI escape code to get
        // `format!({:.1}°F, heat_index)` with a truecolor R(ed)G(reen)B(lue) background.
        let degree = format!(
            "\x1b[48;2;{};{};{}m{:.1}°F\x1b[0m",
            red(heat_index),
            green,
            blue(heat_index),
            heat_index,
        );

        let to_minutes = |x| (x as f64 / 6.0) % 1440.0;
        let time = format!("6am plus {:.1} minutes", to_minutes(time));

        println!("Heat Index Event: {} at {}", degree, time);
    }

    let clock = Arc::new(AtomicU64::new(0));
    let clock_finger = Arc::clone(&clock);

    let thermohygrometer = Arc::new(Mutex::new(HeatIndexSpace {
        temperature: INITIAL_TEMPERATURE,
        humidity: INITIAL_HUMIDITY,
    }));
    let thermohygrometer_finger = Arc::clone(&thermohygrometer);

    let (signals_s, signals_r) = channel::unbounded();

    let hics = Hics {
        clock_finger,
        thermohygrometer_finger,
        signals_s,
    };

    let _clock_simulator = thread::spawn(move || loop {
        thread::sleep(Duration::from_millis(TICK_LENGTH));

        clock.store(clock.load(Ordering::SeqCst) + TICK, Ordering::SeqCst);
    });

    let _thermohygrometer_simulator = thread::spawn(move || {
        let thermohygrometer_finger = Arc::clone(&thermohygrometer);

        // This is the climate effect actuated by the hics.
        let _actuator_simulator = thread::spawn(move || loop {
            let signal = signals_r.recv().unwrap();

            let mut position = thermohygrometer_finger.lock().unwrap();
            match signal {
                HeatIndexSignal::Show(time, heat_index) => print_heat_index_event(time, heat_index),
                HeatIndexSignal::Dehumidify => position.humidity -= ACTUATOR_DECREASE.humidity,
                HeatIndexSignal::Cool => position.temperature -= ACTUATOR_DECREASE.temperature,
            }
        });

        // This is the climate effect actuated by nature.
        loop {
            thread::sleep(Duration::from_millis(NATURAL_INCREASE_PERIOD * TICK_LENGTH));

            let mut position = thermohygrometer.lock().unwrap();
            position.humidity += NATURAL_INCREASE.humidity;
            position.temperature += NATURAL_INCREASE.temperature;
        }
    });

    driver(hics);
}
```

Finally, let us conclude with the key take-away:
rspl can encode generators and is hence suited for demand-driven programming.
However, it is not so clear why one should use rspl to encode generators in general.
Indeed, this is also not quite what we wanted to show.
The idea is rather to show that rspl's stream processors can naturally incorporate demand-driven programming, making them particularly useful for stream-processing problems with demand-driven aspects.
It might be that the hics implemented here is not the best possible example to do so.
But it is the best we could come up with as of yet that is real-world enough, while still being focused on the `put`-construct of rspl.

[^1]: Look at [Codata in Action]https://www.microsoft.com/en-us/research/uploads/prod/2020/01/CoDataInAction.pdf for some more explanation on that term.
[^2]: Also see the concept of monads, which kind of subsumes foobar-passing style.