Expand description
Callbag operator that broadcasts a single source to multiple sinks.
Does reference counting on sinks and starts the source when the first sink gets connected,
similar to RxJS .share()
.
Works on either pullable or listenable sources.
Examples
Share a listenable source to two listeners:
use async_executors::{Timer, TimerExt};
use async_nursery::{NurseExt, Nursery};
use crossbeam_queue::SegQueue;
use std::{sync::Arc, time::Duration};
use callbag::{for_each, interval, share};
let (nursery, nursery_out) = Nursery::new(async_executors::AsyncStd);
let actual_1 = Arc::new(SegQueue::new());
let actual_2 = Arc::new(SegQueue::new());
let source = Arc::new(share(interval(Duration::from_millis(1_000), nursery.clone())));
for_each({
let actual_1 = Arc::clone(&actual_1);
move |x| {
println!("{}", x);
actual_1.push(x);
}
})(Arc::clone(&source));
nursery
.nurse({
let nursery = nursery.clone();
let actual_2 = Arc::clone(&actual_2);
const DURATION: Duration = Duration::from_millis(3_500);
async move {
nursery.sleep(DURATION).await;
for_each(move |x| {
println!("{}", x);
actual_2.push(x);
})(source);
}
})?;
let nursery_out = nursery.timeout(Duration::from_millis(6_500), nursery_out);
drop(nursery);
async_std::task::block_on(nursery_out);
assert_eq!(
&{
let mut v = vec![];
for _i in 0..actual_1.len() {
v.push(actual_1.pop().unwrap());
}
v
}[..],
[0, 1, 2, 3, 4, 5]
);
assert_eq!(
&{
let mut v = vec![];
for _i in 0..actual_2.len() {
v.push(actual_2.pop().unwrap());
}
v
}[..],
[3, 4, 5]
);
Share a pullable source to two pullers:
use arc_swap::ArcSwapOption;
use crossbeam_queue::SegQueue;
use std::sync::Arc;
use callbag::{from_iter, share, Message};
let actual_1 = Arc::new(SegQueue::new());
let actual_2 = Arc::new(SegQueue::new());
let source = share(from_iter([10, 20, 30, 40, 50]));
let talkback = Arc::new(ArcSwapOption::from(None));
source(Message::Handshake(Arc::new(
{
let actual_1 = Arc::clone(&actual_1);
let talkback = Arc::clone(&talkback);
move |message| {
if let Message::Handshake(source) = message {
talkback.store(Some(source));
} else if let Message::Data(data) = message {
println!("a{}", data);
actual_1.push(format!("a{}", data));
}
}
}
.into()
)));
source(Message::Handshake(Arc::new(
{
let actual_2 = Arc::clone(&actual_2);
move |message| {
if let Message::Data(data) = message {
println!("b{}", data);
actual_2.push(format!("b{}", data));
}
}
}
.into()
)));
let talkback = talkback.load();
let talkback = talkback.as_ref().ok_or("source talkback not set")?;
talkback(Message::Pull);
talkback(Message::Pull);
assert_eq!(
&{
let mut v = vec![];
for _i in 0..actual_1.len() {
v.push(actual_1.pop().unwrap());
}
v
}[..],
["a10", "a20"]);
assert_eq!(
&{
let mut v = vec![];
for _i in 0..actual_2.len() {
v.push(actual_2.pop().unwrap());
}
v
}[..],
["b10", "b20"]
);