noir-compute 0.2.0

Network of Operators In Rust
Documentation
use std::fmt::Display;

use crate::block::{BlockStructure, OperatorStructure};
use crate::operator::DataKey;
use crate::operator::{Operator, StreamElement};
use crate::scheduler::ExecutionMetadata;

#[derive(Derivative)]
#[derivative(Debug)]
pub struct KeyBy<Key: DataKey, Keyer, Op>
where
    Keyer: Fn(&Op::Out) -> Key + Send + Clone,
    Op: Operator,
{
    prev: Op,
    #[derivative(Debug = "ignore")]
    keyer: Keyer,
}

impl<Key: DataKey, Keyer: Clone, Op: Clone> Clone for KeyBy<Key, Keyer, Op>
where
    Keyer: Fn(&Op::Out) -> Key + Send + Clone,
    Op: Operator,
{
    fn clone(&self) -> Self {
        Self {
            prev: self.prev.clone(),
            keyer: self.keyer.clone(),
        }
    }
}

impl<Key: DataKey, Keyer, Op> Display for KeyBy<Key, Keyer, Op>
where
    Keyer: Fn(&Op::Out) -> Key + Send + Clone,
    Op: Operator,
{
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "{} -> KeyBy<{}>",
            self.prev,
            std::any::type_name::<Key>(),
        )
    }
}

impl<Key: DataKey, Keyer, Op> KeyBy<Key, Keyer, Op>
where
    Keyer: Fn(&Op::Out) -> Key + Send + Clone,
    Op: Operator,
{
    pub fn new(prev: Op, keyer: Keyer) -> Self {
        Self { prev, keyer }
    }
}

impl<Key: DataKey, Keyer, Op> Operator for KeyBy<Key, Keyer, Op>
where
    Keyer: Fn(&Op::Out) -> Key + Send + Clone,
    Op: Operator,
{
    type Out = (Key, Op::Out);

    fn setup(&mut self, metadata: &mut ExecutionMetadata) {
        self.prev.setup(metadata);
    }

    #[inline]
    fn next(&mut self) -> StreamElement<Self::Out> {
        match self.prev.next() {
            StreamElement::Item(t) => StreamElement::Item(((self.keyer)(&t), t)),
            StreamElement::Timestamped(t, ts) => {
                StreamElement::Timestamped(((self.keyer)(&t), t), ts)
            }
            StreamElement::Watermark(w) => StreamElement::Watermark(w),
            StreamElement::Terminate => StreamElement::Terminate,
            StreamElement::FlushAndRestart => StreamElement::FlushAndRestart,
            StreamElement::FlushBatch => StreamElement::FlushBatch,
        }
    }

    fn structure(&self) -> BlockStructure {
        self.prev
            .structure()
            .add_operator(OperatorStructure::new::<Self::Out, _>("KeyBy"))
    }
}

#[cfg(test)]
mod tests {
    use crate::operator::key_by::KeyBy;
    use crate::operator::{Operator, StreamElement};
    use crate::test::FakeOperator;

    #[test]
    fn test_key_by() {
        let fake_operator = FakeOperator::new(0..10u8);
        let mut key_by = KeyBy::new(fake_operator, |&n| n);

        for i in 0..10u8 {
            match key_by.next() {
                StreamElement::Item((a, b)) => {
                    assert_eq!(a, i);
                    assert_eq!(b, i);
                }
                item => panic!("Expected StreamElement::Item, got {}", item.variant()),
            }
        }
        assert_eq!(key_by.next(), StreamElement::Terminate);
    }
}