union/lib.rs
1//! # `union!`
2//!
3//! `union!` - one macro to rule them all. Provides useful shortcut combinators, combines sync/async chains, transforms tuple of results in result of tuple, supports single and multi thread (sync/async) step by step execution of branches.
4//!
5//! ## Combinators
6//!
7//! - Map: `|>` expr - `value`.map(`expr`)
8//!
9//! - AndThen: `=>` expr - `value`.and_then(`expr`),
10//!
11//! - Then: `->` expr - `expr`(`value`)
12//!
13//! - Dot: `>.` expr - `value`.`expr`
14//!
15//! - Or: `<|` expr - `value`.or(`expr`)
16//!
17//! - OrElse: `<=` expr - `value`.or_else(`expr`)  
18//!
19//! - MapErr: `!>` expr - `value`.map_err(`expr`)
20//!
21//! - Inspect: `?>` expr - (|`value`| { `expr`(&`value`); `value` })(`value`) for sync chains and (|`value`| `value`.inspect(`expr`))(`value`) for futures
22//!
23//! where `value` is the previous value.
24//!
25//! Every combinator prefixed by `~` will act as deferred action (all actions will wait until completion in every step and only after move to the next one).
26//!
27//! ## Handler
28//!
29//! might be one of
30//!
31//! - `map` => will act as results.map(|(result0, result1, ..)| handler(result0, result1, ..))
32//!
33//! - `and_then` => will act as results.and_then(|(result0, result1, ..)| handler(result0, result1, ..))
34//!
35//! - `then` => will act as handler(result0, result1, ..)
36//!
37//! or not specified - then Result<(result0, result1, ..), Error> or Option<(result0, result1, ..)> will be returned.
38//!
39//! ## Custom futures crate path
40//!
41//! You can specify custom path (`futures_crate_path`) at the beginning of macro call
42//!
43//! ```rust
44//! use union::union_async;
45//! use futures::future::ok;
46//!
47//! #[tokio::main]
48//! async fn main() {
49//!     let value = union_async! {
50//!         futures_crate_path(::futures)
51//!         ok::<_,u8>(2u16)
52//!     }.await.unwrap();
53//!     
54//!     println!("{}", value);
55//! }
56//! ```
57//!
58//! Using this macro you can write things like
59//!
60//! ```rust
61//! #![recursion_limit = "256"]
62//!
63//! use rand::prelude::*;
64//! use std::sync::Arc;
65//! use union::union_spawn;
66//!
67//! // Problem: generate vecs filled by random numbers in parallel, make some operations on them in parallel,
68//! // find max of each vec in parallel and find final max of 3 vecs
69//!
70//! // Solution:
71//! fn main() {
72//!     // Branches will be executed in parallel, each in its own thread
73//!     let max = union_spawn! {
74//!         let branch_0 =
75//!             generate_random_vec(1000, 10000000u64)
76//!                 .into_iter()
77//!                 // Multiply every element by himself
78//!                 |> power2
79//!                 >.filter(|value| is_even(*value)).collect::<Vec<_>>()
80//!                 // Use `Arc` to share data with branch 1
81//!                 -> Arc::new
82//!                 // Find max and clone its value
83//!                 ~>.iter().max()
84//!                 |> Clone::clone,
85//!         generate_random_vec(10000, 100000000000000f64)
86//!             .into_iter()
87//!             // Extract sqrt from every element
88//!             |> get_sqrt
89//!             // Add index in order to compare with the values of branch 0
90//!             >.enumerate()
91//!             ~|> {
92//!                 // Get data from branch 0 by cloning arc
93//!                 let branch_0 = branch_0.clone();
94//!                 let len = branch_0.len();
95//!                 // Compare every element of branch 1 with element of branch 0
96//!                 // with the same index and take min
97//!                 move |(index, value)|
98//!                     if index < len && value as u64 > branch_0[index] {
99//!                         branch_0[index]
100//!                     } else {
101//!                         value as u64
102//!                     }
103//!             }
104//!             >.max(),
105//!         generate_random_vec(100000, 100000u32)
106//!             .into_iter()
107//!             ~>.max(),
108//!         map => |max0, max1, max2|
109//!             // Find final max
110//!             *[max0, max1, max2 as u64].into_iter().max().unwrap()
111//!     }
112//!     .unwrap();
113//!     println!("Max: {}", max);
114//! }
115//! 
116//! fn generate_random_vec<T>(size: usize, max: T) -> Vec<T>
117//! where
118//!     T: From<u8>
119//!         + rand::distributions::uniform::SampleUniform
120//!         + rand::distributions::uniform::SampleBorrow<T>
121//!         + Copy,
122//! {
123//!     let mut rng = rand::thread_rng();
124//!     (0..size)
125//!         .map(|_| rng.gen_range(T::from(0u8), max))
126//!         .collect()
127//! }
128//!
129//! fn is_even<T>(value: T) -> bool
130//! where
131//!     T: std::ops::Rem<Output = T> + std::cmp::PartialEq + From<u8>,
132//! {
133//!     value % 2u8.into() == 0u8.into()
134//! }
135//!
136//! fn get_sqrt<T>(value: T) -> T
137//! where
138//!     T: Into<f64>,
139//!     f64: Into<T>,
140//! {
141//!     let value_f64: f64 = value.into();
142//!     value_f64.sqrt().into()
143//! }
144//!
145//! fn power2<T>(value: T) -> T
146//! where
147//!     T: std::ops::Mul<Output = T> + Copy,
148//! {
149//!     value * value
150//! }
151//! ```
152//!
153//! And like this
154//!
155//! ```rust no_run
156//! #![recursion_limit="1024"]
157//!
158//! use union::union_async;
159//! use futures::stream::{iter, Stream};
160//! use reqwest::Client;
161//! use futures::future::{try_join_all, ok, ready};
162//! use failure::{format_err, Error};
163//!
164//! #[tokio::main]
165//! async fn main() {
166//!     println!(
167//!         "{} {}\n{}",
168//!         "Hello.\nThis's is the game where winner is player, which abs(value) is closest to",
169//!         "the max count of links (starting with `https://`) found on one of random pages.",
170//!         "You play against random generator (0-500)."
171//!     );
172//!
173//!     enum GameResult {
174//!         Won,
175//!         Lost,
176//!         Draw
177//!     }
178//!
179//!     let client = Client::new();
180//!     
181//!     let game = union_async! {
182//!         // Make requests to several sites
183//!         // and calculate count of links starting from `https://`
184//!         get_urls_to_calculate_link_count()
185//!             |> {
186//!                 // If pass block statement instead of fn, it will be placed before current step,
187//!                 // so it will us allow to capture some variables from context
188//!                 let ref client = client;
189//!                 move |url|
190//!                     // `union_async!` wraps its content into `async move { }`
191//!                     union_async! {
192//!                         client
193//!                             .get(url).send()
194//!                             => |value| value.text()
195//!                             => |body| ok((url, body))
196//!                     }
197//!             }
198//!             >.collect::<Vec<_>>()
199//!             |> Ok
200//!             => try_join_all
201//!             !> |err| format_err!("Error retrieving pages to calculate links: {:#?}", err)
202//!             => |results|
203//!                 ok(
204//!                     results
205//!                         .into_iter()
206//!                         .map(|(url, body)| (url, body.matches("https://").collect::<Vec<_>>().len()))
207//!                         .max_by_key(|(_, link_count)| link_count.clone())
208//!                         .unwrap()
209//!                 )
210//!             // It waits for input in stdin before log max links count
211//!             ~?> |result| {
212//!                 result
213//!                     .as_ref()
214//!                     .map(
215//!                         |(url, count)| {
216//!                             let split = url.to_owned().split('/').collect::<Vec<_>>();
217//!                             let domain_name = split.get(2).unwrap_or(&url);
218//!                             println!("Max `https://` link count found on `{}`: {}", domain_name, count)
219//!                         }
220//!                     )
221//!                     .unwrap_or(());
222//!             },
223//!         // In parallel it makes request to the site which generates random number
224//!         get_url_to_get_random_number()
225//!             -> ok
226//!             => {
227//!                 // If pass block statement instead of fn, it will be placed before current step,
228//!                 // so it will allow us to capture some variables from context
229//!                 let ref client = client;
230//!                 let map_parse_error =
231//!                     |value|
232//!                         move |err|
233//!                             format_err!("Failed to parse random number: {:#?}, value: {}", err, value);
234//!                 move |url|
235//!                     union_async! {
236//!                         client
237//!                             .get(url)
238//!                             .send()
239//!                             => |value| value.text()
240//!                             !> |err| format_err!("Error retrieving random number: {:#?}", err)
241//!                             => |value| ok(value[..value.len() - 1].to_owned()) // remove \n from `154\n`
242//!                             => |value|  
243//!                                 ready(
244//!                                     value
245//!                                         .parse::<u16>()
246//!                                         .map_err(map_parse_error(value))
247//!                                 )
248//!                     }
249//!             }
250//!             // It waits for input in stdin before log random value
251//!             ~?> |random| {
252//!                 random
253//!                     .as_ref()
254//!                     .map(|number| println!("Random: {}", number))
255//!                     .unwrap_or(());
256//!             },
257//!         // In parallel it reads value from stdin
258//!         read_number_from_stdin(),
259//!         // Finally, when we will have all results, we can decide, who is winner
260//!         map => |(_url, link_count), random_number, number_from_stdin| {
261//!             let random_diff = (link_count as i32 - random_number as i32).abs();
262//!             let stdin_diff = (link_count as i32 - number_from_stdin as i32).abs();
263//!             match () {
264//!                 _ if random_diff > stdin_diff => GameResult::Won,
265//!                 _ if random_diff < stdin_diff => GameResult::Lost,
266//!                 _ => GameResult::Draw
267//!             }
268//!         }    
269//!     };
270//!
271//!     let _ = game.await.map(
272//!         |result|
273//!             println!(
274//!                 "You {}",
275//!                 match result {
276//!                     GameResult::Won => "won!",
277//!                     GameResult::Lost => "lose...",
278//!                     _ => "have the same result as random generator!"
279//!                 }
280//!             )
281//!     ).unwrap();  
282//! }
283//! 
284//! fn get_urls_to_calculate_link_count() -> impl Stream<Item = &'static str> {
285//!     iter(
286//!         vec![
287//!             "https://en.wikipedia.org/w/api.php?format=json&action=query&generator=random&grnnamespace=0&prop=revisions|images&rvprop=content&grnlimit=100",
288//!             "https://github.com/explore",
289//!             "https://twitter.com/search?f=tweets&vertical=news&q=%23news&src=unkn"
290//!         ]
291//!     )   
292//! }
293//!
294//! fn get_url_to_get_random_number() -> &'static str {
295//!     "https://www.random.org/integers/?num=1&min=0&max=500&col=1&base=10&format=plain&rnd=new"
296//! }
297//!
298//! async fn read_number_from_stdin() -> Result<u16, Error> {
299//!     use tokio::*;
300//!     use futures::stream::StreamExt;
301//!     
302//!     let map_parse_error =
303//!         |value|
304//!             move |error|
305//!                 format_err!("Value from stdin isn't a correct `u16`: {:?}, input: {}", error, value);
306//!
307//!     let mut result;
308//!     let mut reader = codec::FramedRead::new(io::BufReader::new(io::stdin()), codec::LinesCodec::new());
309//!
310//!     while {
311//!         println!("Please, enter number (`u16`)");
312//!
313//!         let next = reader.next();
314//!     
315//!         result = union_async! {
316//!             next
317//!                 |> |value| value.ok_or(format_err!("Unexpected end of input"))
318//!                 => |result| ready(result.map_err(|err| format_err!("Failed to apply codec: {:?}", err)))
319//!                 => |value|
320//!                     ready(
321//!                         value
322//!                             .parse()
323//!                             .map_err(map_parse_error(value))
324//!                     )
325//!                 !> |error| { eprintln!("Error: {:#?}", error); error}
326//!         }.await;
327//!
328//!         result.is_err()
329//!     } {}
330//!
331//!     result
332//! }
333//! ```
334//!
335//! ## Single thread combinations
336//!
337//! ### Simple results combination
338//!
339//! Converts input in series of chained results and joins them step by step.
340//!
341//! ```rust
342//!
343//! use std::error::Error;
344//! use union::union;
345//!
346//! type Result<T> = std::result::Result<T, Box<dyn Error>>;
347//!
348//! fn action_1() -> Result<u16> {
349//!     Ok(1)
350//! }
351//!
352//! fn action_2() -> Result<u8> {
353//!     Ok(2)
354//! }
355//!
356//! fn main() {
357//!     let sum = union! {
358//!         action_1(),
359//!         action_2().map(|v| v as u16),
360//!         action_2().map(|v| v as u16 + 1).and_then(|v| Ok(v * 4)),
361//!         action_1().and_then(|_| Err("5".into())).or(Ok(2)),
362//!         map => |a, b, c, d| a + b + c + d
363//!     }.expect("Failed to calculate sum");
364//!
365//!     println!("Calculated: {}", sum);
366//! }
367//! ```
368//!
369//! ### Futures combination
370//!
371//! Each branch will represent chain of tasks. All branches will be joined using `::futures::join!` macro and `union_async!` will return `unpolled` future.
372//!
373//! ```rust
374//! #![recursion_limit="256"]
375//!
376//! use std::error::Error;
377//! use union::union_async;
378//! use futures::future::{ok, err};
379//!
380//! type Result<T> = std::result::Result<T, Box<dyn Error>>;
381//!
382//! async fn action_1() -> Result<u16> {
383//!     Ok(1)
384//! }
385//! async fn action_2() -> Result<u8> {
386//!     Ok(2)
387//! }
388//!
389//! #[tokio::main]
390//! async fn main() {
391//!     let sum = union_async! {
392//!         action_1(),
393//!         action_2().and_then(|v| ok(v as u16)),
394//!         action_2().map(|v| v.map(|v| v as u16 + 1)).and_then(|v| ok(v * 4u16)),
395//!         action_1().and_then(|_| err("5".into())).or_else(|_| ok(2u16)),
396//!         and_then => |a, b, c, d| ok(a + b + c + d)
397//!     }.await.expect("Failed to calculate sum");
398//!
399//!     println!("Calculated: {}", sum);
400//! }
401//! ```
402//!
403//! ## Multi-thread combinations
404//!
405//! To execute several tasks in parallel you could use `union_spawn!` (`spawn!`) for sync tasks
406//! and `union_async_spawn!` (`async_spawn!`) for futures. Since `union_async` already provides parallel futures execution in one thread, `union_async_spawn!` spawns every branch into `tokio` executor so they will be evaluated in multi-threaded executor.
407//!
408//! ### Multi-thread sync branches
409//!
410//! `union_spawn` spawns one `::std::thread` per each step of each branch (number of branches is the max thread count at the time).
411//!
412//! ```rust
413//!
414//! use std::error::Error;
415//! use union::union_spawn;
416//!
417//! type Result<T> = std::result::Result<T, Box<dyn Error + Send + Sync>>;
418//!
419//! fn action_1() -> Result<usize> {
420//!     Ok(1)
421//! }
422//!
423//! fn action_2() -> Result<u16> {
424//!     Ok(2)
425//! }
426//!
427//! fn main() {
428//!     // Branches will be executed in parallel
429//!     let sum = union_spawn! {
430//!         action_1(),
431//!         action_2().map(|v| v as usize),
432//!         action_2().map(|v| v as usize + 1).and_then(|v| Ok(v * 4)),
433//!         action_1().and_then(|_| Err("5".into())).or(Ok(2)),
434//!         map => |a, b, c, d| a + b + c + d
435//!     }.expect("Failed to calculate sum");
436//!
437//!     println!("Calculated: {}", sum);
438//! }
439//! ```
440//!
441//! `union_async_spawn!` uses `::tokio::spawn` function to spawn tasks so it should be done inside `tokio` runtime
442//! (number of branches is the max count of `tokio` tasks at the time).
443//!
444//! ### Multi-thread futures
445//!
446//! ```rust
447//! #![recursion_limit="256"]
448//!
449//! use std::error::Error;
450//! use union::union_async_spawn;
451//! use futures::future::{ok, err};
452//!
453//! type Result<T> = std::result::Result<T, Box<dyn Error + Send + Sync>>;
454//!
455//! async fn action_1() -> Result<u16> {
456//!     Ok(1)
457//! }
458//!
459//! async fn action_2() -> Result<u8> {
460//!     Ok(2)
461//! }
462//!
463//! #[tokio::main]
464//! async fn main() {
465//!     let sum = union_async_spawn! {
466//!         action_1(),
467//!         action_2().and_then(|v| ok(v as u16)),
468//!         action_2().map(|v| v.map(|v| v as u16 + 1)).and_then(|v| ok(v * 4u16)),
469//!         action_1().and_then(|_| err("5".into())).or_else(|_| ok(2u16)),
470//!         and_then => |a, b, c, d| ok(a + b + c + d)
471//!     }.await.expect("Failed to calculate sum");
472//!
473//!     println!("Calculated: {}", sum);
474//! }
475//! ```
476//!
477//! Using combinators we can rewrite first sync example like
478//!
479//! ```rust
480//!
481//! use std::error::Error;
482//! use union::union;
483//!
484//! type Result<T> = std::result::Result<T, Box<dyn Error>>;
485//!
486//! fn action_1() -> Result<u16> {
487//!     Ok(1)
488//! }
489//!
490//! fn action_2() -> Result<u8> {
491//!     Ok(2)
492//! }
493//!
494//! fn main() {
495//!     let sum = union! {
496//!         action_1(),
497//!         action_2() |> |v| v as u16,
498//!         action_2() |> |v| v as u16 + 1 => |v| Ok(v * 4),
499//!         action_1() => |_| Err("5".into()) <| Ok(2),
500//!         map => |a, b, c, d| a + b + c + d
501//!     }.expect("Failed to calculate sum");
502//!
503//!     println!("Calculated: {}", sum);
504//! }
505//! ```
506//!
507//! By separating chain in actions, you will make actions wait for completion of all of them in current step before go to the next step.
508//!
509//! ```rust
510//! #![recursion_limit="256"]
511//!
512//! use std::error::Error;
513//! use union::union;
514//!
515//! type Result<T> = std::result::Result<T, Box<dyn Error + Send + Sync>>;
516//!
517//! fn action_1() -> Result<u16> {
518//!     Ok(1)
519//! }
520//!
521//! fn action_2() -> Result<u8> {
522//!     Ok(2)
523//! }
524//!
525//! fn main() {
526//!     let sum = union! {
527//!         action_1(),
528//!         let result_1 = action_2() ~|> |v| v as u16 + 1,
529//!         action_2() ~|> {
530//!             let result_1 = result_1.as_ref().ok().map(Clone::clone);
531//!             move |v| {
532//!                 // `result_1` now is the result of `action_2()` [Ok(1u8)]
533//!                 if result_1.is_some() {
534//!                     v as u16 + 1
535//!                 } else {
536//!                     unreachable!()
537//!                 }
538//!             }
539//!         } ~=> {
540//!             let result_1 = result_1.as_ref().ok().map(Clone::clone);
541//!             move |v| {
542//!                 // `result_1` now is the result of `|v| v as u16 + 1` [Ok(2u16)]
543//!                 if let Some(result_1) = result_1 {
544//!                     Ok(v * 4 + result_1)
545//!                 } else {
546//!                     unreachable!()
547//!                 }
548//!             }
549//!         },
550//!         action_1() ~=> |_| Err("5".into()) <| Ok(2),
551//!         map => |a, b, c, d| a + b + c + d
552//!     }.expect("Failed to calculate sum");
553//!     println!("Calculated: {}", sum);
554//! }
555//! ```
556
557extern crate proc_macro_hack;
558extern crate proc_macro_nested;
559extern crate union_export;
560
561use proc_macro_hack::proc_macro_hack;
562
563///
564/// Use to combine sync results.
565///
566/// ```rust
567/// extern crate union;
568///
569/// use union::union;
570///
571/// fn main() {
572///     let product = union! {
573///         Ok::<_,u8>(2) |> |v| v + 2,
574///         Ok::<_,u8>(3),
575///         Ok::<_,u8>(4),
576///         map => |a, b, c| a * b * c
577///     }.unwrap();
578///
579///     assert_eq!(product, 48);
580/// }
581/// ```
582///
583#[proc_macro_hack(support_nested)]
584pub use union_export::union;
585
586///
587/// Use to combine futures.
588///
589/// ```rust
590/// extern crate union;
591/// extern crate futures;
592///
593/// use union::union_async;
594/// use futures::future::ok;
595///
596/// #[tokio::main]
597/// async fn main() {
598///     let product = union_async! {
599///         ok::<_,u8>(2u16) => |v| ok::<_,u8>(v + 2u16),
600///         ok::<_,u8>(3u16),
601///         ok::<_,u8>(4u16),
602///         map => |a, b, c| a * b * c
603///     }.await.unwrap();
604///
605///     assert_eq!(product, 48);
606/// }
607/// ```
608///
609#[proc_macro_hack(support_nested, internal_macro_calls = 20)]
610pub use union_export::union_async;
611
612///
613/// Alias for `union_async!`.
614///
615#[proc_macro_hack(support_nested, internal_macro_calls = 20)]
616pub use union_export::asyncion;
617
618///
619/// Use to spawn `::std::thread` per each step of each branch.
620///
621/// ```rust
622/// extern crate union;
623///
624/// use union::union_spawn;
625///
626/// fn main() {
627///     let product = union_spawn! {
628///         Ok::<_,u8>(2) |> |v| v + 2 ?> |_| {
629///             println!("Hello from parallel world!");
630///             ::std::thread::sleep(::std::time::Duration::from_secs(1));
631///             println!("I'm done.");
632///         },
633///         Ok::<_,u8>(3) ?> |_| {
634///             println!("Hello from parallel world again!");
635///             ::std::thread::sleep(::std::time::Duration::from_secs(2));
636///             println!("Me too.");
637///         },
638///         Ok::<_,u8>(4),
639///         map => |a, b, c| a * b * c
640///     }.unwrap();
641///
642///     assert_eq!(product, 48);
643/// }
644///```
645#[proc_macro_hack(support_nested)]
646pub use union_export::union_spawn;
647
648///
649/// Alias for `union_spawn!`.
650///
651#[proc_macro_hack(support_nested)]
652pub use union_export::spawn;
653
654///
655/// Use to spawn `::tokio::spawn` per each step of each branch.
656/// ```rust
657/// #![recursion_limit="512"]
658///
659/// extern crate union;
660/// extern crate futures;
661/// extern crate tokio;
662///
663/// use union::union_async_spawn;
664/// use futures::future::ok;
665///
666/// #[tokio::main]
667/// async fn main() {
668///     let product = union_async_spawn! {
669///         ok::<_,u8>(2u16) |> |v| Ok::<_,u8>(v.unwrap() + 2u16) ?> |_| {
670///             println!("Hello from parallel world!");
671///             // !!! Don't use std::thread::sleep to wait inside future because it will block executor thread !!!
672///             // It's used here only to show that futures are executed on multi thread executor.
673///             ::std::thread::sleep(::std::time::Duration::from_secs(1));
674///             println!("I'm done.");
675///         },
676///         ok::<_,u8>(3u16) ?> |_| {
677///             println!("Hello from parallel world again!");
678///             // !!! Don't use std::thread::sleep to wait inside future because it will block executor thread !!!
679///             // It's used here only to show that futures are executed on multi thread executor.
680///             ::std::thread::sleep(::std::time::Duration::from_secs(2));
681///             println!("Me too.");
682///         },
683///         ok::<_,u8>(4u16),
684///         map => |a, b, c| a * b * c
685///     }.await.unwrap();
686///
687///     assert_eq!(product, 48);
688/// }
689///```
690#[proc_macro_hack(support_nested, internal_macro_calls = 20)]
691pub use union_export::union_async_spawn;
692
693///
694/// Alias for `union_async_spawn!`.
695///
696#[proc_macro_hack(support_nested, internal_macro_calls = 20)]
697pub use union_export::async_spawn;