rspl/lib.rs
1//! rspl is a stream-processor language based on the one discussed in [Generalising Monads to Arrows](https://www.sciencedirect.com/science/article/pii/S0167642399000234) using Rust as meta-language.
2//!
3//! ## Design
4//!
5//! Essentially, rspl is a way to encode functions from streams to streams such that control is syntactically explicit (like in ordinary continuation-passing style) refining the orthodox functional approach to stream processing with combinators like 'map'.
6//! More precisely, the idea of this stream-processor language is to split the processing of streams into two parts:
7//! One part for reading (getting) the first element of an input stream to direct the further processing.
8//! Another part for writing (putting) something to the output stream and offering to process some input stream if needed.
9//! Combining these parts in various ways allows to flexibly construct stream processors as programs.
10//! The following graphic illustrates how those two different kinds of stream processors ('getting' and 'putting') work (whereas a textual description is contained in the docs of [`StreamProcessor`]):
11//!
12//! <pre>
13//! h--t1--t2--t3--... ha--t1--t2--t3--...
14//! - -
15//! | |
16//! | Get(h |-> [SP](h)) | Put(hb, LAZY-[SP])
17//! | |
18//! v |
19//! t1--t2--t3--... | t1--t2--t3--...
20//! - | -
21//! | v |
22//! | [SP](h) = Get(_) hb--| LAZY-[SP]() = Get(_)
23//! | |
24//! v v
25//! ... ...
26//!
27//!
28//! h--t1--t2--t3--... ha--t1--t2--t3--...
29//! - -
30//! | |
31//! | Get(h |-> [SP](h)) | Put(hb, LAZY-[SP])
32//! | |
33//! v |
34//! h--t1--t2--t3--... | ha--t1--t2--t3--...
35//! - | -
36//! | v |
37//! | [SP](h) = Put(_, _) hb--| LAZY-[SP]() = Put(_, _)
38//! | |
39//! v v
40//! ... ...
41//! </pre>
42//!
43//! Remarkably, the language constructs are somewhat dual and loosely correspond to (dual) programming paradigms:
44//! - The `Get`-construct relates to event-driven programming as it reacts to input (events) eagerly.
45//! - The `Put`-construct relates to demand-driven[^1] programming as it generates output (demands) iteratively by need.
46//!
47//! This will be discussed further in the [Examples](#examples)-section.
48//!
49//! ## Usage
50//!
51//! To program a rspl-[`StreamProcessor`] you just have to compose the constructors [`StreamProcessor::Get`]/[`get`](`StreamProcessor::get`) and [`StreamProcessor::Put`]/[`put`](`StreamProcessor::put`) in the right way.
52//! For a somewhat more high-level programming experience you might wish to look at the [`combinators`]-module.
53//! The program can then be evaluated with the [`eval`](`StreamProcessor::eval`)-method on some kind of input stream.
54//! The 'kind' of input stream is either your own implementation of the [`Stream`]-interface or one
55//! from the submodules of the [`streams`]-module.
56//! Either way, as result, evaluation produces an [`InfiniteList`] (lazily).
57//! To observe streams - and i.p. infinite lists - you can destruct them with the [`head`](`Stream::head`)- and [`tail`](`Stream::tail`)-methods of the stream interface.
58//! Moreover, there are various functions helping with the destruction and construction of streams.
59//!
60//! # Examples
61//!
62//! As alluded to in the [Design](#design)-section, rspl supports orthodox 'combinator-driven' stream processing as it is known from list processing with combinators like [`compose`](`combinators::compose`), [`filter`](`combinators::filter`) and [`map`](`combinators::map`).
63//! For example, it is possible to first filter some 'bad' elements out of a stream in order to safely iterate some function over the resulting stream afterwards in a combinatorial way.
64//! Such a [usage](#usage) of rspl looks like:
65//!
66//! ```
67//! use rspl::combinators::{compose, filter, map};
68//! use rspl::streams::infinite_lists::InfiniteList;
69//! use rspl::streams::Stream;
70//! use rspl::StreamProcessor;
71//!
72//! let is_greater_zero = |n: &usize| *n > 0;
73//! let minus_one = |n: usize| n - 1;
74//!
75//! let zeroes = compose(filter(is_greater_zero), map(minus_one))
76//! .eval(InfiniteList::cons(0, || InfiniteList::constant(1)));
77//!
78//! assert_eq!(*zeroes.head(), 0);
79//! ```
80//!
81//! More interestingly, rspl can also serve as a framework for the nifty idea of
82//! - event-driven programming with state machines as suggested [here](https://barrgroup.com/Embedded-Systems/How-To/State-Machines-Event-Driven-Systems).
83//! Abstractly, that [usage](#usage) of rspl looks as follows:
84//!
85//! ```
86//! use rspl::streams::infinite_lists::InfiniteList;
87//! use rspl::streams::Stream;
88//! use rspl::StreamProcessor;
89//!
90//! #[derive(Copy, Clone)]
91//! enum Event {
92//! Event1,
93//! Event2,
94//! }
95//!
96//! fn action() -> bool {
97//! true
98//! }
99//!
100//! fn state_1<'a>() -> StreamProcessor<'a, Event, bool> {
101//! fn transition<'a>(event: Event) -> StreamProcessor<'a, Event, bool> {
102//! match event {
103//! Event::Event1 => StreamProcessor::put(action(), state_1),
104//! Event::Event2 => state_2(),
105//! }
106//! }
107//!
108//! StreamProcessor::get(transition)
109//! }
110//!
111//! fn state_2<'a>() -> StreamProcessor<'a, Event, bool> {
112//! fn transition<'a>(event: Event) -> StreamProcessor<'a, Event, bool> {
113//! match event {
114//! Event::Event1 => state_1(),
115//! Event::Event2 => StreamProcessor::put(false, state_2),
116//! }
117//! }
118//!
119//! StreamProcessor::get(transition)
120//! }
121//!
122//! let event_loop = state_2().eval(InfiniteList::constant(Event::Event1));
123//!
124//! assert!(event_loop.head());
125//! ```
126//!
127//! A slightly more concrete example using that pattern is available as [integration test](https://github.com/shtsoft/rspl/blob/master/tests/events.rs).
128//! And a full-blown concrete example of a pelican crossing can be found [here (as .md file)](https://github.com/shtsoft/rspl/blob/master/examples/pelican.md) and [here (as .rs file)](https://github.com/shtsoft/rspl/blob/master/examples/pelican.rs).
129//! Notably, it uses rspl to encode effectful hierarchical state machines with a capability-passing inspired effect-handling mechanism.
130//! - demand-driven programming with generators as suggested [here](https://www.cse.chalmers.se/~rjmh/Papers/whyfp.pdf).
131//! Abstractly, that [usage](#usage) of rspl looks as follows:
132//!
133//! ```
134//! use rspl::streams::infinite_lists::InfiniteList;
135//! use rspl::streams::Stream;
136//! use rspl::StreamProcessor;
137//!
138//! struct State {
139//! toggle: bool,
140//! }
141//!
142//! fn action(state: &mut State) {
143//! state.toggle = !state.toggle;
144//! }
145//!
146//! fn pre_action(state: State) -> State {
147//! state
148//! }
149//!
150//! fn post_action(state: State) -> State {
151//! state
152//! }
153//!
154//! fn generator_name<'a>(mut state: State) -> StreamProcessor<'a, (), bool> {
155//! state = pre_action(state);
156//! StreamProcessor::get(|_| {
157//! action(&mut state);
158//! StreamProcessor::put(state.toggle, || generator_name(post_action(state)))
159//! })
160//! }
161//!
162//! let generations = generator_name(State { toggle: false }).eval(InfiniteList::constant(()));
163//!
164//! assert!(generations.head());
165//! ```
166//!
167//! A slightly more concrete example using that pattern is available as [integration test](https://github.com/shtsoft/rspl/blob/master/tests/demands.rs).
168//! And a full-blown concrete example of a heat index control system can be found [here (as .md file)](https://github.com/shtsoft/rspl/blob/master/examples/hics.md) and [here (as .rs file)](https://github.com/shtsoft/rspl/blob/master/examples/hics.rs).
169//!
170//! [^1]: Look at [Codata in Action](https://www.microsoft.com/en-us/research/uploads/prod/2020/01/CoDataInAction.pdf) for some more explanation on that term.
171
172#![cfg_attr(not(feature = "std"), no_std)]
173extern crate alloc;
174
175pub mod combinators;
176
177pub mod streams;
178
179use streams::infinite_lists::InfiniteList;
180use streams::Stream;
181
182use alloc::boxed::Box;
183
184/// [`Lazy<T>`] types thunks of type `T`.
185type Lazy<'a, T> = dyn FnOnce() -> T + 'a;
186
187/// [`StreamProcessor<A, B>`] defines (the syntax of) a language describing the domain of stream processors, that is, terms which can be interpreted to turn streams of type `A` into streams of type `B`.
188pub enum StreamProcessor<'a, A: 'a, B> {
189 /// This stream processor first reads the `A` from the head of the input stream and subsequently applies its function argument to that element yielding a stream processor.
190 /// The resulting stream processor is then used to process the input stream further depending on its shape: if it is a
191 /// - [`Get`](`StreamProcessor::Get`), it is applied to the tail of the input stream.
192 /// - [`Put`](`StreamProcessor::Put`), it is applied to the whole input stream.
193 Get(Box<dyn FnOnce(A) -> StreamProcessor<'a, A, B> + 'a>),
194 /// This stream processor writes the `B` from its first argument to the output list.
195 /// Then, to construct the rest of the output list, it uses its second argument to process the input stream depending on its shape: if it is a
196 /// - [`Get`](`StreamProcessor::Get`), it is applied to the tail of the input stream.
197 /// - [`Put`](`StreamProcessor::Put`), it is applied to the whole input stream.
198 Put(B, Box<Lazy<'a, StreamProcessor<'a, A, B>>>),
199}
200
201impl<'a, A, B> StreamProcessor<'a, A, B> {
202 /// The same as [`StreamProcessor::Get`] but with boxing of `f` hidden to make the resulting code less verbose.
203 #[inline]
204 pub fn get<F>(f: F) -> Self
205 where
206 F: FnOnce(A) -> Self + 'a,
207 {
208 StreamProcessor::Get(Box::new(f))
209 }
210
211 /// The same as [`StreamProcessor::Put`] but with boxing of `lazy_sp` hidden to make the resulting code less verbose.
212 #[inline]
213 pub fn put<T>(b: B, lazy_sp: T) -> Self
214 where
215 B: 'a,
216 T: FnOnce() -> Self + 'a,
217 {
218 StreamProcessor::Put(b, Box::new(lazy_sp))
219 }
220}
221
222impl<'a, A, B> StreamProcessor<'a, A, B> {
223 /// Evaluate `self` on an input stream essentially implementing a semantic of [`StreamProcessor<A, B>`].
224 /// - `stream` is the input stream.
225 ///
226 /// Note that the function can block the current thread if the respective implementation of [`Stream::tail`] can.
227 ///
228 /// # Panics
229 ///
230 /// A panic may occur if
231 /// - the stream processor contains Rust-terms which can panic.
232 /// - the respective implementation of [`Stream::head`] or [`Stream::tail`] can panic.
233 ///
234 /// # Examples
235 ///
236 /// Negating a stream of `true`s to obtain a stream of `false`s:
237 ///
238 /// ```
239 /// use rspl::StreamProcessor;
240 ///
241 /// fn negate<'a>() -> StreamProcessor<'a, bool, bool> {
242 /// StreamProcessor::get(|b: bool| StreamProcessor::put(!b, negate))
243 /// }
244 ///
245 /// let trues = rspl::streams::infinite_lists::InfiniteList::constant(true);
246 ///
247 /// negate().eval(trues);
248 /// ```
249 pub fn eval<S: Stream<A> + 'a>(mut self, mut stream: S) -> InfiniteList<'a, B>
250 where
251 A: Clone,
252 {
253 // This implementation deviates from the original for two reasons:
254 // - Rust does not guarantee tail-recursion elimination and rspl wants to prevent
255 // stack-overflows as much as possible. Therefore the loop in lieu of recursion.
256 // - There are streams rspl programs can operate on where taking the tail can block, as
257 // opposed to the original implementation. So, the question arising here is when to take
258 // the tail of the input. The answer is, as late as possible, that is, only if the next
259 // step is 'getting'. Because then 'putting' is not hindered. And this is as it should be
260 // if taking rspl's idea of seperating input processing from output processing serious.
261 loop {
262 match self {
263 StreamProcessor::Get(f) => {
264 self = f(stream.head().clone());
265 while let StreamProcessor::Get(f) = self {
266 stream = stream.tail();
267 self = f(stream.head().clone());
268 }
269 continue;
270 }
271 StreamProcessor::Put(b, lazy_sp) => {
272 return InfiniteList::Cons(
273 b,
274 Box::new(|| {
275 let sp = lazy_sp();
276 if let StreamProcessor::Get(_) = sp {
277 stream = stream.tail();
278 }
279 Self::eval(sp, stream)
280 }),
281 )
282 }
283 }
284 }
285 }
286}
287
288#[cfg(feature = "std")]
289#[cfg(test)]
290mod tests {
291 use super::*;
292 use combinators::map;
293 use streams::overeager_receivers::OvereagerReceiver;
294
295 use crate::assert_head_eq;
296 use crate::assert_tail_starts_with;
297 use crate::enqueue;
298
299 const fn id<X>(x: X) -> X {
300 x
301 }
302
303 #[test]
304 fn test_get() {
305 assert!(matches!(
306 StreamProcessor::get(|_: ()| { map(id) }),
307 StreamProcessor::Get(_)
308 ));
309 }
310
311 #[test]
312 fn test_put() {
313 assert!(matches!(
314 StreamProcessor::put((), || map(id)),
315 StreamProcessor::Put(_, _)
316 ));
317 }
318
319 #[test]
320 fn test_eval() {
321 let sp = StreamProcessor::get(|n: usize| {
322 StreamProcessor::put(n, || {
323 StreamProcessor::get(|n1: usize| {
324 StreamProcessor::get(move |n2: usize| {
325 if n1 < n2 {
326 StreamProcessor::put(n2, move || StreamProcessor::put(n1, || map(id)))
327 } else {
328 StreamProcessor::put(n1, move || StreamProcessor::put(n2, || map(id)))
329 }
330 })
331 })
332 })
333 });
334
335 let (tx, stream) = OvereagerReceiver::channel(0, 0);
336 enqueue!(tx, [1, 2]);
337
338 let mut result = sp.eval(stream);
339 assert_head_eq!(result, 0);
340 assert_tail_starts_with!(result, [2, 1]);
341 }
342
343 #[test]
344 #[should_panic]
345 fn test_eval_panic() {
346 let sp = StreamProcessor::get(|b: bool| {
347 StreamProcessor::put(if b { panic!() } else { b }, || map(id))
348 });
349
350 let trues = InfiniteList::constant(true);
351
352 sp.eval(trues);
353 }
354}