pub fn share<T: 'static, S>(source: S) -> Source<T> where
    T: Clone,
    S: Into<Arc<Source<T>>>, 
Expand description

Callbag operator that broadcasts a single source to multiple sinks.

Does reference counting on sinks and starts the source when the first sink gets connected, similar to RxJS .share().

Works on either pullable or listenable sources.

See https://github.com/staltz/callbag-share/blob/d96748edec631800ec5e606018f519ccaeb8f766/index.js#L1-L32

Examples

Share a listenable source to two listeners:

use async_executors::{Timer, TimerExt};
use async_nursery::{NurseExt, Nursery};
use crossbeam_queue::SegQueue;
use std::{sync::Arc, time::Duration};

use callbag::{for_each, interval, share};

let (nursery, nursery_out) = Nursery::new(async_executors::AsyncStd);

let actual_1 = Arc::new(SegQueue::new());
let actual_2 = Arc::new(SegQueue::new());

let source = Arc::new(share(interval(Duration::from_millis(1_000), nursery.clone())));

for_each({
    let actual_1 = Arc::clone(&actual_1);
    move |x| {
        println!("{}", x);
        actual_1.push(x);
    }
})(Arc::clone(&source));

nursery
    .nurse({
        let nursery = nursery.clone();
        let actual_2 = Arc::clone(&actual_2);
        const DURATION: Duration = Duration::from_millis(3_500);
        async move {
            nursery.sleep(DURATION).await;
            for_each(move |x| {
                println!("{}", x);
                actual_2.push(x);
            })(source);
        }
    })?;

let nursery_out = nursery.timeout(Duration::from_millis(6_500), nursery_out);
drop(nursery);
async_std::task::block_on(nursery_out);

assert_eq!(
    &{
        let mut v = vec![];
        for _i in 0..actual_1.len() {
            v.push(actual_1.pop().unwrap());
        }
        v
    }[..],
    [0, 1, 2, 3, 4, 5]
);
assert_eq!(
    &{
        let mut v = vec![];
        for _i in 0..actual_2.len() {
            v.push(actual_2.pop().unwrap());
        }
        v
    }[..],
    [3, 4, 5]
);

Share a pullable source to two pullers:

use arc_swap::ArcSwapOption;
use crossbeam_queue::SegQueue;
use std::sync::Arc;

use callbag::{from_iter, share, Message};

let actual_1 = Arc::new(SegQueue::new());
let actual_2 = Arc::new(SegQueue::new());

let source = share(from_iter([10, 20, 30, 40, 50]));

let talkback = Arc::new(ArcSwapOption::from(None));
source(Message::Handshake(Arc::new(
    {
        let actual_1 = Arc::clone(&actual_1);
        let talkback = Arc::clone(&talkback);
        move |message| {
            if let Message::Handshake(source) = message {
                talkback.store(Some(source));
            } else if let Message::Data(data) = message {
                println!("a{}", data);
                actual_1.push(format!("a{}", data));
            }
        }
    }
    .into()
)));

source(Message::Handshake(Arc::new(
    {
        let actual_2 = Arc::clone(&actual_2);
        move |message| {
            if let Message::Data(data) = message {
                println!("b{}", data);
                actual_2.push(format!("b{}", data));
            }
        }
    }
    .into()
)));

let talkback = talkback.load();
let talkback = talkback.as_ref().ok_or("source talkback not set")?;
talkback(Message::Pull);
talkback(Message::Pull);

assert_eq!(
    &{
        let mut v = vec![];
        for _i in 0..actual_1.len() {
            v.push(actual_1.pop().unwrap());
        }
        v
    }[..],
    ["a10", "a20"]);
assert_eq!(
    &{
        let mut v = vec![];
        for _i in 0..actual_2.len() {
            v.push(actual_2.pop().unwrap());
        }
        v
    }[..],
    ["b10", "b20"]
);