#![allow(clippy::all, clippy::nursery, clippy::pedantic)]
use rand::{Rng, SeedableRng, StdRng};
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::mem;
use std::sync::{Arc, Mutex};
use timely::Config;
use timely::dataflow::operators::capture::Extract;
use timely::dataflow::operators::Capture;
use timely::dataflow::*;
use palimpsest_dataflow::input::Input;
use palimpsest_dataflow::VecCollection;
use palimpsest_dataflow::lattice::Lattice;
use palimpsest_dataflow::operators::*;
type Node = usize;
type Edge = (Node, Node);
#[test]
fn scc_10_20_1000() {
test_sizes(10, 20, 1000, Config::process(3));
}
#[test]
fn scc_100_200_10() {
test_sizes(100, 200, 10, Config::process(3));
}
#[test]
fn scc_100_2000_1() {
test_sizes(100, 2000, 1, Config::process(3));
}
fn test_sizes(nodes: usize, edges: usize, rounds: usize, config: Config) {
let mut edge_list = Vec::new();
let seed: &[_] = &[1, 2, 3, 4];
let mut rng1: StdRng = SeedableRng::from_seed(seed); let mut rng2: StdRng = SeedableRng::from_seed(seed);
for _ in 0..edges {
edge_list.push(((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 0, 1));
}
for round in 1..rounds {
edge_list.push((
(rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)),
round,
1,
));
edge_list.push((
(rng2.gen_range(0, nodes), rng2.gen_range(0, nodes)),
round,
-1,
));
}
let mut results1 = scc_sequential(edge_list.clone());
let mut results2 = scc_differential(edge_list.clone(), config);
results1.sort();
results1.sort_by(|x, y| x.1.cmp(&y.1));
results2.sort();
results2.sort_by(|x, y| x.1.cmp(&y.1));
if results1 != results2 {
println!("RESULTS INEQUAL!!!");
for x in &results1 {
if !results2.contains(x) {
println!(" in seq, not diff: {:?}", x);
}
}
for x in &results2 {
if !results1.contains(x) {
println!(" in diff, not seq: {:?}", x);
}
}
}
assert_eq!(results1, results2);
}
fn scc_sequential(
edge_list: Vec<((usize, usize), usize, isize)>,
) -> Vec<((usize, usize), usize, isize)> {
let mut rounds = 0;
for &(_, time, _) in &edge_list {
rounds = ::std::cmp::max(rounds, time + 1);
}
let mut output = Vec::new(); let mut results = Vec::new();
for round in 0..rounds {
let mut edges = ::std::collections::HashMap::new();
for &((src, dst), time, diff) in &edge_list {
if time <= round {
*edges.entry((src, dst)).or_insert(0) += diff;
}
}
edges.retain(|_k, v| *v > 0);
let mut forward = ::std::collections::HashMap::new();
let mut reverse = ::std::collections::HashMap::new();
for &(src, dst) in edges.keys() {
forward.entry(src).or_insert(Vec::new()).push(dst);
reverse.entry(dst).or_insert(Vec::new()).push(src);
}
let mut visited = ::std::collections::HashSet::new();
let mut list = Vec::new();
for &node in forward.keys() {
visit(node, &forward, &mut visited, &mut list)
}
let mut component = ::std::collections::HashMap::new();
while let Some(node) = list.pop() {
assign(node, node, &reverse, &mut component);
}
let mut new_output = Vec::new();
for (&(src, dst), &cnt) in edges.iter() {
if component.get(&src) == component.get(&dst) {
new_output.push(((src, dst), cnt));
}
}
let mut changes = HashMap::new();
for &((src, dst), cnt) in new_output.iter() {
*changes.entry((src, dst)).or_insert(0) += cnt;
}
for &((src, dst), cnt) in output.iter() {
*changes.entry((src, dst)).or_insert(0) -= cnt;
}
changes.retain(|_k, v| *v != 0);
for ((src, dst), del) in changes.drain() {
results.push(((src, dst), round, del));
}
output = new_output;
}
results
}
fn visit(
node: usize,
forward: &HashMap<usize, Vec<usize>>,
visited: &mut HashSet<usize>,
list: &mut Vec<usize>,
) {
if !visited.contains(&node) {
visited.insert(node);
if let Some(edges) = forward.get(&node) {
for &edge in edges.iter() {
visit(edge, forward, visited, list)
}
}
list.push(node);
}
}
fn assign(
node: usize,
root: usize,
reverse: &HashMap<usize, Vec<usize>>,
component: &mut HashMap<usize, usize>,
) {
if !component.contains_key(&node) {
component.insert(node, root);
if let Some(edges) = reverse.get(&node) {
for &edge in edges.iter() {
assign(edge, root, reverse, component);
}
}
}
}
fn scc_differential(
edges_list: Vec<((usize, usize), usize, isize)>,
config: Config,
) -> Vec<((usize, usize), usize, isize)> {
let (send, recv) = ::std::sync::mpsc::channel();
let send = Arc::new(Mutex::new(send));
timely::execute(config, move |worker| {
let mut edges_list = edges_list.clone();
let mut edges = worker.dataflow(|scope| {
let send = send.lock().unwrap().clone();
let (edge_input, edges) = scope.new_collection();
_strongly_connected(&edges)
.consolidate()
.inner
.capture_into(send);
edge_input
});
edges_list.sort_by(|x, y| y.1.cmp(&x.1));
if worker.index() == 0 {
let mut round = 0;
while edges_list.len() > 0 {
while edges_list.last().map(|x| x.1) == Some(round) {
let ((src, dst), _time, diff) = edges_list.pop().unwrap();
edges.update((src, dst), diff);
}
round += 1;
edges.advance_to(round);
}
}
})
.unwrap();
recv.extract()
.into_iter()
.flat_map(|(_, list)| {
list.into_iter()
.map(|((src, dst), time, diff)| ((src, dst), time, diff))
})
.collect()
}
fn _strongly_connected<G>(graph: &VecCollection<G, Edge>) -> VecCollection<G, Edge>
where
G: Scope<Timestamp: Lattice + Ord + Hash>,
{
graph.iterate(|inner| {
let edges = graph.enter(&inner.scope());
let trans = edges.map_in_place(|x| mem::swap(&mut x.0, &mut x.1));
_trim_edges(&_trim_edges(inner, &edges), &trans)
})
}
fn _trim_edges<G>(
cycle: &VecCollection<G, Edge>,
edges: &VecCollection<G, Edge>,
) -> VecCollection<G, Edge>
where
G: Scope<Timestamp: Lattice + Ord + Hash>,
{
let nodes = edges.map_in_place(|x| x.0 = x.1).consolidate();
let labels = _reachability(&cycle, &nodes);
edges.consolidate()
.join_map(&labels, |&e1,&e2,&l1| (e2,(e1,l1)))
.join_map(&labels, |&e2,&(e1,l1),&l2| ((e1,e2),(l1,l2)))
.filter(|&(_,(l1,l2))| l1 == l2)
.map(|((x1,x2),_)| (x2,x1))
}
fn _reachability<G>(
edges: &VecCollection<G, Edge>,
nodes: &VecCollection<G, (Node, Node)>,
) -> VecCollection<G, Edge>
where
G: Scope<Timestamp: Lattice + Ord + Hash>,
{
edges.filter(|_| false).iterate(|inner| {
let edges = edges.enter(&inner.scope());
let nodes = nodes.enter_at(&inner.scope(), |r| {
256 * (64 - (r.0 as u64).leading_zeros() as u64)
});
inner
.join_map(&edges, |_k, l, d| (*d, *l))
.concat(&nodes)
.reduce(|_, s, t| t.push((*s[0].0, 1)))
})
}