Expand description
Rust implementation of the callbag spec for reactive/iterable programming
Basic callbag factories and operators to get started with.
Highlights:
- Supports reactive stream programming
- Supports iterable programming (also!)
- Same operator works for both of the above
- Extensible
Imagine a hybrid between an Observable and an (Async)Iterable, that’s what callbags are all about. It’s all done with a few simple callbacks, following the callbag spec.
Examples
Reactive programming examples
Pick the first 5 odd numbers from a clock that ticks every second, then start observing them:
use async_nursery::Nursery;
use crossbeam_queue::SegQueue;
use std::{sync::Arc, time::Duration};
use callbag::{filter, for_each, interval, map, pipe, take};
let (nursery, nursery_out) = Nursery::new(async_executors::AsyncStd);
let actual = Arc::new(SegQueue::new());
pipe!(
interval(Duration::from_millis(1_000), nursery.clone()),
map(|x| x + 1),
filter(|x| x % 2 == 1),
take(5),
for_each({
let actual = Arc::clone(&actual);
move |x| {
println!("{}", x);
actual.push(x);
}
}),
);
drop(nursery);
async_std::task::block_on(nursery_out);
assert_eq!(
&{
let mut v = vec![];
for _i in 0..actual.len() {
v.push(actual.pop().unwrap());
}
v
}[..],
[1, 3, 5, 7, 9]
);
Iterable programming examples
From a range of numbers, pick 5 of them and divide them by 4, then start pulling those one by one:
use crossbeam_queue::SegQueue;
use std::sync::Arc;
use callbag::{for_each, from_iter, map, pipe, take};
#[derive(Clone)]
struct Range {
i: usize,
to: usize,
}
impl Range {
fn new(from: usize, to: usize) -> Self {
Range { i: from, to }
}
}
impl Iterator for Range {
type Item = usize;
fn next(&mut self) -> Option<Self::Item> {
let i = self.i;
if i <= self.to {
self.i += 1;
Some(i)
} else {
None
}
}
}
let actual = Arc::new(SegQueue::new());
pipe!(
from_iter(Range::new(40, 99)),
take(5),
map(|x| x as f64 / 4.0),
for_each({
let actual = Arc::clone(&actual);
move |x| {
println!("{}", x);
actual.push(x);
}
}),
);
assert_eq!(
&{
let mut v = vec![];
for _i in 0..actual.len() {
v.push(actual.pop().unwrap());
}
v
}[..],
[10.0, 10.25, 10.5, 10.75, 11.0]
);
API
The list below shows what’s included.
Source factories
Sink factories
Transformation operators
Filtering operators
Combination operators
Utilities
Terminology
- source: a callbag that delivers data
- sink: a callbag that receives data
- puller sink: a sink that actively requests data from the source
- pullable source: a source that delivers data only on demand (on receiving a request)
- listener sink: a sink that passively receives data from the source
- listenable source: source which sends data to the sink without waiting for requests
- operator: a callbag based on another callbag which applies some operation
Macros
Callbag factory that combines the latest data points from multiple (2 or more) callbag sources.
Callbag factory that concatenates the data from multiple (2 or more) callbag sources.
Callbag factory that merges data from multiple callbag sources.
Utility function for plugging callbags together in chain.
Structs
A Callbag
dynamically receives input of type I
and dynamically delivers output of type O
.
Enums
Functions
Callbag operator that conditionally lets data pass through.
Callbag operator that flattens a higher-order callbag source.
Callbag sink that consumes both pullable and listenable sources.
A callbag listenable source that sends incremental numbers every x milliseconds.
Callbag operator that applies a transformation on data passing through it.
Callbag operator that combines consecutive values from the same source.
Callbag operator that broadcasts a single source to multiple sinks.
Callbag operator that skips the first N data points of a source.
Callbag operator that limits the amount of data sent by a source.