noir-compute 0.2.0

Network of Operators In Rust
Documentation
use std::fmt::Display;

use crate::block::{BlockStructure, OperatorKind, OperatorStructure};
use crate::operator::sink::Sink;
use crate::operator::{ExchangeData, Operator, StreamElement};
use crate::scheduler::ExecutionMetadata;

#[cfg(feature = "crossbeam")]
use crossbeam_channel::Sender;
#[cfg(not(feature = "crossbeam"))]
use flume::Sender;

#[derive(Debug, Clone)]
pub struct CollectChannelSink<Out: ExchangeData, PreviousOperators>
where
    PreviousOperators: Operator<Out = Out>,
{
    prev: PreviousOperators,
    tx: Option<Sender<Out>>,
}

impl<Out: ExchangeData, PreviousOperators> CollectChannelSink<Out, PreviousOperators>
where
    PreviousOperators: Operator<Out = Out>,
{
    pub(crate) fn new(prev: PreviousOperators, tx: Sender<Out>) -> Self {
        Self { prev, tx: Some(tx) }
    }
}

impl<Out: ExchangeData, PreviousOperators> Display for CollectChannelSink<Out, PreviousOperators>
where
    PreviousOperators: Operator<Out = Out>,
{
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{} -> CollectChannelSink", self.prev)
    }
}

impl<Out: ExchangeData, PreviousOperators> Operator for CollectChannelSink<Out, PreviousOperators>
where
    PreviousOperators: Operator<Out = Out>,
{
    type Out = ();

    fn setup(&mut self, metadata: &mut ExecutionMetadata) {
        self.prev.setup(metadata);
    }

    fn next(&mut self) -> StreamElement<()> {
        match self.prev.next() {
            StreamElement::Item(t) | StreamElement::Timestamped(t, _) => {
                let _ = self.tx.as_ref().map(|tx| tx.send(t));
                StreamElement::Item(())
            }
            StreamElement::Watermark(w) => StreamElement::Watermark(w),
            StreamElement::Terminate => {
                self.tx = None;
                StreamElement::Terminate
            }
            StreamElement::FlushBatch => StreamElement::FlushBatch,
            StreamElement::FlushAndRestart => StreamElement::FlushAndRestart,
        }
    }

    fn structure(&self) -> BlockStructure {
        let mut operator = OperatorStructure::new::<Out, _>("CollectChannelSink");
        operator.kind = OperatorKind::Sink;
        self.prev.structure().add_operator(operator)
    }
}

impl<Out: ExchangeData, PreviousOperators> Sink for CollectChannelSink<Out, PreviousOperators> where
    PreviousOperators: Operator<Out = Out>
{
}

#[cfg(test)]
mod tests {
    use itertools::Itertools;

    use crate::config::RuntimeConfig;
    use crate::environment::StreamContext;
    use crate::operator::source;

    #[test]
    fn collect_channel() {
        let env = StreamContext::new(RuntimeConfig::local(4));
        let source = source::IteratorSource::new(0..10u8);
        let rx = env.stream(source).collect_channel();
        env.execute_blocking();
        let mut v = Vec::new();
        while let Ok(x) = rx.recv() {
            v.push(x)
        }
        assert_eq!(v, (0..10).collect_vec());
    }
}