differential-dataflow 0.0.3

An incremental data-parallel dataflow platform
Documentation
// extern crate rand;
// // extern crate time;
// extern crate getopts;
// extern crate timely;
// extern crate graph_map;
// extern crate differential_dataflow;
// // extern crate vec_map;

// use std::time::Instant;
// use std::hash::Hash;
// use std::mem;

// use rand::{Rng, SeedableRng, StdRng};

// use timely::dataflow::*;
// use timely::dataflow::operators::*;

// use differential_dataflow::Collection;
// use differential_dataflow::collection::LeastUpperBound;
// use differential_dataflow::operators::*;
// use differential_dataflow::operators::join::JoinUnsigned;
// use differential_dataflow::operators::group::GroupUnsigned;

// use graph_map::GraphMMap;
// // use vec_map::VecMap;

// type Node = u32;
// type Edge = (Node, Node);

// fn main() {

//     // snag a filename to use for the input graph.
//     let filename = std::env::args().nth(1).unwrap();

//     timely::execute_from_args(std::env::args().skip(1), move |computation| {

//         let peers = computation.peers();
//         let index = computation.index();

//         // // What you might do if you used GraphMMap:
//         let graph = GraphMMap::new(&filename);

//         let (mut input, probe) = computation.scoped::<u64,_,_>(|scope| {

//             let (input, stream) = scope.new_input();

//             let roots = Collection::new(vec![(1,1)].into_iter().to_stream(scope));
//             let graph = Collection::new(stream);

//             let probe = _reach(&graph, &roots).inner.probe().0;
//             // let probe = _connected_components(&graph).inner.probe().0;
//             // let probe = _bfs(&graph, &roots).inner.probe().0;

//             (input, probe)
//         });
        

//         let timer = Instant::now();

//         for node in 0..graph.nodes() {
//             if node % peers == index {
//                 for &edge in graph.edges(node) {
//                     input.send(((node as u32, edge), 1));
//                 }
//             }
//         }

//         input.advance_to(1);
//         while probe.lt(input.time()) { computation.step(); }

//         println!("loaded: {:?}", timer.elapsed());

//         let mut latencies = Vec::with_capacity(1100);

//         let seed: &[_] = &[1, 2, 3, 4];
//         let mut rng: StdRng = SeedableRng::from_seed(seed);    // rng for edge additions

//         let mut counts = 0;

//         for count in 0..latencies.capacity() {
//         //graph.nodes() {

//             let mut updates = Vec::with_capacity(1);
//             while updates.len() < updates.capacity() {
//                 let node = rng.gen_range(0, graph.nodes());
//                 updates.push(node);
//             }

//             let start = Instant::now();
//             if count % peers == index {
//                 for &node in &updates {
//                     for &edge in graph.edges(node) {
//                         input.send(((node as u32, edge), -1));
//                         counts += 1;
//                     }
//                 }
//             }
//             input.advance_to((count as u64) + 2);
//             while probe.lt(input.time()) { computation.step(); }
//             latencies.push(start.elapsed());
//         }

//         let mut latencies = latencies.into_iter().skip(100).collect::<Vec<_>>();

//         let mut sum = 0;
//         for &time in &latencies {
//             sum += time.as_secs() * 1000000000 + time.subsec_nanos() as u64;
//         }

//         println!("average latency: {}; edges: {}", sum / latencies.len() as u64, counts);

//         if index == 0 {
//             latencies.sort();

//             for &time in &latencies {
//                 let nanos = time.as_secs() * 1000000000 + time.subsec_nanos() as u64;
//                 println!("{}", nanos as f64 / 1000.0);
//             }
//         }
//     }).unwrap();
// }


// // returns pairs (n, s) indicating node n can be reached from a root in s steps.
// fn _reach<G: Scope>(edges: &Collection<G, Edge>, roots: &Collection<G, Node>) -> Collection<G, Node>
// where G::Timestamp: LeastUpperBound {

//     // initialize roots as reaching themselves at distance 0
//     // repeatedly update minimal distances each node can be reached from each root
//     roots.iterate(|inner| {

//         let edges = edges.enter(&inner.scope());
//         let nodes = roots.enter(&inner.scope());

//         edges.semijoin_u(&inner)
//              .map(|(_,d)| d)
//              .concat(&nodes)
//              .distinct()
//      })
// }


// fn _connected_components<G: Scope>(edges: &Collection<G, Edge>) -> Collection<G, (Node, Node)>
// where G::Timestamp: LeastUpperBound+Hash {

//     // each edge (x,y) means that we need at least a label for the min of x and y.
//     let nodes = edges.map_in_place(|pair| {
//                         let min = std::cmp::min(pair.0, pair.1);
//                         *pair = (min, min);
//                      })
//                      .consolidate_by(|x| x.0);

//     // each edge should exist in both directions.
//     let edges = edges.map_in_place(|x| mem::swap(&mut x.0, &mut x.1))
//                      .concat(&edges);

//     // don't actually use these labels, just grab the type
//     nodes.filter(|_| false)
//          .iterate(|inner| {
//              let edges = edges.enter(&inner.scope());
//              let nodes = nodes.enter_at(&inner.scope(), |r| 256 * (64 - (r.0).0.leading_zeros() as u64));

//             inner.join_map_u(&edges, |_k,l,d| (*d,*l))
//                  .concat(&nodes)
//                  .group_u(|_, mut s, t| { t.push((*s.peek().unwrap().0, 1)); } )
//          })
// }

// // returns pairs (n, s) indicating node n can be reached from a root in s steps.
// fn _bfs<G: Scope>(edges: &Collection<G, Edge>, roots: &Collection<G, Node>) -> Collection<G, (Node, u32)>
// where G::Timestamp: LeastUpperBound {

//     // initialize roots as reaching themselves at distance 0
//     let nodes = roots.map(|x| (x, 0));

//     // repeatedly update minimal distances each node can be reached from each root
//     nodes.iterate(|inner| {

//         let edges = edges.enter(&inner.scope());
//         let nodes = nodes.enter(&inner.scope());

//         inner.join_map_u(&edges, |_k,l,d| (*d, l+1))
//              .concat(&nodes)
//              .group_u(|_, s, t| t.push((*s.peek().unwrap().0, 1)))
//      })
// }