use anyhow::Result;
use clap::Parser;
use dbsp::{OrdZSet, OutputHandle, Runtime, Stream, typed_batch::IndexedZSetReader};
use feldera_macros::IsNone;
use rkyv::{Archive, Deserialize, Serialize};
use size_of::SizeOf;
use std::hash::Hash;
type EmployeeID = u64;
#[derive(
Default,
Clone,
PartialEq,
Eq,
PartialOrd,
Ord,
Hash,
Debug,
SizeOf,
Archive,
Serialize,
Deserialize,
IsNone,
)]
#[archive_attr(derive(Ord, Eq, PartialEq, PartialOrd))]
#[archive(compare(PartialEq, PartialOrd))]
struct Manages {
manager: EmployeeID,
employee: EmployeeID,
}
#[derive(
Default,
Clone,
PartialEq,
Eq,
PartialOrd,
Ord,
Hash,
Debug,
SizeOf,
Archive,
Serialize,
Deserialize,
IsNone,
)]
#[archive_attr(derive(Ord, Eq, PartialEq, PartialOrd))]
#[archive(compare(PartialEq, PartialOrd))]
struct SkipLevel {
grandmanager: EmployeeID,
manager: EmployeeID,
employee: EmployeeID,
}
type SkipLevels = OrdZSet<SkipLevel>;
fn print_output(output: &OutputHandle<OrdZSet<SkipLevel>>) {
for (key, _value, weight) in output.consolidate().iter() {
println!(
" ({}, {}, {}) {:+}",
key.grandmanager, key.manager, key.employee, weight
);
}
println!();
}
#[derive(Debug, Clone, Parser)]
struct Args {
#[clap(long, default_value = "10")]
size: u64,
#[clap(long, default_value = "2")]
threads: usize,
}
fn main() -> Result<()> {
let Args { threads, size } = Args::parse();
let (mut dbsp, (hmanages, output)) = Runtime::init_circuit(threads, |circuit| {
let (manages, hmanages) = circuit.add_input_zset::<Manages>();
let manages_by_manager = manages.map_index(|m| (m.manager, m.clone()));
let manages_by_employee = manages.map_index(|m| (m.employee, m.clone()));
let skiplevels: Stream<_, SkipLevels> =
manages_by_employee.join(&manages_by_manager, |common, m1, m2| SkipLevel {
grandmanager: m1.manager,
manager: *common,
employee: m2.employee,
});
Ok((hmanages, skiplevels.output()))
})
.unwrap();
for employee in 0..size {
hmanages.push(
Manages {
manager: employee / 2,
employee,
},
1,
);
}
dbsp.transaction().unwrap();
println!("Initialization:");
print_output(&output);
for employee in 1..size {
hmanages.push(
Manages {
manager: employee / 2,
employee,
},
-1,
);
hmanages.push(
Manages {
manager: employee / 3,
employee,
},
1,
);
dbsp.transaction().unwrap();
println!("Changes from adjusting {employee}'s manager:");
print_output(&output);
}
dbsp.kill().unwrap();
Ok(())
}