noir-compute 0.2.0

Network of Operators In Rust
Documentation
use std::time::Instant;

use noir_compute::prelude::*;

#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

fn main() {
    let (config, args) = RuntimeConfig::from_args();
    if args.len() != 2 {
        panic!("Pass the number of iterations and the edges dataset as arguments");
    }
    let num_iterations: usize = args[0].parse().expect("Invalid number of iterations");
    let path_edges = &args[1];

    config.spawn_remote_workers();
    let env = StreamContext::new(config);

    let edges_source = CsvSource::<(u64, u64)>::new(path_edges)
        .delimiter(b',')
        .has_headers(false);

    let mut edges = env.stream(edges_source).split(2);

    let (state, result) = edges.pop().unwrap().iterate(
        num_iterations,
        // (old, new) count of paths in the transitive closure
        (0, 0, 0),
        move |s, _| {
            let mut paths = s.split(2);
            paths
                .pop()
                .unwrap()
                .join(edges.pop().unwrap(), |(_z, x)| *x, |(x, _y)| *x)
                // if there are a path z -> x and an edge x -> y, then generate the path z -> y
                .map(|(_, ((z, _), (_, y)))| (z, y))
                .drop_key()
                // concatenate the paths already present in the transitive closure
                .merge(paths.pop().unwrap())
                // delete duplicated paths
                .group_by_reduce(|(x, y)| (*x, *y), |_, _| {})
                .drop_key()
                .filter(|(x, y)| x != y)
        },
        |count: &mut u64, _| *count += 1,
        |(_old, new, _iter), count| *new += count,
        |(old, new, iter)| {
            *iter += 1;
            let condition = old != new;
            *old = *new;
            *new = 0;
            condition
        },
    );

    // we are interested in the stream output
    let result = result.collect_vec();
    state.for_each(|(_, _, iter)| eprintln!("Iterations: {iter}"));

    let start = Instant::now();
    env.execute_blocking();
    let elapsed = start.elapsed();

    if let Some(mut res) = result.get() {
        eprintln!("Number of paths: {:?}", res.len());
        if cfg!(debug_assertions) {
            res.sort_unstable();
            for (x, y) in res {
                eprintln!("{x} -> {y}");
            }
        }
    }
    eprintln!("Elapsed: {elapsed:?}");
}