StreamProcessor

Enum StreamProcessor 

Source
pub enum StreamProcessor<'a, A: 'a, B> {
    Get(Box<dyn FnOnce(A) -> StreamProcessor<'a, A, B> + 'a>),
    Put(B, Box<dyn FnOnce() -> StreamProcessor<'a, A, B> + 'a>),
}
Expand description

StreamProcessor<A, B> defines (the syntax of) a language describing the domain of stream processors, that is, terms which can be interpreted to turn streams of type A into streams of type B.

Variants§

§

Get(Box<dyn FnOnce(A) -> StreamProcessor<'a, A, B> + 'a>)

This stream processor first reads the A from the head of the input stream and subsequently applies its function argument to that element yielding a stream processor. The resulting stream processor is then used to process the input stream further depending on its shape: if it is a

  • Get, it is applied to the tail of the input stream.
  • Put, it is applied to the whole input stream.
§

Put(B, Box<dyn FnOnce() -> StreamProcessor<'a, A, B> + 'a>)

This stream processor writes the B from its first argument to the output list. Then, to construct the rest of the output list, it uses its second argument to process the input stream depending on its shape: if it is a

  • Get, it is applied to the tail of the input stream.
  • Put, it is applied to the whole input stream.

Implementations§

Source§

impl<'a, A, B> StreamProcessor<'a, A, B>

Source

pub fn get<F>(f: F) -> Self
where F: FnOnce(A) -> Self + 'a,

The same as StreamProcessor::Get but with boxing of f hidden to make the resulting code less verbose.

Source

pub fn put<T>(b: B, lazy_sp: T) -> Self
where B: 'a, T: FnOnce() -> Self + 'a,

The same as StreamProcessor::Put but with boxing of lazy_sp hidden to make the resulting code less verbose.

Source§

impl<'a, A, B> StreamProcessor<'a, A, B>

Source

pub fn eval<S: Stream<A> + 'a>(self, stream: S) -> InfiniteList<'a, B>
where A: Clone,

Evaluate self on an input stream essentially implementing a semantic of StreamProcessor<A, B>.

  • stream is the input stream.

Note that the function can block the current thread if the respective implementation of Stream::tail can.

§Panics

A panic may occur if

  • the stream processor contains Rust-terms which can panic.
  • the respective implementation of Stream::head or Stream::tail can panic.
§Examples

Negating a stream of trues to obtain a stream of falses:

use rspl::StreamProcessor;

fn negate<'a>() -> StreamProcessor<'a, bool, bool> {
    StreamProcessor::get(|b: bool| StreamProcessor::put(!b, negate))
}

let trues = rspl::streams::infinite_lists::InfiniteList::constant(true);

negate().eval(trues);
Examples found in repository?
examples/hics.rs (line 78)
74        fn execute(self, mut cs: impl System<'a, Space>, epsilon: f64) {
75            let mut status;
76
77            // Here the measurements are generated (lazily).
78            let mut positions = cs.meter().eval(InfiniteList::constant(()));
79
80            loop {
81                // Here the actual measurement is made.
82                positions = positions.tail();
83                let position = *positions.head();
84
85                status = cs.quantity(position);
86                let setpoint = cs.reference();
87                let deviation = status - setpoint;
88
89                if f64::abs(deviation) < epsilon {
90                    break;
91                }
92
93                cs = cs.controller(deviation, status, position);
94
95                thread::sleep(self.dwell_time);
96            }
97        }
98    }
99}
100
101use control::Strategy;
102
103use rspl::streams::infinite_lists::InfiniteList;
104use rspl::streams::Stream;
105use rspl::StreamProcessor;
106
107use std::sync::atomic::{AtomicU64, Ordering};
108use std::sync::{Arc, Mutex};
109use std::thread;
110use std::time::Duration;
111
112use crossbeam::channel;
113use crossbeam::channel::Sender;
114
115// This constant is the window of tolerance for the heat index.
116const EPSILON: f64 = 0.5;
117
118const REFERENCE_HEAT_INDEX_DAY: f64 = 91.0;
119const REFERENCE_HEAT_INDEX_NIGHT: f64 = 83.0;
120
121const MINIMAL_TEMPERATURE: f64 = 80.0;
122const MINIMAL_HUMIDITY: f64 = 50.0;
123
124const INITIAL_TEMPERATURE: f64 = 87.0;
125const INITIAL_HUMIDITY: f64 = 72.0;
126
127const ACTUATOR_DECREASE: HeatIndexSpace = HeatIndexSpace {
128    temperature: 0.25,
129    humidity: 1.5,
130};
131const NATURAL_INCREASE: HeatIndexSpace = HeatIndexSpace {
132    temperature: 0.02,
133    humidity: 0.1,
134};
135
136// This block defines time-related constants. In particular, note that the `TICK` is intended to
137// represent 10 real seconds.
138const TICK_LENGTH: u64 = 5; // in (real) millis
139const TICK: u64 = 1;
140const DAY: u64 = 8640 * TICK;
141const DWELL_TIME: u64 = 6 * TICK;
142const CONTROL_PERIOD: u64 = 180 * TICK;
143const NATURAL_INCREASE_PERIOD: u64 = 3 * TICK;
144
145const UNSAFE_BARRIER: usize = 100_000;
146const SERVICE_BARRIER: usize = UNSAFE_BARRIER - 5000;
147
148type HeatIndex = f64;
149type Time = u64;
150type Clock = AtomicU64;
151
152#[derive(Copy, Clone)]
153struct HeatIndexSpace {
154    temperature: f64, // in degree Fahrenheit
155    humidity: f64,    // in percent
156}
157
158// This type defines the output signals of the hics. A signal is either status information
159// (`Show(...)`) or orders for the actuator to execute (`Dehumidfy` and `Cool`).
160enum HeatIndexSignal {
161    Show(Time, HeatIndex),
162    Dehumidify,
163    Cool,
164}
165
166// This type is the actual hics. Essentially, it is the communication interface to its environment.
167#[derive(Clone)]
168struct Hics {
169    clock_finger: Arc<Clock>,
170    thermohygrometer_finger: Arc<Mutex<HeatIndexSpace>>,
171    signals_s: Sender<HeatIndexSignal>,
172}
173
174impl<'a> control::System<'a, HeatIndexSpace> for Hics {
175    fn meter(&self) -> StreamProcessor<'a, (), HeatIndexSpace> {
176        fn read_out<'a, X: 'a + Copy>(finger: Arc<Mutex<X>>) -> StreamProcessor<'a, (), X> {
177            StreamProcessor::Put(
178                *Arc::clone(&finger).lock().unwrap(),
179                Box::new(|| read_out(finger)),
180            )
181        }
182
183        read_out(Arc::clone(&self.thermohygrometer_finger))
184    }
185    fn reference(&self) -> f64 {
186        let time = self.clock_finger.load(Ordering::SeqCst);
187
188        if time % DAY < DAY / 2 {
189            REFERENCE_HEAT_INDEX_DAY
190        } else {
191            REFERENCE_HEAT_INDEX_NIGHT
192        }
193    }
194    fn quantity(&self, position: HeatIndexSpace) -> f64 {
195        // The body is the heat index formula from https://en.wikipedia.org/wiki/Heat_index.
196        const C_1: f64 = -42.379;
197        const C_2: f64 = 2.049_015_23;
198        const C_3: f64 = 10.143_331_27;
199        const C_4: f64 = -0.224_755_41;
200        const C_5: f64 = -0.006_837_83;
201        const C_6: f64 = -0.054_817_17;
202        const C_7: f64 = 0.001_228_74;
203        const C_8: f64 = 0.000_852_82;
204        const C_9: f64 = -0.000_001_99;
205
206        let t = position.temperature;
207        let r = position.humidity;
208
209        C_1 + C_2 * t
210            + C_3 * r
211            + C_4 * t * r
212            + C_5 * t * t
213            + C_6 * r * r
214            + C_7 * t * t * r
215            + C_8 * t * r * r
216            + C_9 * t * t * r * r
217    }
218    fn controller(self, deviation: f64, status: f64, position: HeatIndexSpace) -> Self {
219        let time = self.clock_finger.load(Ordering::SeqCst);
220        self.signals_s
221            .send(HeatIndexSignal::Show(time, status))
222            .unwrap();
223
224        if deviation > 0.0 {
225            if position.humidity > MINIMAL_HUMIDITY {
226                self.signals_s.send(HeatIndexSignal::Dehumidify).unwrap();
227            } else if position.temperature > MINIMAL_TEMPERATURE {
228                self.signals_s.send(HeatIndexSignal::Cool).unwrap();
229            }
230        }
231
232        self
233    }
234}
235
236#[allow(clippy::assertions_on_constants)]
237fn driver(hics: Hics) {
238    fn control<'a>(hics: Hics, mut counter: usize) -> StreamProcessor<'a, (), usize> {
239        control::MeasureOnDemand {
240            dwell_time: Duration::from_millis(DWELL_TIME * TICK_LENGTH),
241        }
242        .execute(hics.clone(), EPSILON);
243
244        counter += 1;
245
246        StreamProcessor::Put(counter, Box::new(move || control(hics, counter)))
247    }
248
249    assert!(UNSAFE_BARRIER > SERVICE_BARRIER);
250
251    // Here the runs of the hics are generated (lazily).
252    let mut runs = control(hics, 0).eval(InfiniteList::constant(()));
253
254    loop {
255        thread::sleep(Duration::from_millis(CONTROL_PERIOD * TICK_LENGTH));
256
257        // Here, an iteration of the hics is started.
258        runs = runs.tail();
259        let run_count = *runs.head();
260
261        if run_count > SERVICE_BARRIER {
262            if run_count > UNSAFE_BARRIER {
263                break;
264            }
265            println!(
266                "Warning: Service needed. ({} runs > {} runs)",
267                run_count, SERVICE_BARRIER
268            );
269        }
270    }
271}
More examples
Hide additional examples
examples/pelican.rs (line 315)
310fn driver<S>(events: S, tfeedback: &Sender<Feedback>)
311where
312    S: Stream<Event>,
313{
314    // This code starts the machine.
315    let mut capabilities = pelican_machine::on().eval(events);
316
317    loop {
318        // The following match provides the machine with capabilities. Most notably, it tells the
319        // feedback loop to trigger a `Timeout`-event after some time.
320        match *capabilities.head() {
321            Capability::SetVehicleLights(color) => println!("Vehicles: {}", color),
322            Capability::SetPedestrianLights(color) => println!("Pedestrians: {}", color),
323            Capability::EmitTimeoutAfter(length) => {
324                tfeedback.send(Feedback::TimeoutAfter(length)).unwrap();
325            }
326            Capability::UnexpectedTimeout(message) => {
327                eprintln!("log: unexpected timeout event ({})", message);
328            }
329            Capability::CallForHelp => {
330                println!("Call for help!");
331                break;
332            }
333            Capability::Break => break,
334        };
335        capabilities = capabilities.tail();
336    }
337}

Auto Trait Implementations§

§

impl<'a, A, B> Freeze for StreamProcessor<'a, A, B>
where B: Freeze,

§

impl<'a, A, B> !RefUnwindSafe for StreamProcessor<'a, A, B>

§

impl<'a, A, B> !Send for StreamProcessor<'a, A, B>

§

impl<'a, A, B> !Sync for StreamProcessor<'a, A, B>

§

impl<'a, A, B> Unpin for StreamProcessor<'a, A, B>
where B: Unpin,

§

impl<'a, A, B> !UnwindSafe for StreamProcessor<'a, A, B>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.