use crate::channels::operator_io::{link, Input, Output};
use crate::stream::{AppendableOperator, OperatorBuilder, StreamBuilder};
use crate::types::{DataMessage, MaybeData, MaybeKey, MaybeTime};
use std::rc::Rc;
pub trait Split<K, V, T>: super::sealed::Sealed {
fn const_split<const N: usize>(
self,
name: &str,
partitioner: impl Fn(&DataMessage<K, V, T>, &mut [bool; N]) + 'static,
) -> [StreamBuilder<K, V, T>; N];
fn split(
self,
name: &str,
partitioner: impl Fn(&DataMessage<K, V, T>, &mut [bool]) + 'static,
outputs: usize,
) -> Vec<StreamBuilder<K, V, T>>;
}
impl<K, V, T> Split<K, V, T> for StreamBuilder<K, V, T>
where
K: MaybeKey,
V: MaybeData,
T: MaybeTime,
{
fn const_split<const N: usize>(
self,
name: &str,
partitioner: impl Fn(&DataMessage<K, V, T>, &mut [bool; N]) + 'static,
) -> [StreamBuilder<K, V, T>; N] {
let partitioner = move |msg: &DataMessage<K, V, T>, outputs: &mut [bool]| {
let outputs: &mut [bool; N] = outputs
.try_into()
.expect("Expected array size to match. This is a bug.");
partitioner(msg, outputs)
};
let streams = self.split(name, partitioner, N);
assert_eq!(streams.len(), N);
unsafe { streams.try_into().unwrap_unchecked() }
}
fn split(
self,
name: &str,
partitioner: impl Fn(&DataMessage<K, V, T>, &mut [bool]) + 'static,
outputs: usize,
) -> Vec<StreamBuilder<K, V, T>> {
let rt = self.get_runtime();
let mut stream_receiver = self.finish_pop_tail();
let mut downstream_receivers: Vec<Input<K, V, T>> =
(0..outputs).map(|_| Input::new_unlinked()).collect();
let output = Output::new_unlinked(partitioner);
let mut partition_op = OperatorBuilder::new_with_output(
name,
|_| {
|input, output, _ctx| {
if let Some(x) = input.recv() {
output.send(x)
}
}
},
output,
);
std::mem::swap(partition_op.get_input_mut(), &mut stream_receiver);
for dr in downstream_receivers.iter_mut() {
link(partition_op.get_output_mut(), dr);
}
#[allow(clippy::unwrap_used)]
rt.lock()
.unwrap()
.add_operators([Box::new(partition_op).into_buildable()]);
downstream_receivers
.into_iter()
.map(|x| StreamBuilder::from_receiver(x, Rc::clone(&rt)))
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
operators::*,
sinks::StatelessSink,
sources::{SingleIteratorSource, StatelessSource},
testing::{get_test_rt, VecSink},
};
#[test]
fn const_split() {
let even_sink = VecSink::new();
let odd_sink = VecSink::new();
let rt = get_test_rt(|provider| {
let stream = provider.new_stream().source(
"source",
StatelessSource::new(SingleIteratorSource::new(0..10u64)),
);
let [even, odd] = stream.const_split("const-split", |msg, outputs| {
let is_even = msg.value & 1 == 0;
*outputs = [is_even, !is_even];
});
even.sink("sink-even", StatelessSink::new(even_sink.clone()));
odd.sink("sink-odd", StatelessSink::new(odd_sink.clone()));
});
rt.execute().unwrap();
let even_expected = vec![0, 2, 4, 6, 8];
let even_result: Vec<u64> = even_sink.into_iter().map(|x| x.value).collect();
assert_eq!(even_expected, even_result);
let odd_expected = vec![1, 3, 5, 7, 9];
let odd_result: Vec<u64> = odd_sink.into_iter().map(|x| x.value).collect();
assert_eq!(odd_expected, odd_result);
}
#[test]
fn split() {
let even_sink = VecSink::new();
let odd_sink = VecSink::new();
let rt = get_test_rt(|provider| {
let stream = provider.new_stream().source(
"source",
StatelessSource::new(SingleIteratorSource::new(0..10u64)),
);
let mut streams = stream.split(
"split",
|msg, outputs| {
if msg.value & 1 == 0 {
outputs[0] = true;
} else {
outputs[1] = true;
}
},
2,
);
let odd = streams.pop().unwrap();
let even = streams.pop().unwrap();
even.sink("sink-even", StatelessSink::new(even_sink.clone()));
odd.sink("sink-odd", StatelessSink::new(odd_sink.clone()));
});
rt.execute().unwrap();
let even_expected = vec![0, 2, 4, 6, 8];
let even_result: Vec<u64> = even_sink.into_iter().map(|x| x.value).collect();
assert_eq!(even_expected, even_result);
let odd_expected = vec![1, 3, 5, 7, 9];
let odd_result: Vec<u64> = odd_sink.into_iter().map(|x| x.value).collect();
assert_eq!(odd_expected, odd_result);
}
}