union/lib.rs
1//! # `union!`
2//!
3//! `union!` - one macro to rule them all. Provides useful shortcut combinators, combines sync/async chains, transforms tuple of results in result of tuple, supports single and multi thread (sync/async) step by step execution of branches.
4//!
5//! ## Combinators
6//!
7//! - Map: `|>` expr - `value`.map(`expr`)
8//!
9//! - AndThen: `=>` expr - `value`.and_then(`expr`),
10//!
11//! - Then: `->` expr - `expr`(`value`)
12//!
13//! - Dot: `>.` expr - `value`.`expr`
14//!
15//! - Or: `<|` expr - `value`.or(`expr`)
16//!
17//! - OrElse: `<=` expr - `value`.or_else(`expr`)
18//!
19//! - MapErr: `!>` expr - `value`.map_err(`expr`)
20//!
21//! - Inspect: `?>` expr - (|`value`| { `expr`(&`value`); `value` })(`value`) for sync chains and (|`value`| `value`.inspect(`expr`))(`value`) for futures
22//!
23//! where `value` is the previous value.
24//!
25//! Every combinator prefixed by `~` will act as deferred action (all actions will wait until completion in every step and only after move to the next one).
26//!
27//! ## Handler
28//!
29//! might be one of
30//!
31//! - `map` => will act as results.map(|(result0, result1, ..)| handler(result0, result1, ..))
32//!
33//! - `and_then` => will act as results.and_then(|(result0, result1, ..)| handler(result0, result1, ..))
34//!
35//! - `then` => will act as handler(result0, result1, ..)
36//!
37//! or not specified - then Result<(result0, result1, ..), Error> or Option<(result0, result1, ..)> will be returned.
38//!
39//! ## Custom futures crate path
40//!
41//! You can specify custom path (`futures_crate_path`) at the beginning of macro call
42//!
43//! ```rust
44//! use union::union_async;
45//! use futures::future::ok;
46//!
47//! #[tokio::main]
48//! async fn main() {
49//! let value = union_async! {
50//! futures_crate_path(::futures)
51//! ok::<_,u8>(2u16)
52//! }.await.unwrap();
53//!
54//! println!("{}", value);
55//! }
56//! ```
57//!
58//! Using this macro you can write things like
59//!
60//! ```rust
61//! #![recursion_limit = "256"]
62//!
63//! use rand::prelude::*;
64//! use std::sync::Arc;
65//! use union::union_spawn;
66//!
67//! // Problem: generate vecs filled by random numbers in parallel, make some operations on them in parallel,
68//! // find max of each vec in parallel and find final max of 3 vecs
69//!
70//! // Solution:
71//! fn main() {
72//! // Branches will be executed in parallel, each in its own thread
73//! let max = union_spawn! {
74//! let branch_0 =
75//! generate_random_vec(1000, 10000000u64)
76//! .into_iter()
77//! // Multiply every element by himself
78//! |> power2
79//! >.filter(|value| is_even(*value)).collect::<Vec<_>>()
80//! // Use `Arc` to share data with branch 1
81//! -> Arc::new
82//! // Find max and clone its value
83//! ~>.iter().max()
84//! |> Clone::clone,
85//! generate_random_vec(10000, 100000000000000f64)
86//! .into_iter()
87//! // Extract sqrt from every element
88//! |> get_sqrt
89//! // Add index in order to compare with the values of branch 0
90//! >.enumerate()
91//! ~|> {
92//! // Get data from branch 0 by cloning arc
93//! let branch_0 = branch_0.clone();
94//! let len = branch_0.len();
95//! // Compare every element of branch 1 with element of branch 0
96//! // with the same index and take min
97//! move |(index, value)|
98//! if index < len && value as u64 > branch_0[index] {
99//! branch_0[index]
100//! } else {
101//! value as u64
102//! }
103//! }
104//! >.max(),
105//! generate_random_vec(100000, 100000u32)
106//! .into_iter()
107//! ~>.max(),
108//! map => |max0, max1, max2|
109//! // Find final max
110//! *[max0, max1, max2 as u64].into_iter().max().unwrap()
111//! }
112//! .unwrap();
113//! println!("Max: {}", max);
114//! }
115//!
116//! fn generate_random_vec<T>(size: usize, max: T) -> Vec<T>
117//! where
118//! T: From<u8>
119//! + rand::distributions::uniform::SampleUniform
120//! + rand::distributions::uniform::SampleBorrow<T>
121//! + Copy,
122//! {
123//! let mut rng = rand::thread_rng();
124//! (0..size)
125//! .map(|_| rng.gen_range(T::from(0u8), max))
126//! .collect()
127//! }
128//!
129//! fn is_even<T>(value: T) -> bool
130//! where
131//! T: std::ops::Rem<Output = T> + std::cmp::PartialEq + From<u8>,
132//! {
133//! value % 2u8.into() == 0u8.into()
134//! }
135//!
136//! fn get_sqrt<T>(value: T) -> T
137//! where
138//! T: Into<f64>,
139//! f64: Into<T>,
140//! {
141//! let value_f64: f64 = value.into();
142//! value_f64.sqrt().into()
143//! }
144//!
145//! fn power2<T>(value: T) -> T
146//! where
147//! T: std::ops::Mul<Output = T> + Copy,
148//! {
149//! value * value
150//! }
151//! ```
152//!
153//! And like this
154//!
155//! ```rust no_run
156//! #![recursion_limit="1024"]
157//!
158//! use union::union_async;
159//! use futures::stream::{iter, Stream};
160//! use reqwest::Client;
161//! use futures::future::{try_join_all, ok, ready};
162//! use failure::{format_err, Error};
163//!
164//! #[tokio::main]
165//! async fn main() {
166//! println!(
167//! "{} {}\n{}",
168//! "Hello.\nThis's is the game where winner is player, which abs(value) is closest to",
169//! "the max count of links (starting with `https://`) found on one of random pages.",
170//! "You play against random generator (0-500)."
171//! );
172//!
173//! enum GameResult {
174//! Won,
175//! Lost,
176//! Draw
177//! }
178//!
179//! let client = Client::new();
180//!
181//! let game = union_async! {
182//! // Make requests to several sites
183//! // and calculate count of links starting from `https://`
184//! get_urls_to_calculate_link_count()
185//! |> {
186//! // If pass block statement instead of fn, it will be placed before current step,
187//! // so it will us allow to capture some variables from context
188//! let ref client = client;
189//! move |url|
190//! // `union_async!` wraps its content into `async move { }`
191//! union_async! {
192//! client
193//! .get(url).send()
194//! => |value| value.text()
195//! => |body| ok((url, body))
196//! }
197//! }
198//! >.collect::<Vec<_>>()
199//! |> Ok
200//! => try_join_all
201//! !> |err| format_err!("Error retrieving pages to calculate links: {:#?}", err)
202//! => |results|
203//! ok(
204//! results
205//! .into_iter()
206//! .map(|(url, body)| (url, body.matches("https://").collect::<Vec<_>>().len()))
207//! .max_by_key(|(_, link_count)| link_count.clone())
208//! .unwrap()
209//! )
210//! // It waits for input in stdin before log max links count
211//! ~?> |result| {
212//! result
213//! .as_ref()
214//! .map(
215//! |(url, count)| {
216//! let split = url.to_owned().split('/').collect::<Vec<_>>();
217//! let domain_name = split.get(2).unwrap_or(&url);
218//! println!("Max `https://` link count found on `{}`: {}", domain_name, count)
219//! }
220//! )
221//! .unwrap_or(());
222//! },
223//! // In parallel it makes request to the site which generates random number
224//! get_url_to_get_random_number()
225//! -> ok
226//! => {
227//! // If pass block statement instead of fn, it will be placed before current step,
228//! // so it will allow us to capture some variables from context
229//! let ref client = client;
230//! let map_parse_error =
231//! |value|
232//! move |err|
233//! format_err!("Failed to parse random number: {:#?}, value: {}", err, value);
234//! move |url|
235//! union_async! {
236//! client
237//! .get(url)
238//! .send()
239//! => |value| value.text()
240//! !> |err| format_err!("Error retrieving random number: {:#?}", err)
241//! => |value| ok(value[..value.len() - 1].to_owned()) // remove \n from `154\n`
242//! => |value|
243//! ready(
244//! value
245//! .parse::<u16>()
246//! .map_err(map_parse_error(value))
247//! )
248//! }
249//! }
250//! // It waits for input in stdin before log random value
251//! ~?> |random| {
252//! random
253//! .as_ref()
254//! .map(|number| println!("Random: {}", number))
255//! .unwrap_or(());
256//! },
257//! // In parallel it reads value from stdin
258//! read_number_from_stdin(),
259//! // Finally, when we will have all results, we can decide, who is winner
260//! map => |(_url, link_count), random_number, number_from_stdin| {
261//! let random_diff = (link_count as i32 - random_number as i32).abs();
262//! let stdin_diff = (link_count as i32 - number_from_stdin as i32).abs();
263//! match () {
264//! _ if random_diff > stdin_diff => GameResult::Won,
265//! _ if random_diff < stdin_diff => GameResult::Lost,
266//! _ => GameResult::Draw
267//! }
268//! }
269//! };
270//!
271//! let _ = game.await.map(
272//! |result|
273//! println!(
274//! "You {}",
275//! match result {
276//! GameResult::Won => "won!",
277//! GameResult::Lost => "lose...",
278//! _ => "have the same result as random generator!"
279//! }
280//! )
281//! ).unwrap();
282//! }
283//!
284//! fn get_urls_to_calculate_link_count() -> impl Stream<Item = &'static str> {
285//! iter(
286//! vec![
287//! "https://en.wikipedia.org/w/api.php?format=json&action=query&generator=random&grnnamespace=0&prop=revisions|images&rvprop=content&grnlimit=100",
288//! "https://github.com/explore",
289//! "https://twitter.com/search?f=tweets&vertical=news&q=%23news&src=unkn"
290//! ]
291//! )
292//! }
293//!
294//! fn get_url_to_get_random_number() -> &'static str {
295//! "https://www.random.org/integers/?num=1&min=0&max=500&col=1&base=10&format=plain&rnd=new"
296//! }
297//!
298//! async fn read_number_from_stdin() -> Result<u16, Error> {
299//! use tokio::*;
300//! use futures::stream::StreamExt;
301//!
302//! let map_parse_error =
303//! |value|
304//! move |error|
305//! format_err!("Value from stdin isn't a correct `u16`: {:?}, input: {}", error, value);
306//!
307//! let mut result;
308//! let mut reader = codec::FramedRead::new(io::BufReader::new(io::stdin()), codec::LinesCodec::new());
309//!
310//! while {
311//! println!("Please, enter number (`u16`)");
312//!
313//! let next = reader.next();
314//!
315//! result = union_async! {
316//! next
317//! |> |value| value.ok_or(format_err!("Unexpected end of input"))
318//! => |result| ready(result.map_err(|err| format_err!("Failed to apply codec: {:?}", err)))
319//! => |value|
320//! ready(
321//! value
322//! .parse()
323//! .map_err(map_parse_error(value))
324//! )
325//! !> |error| { eprintln!("Error: {:#?}", error); error}
326//! }.await;
327//!
328//! result.is_err()
329//! } {}
330//!
331//! result
332//! }
333//! ```
334//!
335//! ## Single thread combinations
336//!
337//! ### Simple results combination
338//!
339//! Converts input in series of chained results and joins them step by step.
340//!
341//! ```rust
342//!
343//! use std::error::Error;
344//! use union::union;
345//!
346//! type Result<T> = std::result::Result<T, Box<dyn Error>>;
347//!
348//! fn action_1() -> Result<u16> {
349//! Ok(1)
350//! }
351//!
352//! fn action_2() -> Result<u8> {
353//! Ok(2)
354//! }
355//!
356//! fn main() {
357//! let sum = union! {
358//! action_1(),
359//! action_2().map(|v| v as u16),
360//! action_2().map(|v| v as u16 + 1).and_then(|v| Ok(v * 4)),
361//! action_1().and_then(|_| Err("5".into())).or(Ok(2)),
362//! map => |a, b, c, d| a + b + c + d
363//! }.expect("Failed to calculate sum");
364//!
365//! println!("Calculated: {}", sum);
366//! }
367//! ```
368//!
369//! ### Futures combination
370//!
371//! Each branch will represent chain of tasks. All branches will be joined using `::futures::join!` macro and `union_async!` will return `unpolled` future.
372//!
373//! ```rust
374//! #![recursion_limit="256"]
375//!
376//! use std::error::Error;
377//! use union::union_async;
378//! use futures::future::{ok, err};
379//!
380//! type Result<T> = std::result::Result<T, Box<dyn Error>>;
381//!
382//! async fn action_1() -> Result<u16> {
383//! Ok(1)
384//! }
385//! async fn action_2() -> Result<u8> {
386//! Ok(2)
387//! }
388//!
389//! #[tokio::main]
390//! async fn main() {
391//! let sum = union_async! {
392//! action_1(),
393//! action_2().and_then(|v| ok(v as u16)),
394//! action_2().map(|v| v.map(|v| v as u16 + 1)).and_then(|v| ok(v * 4u16)),
395//! action_1().and_then(|_| err("5".into())).or_else(|_| ok(2u16)),
396//! and_then => |a, b, c, d| ok(a + b + c + d)
397//! }.await.expect("Failed to calculate sum");
398//!
399//! println!("Calculated: {}", sum);
400//! }
401//! ```
402//!
403//! ## Multi-thread combinations
404//!
405//! To execute several tasks in parallel you could use `union_spawn!` (`spawn!`) for sync tasks
406//! and `union_async_spawn!` (`async_spawn!`) for futures. Since `union_async` already provides parallel futures execution in one thread, `union_async_spawn!` spawns every branch into `tokio` executor so they will be evaluated in multi-threaded executor.
407//!
408//! ### Multi-thread sync branches
409//!
410//! `union_spawn` spawns one `::std::thread` per each step of each branch (number of branches is the max thread count at the time).
411//!
412//! ```rust
413//!
414//! use std::error::Error;
415//! use union::union_spawn;
416//!
417//! type Result<T> = std::result::Result<T, Box<dyn Error + Send + Sync>>;
418//!
419//! fn action_1() -> Result<usize> {
420//! Ok(1)
421//! }
422//!
423//! fn action_2() -> Result<u16> {
424//! Ok(2)
425//! }
426//!
427//! fn main() {
428//! // Branches will be executed in parallel
429//! let sum = union_spawn! {
430//! action_1(),
431//! action_2().map(|v| v as usize),
432//! action_2().map(|v| v as usize + 1).and_then(|v| Ok(v * 4)),
433//! action_1().and_then(|_| Err("5".into())).or(Ok(2)),
434//! map => |a, b, c, d| a + b + c + d
435//! }.expect("Failed to calculate sum");
436//!
437//! println!("Calculated: {}", sum);
438//! }
439//! ```
440//!
441//! `union_async_spawn!` uses `::tokio::spawn` function to spawn tasks so it should be done inside `tokio` runtime
442//! (number of branches is the max count of `tokio` tasks at the time).
443//!
444//! ### Multi-thread futures
445//!
446//! ```rust
447//! #![recursion_limit="256"]
448//!
449//! use std::error::Error;
450//! use union::union_async_spawn;
451//! use futures::future::{ok, err};
452//!
453//! type Result<T> = std::result::Result<T, Box<dyn Error + Send + Sync>>;
454//!
455//! async fn action_1() -> Result<u16> {
456//! Ok(1)
457//! }
458//!
459//! async fn action_2() -> Result<u8> {
460//! Ok(2)
461//! }
462//!
463//! #[tokio::main]
464//! async fn main() {
465//! let sum = union_async_spawn! {
466//! action_1(),
467//! action_2().and_then(|v| ok(v as u16)),
468//! action_2().map(|v| v.map(|v| v as u16 + 1)).and_then(|v| ok(v * 4u16)),
469//! action_1().and_then(|_| err("5".into())).or_else(|_| ok(2u16)),
470//! and_then => |a, b, c, d| ok(a + b + c + d)
471//! }.await.expect("Failed to calculate sum");
472//!
473//! println!("Calculated: {}", sum);
474//! }
475//! ```
476//!
477//! Using combinators we can rewrite first sync example like
478//!
479//! ```rust
480//!
481//! use std::error::Error;
482//! use union::union;
483//!
484//! type Result<T> = std::result::Result<T, Box<dyn Error>>;
485//!
486//! fn action_1() -> Result<u16> {
487//! Ok(1)
488//! }
489//!
490//! fn action_2() -> Result<u8> {
491//! Ok(2)
492//! }
493//!
494//! fn main() {
495//! let sum = union! {
496//! action_1(),
497//! action_2() |> |v| v as u16,
498//! action_2() |> |v| v as u16 + 1 => |v| Ok(v * 4),
499//! action_1() => |_| Err("5".into()) <| Ok(2),
500//! map => |a, b, c, d| a + b + c + d
501//! }.expect("Failed to calculate sum");
502//!
503//! println!("Calculated: {}", sum);
504//! }
505//! ```
506//!
507//! By separating chain in actions, you will make actions wait for completion of all of them in current step before go to the next step.
508//!
509//! ```rust
510//! #![recursion_limit="256"]
511//!
512//! use std::error::Error;
513//! use union::union;
514//!
515//! type Result<T> = std::result::Result<T, Box<dyn Error + Send + Sync>>;
516//!
517//! fn action_1() -> Result<u16> {
518//! Ok(1)
519//! }
520//!
521//! fn action_2() -> Result<u8> {
522//! Ok(2)
523//! }
524//!
525//! fn main() {
526//! let sum = union! {
527//! action_1(),
528//! let result_1 = action_2() ~|> |v| v as u16 + 1,
529//! action_2() ~|> {
530//! let result_1 = result_1.as_ref().ok().map(Clone::clone);
531//! move |v| {
532//! // `result_1` now is the result of `action_2()` [Ok(1u8)]
533//! if result_1.is_some() {
534//! v as u16 + 1
535//! } else {
536//! unreachable!()
537//! }
538//! }
539//! } ~=> {
540//! let result_1 = result_1.as_ref().ok().map(Clone::clone);
541//! move |v| {
542//! // `result_1` now is the result of `|v| v as u16 + 1` [Ok(2u16)]
543//! if let Some(result_1) = result_1 {
544//! Ok(v * 4 + result_1)
545//! } else {
546//! unreachable!()
547//! }
548//! }
549//! },
550//! action_1() ~=> |_| Err("5".into()) <| Ok(2),
551//! map => |a, b, c, d| a + b + c + d
552//! }.expect("Failed to calculate sum");
553//! println!("Calculated: {}", sum);
554//! }
555//! ```
556
557extern crate proc_macro_hack;
558extern crate proc_macro_nested;
559extern crate union_export;
560
561use proc_macro_hack::proc_macro_hack;
562
563///
564/// Use to combine sync results.
565///
566/// ```rust
567/// extern crate union;
568///
569/// use union::union;
570///
571/// fn main() {
572/// let product = union! {
573/// Ok::<_,u8>(2) |> |v| v + 2,
574/// Ok::<_,u8>(3),
575/// Ok::<_,u8>(4),
576/// map => |a, b, c| a * b * c
577/// }.unwrap();
578///
579/// assert_eq!(product, 48);
580/// }
581/// ```
582///
583#[proc_macro_hack(support_nested)]
584pub use union_export::union;
585
586///
587/// Use to combine futures.
588///
589/// ```rust
590/// extern crate union;
591/// extern crate futures;
592///
593/// use union::union_async;
594/// use futures::future::ok;
595///
596/// #[tokio::main]
597/// async fn main() {
598/// let product = union_async! {
599/// ok::<_,u8>(2u16) => |v| ok::<_,u8>(v + 2u16),
600/// ok::<_,u8>(3u16),
601/// ok::<_,u8>(4u16),
602/// map => |a, b, c| a * b * c
603/// }.await.unwrap();
604///
605/// assert_eq!(product, 48);
606/// }
607/// ```
608///
609#[proc_macro_hack(support_nested, internal_macro_calls = 20)]
610pub use union_export::union_async;
611
612///
613/// Alias for `union_async!`.
614///
615#[proc_macro_hack(support_nested, internal_macro_calls = 20)]
616pub use union_export::asyncion;
617
618///
619/// Use to spawn `::std::thread` per each step of each branch.
620///
621/// ```rust
622/// extern crate union;
623///
624/// use union::union_spawn;
625///
626/// fn main() {
627/// let product = union_spawn! {
628/// Ok::<_,u8>(2) |> |v| v + 2 ?> |_| {
629/// println!("Hello from parallel world!");
630/// ::std::thread::sleep(::std::time::Duration::from_secs(1));
631/// println!("I'm done.");
632/// },
633/// Ok::<_,u8>(3) ?> |_| {
634/// println!("Hello from parallel world again!");
635/// ::std::thread::sleep(::std::time::Duration::from_secs(2));
636/// println!("Me too.");
637/// },
638/// Ok::<_,u8>(4),
639/// map => |a, b, c| a * b * c
640/// }.unwrap();
641///
642/// assert_eq!(product, 48);
643/// }
644///```
645#[proc_macro_hack(support_nested)]
646pub use union_export::union_spawn;
647
648///
649/// Alias for `union_spawn!`.
650///
651#[proc_macro_hack(support_nested)]
652pub use union_export::spawn;
653
654///
655/// Use to spawn `::tokio::spawn` per each step of each branch.
656/// ```rust
657/// #![recursion_limit="512"]
658///
659/// extern crate union;
660/// extern crate futures;
661/// extern crate tokio;
662///
663/// use union::union_async_spawn;
664/// use futures::future::ok;
665///
666/// #[tokio::main]
667/// async fn main() {
668/// let product = union_async_spawn! {
669/// ok::<_,u8>(2u16) |> |v| Ok::<_,u8>(v.unwrap() + 2u16) ?> |_| {
670/// println!("Hello from parallel world!");
671/// // !!! Don't use std::thread::sleep to wait inside future because it will block executor thread !!!
672/// // It's used here only to show that futures are executed on multi thread executor.
673/// ::std::thread::sleep(::std::time::Duration::from_secs(1));
674/// println!("I'm done.");
675/// },
676/// ok::<_,u8>(3u16) ?> |_| {
677/// println!("Hello from parallel world again!");
678/// // !!! Don't use std::thread::sleep to wait inside future because it will block executor thread !!!
679/// // It's used here only to show that futures are executed on multi thread executor.
680/// ::std::thread::sleep(::std::time::Duration::from_secs(2));
681/// println!("Me too.");
682/// },
683/// ok::<_,u8>(4u16),
684/// map => |a, b, c| a * b * c
685/// }.await.unwrap();
686///
687/// assert_eq!(product, 48);
688/// }
689///```
690#[proc_macro_hack(support_nested, internal_macro_calls = 20)]
691pub use union_export::union_async_spawn;
692
693///
694/// Alias for `union_async_spawn!`.
695///
696#[proc_macro_hack(support_nested, internal_macro_calls = 20)]
697pub use union_export::async_spawn;