use crate::channels::operator_io::{Input, Output};
use crate::stream::{OperatorBuilder, StreamBuilder};
use crate::types::{Data, DataMessage, Key, MaybeKey, MaybeTime, Message};
pub trait KeyLocal<X, K: Key, V, T> {
fn key_local(
self,
name: &str,
key_func: impl Fn(&DataMessage<X, V, T>) -> K + 'static,
) -> StreamBuilder<K, V, T>;
}
impl<X, K, V, T> KeyLocal<X, K, V, T> for StreamBuilder<X, V, T>
where
X: MaybeKey,
K: Key,
V: Data,
T: MaybeTime,
{
fn key_local(
self,
name: &str,
key_func: impl Fn(&DataMessage<X, V, T>) -> K + 'static,
) -> StreamBuilder<K, V, T> {
let op = OperatorBuilder::direct(
name,
move |input: &mut Input<X, V, T>, output: &mut Output<K, V, T>, _ctx| {
match input.recv() {
Some(Message::Data(d)) => {
let new_key = key_func(&d);
let new_msg = DataMessage {
timestamp: d.timestamp,
key: new_key,
value: d.value,
};
output.send(Message::Data(new_msg))
}
Some(Message::Interrogate(_)) => (),
Some(Message::Collect(_)) => (),
Some(Message::Acquire(_)) => (),
Some(Message::AbsBarrier(b)) => output.send(Message::AbsBarrier(b)),
Some(Message::Rescale(x)) => output.send(Message::Rescale(x)),
Some(Message::SuspendMarker(x)) => output.send(Message::SuspendMarker(x)),
Some(Message::Epoch(x)) => output.send(Message::Epoch(x)),
None => (),
}
},
);
self.then(op)
}
}