use anyhow::Result;
use clap::Parser;
use dbsp::{OrdIndexedZSet, OutputHandle, Runtime, typed_batch::IndexedZSetReader, utils::Tup2};
type Node = u64;
#[derive(Debug, Clone, Parser)]
struct Args {
#[clap(long, default_value = "100")]
edges: u64,
#[clap(long, default_value = "13")]
sources: u64,
#[clap(long, default_value = "5")]
extra: u64,
#[clap(long, default_value = "2")]
threads: u64,
}
fn print_changes(
degrees: &OutputHandle<OrdIndexedZSet<Node, i64>>,
distribution: &OutputHandle<OrdIndexedZSet<i64, i64>>,
) {
for (src, outdegree, weight) in degrees.consolidate().iter() {
println!(" {weight:+}: Node {src} has out-degree {outdegree}");
}
println!();
for (outdegree, count, weight) in distribution.consolidate().iter() {
println!(" {weight:+}: {count} nodes have out-degree {outdegree}");
}
println!();
}
fn main() -> Result<()> {
let Args {
threads,
edges,
sources,
extra,
} = Args::parse();
let (mut dbsp, (hedges, degrees, distribution)) =
Runtime::init_circuit(threads as usize, |circuit| {
let (edges, hedges) = circuit.add_input_zset::<Tup2<Node, Node>>();
let degrees = edges.map(|Tup2(src, _dst)| *src).weighted_count();
let distribution = degrees.map(|(_src, count)| *count).weighted_count();
Ok((hedges, degrees.output(), distribution.output()))
})
.unwrap();
for i in 0..edges {
hedges.push(Tup2(i % sources, i % 7), 1);
}
dbsp.transaction().unwrap();
println!("Initialization:");
print_changes(°rees, &distribution);
for i in 0..extra {
hedges.push(Tup2(i % sources, i % 9), 1);
}
dbsp.transaction().unwrap();
println!("Changes:");
print_changes(°rees, &distribution);
dbsp.kill().unwrap();
Ok(())
}