pub fn take<T: 'static, S>(max: usize) -> Box<dyn Fn(S) -> Source<T>> where
S: Into<Arc<Source<T>>>,
Expand description
Callbag operator that limits the amount of data sent by a source.
Works on either pullable and listenable sources.
See https://github.com/staltz/callbag-take/blob/6ae7755ea5f014306704450a40eb72ffdb21d308/index.js#L1-L30
Examples
On a listenable source:
use async_nursery::Nursery;
use crossbeam_queue::SegQueue;
use std::{sync::Arc, time::Duration};
use callbag::{for_each, interval, take};
let (nursery, nursery_out) = Nursery::new(async_executors::AsyncStd);
let actual = Arc::new(SegQueue::new());
let source = take(3)(interval(Duration::from_millis(1_000), nursery.clone()));
for_each({
let actual = Arc::clone(&actual);
move |x| {
println!("{}", x);
actual.push(x);
}
})(source);
drop(nursery);
async_std::task::block_on(nursery_out);
assert_eq!(
&{
let mut v = vec![];
for _i in 0..actual.len() {
v.push(actual.pop().unwrap());
}
v
}[..],
[0, 1, 2]
);
On a pullable source:
use crossbeam_queue::SegQueue;
use std::sync::Arc;
use callbag::{for_each, from_iter, take};
#[derive(Clone)]
struct Range {
i: usize,
to: usize,
}
impl Range {
fn new(from: usize, to: usize) -> Self {
Range { i: from, to }
}
}
impl Iterator for Range {
type Item = usize;
fn next(&mut self) -> Option<Self::Item> {
let i = self.i;
if i <= self.to {
self.i += 1;
Some(i)
} else {
None
}
}
}
let actual = Arc::new(SegQueue::new());
let source = take(4)(from_iter(Range::new(100, 999)));
for_each({
let actual = Arc::clone(&actual);
move |x| {
println!("{}", x);
actual.push(x);
}
})(source);
assert_eq!(
&{
let mut v = vec![];
for _i in 0..actual.len() {
v.push(actual.pop().unwrap());
}
v
}[..],
[100, 101, 102, 103]
);