[−][src]Crate union
union!
union! - one macro to rule them all. Provides useful shortcut combinators, combines sync/async chains, transforms tuple of results in result of tuple, provides single and multi thread (sync/async) step by step execution of branches.
Using this macro you could do things like
#![recursion_limit="1024"] extern crate futures; extern crate tokio; extern crate union; extern crate reqwest; extern crate failure; use union::{union_async, union}; use futures::stream::{iter, Stream}; use reqwest::Client; use futures::future::{try_join_all, ok, ready}; use failure::{format_err, Error}; fn get_urls_to_calculate_link_count() -> impl Stream<Item = &'static str> { iter( vec![ "https://en.wikipedia.org/w/api.php?format=json&action=query&generator=random&grnnamespace=0&prop=revisions|images&rvprop=content&grnlimit=100", "https://github.com/explore", "https://twitter.com/search?f=tweets&vertical=news&q=%23news&src=unkn" ] ) } fn get_url_to_get_random_number() -> String { "https://www.random.org/integers/?num=1&min=0&max=500&col=1&base=10&format=plain&rnd=new".to_owned() } async fn read_number_from_stdin() -> Result<u16, Error> { use tokio::*; use futures::stream::StreamExt; let map_parse_error = |value| move |error| format_err!("Value from stdin isn't a correct `u16`: {:?}, input: {}", error, value); let mut result; let mut reader = codec::FramedRead::new(io::BufReader::new(io::stdin()), codec::LinesCodec::new()); while { println!("Please, enter number (`u16`)"); let next = reader.next(); result = union_async! { next |> |value| value.ok_or(format_err!("Unexpected end of input")) => |result| ready(result.map_err(|err| format_err!("Failed to apply codec: {:?}", err))) => |value| ready( value .parse() .map_err(map_parse_error(value)) ) !> |error| { eprintln!("Error: {:#?}", error); error} }.await; result.is_err() } {} result } fn main() { let rt = ::tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { println!( "{} {}\n{}", "Hello.\nThis's is the game where winner is player, which abs(value) is closest to", "the max count of links (starting with `https://`) found on one of random pages.", "You play against random generator (0-500)." ); enum GameResult { Won, Lost, Draw } let client = Client::new(); let task = union_async! { // This programs makes requests to several sites // and calculates count of links starting from `https://` get_urls_to_calculate_link_count() |> { // If pass block statement instead of fn, it will be placed before current step, // so it will us allow to capture some variables from context let ref client = client; move |url| { let client = client.clone(); async move { client .get(url) .send() .and_then(|value| value.text()) .await .and_then(|body| Ok((url, body))) } } } >.collect::<Vec<_>>() |> Ok => try_join_all !> |err| format_err!("Error retrieving pages to calculate links: {:#?}", err) => |results| ok( results .into_iter() .map(|(url, body)| (url.to_owned(), body.matches("https://").collect::<Vec<_>>().len())) .max_by_key(|(_, link_count)| link_count.clone()) .unwrap() ) // It waits for input in stdin before log max links count ~?> |result| { result .as_ref() .map( |(url, count)| println!("Max `https://` link count found on `{}`: {}", url, count) ) .unwrap_or(()); }, // In parallel it makes request to the site which generates random number get_url_to_get_random_number() -> ok => { // If pass block statement instead of fn, it will be placed before current step, // so it will allow us to capture some variables from context let client = client.clone(); let map_parse_error = |value| move |err| format_err!("Failed to parse random number: {:#?}, value: {}", err, value); move |url| { union_async! { client .get(&url) .send() => |value| value.text() !> |err| format_err!("Error retrieving random number: {:#?}", err) => |value| ok(value[..value.len() - 1].to_owned()) // remove \n from `154\n` => |value| ready( value .parse::<u16>() .map_err(map_parse_error(value)) ) } } } // It waits for input in stdin before log random value ~?> |random| { random .as_ref() .map(|number| println!("Random: {:?}", number)) .unwrap_or(()); }, // In parallel it reads value from stdin read_number_from_stdin(), // Finally, when we will have all results, we can decide, who is winner map => |(_url, link_count), random_number, number_from_stdin| { let random_diff = (link_count as i32 - random_number as i32).abs(); let stdin_diff = (link_count as i32 - number_from_stdin as i32).abs(); match () { _ if random_diff > stdin_diff => GameResult::Won, _ if random_diff < stdin_diff => GameResult::Lost, _ => GameResult::Draw } } }; // Use sync union because we don't need parallel execution and sync map // is more convenient let _ = union! { task .await |> |result| println!( "You {}", match result { GameResult::Won => "won!", GameResult::Lost => "lose...", _ => "have the same result as random generator!" } ) }.unwrap(); }); }
Combinators
-
Map:
|>expr -value.map(expr) -
AndThen:
=>expr -value.and_then(expr), -
Then:
->expr -expr(value) -
Dot:
>.expr -value.expr -
Or:
<|expr -value.or(expr) -
OrElse:
<=expr -value.or_else(expr) -
MapErr:
!>expr -value.map_err(expr) -
Inspect:
?>expr - (|value| {expr(&value);value})(value) for sync chains and (|value|value.inspect(expr))(value) for futures
where value is the previous value.
Every combinator prefixed by ~ will act as deferred action (all actions will wait until completion in every step and only after move to the next one).
Handler
might be one of
-
map=> will act as results.map(|(result0, result1, ..)| handler(result0, result1, ..)) -
and_then=> will act as results.and_then(|(result0, result1, ..)| handler(result0, result1, ..)) -
then=> will act as handler(result0, result1, ..)
or not specified - then Result<(result0, result1, ..), Error> or Option<(result0, result1, ..)> will be returned.
Single thread combinations
Simple results combination
Converts input in series of chained results and joins them step by step.
extern crate union; use std::error::Error; use union::union; type Result<T> = std::result::Result<T, Box<dyn Error>>; fn action_1() -> Result<u16> { Ok(1) } fn action_2() -> Result<u8> { Ok(2) } fn main() { let sum = union! { action_1(), action_2().map(|v| v as u16), action_2().map(|v| v as u16 + 1).and_then(|v| Ok(v * 4)), action_1().and_then(|_| Err("5".into())).or(Ok(2)), map => |a, b, c, d| a + b + c + d }.expect("Failed to calculate sum"); println!("Calculated: {}", sum); }
Futures combination
Each branch will represent chain of tasks. All branches will be joined using join! macro and macro will return unpolled future.
#![recursion_limit="256"] extern crate union; extern crate futures; extern crate tokio; use std::error::Error; use union::union_async; use futures::future::{ok, err}; type Result<T> = std::result::Result<T, Box<dyn Error>>; async fn action_1() -> Result<u16> { Ok(1) } async fn action_2() -> Result<u8> { Ok(2) } #[tokio::main] async fn main() { let sum = union_async! { action_1(), action_2().and_then(|v| ok(v as u16)), action_2().map(|v| v.map(|v| v as u16 + 1)).and_then(|v| ok(v * 4u16)), action_1().and_then(|_| err("5".into())).or_else(|_| ok(2u16)), and_then => |a, b, c, d| ok(a + b + c + d) }.await.expect("Failed to calculate sum"); println!("Calculated: {}", sum); }
Multi-thread combinations
To execute several tasks in parallel you could use union_spawn! (spawnion!) for sync tasks
and union_async_spawn! (async_spawn!) for futures. Since union_async already provides parallel futures execution in one thread, union_async_spawn! spawns every branch into tokio executor so they will be evaluated in multi-threaded executor.
Multi-thread sync branches
union_spawn spawns one ::std::thread per each step of each branch (number of branches is the max thread count at the time).
extern crate union; use std::error::Error; use union::union_spawn; type Result<T> = std::result::Result<T, Box<dyn Error + Send + Sync>>; fn action_1() -> Result<usize> { Ok(1) } fn action_2() -> Result<u16> { Ok(2) } fn main() { // Branches will be executed in parallel let sum = union_spawn! { action_1(), action_2().map(|v| v as usize), action_2().map(|v| v as usize + 1).and_then(|v| Ok(v * 4)), action_1().and_then(|_| Err("5".into())).or(Ok(2)), map => |a, b, c, d| a + b + c + d }.expect("Failed to calculate sum"); println!("Calculated: {}", sum); }
union_async_spawn! uses ::tokio::spawn function to spawn tasks so it should be done inside tokio runtime
(number of branches is the max count of tokio tasks at the time).
Multi-thread futures
#![recursion_limit="256"] extern crate union; extern crate futures; extern crate tokio; use std::error::Error; use union::union_async_spawn; use futures::future::{ok, err}; type Result<T> = std::result::Result<T, Box<dyn Error + Send + Sync>>; async fn action_1() -> Result<u16> { Ok(1) } async fn action_2() -> Result<u8> { Ok(2) } #[tokio::main] async fn main() { let sum = union_async_spawn! { action_1(), action_2().and_then(|v| ok(v as u16)), action_2().map(|v| v.map(|v| v as u16 + 1)).and_then(|v| ok(v * 4u16)), action_1().and_then(|_| err("5".into())).or_else(|_| ok(2u16)), and_then => |a, b, c, d| ok(a + b + c + d) }.await.expect("Failed to calculate sum"); println!("Calculated: {}", sum); }
Using combinators we can rewrite first example like
extern crate union; use std::error::Error; use union::union; type Result<T> = std::result::Result<T, Box<dyn Error>>; fn action_1() -> Result<u16> { Ok(1) } fn action_2() -> Result<u8> { Ok(2) } fn main() { let sum = union! { action_1(), action_2() |> |v| v as u16, action_2() |> |v| v as u16 + 1 => |v| Ok(v * 4), action_1() => |_| Err("5".into()) <| Ok(2), map => |a, b, c, d| a + b + c + d }.expect("Failed to calculate sum"); println!("Calculated: {}", sum); }
By separating chain in actions, you will make actions wait for completion of all of them in current step before go to the next step.
#![recursion_limit="256"] extern crate union; use std::error::Error; use union::union; type Result<T> = std::result::Result<T, Box<dyn Error + Send + Sync>>; fn action_1() -> Result<u16> { Ok(1) } fn action_2() -> Result<u8> { Ok(2) } fn main() { let sum = union! { action_1(), let result_1 = action_2() ~|> |v| v as u16 + 1, action_2() ~|> { let result_1 = result_1.as_ref().ok().map(Clone::clone); move |v| { // `result_1` now is the result of `action_2()` [Ok(1u8)] if result_1.is_some() { v as u16 + 1 } else { unreachable!() } } } ~=> { let result_1 = result_1.as_ref().ok().map(Clone::clone); move |v| { // `result_1` now is the result of `|v| v as u16 + 1` [Ok(2u16)] if let Some(result_1) = result_1 { Ok(v * 4 + result_1) } else { unreachable!() } } }, action_1() ~=> |_| Err("5".into()) <| Ok(2), map => |a, b, c, d| a + b + c + d }.expect("Failed to calculate sum"); println!("Calculated: {}", sum); }
Macros
| async_spawn | Alias for |
| asyncion | Alias for |
| spawn | Alias for |
| union | Use to combine sync results. |
| union_async | Use to combine futures. |
| union_async_spawn | Use to spawn |
| union_spawn | Use to spawn |