Expand description
§Rust implementation of the callbag spec for reactive/iterable programming
Basic callbag factories and operators to get started with.
Highlights:
- Supports reactive stream programming
- Supports iterable programming (also!)
- Same operator works for both of the above
- Extensible
Imagine a hybrid between an Observable and an (Async)Iterable, that’s what callbags are all about. It’s all done with a few simple callbacks, following the callbag spec.
§Examples
§Reactive programming examples
Pick the first 5 odd numbers from a clock that ticks every second, then start observing them:
use async_nursery::Nursery;
use crossbeam_queue::SegQueue;
use std::{sync::Arc, time::Duration};
use callbag::{filter, for_each, interval, map, pipe, take};
let (nursery, nursery_out) = Nursery::new(async_executors::AsyncStd);
let actual = Arc::new(SegQueue::new());
pipe!(
interval(Duration::from_millis(1_000), nursery.clone()),
map(|x| x + 1),
filter(|x| x % 2 == 1),
take(5),
for_each({
let actual = Arc::clone(&actual);
move |x| {
println!("{}", x);
actual.push(x);
}
}),
);
drop(nursery);
async_std::task::block_on(nursery_out);
assert_eq!(
&{
let mut v = vec![];
for _i in 0..actual.len() {
v.push(actual.pop().unwrap());
}
v
}[..],
[1, 3, 5, 7, 9]
);
§Iterable programming examples
From a range of numbers, pick 5 of them and divide them by 4, then start pulling those one by one:
use crossbeam_queue::SegQueue;
use std::sync::Arc;
use callbag::{for_each, from_iter, map, pipe, take};
#[derive(Clone)]
struct Range {
i: usize,
to: usize,
}
impl Range {
fn new(from: usize, to: usize) -> Self {
Range { i: from, to }
}
}
impl Iterator for Range {
type Item = usize;
fn next(&mut self) -> Option<Self::Item> {
let i = self.i;
if i <= self.to {
self.i += 1;
Some(i)
} else {
None
}
}
}
let actual = Arc::new(SegQueue::new());
pipe!(
from_iter(Range::new(40, 99)),
take(5),
map(|x| x as f64 / 4.0),
for_each({
let actual = Arc::clone(&actual);
move |x| {
println!("{}", x);
actual.push(x);
}
}),
);
assert_eq!(
&{
let mut v = vec![];
for _i in 0..actual.len() {
v.push(actual.pop().unwrap());
}
v
}[..],
[10.0, 10.25, 10.5, 10.75, 11.0]
);
§API
The list below shows what’s included.
§Source factories
§Sink factories
§Transformation operators
§Filtering operators
§Combination operators
§Utilities
§Terminology
- source: a callbag that delivers data
- sink: a callbag that receives data
- puller sink: a sink that actively requests data from the source
- pullable source: a source that delivers data only on demand (on receiving a request)
- listener sink: a sink that passively receives data from the source
- listenable source: source which sends data to the sink without waiting for requests
- operator: a callbag based on another callbag which applies some operation
Macros§
- combine
- Callbag factory that combines the latest data points from multiple (2 or more) callbag sources.
- concat
- Callbag factory that concatenates the data from multiple (2 or more) callbag sources.
- merge
- Callbag factory that merges data from multiple callbag sources.
- pipe
- Utility function for plugging callbags together in chain.
Structs§
- Callbag
- A
Callbag
dynamically receives input of typeI
and dynamically delivers output of typeO
.
Enums§
Functions§
- filter
- Callbag operator that conditionally lets data pass through.
- flatten
- Callbag operator that flattens a higher-order callbag source.
- for_
each - Callbag sink that consumes both pullable and listenable sources.
- from_
iter - Converts an iterable or
Iterator
to a callbag pullable source. - interval
- A callbag listenable source that sends incremental numbers every x milliseconds.
- map
- Callbag operator that applies a transformation on data passing through it.
- scan
- Callbag operator that combines consecutive values from the same source.
- share
- Callbag operator that broadcasts a single source to multiple sinks.
- skip
- Callbag operator that skips the first N data points of a source.
- take
- Callbag operator that limits the amount of data sent by a source.