noir-compute 0.2.0

Network of Operators In Rust
Documentation
use std::convert::Infallible;
use std::fmt::Display;
use std::hash::BuildHasher;
use std::sync::Arc;

use quick_cache::sync::Cache;
use quick_cache::UnitWeighter;

use crate::block::{BlockStructure, GroupHasherBuilder, OperatorStructure};
use crate::operator::{Data, Operator, StreamElement};
use crate::scheduler::ExecutionMetadata;

use super::DataKey;

#[derive(Clone, Derivative)]
#[derivative(Debug)]
pub struct MapMemo<
    O: Data + Sync,
    K: DataKey + Sync,
    F,
    Fk,
    Op,
    H: BuildHasher + Clone = GroupHasherBuilder,
> where
    F: Fn(Op::Out) -> O + Send + Clone,
    Fk: Fn(&Op::Out) -> K + Send + Clone,
    Op: Operator,
    O: Data + Sync,
    K: DataKey + Sync,
{
    prev: Op,
    #[derivative(Debug = "ignore")]
    f: F,
    #[derivative(Debug = "ignore")]
    fk: Fk,
    cache: Arc<Cache<K, O, UnitWeighter, H>>,
}

impl<O, K, F, Fk, Op> Display for MapMemo<O, K, F, Fk, Op>
where
    F: Fn(Op::Out) -> O + Send + Clone,
    Fk: Fn(&Op::Out) -> K + Send + Clone,
    Op: Operator,
    O: Data + Sync,
    K: DataKey + Sync,
{
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "{} -> MapMemo<{} -> {}>",
            self.prev,
            std::any::type_name::<Op::Out>(),
            std::any::type_name::<O>()
        )
    }
}

impl<O, K, F, Fk, Op> MapMemo<O, K, F, Fk, Op>
where
    F: Fn(Op::Out) -> O + Send + Clone,
    Fk: Fn(&Op::Out) -> K + Send + Clone,
    Op: Operator,
    O: Data + Sync,
    K: DataKey + Sync,
{
    pub(super) fn new(prev: Op, f: F, fk: Fk, capacity: usize) -> Self {
        Self {
            prev,
            f,
            fk,
            cache: Arc::new(Cache::with(
                capacity,
                capacity as u64,
                UnitWeighter,
                Default::default(),
                Default::default(),
            )),
        }
    }
}

impl<O, K, F, Fk, Op> Operator for MapMemo<O, K, F, Fk, Op>
where
    F: Fn(Op::Out) -> O + Send + Clone,
    Fk: Fn(&Op::Out) -> K + Send + Clone,
    Op: Operator,
    O: Data + Sync,
    K: DataKey + Sync,
{
    type Out = O;

    fn setup(&mut self, metadata: &mut ExecutionMetadata) {
        self.prev.setup(metadata);
    }

    #[inline]
    fn next(&mut self) -> StreamElement<O> {
        self.prev.next().map(|v| {
            let k = (self.fk)(&v);
            self.cache
                .get_or_insert_with(&k, || Ok::<O, Infallible>((self.f)(v)))
                .unwrap()
        })
    }

    fn structure(&self) -> BlockStructure {
        self.prev
            .structure()
            .add_operator(OperatorStructure::new::<O, _>("MapMemo"))
    }
}

// #[cfg(test)]
// mod tests {
//     use std::str::FromStr;

//     use crate::operator::map::Map;
//     use crate::operator::{Operator, StreamElement};
//     use crate::test::FakeOperator;

//     #[test]
//     #[cfg(feature = "timestamp")]
//     fn map_stream() {
//         let mut fake_operator = FakeOperator::new(0..10u8);
//         for i in 0..10 {
//             fake_operator.push(StreamElement::Timestamped(i, i as i64));
//         }
//         fake_operator.push(StreamElement::Watermark(100));

//         let map = Map::new(fake_operator, |x| x.to_string());
//         let map = Map::new(map, |x| x + "000");
//         let mut map = Map::new(map, |x| u32::from_str(&x).unwrap());

//         for i in 0..10 {
//             let elem = map.next();
//             assert_eq!(elem, StreamElement::Item(i * 1000));
//         }
//         for i in 0..10 {
//             let elem = map.next();
//             assert_eq!(elem, StreamElement::Timestamped(i * 1000, i as i64));
//         }
//         assert_eq!(map.next(), StreamElement::Watermark(100));
//         assert_eq!(map.next(), StreamElement::Terminate);
//     }
// }