rspl/
lib.rs

1//! rspl is a stream-processor language based on the one discussed in [Generalising Monads to Arrows](https://www.sciencedirect.com/science/article/pii/S0167642399000234) using Rust as meta-language.
2//!
3//! ## Design
4//!
5//! Essentially, rspl is a way to encode functions from streams to streams such that control is syntactically explicit (like in ordinary continuation-passing style) refining the orthodox functional approach to stream processing with combinators like 'map'.
6//! More precisely, the idea of this stream-processor language is to split the processing of streams into two parts:
7//! One part for reading (getting) the first element of an input stream to direct the further processing.
8//! Another part for writing (putting) something to the output stream and offering to process some input stream if needed.
9//! Combining these parts in various ways allows to flexibly construct stream processors as programs.
10//! The following graphic illustrates how those two different kinds of stream processors ('getting' and 'putting') work (whereas a textual description is contained in the docs of [`StreamProcessor`]):
11//!
12//! <pre>
13//! h--t1--t2--t3--...                   ha--t1--t2--t3--...
14//! -                                    -
15//! |                                    |
16//! | Get(h |-> [SP](h))                 | Put(hb, LAZY-[SP])
17//! |                                    |
18//! v                                    |
19//! t1--t2--t3--...                      |   t1--t2--t3--...
20//! -                                    |   -
21//! |                                    v   |
22//! | [SP](h) = Get(_)                   hb--| LAZY-[SP]() = Get(_)
23//! |                                        |
24//! v                                        v
25//! ...                                      ...
26//!
27//!
28//! h--t1--t2--t3--...                   ha--t1--t2--t3--...
29//! -                                    -
30//! |                                    |
31//! | Get(h |-> [SP](h))                 | Put(hb, LAZY-[SP])
32//! |                                    |
33//! v                                    |
34//! h--t1--t2--t3--...                   |   ha--t1--t2--t3--...
35//! -                                    |   -
36//! |                                    v   |
37//! | [SP](h) = Put(_, _)                hb--| LAZY-[SP]() = Put(_, _)
38//! |                                        |
39//! v                                        v
40//! ...                                      ...
41//! </pre>
42//!
43//! Remarkably, the language constructs are somewhat dual and loosely correspond to (dual) programming paradigms:
44//! - The `Get`-construct relates to event-driven programming as it reacts to input (events) eagerly.
45//! - The `Put`-construct relates to demand-driven[^1] programming as it generates output (demands) iteratively by need.
46//!
47//! This will be discussed further in the [Examples](#examples)-section.
48//!
49//! ## Usage
50//!
51//! To program a rspl-[`StreamProcessor`] you just have to compose the constructors [`StreamProcessor::Get`]/[`get`](`StreamProcessor::get`) and [`StreamProcessor::Put`]/[`put`](`StreamProcessor::put`) in the right way.
52//! For a somewhat more high-level programming experience you might wish to look at the [`combinators`]-module.
53//! The program can then be evaluated with the [`eval`](`StreamProcessor::eval`)-method on some kind of input stream.
54//! The 'kind' of input stream is either your own implementation of the [`Stream`]-interface or one
55//! from the submodules of the [`streams`]-module.
56//! Either way, as result, evaluation produces an [`InfiniteList`] (lazily).
57//! To observe streams - and i.p. infinite lists - you can destruct them with the [`head`](`Stream::head`)- and [`tail`](`Stream::tail`)-methods of the stream interface.
58//! Moreover, there are various functions helping with the destruction and construction of streams.
59//!
60//! # Examples
61//!
62//! As alluded to in the [Design](#design)-section, rspl supports orthodox 'combinator-driven' stream processing as it is known from list processing with combinators like [`compose`](`combinators::compose`), [`filter`](`combinators::filter`) and [`map`](`combinators::map`).
63//! For example, it is possible to first filter some 'bad' elements out of a stream in order to safely iterate some function over the resulting stream afterwards in a combinatorial way.
64//! Such a [usage](#usage) of rspl looks like:
65//!
66//! ```
67//! use rspl::combinators::{compose, filter, map};
68//! use rspl::streams::infinite_lists::InfiniteList;
69//! use rspl::streams::Stream;
70//! use rspl::StreamProcessor;
71//!
72//! let is_greater_zero = |n: &usize| *n > 0;
73//! let minus_one = |n: usize| n - 1;
74//!
75//! let zeroes = compose(filter(is_greater_zero), map(minus_one))
76//!     .eval(InfiniteList::cons(0, || InfiniteList::constant(1)));
77//!
78//! assert_eq!(*zeroes.head(), 0);
79//! ```
80//!
81//! More interestingly, rspl can also serve as a framework for the nifty idea of
82//! - event-driven programming with state machines as suggested [here](https://barrgroup.com/Embedded-Systems/How-To/State-Machines-Event-Driven-Systems).
83//!   Abstractly, that [usage](#usage) of rspl looks as follows:
84//!
85//!   ```
86//!   use rspl::streams::infinite_lists::InfiniteList;
87//!   use rspl::streams::Stream;
88//!   use rspl::StreamProcessor;
89//!
90//!   #[derive(Copy, Clone)]
91//!   enum Event {
92//!       Event1,
93//!       Event2,
94//!   }
95//!
96//!   fn action() -> bool {
97//!       true
98//!   }
99//!
100//!   fn state_1<'a>() -> StreamProcessor<'a, Event, bool> {
101//!       fn transition<'a>(event: Event) -> StreamProcessor<'a, Event, bool> {
102//!           match event {
103//!               Event::Event1 => StreamProcessor::put(action(), state_1),
104//!               Event::Event2 => state_2(),
105//!           }
106//!       }
107//!
108//!       StreamProcessor::get(transition)
109//!   }
110//!
111//!   fn state_2<'a>() -> StreamProcessor<'a, Event, bool> {
112//!       fn transition<'a>(event: Event) -> StreamProcessor<'a, Event, bool> {
113//!           match event {
114//!               Event::Event1 => state_1(),
115//!               Event::Event2 => StreamProcessor::put(false, state_2),
116//!           }
117//!       }
118//!
119//!       StreamProcessor::get(transition)
120//!   }
121//!
122//!   let event_loop = state_2().eval(InfiniteList::constant(Event::Event1));
123//!
124//!   assert!(event_loop.head());
125//!   ```
126//!
127//!   A slightly more concrete example using that pattern is available as [integration test](https://github.com/shtsoft/rspl/blob/master/tests/events.rs).
128//!   And a full-blown concrete example of a pelican crossing can be found [here (as .md file)](https://github.com/shtsoft/rspl/blob/master/examples/pelican.md) and [here (as .rs file)](https://github.com/shtsoft/rspl/blob/master/examples/pelican.rs).
129//!   Notably, it uses rspl to encode effectful hierarchical state machines with a capability-passing inspired effect-handling mechanism.
130//! - demand-driven programming with generators as suggested [here](https://www.cse.chalmers.se/~rjmh/Papers/whyfp.pdf).
131//!   Abstractly, that [usage](#usage) of rspl looks as follows:
132//!
133//!   ```
134//!   use rspl::streams::infinite_lists::InfiniteList;
135//!   use rspl::streams::Stream;
136//!   use rspl::StreamProcessor;
137//!
138//!   struct State {
139//!       toggle: bool,
140//!   }
141//!
142//!   fn action(state: &mut State) {
143//!       state.toggle = !state.toggle;
144//!   }
145//!
146//!   fn pre_action(state: State) -> State {
147//!       state
148//!   }
149//!
150//!   fn post_action(state: State) -> State {
151//!       state
152//!   }
153//!
154//!   fn generator_name<'a>(mut state: State) -> StreamProcessor<'a, (), bool> {
155//!       state = pre_action(state);
156//!       StreamProcessor::get(|_| {
157//!           action(&mut state);
158//!           StreamProcessor::put(state.toggle, || generator_name(post_action(state)))
159//!       })
160//!   }
161//!
162//!   let generations = generator_name(State { toggle: false }).eval(InfiniteList::constant(()));
163//!
164//!   assert!(generations.head());
165//!   ```
166//!
167//!   A slightly more concrete example using that pattern is available as [integration test](https://github.com/shtsoft/rspl/blob/master/tests/demands.rs).
168//!   And a full-blown concrete example of a heat index control system can be found [here (as .md file)](https://github.com/shtsoft/rspl/blob/master/examples/hics.md) and [here (as .rs file)](https://github.com/shtsoft/rspl/blob/master/examples/hics.rs).
169//!
170//! [^1]: Look at [Codata in Action](https://www.microsoft.com/en-us/research/uploads/prod/2020/01/CoDataInAction.pdf) for some more explanation on that term.
171
172#![cfg_attr(not(feature = "std"), no_std)]
173extern crate alloc;
174
175pub mod combinators;
176
177pub mod streams;
178
179use streams::infinite_lists::InfiniteList;
180use streams::Stream;
181
182use alloc::boxed::Box;
183
184/// [`Lazy<T>`] types thunks of type `T`.
185type Lazy<'a, T> = dyn FnOnce() -> T + 'a;
186
187/// [`StreamProcessor<A, B>`] defines (the syntax of) a language describing the domain of stream processors, that is, terms which can be interpreted to turn streams of type `A` into streams of type `B`.
188pub enum StreamProcessor<'a, A: 'a, B> {
189    /// This stream processor first reads the `A` from the head of the input stream and subsequently applies its function argument to that element yielding a stream processor.
190    /// The resulting stream processor is then used to process the input stream further depending on its shape: if it is a
191    /// - [`Get`](`StreamProcessor::Get`), it is applied to the tail of the input stream.
192    /// - [`Put`](`StreamProcessor::Put`), it is applied to the whole input stream.
193    Get(Box<dyn FnOnce(A) -> StreamProcessor<'a, A, B> + 'a>),
194    /// This stream processor writes the `B` from its first argument to the output list.
195    /// Then, to construct the rest of the output list, it uses its second argument to process the input stream depending on its shape: if it is a
196    /// - [`Get`](`StreamProcessor::Get`), it is applied to the tail of the input stream.
197    /// - [`Put`](`StreamProcessor::Put`), it is applied to the whole input stream.
198    Put(B, Box<Lazy<'a, StreamProcessor<'a, A, B>>>),
199}
200
201impl<'a, A, B> StreamProcessor<'a, A, B> {
202    /// The same as [`StreamProcessor::Get`] but with boxing of `f` hidden to make the resulting code less verbose.
203    #[inline]
204    pub fn get<F>(f: F) -> Self
205    where
206        F: FnOnce(A) -> Self + 'a,
207    {
208        StreamProcessor::Get(Box::new(f))
209    }
210
211    /// The same as [`StreamProcessor::Put`] but with boxing of `lazy_sp` hidden to make the resulting code less verbose.
212    #[inline]
213    pub fn put<T>(b: B, lazy_sp: T) -> Self
214    where
215        B: 'a,
216        T: FnOnce() -> Self + 'a,
217    {
218        StreamProcessor::Put(b, Box::new(lazy_sp))
219    }
220}
221
222impl<'a, A, B> StreamProcessor<'a, A, B> {
223    /// Evaluate `self` on an input stream essentially implementing a semantic of [`StreamProcessor<A, B>`].
224    /// - `stream` is the input stream.
225    ///
226    /// Note that the function can block the current thread if the respective implementation of [`Stream::tail`] can.
227    ///
228    /// # Panics
229    ///
230    /// A panic may occur if
231    /// - the stream processor contains Rust-terms which can panic.
232    /// - the respective implementation of [`Stream::head`] or [`Stream::tail`] can panic.
233    ///
234    /// # Examples
235    ///
236    /// Negating a stream of `true`s to obtain a stream of `false`s:
237    ///
238    /// ```
239    /// use rspl::StreamProcessor;
240    ///
241    /// fn negate<'a>() -> StreamProcessor<'a, bool, bool> {
242    ///     StreamProcessor::get(|b: bool| StreamProcessor::put(!b, negate))
243    /// }
244    ///
245    /// let trues = rspl::streams::infinite_lists::InfiniteList::constant(true);
246    ///
247    /// negate().eval(trues);
248    /// ```
249    pub fn eval<S: Stream<A> + 'a>(mut self, mut stream: S) -> InfiniteList<'a, B>
250    where
251        A: Clone,
252    {
253        // This implementation deviates from the original for two reasons:
254        // - Rust does not guarantee tail-recursion elimination and rspl wants to prevent
255        //   stack-overflows as much as possible. Therefore the loop in lieu of recursion.
256        // - There are streams rspl programs can operate on where taking the tail can block, as
257        //   opposed to the original implementation. So, the question arising here is when to take
258        //   the tail of the input. The answer is, as late as possible, that is, only if the next
259        //   step is 'getting'. Because then 'putting' is not hindered. And this is as it should be
260        //   if taking rspl's idea of seperating input processing from output processing serious.
261        loop {
262            match self {
263                StreamProcessor::Get(f) => {
264                    self = f(stream.head().clone());
265                    while let StreamProcessor::Get(f) = self {
266                        stream = stream.tail();
267                        self = f(stream.head().clone());
268                    }
269                    continue;
270                }
271                StreamProcessor::Put(b, lazy_sp) => {
272                    return InfiniteList::Cons(
273                        b,
274                        Box::new(|| {
275                            let sp = lazy_sp();
276                            if let StreamProcessor::Get(_) = sp {
277                                stream = stream.tail();
278                            }
279                            Self::eval(sp, stream)
280                        }),
281                    )
282                }
283            }
284        }
285    }
286}
287
288#[cfg(feature = "std")]
289#[cfg(test)]
290mod tests {
291    use super::*;
292    use combinators::map;
293    use streams::overeager_receivers::OvereagerReceiver;
294
295    use crate::assert_head_eq;
296    use crate::assert_tail_starts_with;
297    use crate::enqueue;
298
299    const fn id<X>(x: X) -> X {
300        x
301    }
302
303    #[test]
304    fn test_get() {
305        assert!(matches!(
306            StreamProcessor::get(|_: ()| { map(id) }),
307            StreamProcessor::Get(_)
308        ));
309    }
310
311    #[test]
312    fn test_put() {
313        assert!(matches!(
314            StreamProcessor::put((), || map(id)),
315            StreamProcessor::Put(_, _)
316        ));
317    }
318
319    #[test]
320    fn test_eval() {
321        let sp = StreamProcessor::get(|n: usize| {
322            StreamProcessor::put(n, || {
323                StreamProcessor::get(|n1: usize| {
324                    StreamProcessor::get(move |n2: usize| {
325                        if n1 < n2 {
326                            StreamProcessor::put(n2, move || StreamProcessor::put(n1, || map(id)))
327                        } else {
328                            StreamProcessor::put(n1, move || StreamProcessor::put(n2, || map(id)))
329                        }
330                    })
331                })
332            })
333        });
334
335        let (tx, stream) = OvereagerReceiver::channel(0, 0);
336        enqueue!(tx, [1, 2]);
337
338        let mut result = sp.eval(stream);
339        assert_head_eq!(result, 0);
340        assert_tail_starts_with!(result, [2, 1]);
341    }
342
343    #[test]
344    #[should_panic]
345    fn test_eval_panic() {
346        let sp = StreamProcessor::get(|b: bool| {
347            StreamProcessor::put(if b { panic!() } else { b }, || map(id))
348        });
349
350        let trues = InfiniteList::constant(true);
351
352        sp.eval(trues);
353    }
354}