wingfoil 3.0.1

graph based stream processing framework
Documentation
use crate::types::*;
use derive_new::new;

use std::cell::RefCell;
use std::rc::Rc;

#[derive(new)]
struct CombineNode<T: Element> {
    upstream: Rc<dyn Stream<T>>,
    combined: Rc<RefCell<Burst<T>>>,
}

impl<T: Element> MutableNode for CombineNode<T> {
    fn cycle(&mut self, _: &mut GraphState) -> anyhow::Result<bool> {
        self.combined.borrow_mut().push(self.upstream.peek_value());
        Ok(true)
    }
    fn upstreams(&self) -> UpStreams {
        UpStreams::new(vec![self.upstream.clone().as_node()], vec![])
    }
}

#[derive(new)]
struct CombineStream2<T: Element> {
    upstreams: Vec<Rc<dyn Node>>,
    combined: Rc<RefCell<Burst<T>>>,
    #[new(default)]
    value: Burst<T>,
}

impl<T: Element> MutableNode for CombineStream2<T> {
    fn cycle(&mut self, _: &mut GraphState) -> anyhow::Result<bool> {
        self.value = std::mem::replace(&mut *self.combined.borrow_mut(), Burst::new());
        Ok(true)
    }
    fn upstreams(&self) -> UpStreams {
        UpStreams::new(self.upstreams.clone(), vec![])
    }
}

impl<T: Element> StreamPeekRef<Burst<T>> for CombineStream2<T> {
    fn peek_ref(&self) -> &Burst<T> {
        &self.value
    }
}

#[must_use]
pub fn combine<T: Element>(streams: Vec<Rc<dyn Stream<T>>>) -> Rc<dyn Stream<Burst<T>>> {
    let combined = Rc::new(RefCell::new(Burst::new()));
    let nodes = streams
        .iter()
        .map(|strm| CombineNode::new(strm.clone(), combined.clone()).into_node())
        .collect::<Vec<_>>();
    CombineStream2::new(nodes, combined).into_stream()
}

#[cfg(test)]
mod tests {
    use crate::{NanoTime, NodeOperators, RunFor, RunMode, StreamOperators, combine, ticker};
    use std::time::Duration;
    use tinyvec::tiny_vec;
    #[test]
    fn combine_works() {
        let _ = env_logger::try_init();
        let period = Duration::from_micros(1);
        let run_mode = RunMode::HistoricalFrom(NanoTime::ZERO);
        let run_for = RunFor::Cycles(3);
        let src = ticker(period).count();
        let streams = (0..3)
            .map(|i| src.map(move |x| x * 10_u64.pow(i)))
            .collect::<Vec<_>>();
        combine(streams)
            .logged("output", log::Level::Info)
            .accumulate()
            .finally(|res, _| {
                let expected = vec![
                    tiny_vec![1, 10, 100],
                    tiny_vec![2, 20, 200],
                    tiny_vec![3, 30, 300],
                ];
                println!("{:?}", res);
                assert_eq!(res, expected);
                Ok(())
            })
            .run(run_mode, run_for)
            .unwrap();
    }
}