pub fn take<T: 'static, S>(max: usize) -> Box<dyn Fn(S) -> Source<T>> where
    S: Into<Arc<Source<T>>>, 
Expand description

Callbag operator that limits the amount of data sent by a source.

Works on either pullable and listenable sources.

See https://github.com/staltz/callbag-take/blob/6ae7755ea5f014306704450a40eb72ffdb21d308/index.js#L1-L30

Examples

On a listenable source:

use async_nursery::Nursery;
use crossbeam_queue::SegQueue;
use std::{sync::Arc, time::Duration};

use callbag::{for_each, interval, take};

let (nursery, nursery_out) = Nursery::new(async_executors::AsyncStd);

let actual = Arc::new(SegQueue::new());

let source = take(3)(interval(Duration::from_millis(1_000), nursery.clone()));

for_each({
    let actual = Arc::clone(&actual);
    move |x| {
        println!("{}", x);
        actual.push(x);
    }
})(source);

drop(nursery);
async_std::task::block_on(nursery_out);

assert_eq!(
    &{
        let mut v = vec![];
        for _i in 0..actual.len() {
            v.push(actual.pop().unwrap());
        }
        v
    }[..],
    [0, 1, 2]
);

On a pullable source:

use crossbeam_queue::SegQueue;
use std::sync::Arc;

use callbag::{for_each, from_iter, take};

#[derive(Clone)]
struct Range {
    i: usize,
    to: usize,
}

impl Range {
    fn new(from: usize, to: usize) -> Self {
        Range { i: from, to }
    }
}

impl Iterator for Range {
    type Item = usize;

    fn next(&mut self) -> Option<Self::Item> {
        let i = self.i;
        if i <= self.to {
            self.i += 1;
            Some(i)
        } else {
            None
        }
    }
}

let actual = Arc::new(SegQueue::new());

let source = take(4)(from_iter(Range::new(100, 999)));

for_each({
    let actual = Arc::clone(&actual);
    move |x| {
        println!("{}", x);
        actual.push(x);
    }
})(source);

assert_eq!(
    &{
        let mut v = vec![];
        for _i in 0..actual.len() {
            v.push(actual.pop().unwrap());
        }
        v
    }[..],
    [100, 101, 102, 103]
);