extern crate rand;
extern crate timely;
extern crate differential_dataflow;
extern crate serde;
extern crate rdkafka;
use rand::{Rng, SeedableRng, StdRng};
use timely::dataflow::*;
use timely::dataflow::operators::probe::Handle;
use differential_dataflow::input::Input;
use differential_dataflow::Collection;
use differential_dataflow::operators::*;
use differential_dataflow::lattice::Lattice;
type Node = u32;
type Edge = (Node, Node);
fn main() {
let nodes: u32 = std::env::args().nth(1).unwrap().parse().unwrap();
let edges: u32 = std::env::args().nth(2).unwrap().parse().unwrap();
let batch: u32 = std::env::args().nth(3).unwrap().parse().unwrap();
let topic = std::env::args().nth(4).unwrap();
let write = std::env::args().any(|x| x == "write");
let read = std::env::args().any(|x| x == "read");
timely::execute_from_args(std::env::args(), move |worker| {
let timer = ::std::time::Instant::now();
let mut probe = Handle::new();
let (mut roots, mut graph, _write_token, _read_token) = worker.dataflow(|scope| {
let (root_input, roots) = scope.new_collection();
let (edge_input, graph) = scope.new_collection();
let result = bfs(&graph, &roots);
let result =
result.map(|(_,l)| l)
.consolidate()
.probe_with(&mut probe);
let write_token = if write {
Some(kafka::create_sink(&result.inner, "localhost:9092", &topic))
} else { None };
let read_token = if read {
let (read_token, stream) = kafka::create_source(result.scope(), "localhost:9092", &topic, "group");
use differential_dataflow::AsCollection;
stream
.as_collection()
.negate()
.concat(&result)
.consolidate()
.inspect(|x| println!("In error: {:?}", x))
.probe_with(&mut probe)
.assert_empty()
;
Some(read_token)
} else { None };
(root_input, edge_input, write_token, read_token)
});
let seed: &[_] = &[1, 2, 3, 4];
let mut rng1: StdRng = SeedableRng::from_seed(seed); let mut rng2: StdRng = SeedableRng::from_seed(seed);
roots.insert(0);
roots.close();
println!("performing BFS on {} nodes, {} edges:", nodes, edges);
if worker.index() == 0 {
for _ in 0 .. edges {
graph.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)));
}
}
println!("{:?}\tloaded", timer.elapsed());
graph.advance_to(1);
graph.flush();
worker.step_while(|| probe.less_than(graph.time()));
println!("{:?}\tstable", timer.elapsed());
for round in 0 .. {
if write {
std::thread::sleep(std::time::Duration::from_millis(100));
}
for element in 0 .. batch {
if worker.index() == 0 {
graph.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)));
graph.remove((rng2.gen_range(0, nodes), rng2.gen_range(0, nodes)));
}
graph.advance_to(2 + round * batch + element);
}
graph.flush();
let timer2 = ::std::time::Instant::now();
worker.step_while(|| probe.less_than(&graph.time()));
if worker.index() == 0 {
let elapsed = timer2.elapsed();
println!("{:?}\t{:?}:\t{}", timer.elapsed(), round, elapsed.as_secs() * 1000000000 + (elapsed.subsec_nanos() as u64));
}
}
println!("finished; elapsed: {:?}", timer.elapsed());
}).unwrap();
}
fn bfs<G: Scope>(edges: &Collection<G, Edge>, roots: &Collection<G, Node>) -> Collection<G, (Node, u32)>
where G::Timestamp: Lattice+Ord {
let nodes = roots.map(|x| (x, 0));
nodes.iterate(|inner| {
let edges = edges.enter(&inner.scope());
let nodes = nodes.enter(&inner.scope());
inner.join_map(&edges, |_k,l,d| (*d, l+1))
.concat(&nodes)
.reduce(|_, s, t| t.push((*s[0].0, 1)))
})
}
pub mod kafka {
use serde::{Serialize, Deserialize};
use timely::scheduling::SyncActivator;
use rdkafka::{ClientContext, config::ClientConfig};
use rdkafka::consumer::{BaseConsumer, ConsumerContext};
use rdkafka::error::{KafkaError, RDKafkaError};
use differential_dataflow::capture::Writer;
use std::hash::Hash;
use timely::progress::Timestamp;
use timely::dataflow::{Scope, Stream};
use differential_dataflow::ExchangeData;
use differential_dataflow::lattice::Lattice;
pub fn create_source<G, D, T, R>(scope: G, addr: &str, topic: &str, group: &str) -> (Box<dyn std::any::Any>, Stream<G, (D, T, R)>)
where
G: Scope<Timestamp = T>,
D: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
T: ExchangeData + Hash + for<'a> serde::Deserialize<'a> + Timestamp + Lattice,
R: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
{
differential_dataflow::capture::source::build(scope, |activator| {
let source = KafkaSource::new(addr, topic, group, activator);
differential_dataflow::capture::YieldingIter::new_from(Iter::<D,T,R>::new_from(source), std::time::Duration::from_millis(10))
})
}
pub fn create_sink<G, D, T, R>(stream: &Stream<G, (D, T, R)>, addr: &str, topic: &str) -> Box<dyn std::any::Any>
where
G: Scope<Timestamp = T>,
D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice,
R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
{
use std::rc::Rc;
use std::cell::RefCell;
use differential_dataflow::hashable::Hashable;
let sink = KafkaSink::new(addr, topic);
let result = Rc::new(RefCell::new(sink));
let sink_hash = (addr.to_string(), topic.to_string()).hashed();
differential_dataflow::capture::sink::build(
&stream,
sink_hash,
Rc::downgrade(&result),
Rc::downgrade(&result),
);
Box::new(result)
}
pub struct KafkaSource {
consumer: BaseConsumer<ActivationConsumerContext>,
}
impl KafkaSource {
pub fn new(addr: &str, topic: &str, group: &str, activator: SyncActivator) -> Self {
let mut kafka_config = ClientConfig::new();
kafka_config.set("bootstrap.servers", &addr.to_string());
kafka_config
.set("enable.auto.commit", "false")
.set("auto.offset.reset", "earliest");
kafka_config.set("topic.metadata.refresh.interval.ms", "30000"); kafka_config.set("fetch.message.max.bytes", "134217728");
kafka_config.set("group.id", group);
kafka_config.set("isolation.level", "read_committed");
let activator = ActivationConsumerContext(activator);
let consumer = kafka_config.create_with_context::<_, BaseConsumer<_>>(activator).unwrap();
use rdkafka::consumer::Consumer;
consumer.subscribe(&[topic]).unwrap();
Self {
consumer,
}
}
}
pub struct Iter<D, T, R> {
pub source: KafkaSource,
phantom: std::marker::PhantomData<(D, T, R)>,
}
impl<D, T, R> Iter<D, T, R> {
pub fn new_from(source: KafkaSource) -> Self {
Self {
source,
phantom: std::marker::PhantomData,
}
}
}
impl<D, T, R> Iterator for Iter<D, T, R>
where
D: for<'a>Deserialize<'a>,
T: for<'a>Deserialize<'a>,
R: for<'a>Deserialize<'a>,
{
type Item = differential_dataflow::capture::Message<D, T, R>;
fn next(&mut self) -> Option<Self::Item> {
use rdkafka::message::Message;
self.source
.consumer
.poll(std::time::Duration::from_millis(0))
.and_then(|result| result.ok())
.and_then(|message| {
message.payload().and_then(|message| bincode::deserialize::<differential_dataflow::capture::Message<D, T, R>>(message).ok())
})
}
}
struct ActivationConsumerContext(SyncActivator);
impl ClientContext for ActivationConsumerContext { }
impl ActivationConsumerContext {
fn activate(&self) {
self.0.activate().unwrap();
}
}
impl ConsumerContext for ActivationConsumerContext {
fn message_queue_nonempty_callback(&self) {
self.activate();
}
}
use std::time::Duration;
use rdkafka::producer::DefaultProducerContext;
use rdkafka::producer::{BaseRecord, ThreadedProducer};
pub struct KafkaSink {
topic: String,
producer: ThreadedProducer<DefaultProducerContext>,
buffer: Vec<u8>,
}
impl KafkaSink {
pub fn new(addr: &str, topic: &str) -> Self {
let mut config = ClientConfig::new();
config.set("bootstrap.servers", &addr);
config.set("queue.buffering.max.kbytes", &format!("{}", 16 << 20));
config.set("queue.buffering.max.messages", &format!("{}", 10_000_000));
config.set("queue.buffering.max.ms", &format!("{}", 10));
let producer = config
.create_with_context::<_, ThreadedProducer<_>>(DefaultProducerContext)
.expect("creating kafka producer for kafka sinks failed");
Self {
producer,
topic: topic.to_string(),
buffer: Vec::new(),
}
}
}
impl<T: Serialize> Writer<T> for KafkaSink {
fn poll(&mut self, item: &T) -> Option<Duration> {
self.buffer.clear();
bincode::serialize_into(&mut self.buffer, item).expect("Writing to a `Vec<u8>` cannot fail");
let record = BaseRecord::<[u8], _>::to(&self.topic).payload(&self.buffer);
self.producer.send(record).err().map(|(e, _)| {
if let KafkaError::MessageProduction(RDKafkaError::QueueFull) = e {
Duration::from_secs(1)
} else {
Duration::from_secs(1)
}
})
}
fn done(&self) -> bool {
self.producer.in_flight_count() == 0
}
}
}