rust_io/
lib.rs

1use std::thread;
2use std::time::Duration;
3
4use futures::{FutureExt};
5use futures::executor::block_on;
6use futures::future::{join_all, LocalBoxFuture};
7use rand::{Rng, thread_rng};
8
9use crate::RustIO::{Empty, Fut, Right, Value, Wrong};
10
11/// Macro implementation for [rust_io] defining several operators to be used emulating
12/// Haskel [do notation]
13/// Work based on original idea of crate [do-notation]
14#[macro_export]
15macro_rules! rust_io {
16  // return
17  (yield $r:expr ;) => {
18    $crate::Lift::lift($r)
19  };
20
21  // let variable bind
22  (let $p:pat = $e:expr ; $($r:tt)*) => {{
23    let $p = $e;
24    rust_io!($($r)*)
25  }};
26
27  // unused variable bind
28  (_ <- $x:expr ; $($r:tt)*) => {
29    $x.flat_map(move |_| { rust_io!($($r)*) })
30  };
31
32  // bind
33  ($bind:ident <- $x:expr ; $($r:tt)*) => {
34    $x.flat_map(move |$bind| { rust_io!($($r)*) })
35  };
36
37  // return type from do-notation
38  ($a:expr) => {
39    $a
40  }
41}
42
43///Specification to be implemented by a monad.
44/// [lift] a value into a default structure.
45/// Operators to create monad:
46/// [of][from_func][from_option_func][from_result_func][from_option][from_result][merge]
47/// Operators to transform monads
48/// [map][fold][map_error]
49/// Operators to compose monads
50/// [flat_map][zip]
51/// Operators to filter monads
52/// [filter]
53/// Operators to filter and transform monad in one transaction
54/// [when][when_rio]
55/// Operators to recover from side-effects
56/// [recover][recover_with][eventually]
57/// To slow the monad execution
58/// [delay]
59/// To unwrap the value from monad.
60/// [get][get_or_else]
61/// Check the state of the monad
62/// [is_ok][is_failed][is_empty]
63/// Async task executions
64/// [parallel][fork]
65pub trait Lift<A, T> {
66    fn lift(a: A) -> Self;
67
68    fn of(a: A) -> Self;
69
70    fn from_func(f: fn() -> A) -> Self;
71
72    fn from_option_func(f: fn() -> Option<A>) -> Self;
73
74    fn from_result_func(f: fn() -> Result<A, T>) -> Self;
75
76    fn from_option(a: Option<A>) -> Self;
77
78    fn from_result(a: Result<A, T>) -> Self;
79
80    fn merge<F: FnOnce(A, A) -> Self>(a: Self, b: Self, op: F) -> Self;
81
82    fn get(self) -> A;
83
84    fn failed(self) -> T;
85
86    fn get_or_else(self, default: A) -> A;
87
88    fn is_ok(&self) -> bool;
89
90    fn is_failed(&self) -> bool;
91
92    fn is_empty(&self) -> bool;
93
94    fn map<F: FnOnce(A) -> A>(self, op: F) -> Self;
95
96    fn map_error<F: FnOnce(T) -> T>(self, op: F) -> Self;
97
98    fn flat_map<F: FnOnce(A) -> Self>(self, op: F) -> Self;
99
100    fn at_some_point<F: FnOnce(A) -> Self>(self, op: F) -> Self where A: Clone, F: Clone;
101
102    fn at_some_point_while<P: FnOnce() -> bool, F: FnOnce(A) -> Self>(self, predicate: P, op: F) -> Self where A: Clone, P: Clone, F: Clone;
103
104    fn at_some_point_until<P: FnOnce() -> bool, F: FnOnce(A) -> Self>(self, predicate: P, op: F) -> Self where A: Clone, P: Clone, F: Clone;
105
106    fn when<P: FnOnce(&A) -> bool, F: FnOnce(A) -> A>(self, predicate: P, op: F) -> Self;
107
108    fn when_rio<P: FnOnce(&A) -> bool, F: FnOnce(A) -> Self>(self, predicate: P, op: F) -> Self;
109
110    fn zip<Z1: FnOnce() -> Self, Z2: FnOnce() -> Self, F: FnOnce(A, A) -> Self>(a: Z1, b: Z2, op: F) -> Self;
111
112    fn filter<F: FnOnce(&A) -> bool>(self, op: F) -> Self;
113
114    fn fold<F: FnOnce(A) -> A>(self, default: A, op: F) -> Self;
115
116    fn recover<F: FnOnce() -> A>(self, op: F) -> Self;
117
118    fn recover_with<F: FnOnce() -> Self>(self, op: F) -> Self;
119
120    fn delay(self, time: Duration) -> Self;
121
122    fn parallel<Task: FnOnce() -> Self, F: FnOnce(Vec<A>) -> Self>(tasks: Vec<Task>, op: F) -> Self;
123
124    /// Provide [A:'static] in the definition it can extend the lifetime of a specific type
125    fn fork<F: FnOnce(A) -> A>(self, op: F) -> Self where A: 'static, F: 'static;
126
127    /// Provide [A:'static] in the definition it can extend the lifetime of a specific type
128    fn join(self) -> Self;
129
130    fn daemon<F: FnOnce(&A) -> ()>(self, op: F) -> Self;
131
132    fn peek<F: FnOnce(&A) -> ()>(self, op: F) -> Self;
133
134    fn on_error<F: FnOnce(&T) -> ()>(self, op: F) -> Self;
135
136    fn on_success<F: FnOnce(&A) -> ()>(self, op: F) -> Self;
137}
138
139///Data structure to be used as the monad to be implemented as [Lift]
140/// RustIO monad can have the list of possible states.
141pub enum RustIO<A, T> {
142    Right(A),
143    Wrong(T),
144    Value(A),
145    Empty(),
146    Fut(LocalBoxFuture<'static, A>),
147}
148
149/// Implementation of the Monad Lift.
150impl<A, T> Lift<A, T> for RustIO<A, T> {
151    fn lift(a: A) -> Self {
152        RustIO::of(a)
153    }
154
155    /// Pure value to create RustIO monad without side-effects.
156    fn of(a: A) -> Self {
157        Value(a)
158    }
159
160    fn from_func(f: fn() -> A) -> Self {
161        Value(f())
162    }
163
164    fn from_option_func(f: fn() -> Option<A>) -> Self {
165        RustIO::from_option(f())
166    }
167
168    fn from_result_func(f: fn() -> Result<A, T>) -> Self {
169        RustIO::from_result(f())
170    }
171
172    fn from_option(a: Option<A>) -> Self {
173        match a {
174            None => Empty(),
175            Some(v) => Value(v)
176        }
177    }
178
179    fn from_result(a: Result<A, T>) -> Self {
180        match a {
181            Ok(v) => Right(v),
182            Err(t) => Wrong(t)
183        }
184    }
185
186    fn merge<F: FnOnce(A, A) -> Self>(a: Self, b: Self, op: F) -> Self {
187        return a.flat_map(|x| b.flat_map(|y| op(x, y)));
188    }
189
190    fn get(self) -> A {
191        match self {
192            Value(v) => v,
193            Right(t) => t,
194            _ => panic!("Error, value not available"),
195        }
196    }
197
198    fn failed(self) -> T {
199        match self {
200            Value(_) | Right(_) | Empty() | Fut(_) => panic!("Error, value not available"),
201            Wrong(e) => e,
202        }
203    }
204
205    fn get_or_else(self, default: A) -> A {
206        match self {
207            Empty() => default,
208            Wrong(_) => default,
209            _ => self.get()
210        }
211    }
212
213    fn is_ok(&self) -> bool {
214        match self {
215            Value(_) => true,
216            Right(_) => true,
217            _ => false,
218        }
219    }
220
221    fn is_failed(&self) -> bool {
222        match self {
223            Value(_) => false,
224            Right(_) => false,
225            _ => true,
226        }
227    }
228
229    fn is_empty(&self) -> bool {
230        match self {
231            Value(_) => false,
232            Right(_) => false,
233            _ => true,
234        }
235    }
236
237    fn map<F: FnOnce(A) -> A>(self, op: F) -> Self {
238        match self {
239            Value(v) => Value(op(v)),
240            Right(v) => Right(op(v)),
241            _ => self,
242        }
243    }
244
245    fn map_error<F: FnOnce(T) -> T>(self, op: F) -> Self {
246        match self {
247            Wrong(e) => Wrong(op(e)),
248            _ => self
249        }
250    }
251
252    fn flat_map<F: FnOnce(A) -> Self>(self, op: F) -> Self {
253        match self {
254            Value(a) | Right(a) => op(a),
255            Empty() => Empty(),
256            Wrong(e) => Wrong(e),
257            _ => self
258        }
259    }
260
261    ///Returns an effect that ignores errors and runs repeatedly until it [at_some_point] succeeds
262    /// We mark A type as Clone since we need a clone of the value for each iteration in the loop.
263    /// In case you need a backoff between iterations, or a escape clause, you can use
264    /// [until] or [while] [at_some_point] operator conditions.
265    fn at_some_point<F: FnOnce(A) -> Self>(self, op: F) -> Self where A: Clone, F: Clone {
266        match self {
267            Value(a) | Right(a) => {
268                loop {
269                    let op_copy = op.clone();
270                    let a_copy = a.clone();
271                    let result = op_copy(a_copy);
272                    if result.is_ok() {
273                        break result;
274                    }
275                }
276            }
277            _ => self
278        }
279    }
280
281    /// Retry pattern of a task while a predicate condition is [false]
282    fn at_some_point_while<P: FnOnce() -> bool, F: FnOnce(A) -> Self>(self, predicate: P, op: F) -> Self where A: Clone, P: Clone, F: Clone {
283        self.at_some_point_cond(false, predicate, op)
284    }
285
286    /// Retry pattern of a task while a predicate condition is [true]
287    fn at_some_point_until<P: FnOnce() -> bool, F: FnOnce(A) -> Self>(self, predicate: P, op: F) -> Self where A: Clone, P: Clone, F: Clone {
288        self.at_some_point_cond(true, predicate, op)
289    }
290
291    fn when<P: FnOnce(&A) -> bool, F: FnOnce(A) -> A>(self, predicate: P, op: F) -> Self {
292        return match self {
293            Value(t) => {
294                let x = t;
295                return if predicate(&x) { Value(op(x)) } else { Empty() };
296            }
297            Empty() => Empty(),
298            Right(a) => {
299                let x = a;
300                return if predicate(&x) { Right(op(x)) } else { Empty() };
301            }
302            Wrong(e) => Wrong(e),
303            _ => self
304        };
305    }
306
307    fn when_rio<P: FnOnce(&A) -> bool, F: FnOnce(A) -> Self>(self, predicate: P, op: F) -> Self {
308        return match self {
309            Value(t) => {
310                let x = t;
311                return if predicate(&x) { op(x) } else { Empty() };
312            }
313            Empty() => Empty(),
314            Right(a) => {
315                let x = a;
316                return if predicate(&x) { op(x) } else { Empty() };
317            }
318            Wrong(e) => Wrong(e),
319            _ => self
320        };
321    }
322
323    fn zip<Z1: FnOnce() -> Self, Z2: FnOnce() -> Self, F: FnOnce(A, A) -> Self>(a: Z1, b: Z2, op: F) -> Self {
324        let empty = Empty();
325        let (zip_1, zip_2) = block_on(empty.run_future_zip_tasks(a, b));
326        if (zip_1.is_ok() || !zip_1.is_empty()) && (zip_2.is_ok() || !zip_2.is_empty()) {
327            return op(zip_1.get(), zip_2.get());
328        }
329        return empty;
330    }
331
332    fn filter<F: FnOnce(&A) -> bool>(self, op: F) -> Self {
333        return match self {
334            Value(t) => {
335                let x = t;
336                return if op(&x) { Value(x) } else { Empty() };
337            }
338            Empty() => Empty(),
339            Right(a) => {
340                let x = a;
341                return if op(&x) { Right(x) } else { Empty() };
342            }
343            Wrong(e) => Wrong(e),
344            _ => self
345        };
346    }
347
348    fn fold<F: FnOnce(A) -> A>(self, default: A, op: F) -> Self {
349        match self {
350            Value(v) => Value(op(v)),
351            Right(v) => Right(op(v)),
352            Empty() => Value(default),
353            _ => self
354        }
355    }
356
357    fn recover<F: FnOnce() -> A>(self, op: F) -> Self {
358        match self {
359            Wrong(_) => Right(op()),
360            Empty() => Value(op()),
361            _ => self
362        }
363    }
364
365    fn recover_with<F: FnOnce() -> Self>(self, op: F) -> Self {
366        match self {
367            Wrong(_) | Empty() => op(),
368            _ => self
369        }
370    }
371
372    fn delay(self, time: Duration) -> Self {
373        match self {
374            Value(_) | Right(_) => {
375                thread::sleep(time);
376                self
377            }
378            _ => self
379        }
380    }
381
382    /// Operator to run every task in the Vector asynchronously using async.
383    /// After we create the list of Futures, we use [join_all] to run all futures in parallel.
384    /// Once all of them are finished, we invoke the passed function with [Vector<A>] as input param
385    fn parallel<Task: FnOnce() -> Self, F: FnOnce(Vec<A>) -> Self>(tasks: Vec<Task>, op: F) -> Self {
386        let empty = Empty();
387        let tasks_done = block_on(empty.run_future_tasks(tasks));
388        let find_error_tasks = &tasks_done;
389        return match find_error_tasks.into_iter().find(|rio| rio.is_empty() || !rio.is_ok()) {
390            Some(_) => {
391                println!("Some of the task failed. Returning Empty value");
392                empty
393            }
394            None => {
395                let rios = tasks_done.into_iter()
396                    .fold(vec!(), |rios, task_done| {
397                        return rios.into_iter().chain(vec![task_done.get()]).collect::<Vec<_>>();
398                    });
399                op(rios)
400            }
401        };
402    }
403
404    /// It run the execution of the task in another green thread
405    /// We use type [Fut] to wrap the [LocalBoxFuture<A>] which it contains the output of the function execution.
406    fn fork<F: FnOnce(A) -> A>(self, op: F) -> Self where A: 'static, F: 'static {
407        match self {
408            Value(v) | Right(v) => {
409                Fut(async { op(v) }.boxed_local())
410            }
411            _ => self,
412        }
413    }
414
415    ///Join the [LocalBoxFuture<A>].
416    fn join(self) -> Self {
417        block_on(self.unbox_fork())
418    }
419
420    /// async consumer function that does not affect the current value of the monad.
421    fn daemon<F: FnOnce(&A) -> ()>(self, op: F) -> Self {
422        return block_on(self.run_daemon(op));
423    }
424
425    fn peek<F: FnOnce(&A) -> ()>(self, op: F) -> Self {
426        return match self {
427            Value(v) => {
428                let x = v;
429                op(&x);
430                Value(x)
431            }
432            Right(v) => {
433                let x = v;
434                op(&x);
435                Right(x)
436            }
437            _ => self
438        };
439    }
440
441    fn on_error<F: FnOnce(&T) -> ()>(self, op: F) -> Self {
442        return match self {
443            Wrong(v) => {
444                let x = v;
445                op(&x);
446                Wrong(x)
447            }
448            _ => self
449        };
450    }
451
452    fn on_success<F: FnOnce(&A) -> ()>(self, op: F) -> Self {
453        return match self {
454            Right(v) => {
455                let x = v;
456                op(&x);
457                Right(x)
458            }
459            _ => self
460        };
461    }
462}
463
464impl<A, T> RustIO<A, T> {
465    async fn run_future_zip_tasks<Z1: FnOnce() -> Self, Z2: FnOnce() -> Self>(&self, a: Z1, b: Z2) -> (RustIO<A, T>, RustIO<A, T>) {
466        let future_zip1 = async {
467            a()
468        };
469        let future_zip2 = async {
470            b()
471        };
472        return futures::join!(future_zip1,future_zip2);
473    }
474
475    async fn run_future_tasks<Task: FnOnce() -> Self>(&self, tasks: Vec<Task>) -> Vec<RustIO<A, T>> {
476        let future_tasks = tasks.into_iter()
477            .fold(vec!(), |futures, task: Task| {
478                let future_task = vec![async { return task(); }];
479                return futures.into_iter().chain(future_task).collect::<Vec<_>>();
480            });
481        return join_all(future_tasks).await;
482    }
483
484    async fn unbox_fork(self) -> RustIO<A, T> {
485        match self {
486            Fut(fut_box) => {
487                println!("Extracting future");
488                Value(fut_box.await)
489            }
490            _ => Empty(),
491        }
492    }
493
494    async fn run_daemon<F: FnOnce(&A) -> ()>(self, op: F) -> RustIO<A, T> {
495        return match self {
496            Value(v) => {
497                let x = v;
498                async { op(&x) }.await;
499                Value(x)
500            }
501            Right(v) => {
502                let x = v;
503                async { op(&x) }.await;
504                Right(x)
505            }
506            _ => self
507        };
508    }
509
510    /// Generic function to cover [at_some_point] [while] and [until]
511    fn at_some_point_cond<P: FnOnce() -> bool, F: FnOnce(A) -> Self>(self, cond: bool, predicate: P, op: F) -> Self where A: Clone, P: Clone, F: Clone {
512        match self {
513            Value(a) | Right(a) => {
514                loop {
515                    let op_copy = op.clone();
516                    let predicate_copy = predicate.clone();
517                    let a_copy = a.clone();
518                    let result = op_copy(a_copy);
519                    if result.is_ok() || predicate_copy() == cond {
520                        break result;
521                    }
522                }
523            }
524            _ => self
525        }
526    }
527}
528
529#[cfg(test)]
530mod tests {
531    use super::*;
532
533    #[test]
534    fn rio() {
535        let rio_program: RustIO<String, String> = rust_io! {
536             _ <- RustIO::of(String::from("1981"));
537             v <- RustIO::from_option(Some(String::from("hello")));
538             t <- RustIO::from_option_func(|| Some(String::from(" pure")));
539             z <- RustIO::from_func(|| String::from(" functional"));
540             x <- RustIO::from_result(Ok(String::from(" world")));
541             i <- RustIO::of(String::from("!!"));
542             y <- RustIO::from_result_func(|| Ok(String::from("!!")));
543
544             yield v + &t + &z + &x + &i + &y;
545        };
546        println!("${:?}", rio_program.is_empty());
547        println!("${:?}", rio_program.is_ok());
548        assert_eq!(rio_program.get(), "hello pure functional world!!!!");
549    }
550
551    #[test]
552    fn rio_map() {
553        let rio_program: RustIO<String, String> = rust_io! {
554             v <- RustIO::from_option(Some(String::from("hello")))
555                        .map(|v| v.to_uppercase());
556             x <- RustIO::from_result(Ok(String::from(" world")))
557                        .map(|v| v.to_uppercase());
558             i <- RustIO::of(String::from("!!"));
559             yield v + &x + &i;
560        };
561        println!("${:?}", rio_program.is_empty());
562        println!("${:?}", rio_program.is_ok());
563        assert_eq!(rio_program.get(), "HELLO WORLD!!");
564    }
565
566    #[test]
567    fn rio_flat_map() {
568        let rio_program: RustIO<String, String> = rust_io! {
569             v <- RustIO::from_option(Some(String::from("hello")))
570                        .flat_map(|v| RustIO::of( v + &String::from(" world")))
571                        .map(|v| v.to_uppercase());
572             i <- RustIO::of(String::from("!!"));
573             yield v + &i;
574        };
575        println!("${:?}", rio_program.is_empty());
576        println!("${:?}", rio_program.is_ok());
577        assert_eq!(rio_program.get(), "HELLO WORLD!!");
578    }
579
580    #[test]
581    fn rio_filter() {
582        let rio_program: RustIO<String, String> = rust_io! {
583             v <- RustIO::from_option(Some(String::from("hello")))
584                        .flat_map(|v| RustIO::of( v + &String::from(" world")))
585                        .filter(|v| v.len() > 5);
586             i <- RustIO::of(String::from("!!"));
587             yield v + &i;
588        };
589        println!("${:?}", rio_program.is_empty());
590        println!("${:?}", rio_program.is_ok());
591        assert_eq!(rio_program.get(), "hello world!!");
592    }
593
594    #[test]
595    fn rio_compose_two_programs() {
596        let rio_program_1: RustIO<String, String> = rust_io! {
597             v <- RustIO::from_option(Some(String::from("hello")));
598             yield v + &" ".to_string();
599        };
600        let rio_program_2: RustIO<String, String> = rust_io! {
601             v <- RustIO::from_option(Some(String::from("world")));
602             yield v + &"!!".to_string();
603        };
604        let rio_program: RustIO<String, String> = rust_io! {
605             v <- rio_program_1;
606             i <- rio_program_2;
607             RustIO::of(v + &i).map(|v| v.to_uppercase())
608        };
609        println!("${:?}", rio_program.is_empty());
610        println!("${:?}", rio_program.is_ok());
611        assert_eq!(rio_program.get(), "HELLO WORLD!!");
612    }
613
614    #[test]
615    fn rio_fold() {
616        let rio_program: RustIO<String, String> = rust_io! {
617             v <- RustIO::from_option(None)
618                        .fold("hello world!!".to_string(), |v| v.to_uppercase());
619             yield v;
620        };
621        println!("${:?}", rio_program.is_empty());
622        println!("${:?}", rio_program.is_ok());
623        assert_eq!(rio_program.get(), "hello world!!");
624    }
625
626    #[test]
627    fn rio_empty_recover() {
628        let rio_program: RustIO<String, String> = rust_io! {
629             v <- RustIO::from_option(None)
630                        .recover(|| "hello world!!".to_string());
631             yield v;
632        };
633        println!("${:?}", rio_program.is_empty());
634        println!("${:?}", rio_program.is_ok());
635        assert_eq!(rio_program.get(), "hello world!!");
636    }
637
638    #[test]
639    fn rio_error_recover() {
640        let rio_program: RustIO<String, String> = rust_io! {
641             v <- RustIO::from_result(Err("".to_string()))
642                        .recover(|| "hello world!!".to_string());
643            yield v;
644        };
645        println!("${:?}", rio_program.is_empty());
646        println!("${:?}", rio_program.is_ok());
647        assert_eq!(rio_program.get(), "hello world!!");
648    }
649
650    #[test]
651    fn rio_option_recover_with() {
652        let rio_program: RustIO<String, String> = rust_io! {
653             v <- RustIO::from_option(None)
654                        .recover_with(|| RustIO::from_option(Some("hello world!!".to_string())));
655             yield v;
656        };
657        println!("${:?}", rio_program.is_empty());
658        println!("${:?}", rio_program.is_ok());
659        assert_eq!(rio_program.get(), "hello world!!");
660    }
661
662    #[test]
663    fn rio_error_recover_with() {
664        let rio_program: RustIO<String, String> = rust_io! {
665             v <- RustIO::from_result(Err("".to_string()))
666                        .recover_with(|| RustIO::from_result(Ok("hello world!!".to_string())));
667             yield v;
668        };
669        println!("${:?}", rio_program.is_empty());
670        println!("${:?}", rio_program.is_ok());
671        assert_eq!(rio_program.get(), "hello world!!");
672    }
673
674    #[test]
675    fn rio_error() {
676        let rio_program: RustIO<String, i32> = rust_io! {
677             i <- RustIO::from_option(Some(String::from("hello")));
678             _ <- RustIO::from_result(Err(503));
679             v <- RustIO::from_option(Some(String::from("world")));
680             yield (i + &v);
681        };
682        println!("${:?}", rio_program.is_empty());
683        println!("${:?}", rio_program.is_ok());
684        assert_eq!(false, rio_program.is_ok());
685    }
686
687    #[test]
688    fn rio_filter_empty() {
689        let rio_program: RustIO<String, String> = rust_io! {
690             v <- RustIO::from_option(Some(String::from("hello")))
691                        .filter(|v| v.len() > 10);
692             i <- RustIO::of(String::from("!!"));
693             yield (v + &i);
694        };
695        println!("${:?}", rio_program.is_empty());
696        println!("${:?}", rio_program.is_ok());
697        assert_eq!(true, rio_program.is_empty());
698    }
699
700    #[test]
701    fn rio_delay() {
702        let rio_program: RustIO<String, String> = rust_io! {
703             v <- RustIO::from_option(Some("hello world!!".to_string()))
704                        .delay(Duration::from_secs(2));
705            yield v;
706        };
707        println!("${:?}", rio_program.is_empty());
708        println!("${:?}", rio_program.is_ok());
709        assert_eq!(rio_program.get(), "hello world!!");
710    }
711
712    #[test]
713    fn rio_get_or_else() {
714        let rio_program: RustIO<String, String> = rust_io! {
715             v <- RustIO::from_result(Err("".to_string()));
716             yield v;
717        };
718        println!("${:?}", rio_program.is_empty());
719        println!("${:?}", rio_program.is_ok());
720        assert_eq!(rio_program.get_or_else("hello world!!".to_string()), "hello world!!");
721    }
722
723    #[test]
724    fn rio_merge() {
725        let rio_program: RustIO<String, String> = rust_io! {
726             v <- RustIO::merge(
727                RustIO::from_option(Some("hello".to_string())), RustIO::from_option(Some(" world!!".to_string())),
728                |a,b| RustIO::from_option(Some(a + &b)));
729             yield v;
730        };
731        println!("${:?}", rio_program.is_empty());
732        println!("${:?}", rio_program.is_ok());
733        assert_eq!(rio_program.get(), "hello world!!");
734    }
735
736    #[test]
737    fn rio_merge_error() {
738        let rio_program: RustIO<String, String> = rust_io! {
739             v <- RustIO::merge(
740                RustIO::from_option(Some("hello".to_string())), RustIO::from_option(None),
741                |a,b| RustIO::from_option(Some(a + &b)));
742             yield v;
743        };
744        println!("${:?}", rio_program.is_empty());
745        println!("${:?}", rio_program.is_ok());
746        assert_eq!(rio_program.is_ok(), false);
747    }
748
749    #[test]
750    fn rio_zip() {
751        let rio_program: RustIO<String, String> = rust_io! {
752             v <- RustIO::zip(
753                || RustIO::from_option(Some("hello".to_string())), || RustIO::from_option(Some(" world!!".to_string())),
754                |a,b| RustIO::from_option(Some(a + &b)));
755            yield v;
756        };
757        println!("${:?}", rio_program.is_empty());
758        println!("${:?}", rio_program.is_ok());
759        assert_eq!(rio_program.get(), "hello world!!");
760    }
761
762    #[test]
763    fn rio_parallel() {
764        let mut parallel_tasks: Vec<fn() -> RustIO<String, String>> = vec!();
765        parallel_tasks.push(|| RustIO::from_option(Some("hello".to_string())));
766        parallel_tasks.push(|| RustIO::from_result(Ok(" world".to_string())));
767        parallel_tasks.push(|| RustIO::of("!!".to_string()));
768
769        let rio_program: RustIO<String, String> = rust_io! {
770             v <- RustIO::parallel(parallel_tasks,|tasks| RustIO::of(tasks.into_iter().collect()));
771             yield v;
772        };
773        println!("${:?}", rio_program.is_empty());
774        println!("${:?}", rio_program.is_ok());
775        assert_eq!(rio_program.get(), "hello world!!");
776    }
777
778    #[test]
779    fn rio_map_error() {
780        let rio_program: RustIO<String, String> = rust_io! {
781             v <- RustIO::from_result(Err(String::from("Error A")))
782                .map_error(|t| String::from("Error B"));
783            yield v;
784        };
785        println!("${:?}", rio_program.is_empty());
786        println!("${:?}", rio_program.is_ok());
787        assert_eq!(rio_program.failed(), "Error B");
788    }
789
790    #[test]
791    fn rio_when() {
792        let rio_program: RustIO<String, String> = rust_io! {
793             v <- RustIO::from_option(Some(String::from("hello")))
794                        .when(|v| v.len() > 3, |v| v + &" world!!".to_string());
795             yield v;
796        };
797        println!("${:?}", rio_program.is_empty());
798        println!("${:?}", rio_program.is_ok());
799        assert_eq!(rio_program.get(), "hello world!!");
800    }
801
802    #[test]
803    fn rio_when_rio() {
804        let rio_program: RustIO<String, String> = rust_io! {
805             v <- RustIO::from_option(Some(String::from("hello")))
806                        .when_rio(|v| v.len() > 3, |v| RustIO::from_option(Some(v + &" world!!".to_string())));
807             yield v;
808        };
809        println!("${:?}", rio_program.is_empty());
810        println!("${:?}", rio_program.is_ok());
811        assert_eq!(rio_program.get(), "hello world!!");
812    }
813
814    #[test]
815    fn rio_peek() {
816        let rio_program: RustIO<String, String> = rust_io! {
817             v <- RustIO::from_option(Some(String::from("hello world!!")))
818                .peek(|v| println!("${}",v));
819             yield v;
820        };
821        println!("${:?}", rio_program.is_empty());
822        println!("${:?}", rio_program.is_ok());
823        assert_eq!(rio_program.get(), "hello world!!");
824    }
825
826    #[test]
827    fn rio_fork() {
828        let rio_program: RustIO<String, String> = rust_io! {
829             v <- RustIO::from_option(Some(String::from("hello")))
830                        .fork(|v| {
831                            println!("Fork. Variable:{} in Thread:{:?}", v, thread::current().id());
832                            return v.to_uppercase();
833                        })
834                        .join();
835             x <- RustIO::from_option(Some(String::from(" world!!")))
836                    .peek(|v| println!("Join. Variable:{} in Thread:{:?}", v, thread::current().id()));
837             yield v + &x;
838        };
839        println!("${:?}", rio_program.is_empty());
840        println!("${:?}", rio_program.is_ok());
841        assert_eq!(rio_program.get(), "HELLO world!!");
842    }
843
844    #[test]
845    fn rio_daemon() {
846        let rio_program: RustIO<String, String> = rust_io! {
847             v <- RustIO::from_option(Some(String::from("hello world!!")))
848                .daemon(|v| println!("${}",v));
849             yield v;
850        };
851        println!("${:?}", rio_program.is_empty());
852        println!("${:?}", rio_program.is_ok());
853        assert_eq!(rio_program.get(), "hello world!!");
854    }
855
856    #[test]
857    fn rio_on_success() {
858        let rio_program: RustIO<String, String> = rust_io! {
859             v <- RustIO::from_result(Ok(String::from("hello world!!")))
860                .on_success(|v| println!("Success program: ${}",v));
861             yield v;
862        };
863        println!("${:?}", rio_program.is_empty());
864        println!("${:?}", rio_program.is_ok());
865        assert_eq!(rio_program.get(), "hello world!!");
866    }
867
868    #[test]
869    fn rio_on_error() {
870        let rio_program: RustIO<String, String> = rust_io! {
871             v <- RustIO::from_result(Err(String::from("burning world!!")))
872                .on_error(|v| println!("Error program: ${}",v));
873             yield v;
874        };
875        println!("${:?}", rio_program.is_empty());
876        println!("${:?}", rio_program.is_ok());
877        assert_eq!(rio_program.is_failed(), true);
878    }
879
880    #[test]
881    fn rio_eventually() {
882        let rio_program: RustIO<String, String> = rust_io! {
883             v <- RustIO::from_result(Ok("hello".to_string()))
884                .at_some_point(|v| get_eventual_result( v));
885             yield v;
886        };
887        println!("${:?}", rio_program.is_empty());
888        println!("${:?}", rio_program.is_ok());
889        assert_eq!(rio_program.is_failed(), false);
890    }
891
892    #[test]
893    fn rio_eventually_while() {
894        let rio_program: RustIO<String, String> = rust_io! {
895             v <- RustIO::from_result(Ok("hello".to_string()))
896                .at_some_point_while(|| true,|v| get_eventual_result( v));
897             yield v;
898        };
899        println!("${:?}", rio_program.is_empty());
900        println!("${:?}", rio_program.is_ok());
901        assert_eq!(rio_program.is_failed(), false);
902    }
903
904    #[test]
905    fn rio_eventually_until() {
906        let rio_program: RustIO<String, String> = rust_io! {
907             v <- RustIO::from_result(Ok("hello".to_string()))
908                .at_some_point_until(|| {
909                    std::thread::sleep(Duration::from_millis(100));
910                    false
911                },|v| get_eventual_result( v));
912             yield v;
913        };
914        println!("${:?}", rio_program.is_empty());
915        println!("${:?}", rio_program.is_ok());
916        assert_eq!(rio_program.is_failed(), false);
917    }
918
919    #[test]
920    fn features() {
921        let rio_program: RustIO<String, String> =
922            RustIO::from_option(Some(String::from("hello")))
923                .when(|v| v.len() > 3, |v| v + &" world!!".to_string())
924                .at_some_point(|v| get_eventual_result(v))
925                .map(|v| v.to_uppercase())
926                .flat_map(|v| RustIO::of(v + &"!!!".to_string()))
927                .filter(|v| v.len() > 10)
928                .delay(Duration::from_secs(1))
929                .on_error(|v| println!("Error program: ${}", v))
930                .map_error(|t| String::from("Error B"))
931                .on_success(|v| println!("Success program: ${}", v))
932                .peek(|v| println!("${}", v));
933
934        println!("${:?}", rio_program.is_empty());
935        println!("${:?}", rio_program.is_ok());
936        assert_eq!(false, rio_program.is_empty());
937        assert_eq!(true, rio_program.is_ok());
938
939    }
940
941    fn get_eventual_result(v: String) -> RustIO<String, String> {
942        let mut rng = thread_rng();
943        let n: i32 = rng.gen_range(0..100);
944        println!("${}", n);
945        if n < 90 {
946            eprintln!("Returning error");
947            RustIO::from_result(Err("Error".to_string()))
948        } else {
949            eprintln!("Returning success");
950            RustIO::from_result(Ok(v + &"world".to_string()))
951        }
952    }
953}