dvcompute/simulation/stream/
mod.rs1use crate::simulation::event::*;
8use crate::simulation::process::*;
9
10pub mod random;
12
13pub mod ops;
15
16pub fn repeat_process<T, F, M>(f: F) -> Stream<T>
18 where F: Fn() -> M + 'static,
19 M: Process<Item = T> + 'static,
20 T: 'static
21{
22 let y = {
23 let comp = f();
24 comp.and_then(move |a| {
25 return_process((a, repeat_process(f)))
26 })
27 .into_boxed()
28 };
29 Stream::Cons(y)
30}
31
32pub fn empty_stream<T>() -> Stream<T>
34 where T: 'static
35{
36 let y = never_process().into_boxed();
37 Stream::Cons(y)
38}
39
40pub fn singleton_stream<T>(val: T) -> Stream<T>
42 where T: 'static
43{
44 let y = return_process((val, empty_stream())).into_boxed();
45 Stream::Cons(y)
46}
47
48pub enum Stream<T> {
50
51 Cons(ProcessBox<(T, Stream<T>)>)
53}
54
55impl<T> Stream<T> {
56
57 pub fn run(self) -> ProcessBox<(T, Self)> {
59 let Stream::Cons(y) = self;
60 y
61 }
62
63 pub fn map<F, B>(self, f: F) -> Stream<B>
65 where F: Fn(T) -> B + 'static,
66 B: 'static,
67 T: 'static
68 {
69 let y = {
70 let Stream::Cons(comp) = self;
71 comp.and_then(move |(a, xs)| {
72 let b = f(a);
73 return_process((b, xs.map(f)))
74 })
75 .into_boxed()
76 };
77 Stream::Cons(y)
78 }
79
80 pub fn mapc<F, M, B>(self, f: F) -> Stream<B>
82 where F: Fn(T) -> M + 'static,
83 M: Event<Item = B> + 'static,
84 B: 'static,
85 T: 'static
86 {
87 let y = {
88 let Stream::Cons(comp) = self;
89 comp.and_then(move |(a, xs)| {
90 f(a).into_process().and_then(move |b| {
91 return_process((b, xs.mapc(f)))
92 })
93 })
94 .into_boxed()
95 };
96 Stream::Cons(y)
97 }
98
99 pub fn accum<F, M, B, Acc>(self, f: F, acc: Acc) -> Stream<B>
101 where F: Fn(Acc, T) -> M + 'static,
102 M: Event<Item = (Acc, B)> + 'static,
103 B: 'static,
104 T: 'static,
105 Acc: 'static
106 {
107 let y = {
108 let Stream::Cons(comp) = self;
109 comp.and_then(move |(a, xs)| {
110 f(acc, a).into_process().and_then(move |(acc, b)| {
111 return_process((b, xs.accum(f, acc)))
112 })
113 })
114 .into_boxed()
115 };
116 Stream::Cons(y)
117 }
118
119 pub fn filter<F>(self, pred: F) -> Self
121 where F: Fn(&T) -> bool + 'static,
122 T: 'static
123 {
124 let y = {
125 let Stream::Cons(comp) = self;
126 comp.and_then(move |(a, xs)| {
127 if pred(&a) {
128 return_process((a, xs.filter(pred))).into_boxed()
129 } else {
130 let Stream::Cons(comp) = xs.filter(pred);
131 comp
132 }
133 })
134 .into_boxed()
135 };
136 Stream::Cons(y)
137 }
138
139 pub fn filterc<F, M>(self, pred: F) -> Self
141 where F: Fn(&T) -> M + 'static,
142 M: Event<Item = bool> + 'static,
143 T: 'static
144 {
145 let y = {
146 let Stream::Cons(comp) = self;
147 comp.and_then(move |(a, xs)| {
148 pred(&a).into_process().and_then(move |b| {
149 if b {
150 return_process((a, xs.filterc(pred))).into_boxed()
151 } else {
152 let Stream::Cons(comp) = xs.filterc(pred);
153 comp
154 }
155 })
156 })
157 .into_boxed()
158 };
159 Stream::Cons(y)
160 }
161
162 pub fn take(self, n: isize) -> Self
164 where T: 'static
165 {
166 if n <= 0 {
167 empty_stream()
168 } else {
169 let y = {
170 let Stream::Cons(comp) = self;
171 comp.and_then(move |(a, xs)| {
172 return_process((a, xs.take(n - 1)))
173 })
174 .into_boxed()
175 };
176 Stream::Cons(y)
177 }
178 }
179
180 pub fn take_while<F>(self, pred: F) -> Self
182 where F: Fn(&T) -> bool + 'static,
183 T: 'static
184 {
185 let y = {
186 let Stream::Cons(comp) = self;
187 comp.and_then(move |(a, xs)| {
188 if pred(&a) {
189 return_process((a, xs.take_while(pred))).into_boxed()
190 } else {
191 never_process().into_boxed()
192 }
193 })
194 .into_boxed()
195 };
196 Stream::Cons(y)
197 }
198
199 pub fn take_while_c<F, M>(self, pred: F) -> Self
201 where F: Fn(&T) -> M + 'static,
202 M: Event<Item = bool> + 'static,
203 T: 'static
204 {
205 let y = {
206 let Stream::Cons(comp) = self;
207 comp.and_then(move |(a, xs)| {
208 pred(&a).into_process().and_then(move |b| {
209 if b {
210 return_process((a, xs.take_while_c(pred))).into_boxed()
211 } else {
212 never_process().into_boxed()
213 }
214 })
215 })
216 .into_boxed()
217 };
218 Stream::Cons(y)
219 }
220
221 pub fn drop(self, n: isize) -> Self
223 where T: 'static
224 {
225 if n <= 0 {
226 self
227 } else {
228 let y = {
229 let Stream::Cons(comp) = self;
230 comp.and_then(move |(_, xs)| {
231 xs.drop(n - 1).run()
232 })
233 .into_boxed()
234 };
235 Stream::Cons(y)
236 }
237 }
238
239 pub fn drop_while<F>(self, pred: F) -> Self
241 where F: Fn(&T) -> bool + 'static,
242 T: 'static
243 {
244 let y = {
245 let Stream::Cons(comp) = self;
246 comp.and_then(move |(a, xs)| {
247 if pred(&a) {
248 xs.drop_while(pred).run()
249 } else {
250 return_process((a, xs)).into_boxed()
251 }
252 })
253 .into_boxed()
254 };
255 Stream::Cons(y)
256 }
257
258 pub fn drop_while_c<F, M>(self, pred: F) -> Self
260 where F: Fn(&T) -> M + 'static,
261 M: Event<Item = bool> + 'static,
262 T: 'static
263 {
264 let y = {
265 let Stream::Cons(comp) = self;
266 comp.and_then(move |(a, xs)| {
267 pred(&a).into_process().and_then(move |b| {
268 if b {
269 xs.drop_while_c(pred).run()
270 } else {
271 return_process((a, xs)).into_boxed()
272 }
273 })
274 })
275 .into_boxed()
276 };
277 Stream::Cons(y)
278 }
279}