Crate callbag

Source
Expand description

§Rust implementation of the callbag spec for reactive/iterable programming

Basic callbag factories and operators to get started with.

Highlights:

  • Supports reactive stream programming
  • Supports iterable programming (also!)
  • Same operator works for both of the above
  • Extensible

Imagine a hybrid between an Observable and an (Async)Iterable, that’s what callbags are all about. It’s all done with a few simple callbacks, following the callbag spec.

§Examples

§Reactive programming examples

Pick the first 5 odd numbers from a clock that ticks every second, then start observing them:

use async_nursery::Nursery;
use crossbeam_queue::SegQueue;
use std::{sync::Arc, time::Duration};

use callbag::{filter, for_each, interval, map, pipe, take};

let (nursery, nursery_out) = Nursery::new(async_executors::AsyncStd);

let actual = Arc::new(SegQueue::new());

pipe!(
    interval(Duration::from_millis(1_000), nursery.clone()),
    map(|x| x + 1),
    filter(|x| x % 2 == 1),
    take(5),
    for_each({
        let actual = Arc::clone(&actual);
        move |x| {
            println!("{}", x);
            actual.push(x);
        }
    }),
);

drop(nursery);
async_std::task::block_on(nursery_out);

assert_eq!(
    &{
        let mut v = vec![];
        for _i in 0..actual.len() {
            v.push(actual.pop().unwrap());
        }
        v
    }[..],
    [1, 3, 5, 7, 9]
);

§Iterable programming examples

From a range of numbers, pick 5 of them and divide them by 4, then start pulling those one by one:

use crossbeam_queue::SegQueue;
use std::sync::Arc;

use callbag::{for_each, from_iter, map, pipe, take};

#[derive(Clone)]
struct Range {
    i: usize,
    to: usize,
}

impl Range {
    fn new(from: usize, to: usize) -> Self {
        Range { i: from, to }
    }
}

impl Iterator for Range {
    type Item = usize;

    fn next(&mut self) -> Option<Self::Item> {
        let i = self.i;
        if i <= self.to {
            self.i += 1;
            Some(i)
        } else {
            None
        }
    }
}

let actual = Arc::new(SegQueue::new());

pipe!(
    from_iter(Range::new(40, 99)),
    take(5),
    map(|x| x as f64 / 4.0),
    for_each({
        let actual = Arc::clone(&actual);
        move |x| {
            println!("{}", x);
            actual.push(x);
        }
    }),
);

assert_eq!(
    &{
        let mut v = vec![];
        for _i in 0..actual.len() {
            v.push(actual.pop().unwrap());
        }
        v
    }[..],
    [10.0, 10.25, 10.5, 10.75, 11.0]
);

§API

The list below shows what’s included.

§Source factories

§Sink factories

§Transformation operators

§Filtering operators

§Combination operators

§Utilities

§Terminology

  • source: a callbag that delivers data
  • sink: a callbag that receives data
  • puller sink: a sink that actively requests data from the source
  • pullable source: a source that delivers data only on demand (on receiving a request)
  • listener sink: a sink that passively receives data from the source
  • listenable source: source which sends data to the sink without waiting for requests
  • operator: a callbag based on another callbag which applies some operation

Macros§

combine
Callbag factory that combines the latest data points from multiple (2 or more) callbag sources.
concat
Callbag factory that concatenates the data from multiple (2 or more) callbag sources.
merge
Callbag factory that merges data from multiple callbag sources.
pipe
Utility function for plugging callbags together in chain.

Structs§

Callbag
A Callbag dynamically receives input of type I and dynamically delivers output of type O.

Enums§

Message
A message passed to a Callbag.

Functions§

filter
Callbag operator that conditionally lets data pass through.
flatten
Callbag operator that flattens a higher-order callbag source.
for_each
Callbag sink that consumes both pullable and listenable sources.
from_iter
Converts an iterable or Iterator to a callbag pullable source.
interval
A callbag listenable source that sends incremental numbers every x milliseconds.
map
Callbag operator that applies a transformation on data passing through it.
scan
Callbag operator that combines consecutive values from the same source.
share
Callbag operator that broadcasts a single source to multiple sinks.
skip
Callbag operator that skips the first N data points of a source.
take
Callbag operator that limits the amount of data sent by a source.

Type Aliases§

CallbagFn
Sink
A sink only receives data.
Source
A source only delivers data.