macro_rules! pipe {
($a : expr, $b : expr $(,) ?) => { ... };
($a : expr, $b : expr, $($rest : expr), * $(,) ?) => { ... };
}
Expand description
Utility function for plugging callbags together in chain.
This utility actually doesn’t rely on Callbag specifics, and is really similar to
Ramda’s pipe
or lodash’s flow
.
This exists to play nicely with the ecosystem, and to facilitate the import of the function.
Examples
Create a source with pipe!
, then pass it to a for_each
:
use async_executors::TimerExt;
use async_nursery::Nursery;
use crossbeam_queue::SegQueue;
use std::{sync::Arc, time::Duration};
use callbag::{combine, for_each, interval, map, pipe, take};
let (nursery, nursery_out) = Nursery::new(async_executors::AsyncStd);
let actual = Arc::new(SegQueue::new());
let source = pipe!(
combine!(
interval(Duration::from_millis(100), nursery.clone()),
interval(Duration::from_millis(350), nursery.clone()),
),
map(|(x, y)| format!("X{},Y{}", x, y)),
take(10),
);
for_each({
let actual = Arc::clone(&actual);
move |x: String| {
println!("{:?}", x);
actual.push(x.clone());
}
})(source);
let nursery_out = nursery.timeout(Duration::from_millis(1_100), nursery_out);
drop(nursery);
async_std::task::block_on(nursery_out);
assert_eq!(
&{
let mut v = vec![];
for _i in 0..actual.len() {
v.push(actual.pop().unwrap());
}
v
}[..],
[
"X2,Y0",
"X3,Y0",
"X4,Y0",
"X5,Y0",
"X6,Y0",
"X6,Y1",
"X7,Y1",
"X8,Y1",
"X9,Y1",
"X9,Y2",
]
);
Or use pipe!
to go all the way from source to sink:
use async_executors::TimerExt;
use async_nursery::Nursery;
use crossbeam_queue::SegQueue;
use std::{sync::Arc, time::Duration};
use callbag::{combine, for_each, interval, map, pipe, take};
let (nursery, nursery_out) = Nursery::new(async_executors::AsyncStd);
let actual = Arc::new(SegQueue::new());
let source = pipe!(
combine!(
interval(Duration::from_millis(100), nursery.clone()),
interval(Duration::from_millis(350), nursery.clone()),
),
map(|(x, y)| format!("X{},Y{}", x, y)),
take(10),
for_each({
let actual = Arc::clone(&actual);
move |x: String| {
println!("{:?}", x);
actual.push(x.clone());
}
}),
);
let nursery_out = nursery.timeout(Duration::from_millis(1_100), nursery_out);
drop(nursery);
async_std::task::block_on(nursery_out);
assert_eq!(
&{
let mut v = vec![];
for _i in 0..actual.len() {
v.push(actual.pop().unwrap());
}
v
}[..],
[
"X2,Y0",
"X3,Y0",
"X4,Y0",
"X5,Y0",
"X6,Y0",
"X6,Y1",
"X7,Y1",
"X8,Y1",
"X9,Y1",
"X9,Y2",
]
);
Nesting
To use pipe!
inside another pipe!
, you need to give the inner pipe!
an argument, e.g.
|s| pipe!(s, ...
:
use async_executors::TimerExt;
use async_nursery::Nursery;
use crossbeam_queue::SegQueue;
use std::{sync::Arc, time::Duration};
use callbag::{combine, for_each, interval, map, pipe, take};
let (nursery, nursery_out) = Nursery::new(async_executors::AsyncStd);
let actual = Arc::new(SegQueue::new());
let source = pipe!(
combine!(
interval(Duration::from_millis(100), nursery.clone()),
interval(Duration::from_millis(350), nursery.clone()),
),
|s| pipe!(
s,
map(|(x, y)| format!("X{},Y{}", x, y)),
take(10),
),
for_each({
let actual = Arc::clone(&actual);
move |x: String| {
println!("{:?}", x);
actual.push(x.clone());
}
}),
);
let nursery_out = nursery.timeout(Duration::from_millis(1_100), nursery_out);
drop(nursery);
async_std::task::block_on(nursery_out);
assert_eq!(
&{
let mut v = vec![];
for _i in 0..actual.len() {
v.push(actual.pop().unwrap());
}
v
}[..],
[
"X2,Y0",
"X3,Y0",
"X4,Y0",
"X5,Y0",
"X6,Y0",
"X6,Y1",
"X7,Y1",
"X8,Y1",
"X9,Y1",
"X9,Y2",
]
);
This means you can use pipe!
to create a new operator:
use async_executors::TimerExt;
use async_nursery::Nursery;
use crossbeam_queue::SegQueue;
use std::{sync::Arc, time::Duration};
use callbag::{combine, for_each, interval, map, pipe, take};
let (nursery, nursery_out) = Nursery::new(async_executors::AsyncStd);
let actual = Arc::new(SegQueue::new());
let map_then_take = |f, amount| {
move |s| pipe!(s, map(f), take(amount))
};
let source = pipe!(
combine!(
interval(Duration::from_millis(100), nursery.clone()),
interval(Duration::from_millis(350), nursery.clone()),
),
|s| pipe!(
s,
map_then_take(|(x, y)| format!("X{},Y{}", x, y), 10),
),
for_each({
let actual = Arc::clone(&actual);
move |x: String| {
println!("{:?}", x);
actual.push(x.clone());
}
}),
);
let nursery_out = nursery.timeout(Duration::from_millis(1_100), nursery_out);
drop(nursery);
async_std::task::block_on(nursery_out);
assert_eq!(
&{
let mut v = vec![];
for _i in 0..actual.len() {
v.push(actual.pop().unwrap());
}
v
}[..],
[
"X2,Y0",
"X3,Y0",
"X4,Y0",
"X5,Y0",
"X6,Y0",
"X6,Y1",
"X7,Y1",
"X8,Y1",
"X9,Y1",
"X9,Y2",
]
);