Expand description

Rust implementation of the callbag spec for reactive/iterable programming

Basic callbag factories and operators to get started with.

Highlights:

  • Supports reactive stream programming
  • Supports iterable programming (also!)
  • Same operator works for both of the above
  • Extensible

Imagine a hybrid between an Observable and an (Async)Iterable, that’s what callbags are all about. It’s all done with a few simple callbacks, following the callbag spec.

Examples

Reactive programming examples

Pick the first 5 odd numbers from a clock that ticks every second, then start observing them:

use async_nursery::Nursery;
use crossbeam_queue::SegQueue;
use std::{sync::Arc, time::Duration};

use callbag::{filter, for_each, interval, map, pipe, take};

let (nursery, nursery_out) = Nursery::new(async_executors::AsyncStd);

let actual = Arc::new(SegQueue::new());

pipe!(
    interval(Duration::from_millis(1_000), nursery.clone()),
    map(|x| x + 1),
    filter(|x| x % 2 == 1),
    take(5),
    for_each({
        let actual = Arc::clone(&actual);
        move |x| {
            println!("{}", x);
            actual.push(x);
        }
    }),
);

drop(nursery);
async_std::task::block_on(nursery_out);

assert_eq!(
    &{
        let mut v = vec![];
        for _i in 0..actual.len() {
            v.push(actual.pop().unwrap());
        }
        v
    }[..],
    [1, 3, 5, 7, 9]
);

Iterable programming examples

From a range of numbers, pick 5 of them and divide them by 4, then start pulling those one by one:

use crossbeam_queue::SegQueue;
use std::sync::Arc;

use callbag::{for_each, from_iter, map, pipe, take};

#[derive(Clone)]
struct Range {
    i: usize,
    to: usize,
}

impl Range {
    fn new(from: usize, to: usize) -> Self {
        Range { i: from, to }
    }
}

impl Iterator for Range {
    type Item = usize;

    fn next(&mut self) -> Option<Self::Item> {
        let i = self.i;
        if i <= self.to {
            self.i += 1;
            Some(i)
        } else {
            None
        }
    }
}

let actual = Arc::new(SegQueue::new());

pipe!(
    from_iter(Range::new(40, 99)),
    take(5),
    map(|x| x as f64 / 4.0),
    for_each({
        let actual = Arc::clone(&actual);
        move |x| {
            println!("{}", x);
            actual.push(x);
        }
    }),
);

assert_eq!(
    &{
        let mut v = vec![];
        for _i in 0..actual.len() {
            v.push(actual.pop().unwrap());
        }
        v
    }[..],
    [10.0, 10.25, 10.5, 10.75, 11.0]
);

API

The list below shows what’s included.

Source factories

Sink factories

Transformation operators

Filtering operators

Combination operators

Utilities

Terminology

  • source: a callbag that delivers data
  • sink: a callbag that receives data
  • puller sink: a sink that actively requests data from the source
  • pullable source: a source that delivers data only on demand (on receiving a request)
  • listener sink: a sink that passively receives data from the source
  • listenable source: source which sends data to the sink without waiting for requests
  • operator: a callbag based on another callbag which applies some operation

Macros

Callbag factory that combines the latest data points from multiple (2 or more) callbag sources.

Callbag factory that concatenates the data from multiple (2 or more) callbag sources.

Callbag factory that merges data from multiple callbag sources.

Utility function for plugging callbags together in chain.

Structs

A Callbag dynamically receives input of type I and dynamically delivers output of type O.

Enums

A message passed to a Callbag.

Functions

Callbag operator that conditionally lets data pass through.

Callbag operator that flattens a higher-order callbag source.

Callbag sink that consumes both pullable and listenable sources.

Converts an iterable or Iterator to a callbag pullable source.

A callbag listenable source that sends incremental numbers every x milliseconds.

Callbag operator that applies a transformation on data passing through it.

Callbag operator that combines consecutive values from the same source.

Callbag operator that broadcasts a single source to multiple sinks.

Callbag operator that skips the first N data points of a source.

Callbag operator that limits the amount of data sent by a source.

Type Definitions

A sink only receives data.

A source only delivers data.