mod always;
#[cfg(feature = "async")]
mod async_io;
mod average;
mod bimap;
mod buffer;
mod callback;
#[cfg(feature = "async")]
mod channel;
mod combine;
mod constant;
mod consumer;
mod delay;
mod delay_with_reset;
mod demux;
mod difference;
mod distinct;
mod feedback;
mod filter;
mod finally;
mod fold;
#[cfg(feature = "async")]
mod graph_node;
mod graph_state;
mod inspect;
mod limit;
mod map;
mod map_filter;
mod merge;
mod never;
mod node_flow;
mod print;
mod producer;
mod sample;
mod throttle;
mod tick;
mod trimap;
mod try_bimap;
mod try_map;
mod try_trimap;
mod window;
pub use always::*;
#[cfg(feature = "async")]
pub use async_io::*;
pub use callback::CallBackStream;
pub use demux::*;
pub use feedback::{FeedbackSink, feedback, feedback_node};
#[cfg(feature = "async")]
pub use graph_node::*;
pub use never::*;
use average::*;
use bimap::*;
use buffer::BufferStream;
use constant::*;
use consumer::*;
use delay::*;
use delay_with_reset::*;
use difference::*;
use distinct::*;
use filter::*;
use finally::*;
use fold::*;
use graph_state::*;
use inspect::*;
use limit::*;
use map::*;
use map_filter::*;
use merge::*;
use node_flow::*;
use print::*;
use producer::*;
use sample::*;
use throttle::*;
use tick::*;
use trimap::*;
use try_bimap::*;
use try_map::*;
use try_trimap::*;
use window::WindowStream;
use crate::graph::*;
use crate::queue::ValueAt;
use crate::types::*;
use log::Level;
use log::log;
use num_traits::ToPrimitive;
use std::cmp::Eq;
#[cfg(feature = "async")]
use std::future::Future;
use std::hash::Hash;
use std::ops::Add;
#[cfg(feature = "async")]
use std::pin::Pin;
use std::rc::Rc;
use std::time::Duration;
#[must_use]
pub fn add<T>(upstream1: &Rc<dyn Stream<T>>, upstream2: &Rc<dyn Stream<T>>) -> Rc<dyn Stream<T>>
where
T: Element + Add<Output = T>,
{
let f = |a: T, b: T| (a + b) as T;
BiMapStream::new(
Dep::Active(upstream1.clone()),
Dep::Active(upstream2.clone()),
Box::new(f),
)
.into_stream()
}
#[must_use]
pub fn bimap<IN1: Element, IN2: Element, OUT: Element>(
upstream1: Dep<IN1>,
upstream2: Dep<IN2>,
func: impl Fn(IN1, IN2) -> OUT + 'static,
) -> Rc<dyn Stream<OUT>> {
BiMapStream::new(upstream1, upstream2, Box::new(func)).into_stream()
}
#[must_use]
pub fn trimap<IN1: Element, IN2: Element, IN3: Element, OUT: Element>(
upstream1: Dep<IN1>,
upstream2: Dep<IN2>,
upstream3: Dep<IN3>,
func: impl Fn(IN1, IN2, IN3) -> OUT + 'static,
) -> Rc<dyn Stream<OUT>> {
TriMapStream::new(upstream1, upstream2, upstream3, Box::new(func)).into_stream()
}
#[must_use]
pub fn try_bimap<IN1: Element, IN2: Element, OUT: Element>(
upstream1: Dep<IN1>,
upstream2: Dep<IN2>,
func: impl Fn(IN1, IN2) -> anyhow::Result<OUT> + 'static,
) -> Rc<dyn Stream<OUT>> {
TryBiMapStream::new(upstream1, upstream2, Box::new(func)).into_stream()
}
#[must_use]
pub fn try_trimap<IN1: Element, IN2: Element, IN3: Element, OUT: Element>(
upstream1: Dep<IN1>,
upstream2: Dep<IN2>,
upstream3: Dep<IN3>,
func: impl Fn(IN1, IN2, IN3) -> anyhow::Result<OUT> + 'static,
) -> Rc<dyn Stream<OUT>> {
TryTriMapStream::new(upstream1, upstream2, upstream3, Box::new(func)).into_stream()
}
#[must_use]
pub fn merge<T>(sources: Vec<Rc<dyn Stream<T>>>) -> Rc<dyn Stream<T>>
where
T: Element,
{
MergeStream::new(sources).into_stream()
}
#[must_use]
pub fn constant<T: Element>(value: T) -> Rc<dyn Stream<T>> {
ConstantStream::new(value).into_stream()
}
#[must_use]
pub fn combine<T>(streams: Vec<Rc<dyn Stream<T>>>) -> Rc<dyn Stream<Burst<T>>>
where
T: Element + 'static,
{
combine::combine(streams)
}
#[must_use]
pub fn ticker(period: Duration) -> Rc<dyn Node> {
TickNode::new(NanoTime::new(period.as_nanos() as u64)).into_node()
}
pub trait NodeOperators {
#[must_use]
fn count(self: &Rc<Self>) -> Rc<dyn Stream<u64>>;
#[must_use]
fn ticked_at(self: &Rc<Self>) -> Rc<dyn Stream<NanoTime>>;
#[must_use]
fn ticked_at_elapsed(self: &Rc<Self>) -> Rc<dyn Stream<NanoTime>>;
#[must_use]
fn produce<T: Element>(self: &Rc<Self>, func: impl Fn() -> T + 'static) -> Rc<dyn Stream<T>>;
fn run(self: &Rc<Self>, run_mode: RunMode, run_to: RunFor) -> anyhow::Result<()>;
fn into_graph(self: &Rc<Self>, run_mode: RunMode, run_for: RunFor) -> Graph;
}
impl NodeOperators for dyn Node {
fn count(self: &Rc<Self>) -> Rc<dyn Stream<u64>> {
constant(1).sample(self.clone()).sum()
}
fn ticked_at(self: &Rc<Self>) -> Rc<dyn Stream<NanoTime>> {
let f = Box::new(|state: &mut GraphState| state.time());
GraphStateStream::new(self.clone(), f).into_stream()
}
fn ticked_at_elapsed(self: &Rc<Self>) -> Rc<dyn Stream<NanoTime>> {
let f = Box::new(|state: &mut GraphState| state.elapsed());
GraphStateStream::new(self.clone(), f).into_stream()
}
fn produce<T: Element>(self: &Rc<Self>, func: impl Fn() -> T + 'static) -> Rc<dyn Stream<T>> {
ProducerStream::new(self.clone(), Box::new(func)).into_stream()
}
fn run(self: &Rc<Self>, run_mode: RunMode, run_for: RunFor) -> anyhow::Result<()> {
Graph::new(vec![self.clone()], run_mode, run_for).run()
}
fn into_graph(self: &Rc<Self>, run_mode: RunMode, run_for: RunFor) -> Graph {
Graph::new(vec![self.clone()], run_mode, run_for)
}
}
impl<T> NodeOperators for dyn Stream<T> {
fn count(self: &Rc<Self>) -> Rc<dyn Stream<u64>> {
self.clone().as_node().count()
}
fn ticked_at(self: &Rc<Self>) -> Rc<dyn Stream<NanoTime>> {
self.clone().as_node().ticked_at()
}
fn ticked_at_elapsed(self: &Rc<Self>) -> Rc<dyn Stream<NanoTime>> {
self.clone().as_node().ticked_at_elapsed()
}
fn produce<OUT: Element>(
self: &Rc<Self>,
func: impl Fn() -> OUT + 'static,
) -> Rc<dyn Stream<OUT>> {
self.clone().as_node().produce(func)
}
fn run(self: &Rc<Self>, run_mode: RunMode, run_for: RunFor) -> anyhow::Result<()> {
self.clone().as_node().run(run_mode, run_for)
}
fn into_graph(self: &Rc<Self>, run_mode: RunMode, run_for: RunFor) -> Graph {
self.clone().as_node().into_graph(run_mode, run_for)
}
}
pub trait NodeFlowOperators {
#[must_use]
fn throttle(self: &Rc<Self>, interval: Duration) -> Rc<dyn Node>;
#[must_use]
fn delay(self: &Rc<Self>, delay: Duration) -> Rc<dyn Node>;
#[must_use]
fn limit(self: &Rc<Self>, limit: u32) -> Rc<dyn Node>;
#[must_use]
fn filter(self: &Rc<Self>, condition: Rc<dyn Stream<bool>>) -> Rc<dyn Node>;
#[must_use]
fn feedback(self: &Rc<Self>, sink: FeedbackSink<()>) -> Rc<dyn Node>;
}
impl NodeFlowOperators for dyn Node {
fn throttle(self: &Rc<Self>, interval: Duration) -> Rc<dyn Node> {
ThrottleNode::new(self.clone(), NanoTime::new(interval.as_nanos() as u64)).into_node()
}
fn delay(self: &Rc<Self>, delay: Duration) -> Rc<dyn Node> {
DelayNode::new(self.clone(), NanoTime::new(delay.as_nanos() as u64)).into_node()
}
fn limit(self: &Rc<Self>, limit: u32) -> Rc<dyn Node> {
LimitNode::new(self.clone(), limit).into_node()
}
fn filter(self: &Rc<Self>, condition: Rc<dyn Stream<bool>>) -> Rc<dyn Node> {
FilterNode::new(self.clone(), condition).into_node()
}
fn feedback(self: &Rc<Self>, sink: FeedbackSink<()>) -> Rc<dyn Node> {
FeedbackSendNode::new(self.clone(), sink).into_node()
}
}
pub trait StreamOperators<T: Element> {
#[must_use]
fn accumulate(self: &Rc<Self>) -> Rc<dyn Stream<Vec<T>>>;
#[must_use]
fn average(self: &Rc<Self>) -> Rc<dyn Stream<f64>>
where
T: ToPrimitive;
#[must_use]
fn buffer(self: &Rc<Self>, capacity: usize) -> Rc<dyn Stream<Vec<T>>>;
#[must_use]
fn window(self: &Rc<Self>, interval: Duration) -> Rc<dyn Stream<Vec<T>>>;
#[must_use]
fn collect(self: &Rc<Self>) -> Rc<dyn Stream<Vec<ValueAt<T>>>>;
#[must_use]
fn collapse<OUT>(self: &Rc<Self>) -> Rc<dyn Stream<OUT>>
where
T: std::iter::IntoIterator<Item = OUT>,
OUT: Element;
#[cfg(feature = "async")]
#[must_use]
fn consume_async<FUT>(
self: &Rc<Self>,
func: Box<dyn FnOnce(Pin<Box<dyn FutStream<T>>>) -> FUT + Send>,
) -> Rc<dyn Node>
where
T: Element + Send,
FUT: Future<Output = anyhow::Result<()>> + Send + 'static;
#[must_use]
fn finally<F: FnOnce(T, &GraphState) -> anyhow::Result<()> + 'static>(
self: &Rc<Self>,
func: F,
) -> Rc<dyn Node>;
#[must_use]
fn for_each(self: &Rc<Self>, func: impl Fn(T, NanoTime) + 'static) -> Rc<dyn Node>;
#[must_use]
fn feedback(self: &Rc<Self>, sink: FeedbackSink<T>) -> Rc<dyn Node>
where
T: Hash + Eq;
#[must_use]
fn try_for_each(
self: &Rc<Self>,
func: impl Fn(T, NanoTime) -> anyhow::Result<()> + 'static,
) -> Rc<dyn Node>;
#[must_use]
fn fold<OUT: Element>(
self: &Rc<Self>,
func: impl Fn(&mut OUT, T) + 'static,
) -> Rc<dyn Stream<OUT>>;
#[must_use]
fn difference(self: &Rc<Self>) -> Rc<dyn Stream<T>>
where
T: std::ops::Sub<Output = T>;
#[must_use]
fn delay(self: &Rc<Self>, delay: Duration) -> Rc<dyn Stream<T>>
where
T: Hash + Eq;
#[must_use]
fn delay_with_reset(
self: &Rc<Self>,
delay: Duration,
trigger: Rc<dyn Node>,
) -> Rc<dyn Stream<T>>
where
T: Hash + Eq;
fn demux<K, F>(
self: &Rc<Self>,
capacity: usize,
func: F,
) -> (Vec<Rc<dyn Stream<T>>>, Overflow<T>)
where
K: Hash + Eq + PartialEq + std::fmt::Debug + 'static,
F: Fn(&T) -> (K, DemuxEvent) + 'static;
fn demux_it<K, F, U>(
self: &Rc<Self>,
capacity: usize,
func: F,
) -> (Vec<Rc<dyn Stream<Burst<U>>>>, Overflow<Burst<U>>)
where
T: IntoIterator<Item = U>,
U: Element,
K: Hash + Eq + PartialEq + std::fmt::Debug + 'static,
F: Fn(&U) -> (K, DemuxEvent) + 'static;
fn demux_it_with_map<K, F, U>(
self: &Rc<Self>,
map: DemuxMap<K>,
func: F,
) -> (Vec<Rc<dyn Stream<Burst<U>>>>, Overflow<Burst<U>>)
where
T: IntoIterator<Item = U>,
U: Element,
K: Hash + Eq + PartialEq + std::fmt::Debug + 'static,
F: Fn(&U) -> (K, DemuxEvent) + 'static;
#[must_use]
fn distinct(self: &Rc<Self>) -> Rc<dyn Stream<T>>
where
T: PartialEq;
#[must_use]
fn filter(self: &Rc<Self>, condition: Rc<dyn Stream<bool>>) -> Rc<dyn Stream<T>>;
#[must_use]
fn filter_value(self: &Rc<Self>, predicate: impl Fn(&T) -> bool + 'static)
-> Rc<dyn Stream<T>>;
#[must_use]
fn inspect(self: &Rc<Self>, func: impl Fn(&T) + 'static) -> Rc<dyn Stream<T>>;
#[must_use]
fn limit(self: &Rc<Self>, limit: u32) -> Rc<dyn Stream<T>>;
#[must_use]
fn logged(self: &Rc<Self>, label: &str, level: Level) -> Rc<dyn Stream<T>>;
#[must_use]
fn map<OUT: Element>(self: &Rc<Self>, func: impl Fn(T) -> OUT + 'static)
-> Rc<dyn Stream<OUT>>;
#[must_use]
fn try_map<OUT: Element>(
self: &Rc<Self>,
func: impl Fn(T) -> anyhow::Result<OUT> + 'static,
) -> Rc<dyn Stream<OUT>>;
#[cfg(feature = "async")]
#[must_use]
fn mapper<FUNC, OUT>(self: &Rc<Self>, func: FUNC) -> Rc<dyn Stream<Burst<OUT>>>
where
T: Element + Send,
OUT: Element + Send + Hash + Eq,
FUNC: FnOnce(Rc<dyn Stream<Burst<T>>>) -> Rc<dyn Stream<OUT>> + Send + 'static;
#[must_use]
fn not(self: &Rc<Self>) -> Rc<dyn Stream<T>>
where
T: std::ops::Not<Output = T>;
#[must_use]
fn reduce(self: &Rc<Self>, func: impl Fn(T, T) -> T + 'static) -> Rc<dyn Stream<T>>;
#[must_use]
fn sample(self: &Rc<Self>, trigger: Rc<dyn Node>) -> Rc<dyn Stream<T>>;
#[must_use]
fn print(self: &Rc<Self>) -> Rc<dyn Stream<T>>;
#[must_use]
fn sum(self: &Rc<Self>) -> Rc<dyn Stream<T>>
where
T: Add<T, Output = T>;
#[must_use]
fn throttle(self: &Rc<Self>, interval: Duration) -> Rc<dyn Stream<T>>;
}
impl<T> StreamOperators<T> for dyn Stream<T>
where
T: Element + 'static,
{
fn accumulate(self: &Rc<Self>) -> Rc<dyn Stream<Vec<T>>> {
self.fold(|acc: &mut Vec<T>, value| {
acc.push(value);
})
}
fn average(self: &Rc<Self>) -> Rc<dyn Stream<f64>>
where
T: ToPrimitive,
{
AverageStream::new(self.clone()).into_stream()
}
fn buffer(self: &Rc<Self>, capacity: usize) -> Rc<dyn Stream<Vec<T>>> {
BufferStream::new(self.clone(), capacity).into_stream()
}
fn window(self: &Rc<Self>, interval: Duration) -> Rc<dyn Stream<Vec<T>>> {
WindowStream::new(self.clone(), NanoTime::new(interval.as_nanos() as u64)).into_stream()
}
fn collect(self: &Rc<Self>) -> Rc<dyn Stream<Vec<ValueAt<T>>>> {
bimap(
Dep::Active(self.clone()),
Dep::Active(self.clone().as_node().ticked_at()),
ValueAt::new,
)
.fold(|acc: &mut Vec<ValueAt<T>>, value| {
acc.push(value);
})
}
fn collapse<OUT>(self: &Rc<Self>) -> Rc<dyn Stream<OUT>>
where
T: std::iter::IntoIterator<Item = OUT>,
OUT: Element,
{
let f = |x: T| match x.into_iter().last() {
Some(x) => (x, true),
None => (Default::default(), false),
};
MapFilterStream::new(self.clone(), Box::new(f)).into_stream()
}
#[cfg(feature = "async")]
fn consume_async<FUT>(
self: &Rc<Self>,
func: Box<dyn FnOnce(Pin<Box<dyn FutStream<T>>>) -> FUT + Send>,
) -> Rc<dyn Node>
where
T: Element + Send,
FUT: Future<Output = anyhow::Result<()>> + Send + 'static,
{
AsyncConsumerNode::new(self.clone(), func).into_node()
}
fn demux<K, F>(
self: &Rc<Self>,
capacity: usize,
func: F,
) -> (Vec<Rc<dyn Stream<T>>>, Overflow<T>)
where
T: Element,
K: Hash + Eq + PartialEq + std::fmt::Debug + 'static,
F: Fn(&T) -> (K, DemuxEvent) + 'static,
{
demux::demux(self.clone(), demux::DemuxMap::new(capacity), func)
}
fn demux_it<K, F, U>(
self: &Rc<Self>,
capacity: usize,
func: F,
) -> (Vec<Rc<dyn Stream<Burst<U>>>>, Overflow<Burst<U>>)
where
T: IntoIterator<Item = U>,
U: Element,
K: Hash + Eq + PartialEq + std::fmt::Debug + 'static,
F: Fn(&U) -> (K, DemuxEvent) + 'static,
{
self.demux_it_with_map(DemuxMap::new(capacity), func)
}
fn demux_it_with_map<K, F, U>(
self: &Rc<Self>,
map: DemuxMap<K>,
func: F,
) -> (Vec<Rc<dyn Stream<Burst<U>>>>, Overflow<Burst<U>>)
where
T: IntoIterator<Item = U>,
U: Element,
K: Hash + Eq + PartialEq + std::fmt::Debug + 'static,
F: Fn(&U) -> (K, DemuxEvent) + 'static,
{
demux_it(self.clone(), map, func)
}
fn for_each(self: &Rc<Self>, func: impl Fn(T, NanoTime) + 'static) -> Rc<dyn Node> {
ConsumerNode::new(self.clone(), Box::new(func)).into_node()
}
fn feedback(self: &Rc<Self>, sink: FeedbackSink<T>) -> Rc<dyn Node>
where
T: Hash + Eq,
{
let upstream = self.clone();
GraphStateStream::new(
self.clone().as_node(),
Box::new(move |state: &mut GraphState| {
sink.send(upstream.peek_value(), state);
}),
)
.into_stream()
.as_node()
}
fn try_for_each(
self: &Rc<Self>,
func: impl Fn(T, NanoTime) -> anyhow::Result<()> + 'static,
) -> Rc<dyn Node> {
TryConsumerNode::new(self.clone(), Box::new(func)).into_node()
}
fn delay(self: &Rc<Self>, duration: Duration) -> Rc<dyn Stream<T>>
where
T: Hash + Eq,
{
DelayStream::new(self.clone(), NanoTime::new(duration.as_nanos() as u64)).into_stream()
}
fn delay_with_reset(
self: &Rc<Self>,
delay: Duration,
trigger: Rc<dyn Node>,
) -> Rc<dyn Stream<T>>
where
T: Hash + Eq,
{
DelayWithResetStream::new(
self.clone(),
trigger,
NanoTime::new(delay.as_nanos() as u64),
)
.into_stream()
}
fn difference(self: &Rc<Self>) -> Rc<dyn Stream<T>>
where
T: std::ops::Sub<Output = T>,
{
DifferenceStream::new(self.clone()).into_stream()
}
fn distinct(self: &Rc<Self>) -> Rc<dyn Stream<T>>
where
T: PartialEq,
{
DistinctStream::new(self.clone()).into_stream()
}
fn filter(self: &Rc<Self>, condition: Rc<dyn Stream<bool>>) -> Rc<dyn Stream<T>> {
FilterStream::new(self.clone(), condition).into_stream()
}
fn filter_value(
self: &Rc<Self>,
predicate: impl Fn(&T) -> bool + 'static,
) -> Rc<dyn Stream<T>> {
let condition = self.clone().map(move |val| predicate(&val));
FilterStream::new(self.clone(), condition).into_stream()
}
fn finally<F: FnOnce(T, &GraphState) -> anyhow::Result<()> + 'static>(
self: &Rc<Self>,
func: F,
) -> Rc<dyn Node> {
FinallyNode::new(self.clone(), Some(func)).into_node()
}
fn fold<OUT: Element>(
self: &Rc<Self>,
func: impl Fn(&mut OUT, T) + 'static,
) -> Rc<dyn Stream<OUT>> {
FoldStream::new(self.clone(), Box::new(func)).into_stream()
}
fn inspect(self: &Rc<Self>, func: impl Fn(&T) + 'static) -> Rc<dyn Stream<T>> {
InspectStream::new(self.clone(), Box::new(func)).into_stream()
}
fn limit(self: &Rc<Self>, limit: u32) -> Rc<dyn Stream<T>> {
LimitStream::new(self.clone(), limit).into_stream()
}
fn logged(self: &Rc<Self>, label: &str, level: Level) -> Rc<dyn Stream<T>> {
if log::log_enabled!(level) {
let lbl = label.to_string();
let func = move |value, time: NanoTime| {
log!(target:"wingfoil", level, "{:} {:} {:?}", time.pretty(), lbl, value);
value
};
bimap(
Dep::Active(self.clone()),
Dep::Active(self.clone().as_node().ticked_at_elapsed()),
func,
)
} else {
self.clone()
}
}
fn map<OUT: Element>(
self: &Rc<Self>,
func: impl Fn(T) -> OUT + 'static,
) -> Rc<dyn Stream<OUT>> {
MapStream::new(self.clone(), Box::new(func)).into_stream()
}
fn try_map<OUT: Element>(
self: &Rc<Self>,
func: impl Fn(T) -> anyhow::Result<OUT> + 'static,
) -> Rc<dyn Stream<OUT>> {
TryMapStream::new(self.clone(), Box::new(func)).into_stream()
}
#[cfg(feature = "async")]
fn mapper<FUNC, OUT>(self: &Rc<Self>, func: FUNC) -> Rc<dyn Stream<Burst<OUT>>>
where
T: Element + Send,
OUT: Element + Send + Hash + Eq,
FUNC: FnOnce(Rc<dyn Stream<Burst<T>>>) -> Rc<dyn Stream<OUT>> + Send + 'static,
{
GraphMapStream::new(self.clone(), func).into_stream()
}
fn not(self: &Rc<Self>) -> Rc<dyn Stream<T>>
where
T: std::ops::Not<Output = T>,
{
self.map(|value| !value)
}
fn print(self: &Rc<Self>) -> Rc<dyn Stream<T>> {
PrintStream::new(self.clone()).into_stream()
}
fn reduce(self: &Rc<Self>, func: impl Fn(T, T) -> T + 'static) -> Rc<dyn Stream<T>> {
let f = move |acc: &mut T, val: T| {
*acc = func((*acc).clone(), val);
};
self.fold(f)
}
fn sample(self: &Rc<Self>, trigger: Rc<dyn Node>) -> Rc<dyn Stream<T>> {
SampleStream::new(self.clone(), trigger).into_stream()
}
fn sum(self: &Rc<Self>) -> Rc<dyn Stream<T>>
where
T: Add<T, Output = T>,
{
self.reduce(|acc, val| acc + val)
}
fn throttle(self: &Rc<Self>, interval: Duration) -> Rc<dyn Stream<T>> {
ThrottleStream::new(self.clone(), NanoTime::new(interval.as_nanos() as u64)).into_stream()
}
}
#[doc(hidden)]
pub trait TupleStreamOperators<A, B>
where
A: Element + 'static,
B: Element + 'static,
{
fn split(self: &Rc<Self>) -> (Rc<dyn Stream<A>>, Rc<dyn Stream<B>>);
}
impl<A, B> TupleStreamOperators<A, B> for dyn Stream<(A, B)>
where
A: Element + 'static,
B: Element + 'static,
{
fn split(self: &Rc<Self>) -> (Rc<dyn Stream<A>>, Rc<dyn Stream<B>>) {
let a = self.map(|tuple: (A, B)| tuple.0);
let b = self.map(|tuple: (A, B)| tuple.1);
(a, b)
}
}