pub enum StreamProcessor<'a, A: 'a, B> {
Get(Box<dyn FnOnce(A) -> StreamProcessor<'a, A, B> + 'a>),
Put(B, Box<dyn FnOnce() -> StreamProcessor<'a, A, B> + 'a>),
}Expand description
StreamProcessor<A, B> defines (the syntax of) a language describing the domain of stream processors, that is, terms which can be interpreted to turn streams of type A into streams of type B.
Variants§
Get(Box<dyn FnOnce(A) -> StreamProcessor<'a, A, B> + 'a>)
This stream processor first reads the A from the head of the input stream and subsequently applies its function argument to that element yielding a stream processor.
The resulting stream processor is then used to process the input stream further depending on its shape: if it is a
Put(B, Box<dyn FnOnce() -> StreamProcessor<'a, A, B> + 'a>)
This stream processor writes the B from its first argument to the output list.
Then, to construct the rest of the output list, it uses its second argument to process the input stream depending on its shape: if it is a
Implementations§
Source§impl<'a, A, B> StreamProcessor<'a, A, B>
impl<'a, A, B> StreamProcessor<'a, A, B>
Sourcepub fn get<F>(f: F) -> Selfwhere
F: FnOnce(A) -> Self + 'a,
pub fn get<F>(f: F) -> Selfwhere
F: FnOnce(A) -> Self + 'a,
The same as StreamProcessor::Get but with boxing of f hidden to make the resulting code less verbose.
Sourcepub fn put<T>(b: B, lazy_sp: T) -> Selfwhere
B: 'a,
T: FnOnce() -> Self + 'a,
pub fn put<T>(b: B, lazy_sp: T) -> Selfwhere
B: 'a,
T: FnOnce() -> Self + 'a,
The same as StreamProcessor::Put but with boxing of lazy_sp hidden to make the resulting code less verbose.
Source§impl<'a, A, B> StreamProcessor<'a, A, B>
impl<'a, A, B> StreamProcessor<'a, A, B>
Sourcepub fn eval<S: Stream<A> + 'a>(self, stream: S) -> InfiniteList<'a, B>where
A: Clone,
pub fn eval<S: Stream<A> + 'a>(self, stream: S) -> InfiniteList<'a, B>where
A: Clone,
Evaluate self on an input stream essentially implementing a semantic of StreamProcessor<A, B>.
streamis the input stream.
Note that the function can block the current thread if the respective implementation of Stream::tail can.
§Panics
A panic may occur if
- the stream processor contains Rust-terms which can panic.
- the respective implementation of
Stream::headorStream::tailcan panic.
§Examples
Negating a stream of trues to obtain a stream of falses:
use rspl::StreamProcessor;
fn negate<'a>() -> StreamProcessor<'a, bool, bool> {
StreamProcessor::get(|b: bool| StreamProcessor::put(!b, negate))
}
let trues = rspl::streams::infinite_lists::InfiniteList::constant(true);
negate().eval(trues);Examples found in repository?
74 fn execute(self, mut cs: impl System<'a, Space>, epsilon: f64) {
75 let mut status;
76
77 // Here the measurements are generated (lazily).
78 let mut positions = cs.meter().eval(InfiniteList::constant(()));
79
80 loop {
81 // Here the actual measurement is made.
82 positions = positions.tail();
83 let position = *positions.head();
84
85 status = cs.quantity(position);
86 let setpoint = cs.reference();
87 let deviation = status - setpoint;
88
89 if f64::abs(deviation) < epsilon {
90 break;
91 }
92
93 cs = cs.controller(deviation, status, position);
94
95 thread::sleep(self.dwell_time);
96 }
97 }
98 }
99}
100
101use control::Strategy;
102
103use rspl::streams::infinite_lists::InfiniteList;
104use rspl::streams::Stream;
105use rspl::StreamProcessor;
106
107use std::sync::atomic::{AtomicU64, Ordering};
108use std::sync::{Arc, Mutex};
109use std::thread;
110use std::time::Duration;
111
112use crossbeam::channel;
113use crossbeam::channel::Sender;
114
115// This constant is the window of tolerance for the heat index.
116const EPSILON: f64 = 0.5;
117
118const REFERENCE_HEAT_INDEX_DAY: f64 = 91.0;
119const REFERENCE_HEAT_INDEX_NIGHT: f64 = 83.0;
120
121const MINIMAL_TEMPERATURE: f64 = 80.0;
122const MINIMAL_HUMIDITY: f64 = 50.0;
123
124const INITIAL_TEMPERATURE: f64 = 87.0;
125const INITIAL_HUMIDITY: f64 = 72.0;
126
127const ACTUATOR_DECREASE: HeatIndexSpace = HeatIndexSpace {
128 temperature: 0.25,
129 humidity: 1.5,
130};
131const NATURAL_INCREASE: HeatIndexSpace = HeatIndexSpace {
132 temperature: 0.02,
133 humidity: 0.1,
134};
135
136// This block defines time-related constants. In particular, note that the `TICK` is intended to
137// represent 10 real seconds.
138const TICK_LENGTH: u64 = 5; // in (real) millis
139const TICK: u64 = 1;
140const DAY: u64 = 8640 * TICK;
141const DWELL_TIME: u64 = 6 * TICK;
142const CONTROL_PERIOD: u64 = 180 * TICK;
143const NATURAL_INCREASE_PERIOD: u64 = 3 * TICK;
144
145const UNSAFE_BARRIER: usize = 100_000;
146const SERVICE_BARRIER: usize = UNSAFE_BARRIER - 5000;
147
148type HeatIndex = f64;
149type Time = u64;
150type Clock = AtomicU64;
151
152#[derive(Copy, Clone)]
153struct HeatIndexSpace {
154 temperature: f64, // in degree Fahrenheit
155 humidity: f64, // in percent
156}
157
158// This type defines the output signals of the hics. A signal is either status information
159// (`Show(...)`) or orders for the actuator to execute (`Dehumidfy` and `Cool`).
160enum HeatIndexSignal {
161 Show(Time, HeatIndex),
162 Dehumidify,
163 Cool,
164}
165
166// This type is the actual hics. Essentially, it is the communication interface to its environment.
167#[derive(Clone)]
168struct Hics {
169 clock_finger: Arc<Clock>,
170 thermohygrometer_finger: Arc<Mutex<HeatIndexSpace>>,
171 signals_s: Sender<HeatIndexSignal>,
172}
173
174impl<'a> control::System<'a, HeatIndexSpace> for Hics {
175 fn meter(&self) -> StreamProcessor<'a, (), HeatIndexSpace> {
176 fn read_out<'a, X: 'a + Copy>(finger: Arc<Mutex<X>>) -> StreamProcessor<'a, (), X> {
177 StreamProcessor::Put(
178 *Arc::clone(&finger).lock().unwrap(),
179 Box::new(|| read_out(finger)),
180 )
181 }
182
183 read_out(Arc::clone(&self.thermohygrometer_finger))
184 }
185 fn reference(&self) -> f64 {
186 let time = self.clock_finger.load(Ordering::SeqCst);
187
188 if time % DAY < DAY / 2 {
189 REFERENCE_HEAT_INDEX_DAY
190 } else {
191 REFERENCE_HEAT_INDEX_NIGHT
192 }
193 }
194 fn quantity(&self, position: HeatIndexSpace) -> f64 {
195 // The body is the heat index formula from https://en.wikipedia.org/wiki/Heat_index.
196 const C_1: f64 = -42.379;
197 const C_2: f64 = 2.049_015_23;
198 const C_3: f64 = 10.143_331_27;
199 const C_4: f64 = -0.224_755_41;
200 const C_5: f64 = -0.006_837_83;
201 const C_6: f64 = -0.054_817_17;
202 const C_7: f64 = 0.001_228_74;
203 const C_8: f64 = 0.000_852_82;
204 const C_9: f64 = -0.000_001_99;
205
206 let t = position.temperature;
207 let r = position.humidity;
208
209 C_1 + C_2 * t
210 + C_3 * r
211 + C_4 * t * r
212 + C_5 * t * t
213 + C_6 * r * r
214 + C_7 * t * t * r
215 + C_8 * t * r * r
216 + C_9 * t * t * r * r
217 }
218 fn controller(self, deviation: f64, status: f64, position: HeatIndexSpace) -> Self {
219 let time = self.clock_finger.load(Ordering::SeqCst);
220 self.signals_s
221 .send(HeatIndexSignal::Show(time, status))
222 .unwrap();
223
224 if deviation > 0.0 {
225 if position.humidity > MINIMAL_HUMIDITY {
226 self.signals_s.send(HeatIndexSignal::Dehumidify).unwrap();
227 } else if position.temperature > MINIMAL_TEMPERATURE {
228 self.signals_s.send(HeatIndexSignal::Cool).unwrap();
229 }
230 }
231
232 self
233 }
234}
235
236#[allow(clippy::assertions_on_constants)]
237fn driver(hics: Hics) {
238 fn control<'a>(hics: Hics, mut counter: usize) -> StreamProcessor<'a, (), usize> {
239 control::MeasureOnDemand {
240 dwell_time: Duration::from_millis(DWELL_TIME * TICK_LENGTH),
241 }
242 .execute(hics.clone(), EPSILON);
243
244 counter += 1;
245
246 StreamProcessor::Put(counter, Box::new(move || control(hics, counter)))
247 }
248
249 assert!(UNSAFE_BARRIER > SERVICE_BARRIER);
250
251 // Here the runs of the hics are generated (lazily).
252 let mut runs = control(hics, 0).eval(InfiniteList::constant(()));
253
254 loop {
255 thread::sleep(Duration::from_millis(CONTROL_PERIOD * TICK_LENGTH));
256
257 // Here, an iteration of the hics is started.
258 runs = runs.tail();
259 let run_count = *runs.head();
260
261 if run_count > SERVICE_BARRIER {
262 if run_count > UNSAFE_BARRIER {
263 break;
264 }
265 println!(
266 "Warning: Service needed. ({} runs > {} runs)",
267 run_count, SERVICE_BARRIER
268 );
269 }
270 }
271}More examples
310fn driver<S>(events: S, tfeedback: &Sender<Feedback>)
311where
312 S: Stream<Event>,
313{
314 // This code starts the machine.
315 let mut capabilities = pelican_machine::on().eval(events);
316
317 loop {
318 // The following match provides the machine with capabilities. Most notably, it tells the
319 // feedback loop to trigger a `Timeout`-event after some time.
320 match *capabilities.head() {
321 Capability::SetVehicleLights(color) => println!("Vehicles: {}", color),
322 Capability::SetPedestrianLights(color) => println!("Pedestrians: {}", color),
323 Capability::EmitTimeoutAfter(length) => {
324 tfeedback.send(Feedback::TimeoutAfter(length)).unwrap();
325 }
326 Capability::UnexpectedTimeout(message) => {
327 eprintln!("log: unexpected timeout event ({})", message);
328 }
329 Capability::CallForHelp => {
330 println!("Call for help!");
331 break;
332 }
333 Capability::Break => break,
334 };
335 capabilities = capabilities.tail();
336 }
337}