dvcompute/simulation/stream/
mod.rs

1// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at https://mozilla.org/MPL/2.0/.
6
7use crate::simulation::event::*;
8use crate::simulation::process::*;
9
10/// Random streams.
11pub mod random;
12
13/// Additional operations.
14pub mod ops;
15
16/// Return a stream of values generated by processes supplied by the specified function.
17pub fn repeat_process<T, F, M>(f: F) -> Stream<T>
18    where F: Fn() -> M + 'static,
19          M: Process<Item = T> + 'static,
20          T: 'static
21{
22    let y = {
23        let comp = f();
24        comp.and_then(move |a| {
25                return_process((a, repeat_process(f)))
26            })
27            .into_boxed()
28    };
29    Stream::Cons(y)
30}
31
32/// An empty stream that never returns data.
33pub fn empty_stream<T>() -> Stream<T>
34    where T: 'static
35{
36    let y = never_process().into_boxed();
37    Stream::Cons(y)
38}
39
40/// Return a stream consisting of exactly one element and inifinite tail.
41pub fn singleton_stream<T>(val: T) -> Stream<T>
42    where T: 'static
43{
44    let y = return_process((val, empty_stream())).into_boxed();
45    Stream::Cons(y)
46}
47
48/// An infinite stream of data.
49pub enum Stream<T> {
50
51    /// The cons-cell.
52    Cons(ProcessBox<(T, Stream<T>)>)
53}
54
55impl<T> Stream<T> {
56
57    /// Run the stream computation.
58    pub fn run(self) -> ProcessBox<(T, Self)> {
59        let Stream::Cons(y) = self;
60        y
61    }
62
63    /// Map the stream according the specified function.
64    pub fn map<F, B>(self, f: F) -> Stream<B>
65        where F: Fn(T) -> B + 'static,
66              B: 'static,
67              T: 'static
68    {
69        let y = {
70            let Stream::Cons(comp) = self;
71            comp.and_then(move |(a, xs)| {
72                    let b = f(a);
73                    return_process((b, xs.map(f)))
74                })
75                .into_boxed()
76        };
77        Stream::Cons(y)
78    }
79
80    /// Compose the stream.
81    pub fn mapc<F, M, B>(self, f: F) -> Stream<B>
82        where F: Fn(T) -> M + 'static,
83              M: Event<Item = B> + 'static,
84              B: 'static,
85              T: 'static
86    {
87        let y = {
88            let Stream::Cons(comp) = self;
89            comp.and_then(move |(a, xs)| {
90                    f(a).into_process().and_then(move |b| {
91                        return_process((b, xs.mapc(f)))
92                    })
93                })
94                .into_boxed()
95        };
96        Stream::Cons(y)
97    }
98
99    /// Accumulator that outputs a value determined by the specified function.
100    pub fn accum<F, M, B, Acc>(self, f: F, acc: Acc) -> Stream<B>
101        where F: Fn(Acc, T) -> M + 'static,
102              M: Event<Item = (Acc, B)> + 'static,
103              B: 'static,
104              T: 'static,
105              Acc: 'static
106    {
107        let y = {
108            let Stream::Cons(comp) = self;
109            comp.and_then(move |(a, xs)| {
110                    f(acc, a).into_process().and_then(move |(acc, b)| {
111                        return_process((b, xs.accum(f, acc)))
112                    })
113                })
114                .into_boxed()
115        };
116        Stream::Cons(y)
117    }
118
119    /// Filter only those data values that satisfy to the specified predicate.
120    pub fn filter<F>(self, pred: F) -> Self
121        where F: Fn(&T) -> bool + 'static,
122              T: 'static
123    {
124        let y = {
125            let Stream::Cons(comp) = self;
126            comp.and_then(move |(a, xs)| {
127                    if pred(&a) {
128                        return_process((a, xs.filter(pred))).into_boxed()
129                    } else {
130                        let Stream::Cons(comp) = xs.filter(pred);
131                        comp
132                    }
133                })
134                .into_boxed()
135        };
136        Stream::Cons(y)
137    }
138
139    /// Filter only those data values that satisfy to the specified predicate.
140    pub fn filterc<F, M>(self, pred: F) -> Self
141        where F: Fn(&T) -> M + 'static,
142              M: Event<Item = bool> + 'static,
143              T: 'static
144    {
145        let y = {
146            let Stream::Cons(comp) = self;
147            comp.and_then(move |(a, xs)| {
148                    pred(&a).into_process().and_then(move |b| {
149                        if b {
150                            return_process((a, xs.filterc(pred))).into_boxed()
151                        } else {
152                            let Stream::Cons(comp) = xs.filterc(pred);
153                            comp
154                        }
155                    })
156                })
157                .into_boxed()
158        };
159        Stream::Cons(y)
160    }
161
162    /// Return the prefix of the stream of the specified length.
163    pub fn take(self, n: isize) -> Self
164        where T: 'static
165    {
166        if n <= 0 {
167            empty_stream()
168        } else {
169            let y = {
170                let Stream::Cons(comp) = self;
171                comp.and_then(move |(a, xs)| {
172                        return_process((a, xs.take(n - 1)))
173                    })
174                    .into_boxed()
175            };
176            Stream::Cons(y)
177        }
178    }
179
180    /// Return the longest prefix of the stream of elements that satisfy the predicate.
181    pub fn take_while<F>(self, pred: F) -> Self
182        where F: Fn(&T) -> bool + 'static,
183              T: 'static
184    {
185        let y = {
186            let Stream::Cons(comp) = self;
187            comp.and_then(move |(a, xs)| {
188                    if pred(&a) {
189                        return_process((a, xs.take_while(pred))).into_boxed()
190                    } else {
191                        never_process().into_boxed()
192                    }
193                })
194                .into_boxed()
195        };
196        Stream::Cons(y)
197    }
198
199    /// Return the longest prefix of the stream of elements that satisfy the computations.
200    pub fn take_while_c<F, M>(self, pred: F) -> Self
201        where F: Fn(&T) -> M + 'static,
202              M: Event<Item = bool> + 'static,
203              T: 'static
204    {
205        let y = {
206            let Stream::Cons(comp) = self;
207            comp.and_then(move |(a, xs)| {
208                    pred(&a).into_process().and_then(move |b| {
209                        if b {
210                            return_process((a, xs.take_while_c(pred))).into_boxed()
211                        } else {
212                            never_process().into_boxed()
213                        }
214                    })
215                })
216                .into_boxed()
217        };
218        Stream::Cons(y)
219    }
220
221    /// Return the suffix of the stream after the specified first elements.
222    pub fn drop(self, n: isize) -> Self
223        where T: 'static
224    {
225        if n <= 0 {
226            self
227        } else {
228            let y = {
229                let Stream::Cons(comp) = self;
230                comp.and_then(move |(_, xs)| {
231                        xs.drop(n - 1).run()
232                    })
233                    .into_boxed()
234            };
235            Stream::Cons(y)
236        }
237    }
238
239    /// Return the suffix of the stream of elements remaining after `take_while`.
240    pub fn drop_while<F>(self, pred: F) -> Self
241        where F: Fn(&T) -> bool + 'static,
242              T: 'static
243    {
244        let y = {
245            let Stream::Cons(comp) = self;
246            comp.and_then(move |(a, xs)| {
247                    if pred(&a) {
248                        xs.drop_while(pred).run()
249                    } else {
250                        return_process((a, xs)).into_boxed()
251                    }
252                })
253                .into_boxed()
254        };
255        Stream::Cons(y)
256    }
257
258    /// Return the suffix of the stream of elements remaining after `take_while_c`.
259    pub fn drop_while_c<F, M>(self, pred: F) -> Self
260        where F: Fn(&T) -> M + 'static,
261              M: Event<Item = bool> + 'static,
262              T: 'static
263    {
264        let y = {
265            let Stream::Cons(comp) = self;
266            comp.and_then(move |(a, xs)| {
267                    pred(&a).into_process().and_then(move |b| {
268                        if b {
269                            xs.drop_while_c(pred).run()
270                        } else {
271                            return_process((a, xs)).into_boxed()
272                        }
273                    })
274                })
275                .into_boxed()
276        };
277        Stream::Cons(y)
278    }
279}