Extra [Sink] adaptors and functions.
Forward building with [SinkBuild]
For an intuitive API that matches the data flow direction, use [SinkBuilder] and the [SinkBuild] trait to chain
adaptors in forward order:
use sinktools::{SinkBuilder, SinkBuild};
use sinktools::sink::SinkExt;
# #[tokio::main(flavor = "current_thread")]
# async fn main() {
let mut pipeline = SinkBuilder::<Vec<i32>>::new()
.flatten::<Vec<i32>>() .filter_map(|x: i32| { if x % 2 == 0 {
Some(x * 2)
} else {
None
}
})
.map(|x| x + 1) .filter(|x| *x < 100) .for_each(|x: i32| { println!("Received: {}", x);
});
pipeline.send(vec![1, 2, 3, 4]).await.unwrap();
pipeline.send(vec![5, 6]).await.unwrap();
pipeline.send(vec![]).await.unwrap();
pipeline.send(vec![7, 8, 9]).await.unwrap();
# }
Direct construction
Alternatively, you can construct sink adaptors directly using their new methods:
use sinktools::{for_each, filter, map, filter_map, flatten};
use sinktools::sink::SinkExt;
# #[tokio::main(flavor = "current_thread")]
# async fn main() {
let sink = for_each(|x: i32| {
println!("Received: {}", x);
});
let filter_sink = filter(|x: &i32| *x < 100, sink);
let map_sink = map(|x: i32| x + 1, filter_sink);
let filter_map_sink = filter_map(|x: i32| {
if x % 2 == 0 {
Some(x * 2)
} else {
None
}
}, map_sink);
let mut complex_sink = flatten::<Vec<i32>, _>(filter_map_sink);
complex_sink.send(vec![1, 2, 3, 4]).await.unwrap();
complex_sink.send(vec![5, 6]).await.unwrap();
complex_sink.send(vec![]).await.unwrap();
complex_sink.send(vec![7, 8, 9]).await.unwrap();
# }
Each adaptor also provides a new_sink method which ensures the construction is correct for Sink to be implemented,
but may require additional type arguments to aid inference.
The forward SinkBuilder API is more intuitive for direct users, as it matches the data flow direction. Direct construction is
better for generated code as it aids compiler type inference.