use std::rc::Rc;
use trace::{Batch, BatchReader, Builder, Merger, Cursor, Trace, TraceReader};
use trace::description::Description;
use trace::rc_blanket_impls::RcBatchCursor;
use super::spine_fueled::Spine;
use super::merge_batcher::MergeBatcher;
use timely::progress::nested::product::Product;
use timely::progress::timestamp::RootTimestamp;
type Node = u32;
struct GraphSpine<N> where N: Ord+Clone+'static {
spine: Spine<Node, N, Product<RootTimestamp, ()>, isize, Rc<GraphBatch<N>>>
}
impl<N> TraceReader<Node, N, Product<RootTimestamp, ()>, isize> for GraphSpine<N>
where
N: Ord+Clone+'static,
{
type Batch = Rc<GraphBatch<N>>;
type Cursor = RcBatchCursor<Node, N, Product<RootTimestamp, ()>, isize, GraphBatch<N>>;
fn cursor_through(&mut self, upper: &[Product<RootTimestamp,()>]) -> Option<(Self::Cursor, <Self::Cursor as Cursor<Node, N, Product<RootTimestamp,()>, isize>>::Storage)> {
let mut batch = Vec::new();
self.spine.map_batches(|b| batch.push(b.clone()));
assert!(batch.len() <= 1);
if upper == &[] {
batch.pop().map(|b| (b.cursor(), b))
}
else { None }
}
fn advance_by(&mut self, frontier: &[Product<RootTimestamp,()>]) {
self.spine.advance_by(frontier)
}
fn advance_frontier(&mut self) -> &[Product<RootTimestamp,()>] { self.spine.advance_frontier() }
fn distinguish_since(&mut self, frontier: &[Product<RootTimestamp,()>]) {
self.spine.distinguish_since(frontier)
}
fn distinguish_frontier(&mut self) -> &[Product<RootTimestamp,()>] { &self.spine.distinguish_frontier() }
fn map_batches<F: FnMut(&Self::Batch)>(&mut self, f: F) {
self.spine.map_batches(f)
}
}
impl<N> Trace<Node, N, Product<RootTimestamp,()>, isize> for GraphSpine<N>
where
N: Ord+Clone+'static,
{
fn new() -> Self {
GraphSpine {
spine: Spine::<Node, N, Product<RootTimestamp, ()>, isize, Rc<GraphBatch<N>>>::new()
}
}
fn insert(&mut self, batch: Self::Batch) {
self.spine.insert(batch)
}
fn close(&mut self) {
self.spine.close()
}
}
#[derive(Debug, Abomonation)]
pub struct GraphBatch<N> {
index: usize,
peers: usize,
keys: Vec<Node>,
nodes: Vec<usize>,
edges: Vec<N>,
desc: Description<Product<RootTimestamp,()>>,
}
impl<N> BatchReader<Node, N, Product<RootTimestamp,()>, isize> for GraphBatch<N> where N: Ord+Clone+'static {
type Cursor = GraphCursor;
fn cursor(&self) -> Self::Cursor { GraphCursor { key: self.index as Node, key_pos: 0, val_pos: 0 } }
fn len(&self) -> usize { self.edges.len() }
fn description(&self) -> &Description<Product<RootTimestamp,()>> { &self.desc }
}
impl<N> Batch<Node, N, Product<RootTimestamp,()>, isize> for GraphBatch<N> where N: Ord+Clone+'static {
type Batcher = MergeBatcher<Node, N, Product<RootTimestamp,()>, isize, Self>;
type Builder = GraphBuilder<N>;
type Merger = GraphMerger;
fn begin_merge(&self, other: &Self) -> Self::Merger {
GraphMerger::new(self, other)
}
}
pub struct GraphMerger { }
impl<N> Merger<Node, N, Product<RootTimestamp,()>, isize, GraphBatch<N>> for GraphMerger where N: Ord+Clone+'static {
fn new(_batch1: &GraphBatch<N>, _batch2: &GraphBatch<N>) -> Self {
panic!("Cannot merge GraphBatch; they are static");
}
fn done(self) -> GraphBatch<N> {
panic!("Cannot merge GraphBatch; they are static");
}
fn work(&mut self, _source1: &GraphBatch<N>, _source2: &GraphBatch<N>, _frontier: &Option<Vec<Product<RootTimestamp,()>>>, _fuel: &mut usize) {
panic!("Cannot merge GraphBatch; they are static");
}
}
#[derive(Debug)]
pub struct GraphCursor {
key: Node,
key_pos: usize,
val_pos: usize,
}
impl<N> Cursor<Node, N, Product<RootTimestamp,()>, isize> for GraphCursor where N: Ord+Clone {
type Storage = GraphBatch<N>;
fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Node { &storage.keys[self.key_pos] }
fn val<'a>(&self, storage: &'a Self::Storage) -> &'a N { &storage.edges[self.val_pos] }
fn map_times<L: FnMut(&Product<RootTimestamp,()>, isize)>(&mut self, _storage: &Self::Storage, mut logic: L) {
logic(&Product::new(RootTimestamp, ()), 1);
}
fn key_valid(&self, storage: &Self::Storage) -> bool { (self.key_pos + 1) < storage.nodes.len() }
fn val_valid(&self, storage: &Self::Storage) -> bool {
self.val_pos < storage.nodes[self.key_pos + 1]
}
fn step_key(&mut self, storage: &Self::Storage){
if self.key_valid(storage) {
self.key_pos += 1;
self.key += storage.peers as Node;
}
}
fn seek_key(&mut self, storage: &Self::Storage, key: &Node) {
if self.key_valid(storage) {
self.key_pos = (*key as usize) / storage.peers;
if self.key_pos + 1 >= storage.nodes.len() {
self.key_pos = storage.nodes.len() - 1;
}
self.val_pos = storage.nodes[self.key_pos];
self.key = (storage.peers * self.key_pos + storage.index) as Node;
}
}
fn step_val(&mut self, storage: &Self::Storage) {
if self.val_valid(storage) {
self.val_pos += 1;
}
}
fn seek_val(&mut self, storage: &Self::Storage, val: &N) {
if self.val_valid(storage) {
let lower = self.val_pos;
let upper = storage.nodes[self.key_pos + 1];
self.val_pos += advance(&storage.edges[lower .. upper], |tuple| tuple < val);
}
}
fn rewind_keys(&mut self, storage: &Self::Storage) { self.key_pos = 0; self.key = storage.index as Node; }
fn rewind_vals(&mut self, storage: &Self::Storage) {
if self.key_valid(storage) {
self.val_pos = storage.nodes[self.key_pos];
}
}
}
pub struct GraphBuilder<N: Ord> {
index: usize,
peers: usize,
keys: Vec<Node>,
nodes: Vec<usize>,
edges: Vec<N>,
}
impl<N> Builder<Node, N, Product<RootTimestamp,()>, isize, GraphBatch<N>> for GraphBuilder<N> where N: Ord+Clone+'static {
fn new() -> Self { Self::with_capacity(0) }
fn with_capacity(cap: usize) -> Self {
GraphBuilder {
index: 0,
peers: 1,
keys: Vec::new(),
nodes: Vec::new(),
edges: Vec::with_capacity(cap),
}
}
#[inline(always)]
fn push(&mut self, (key, val, _time, _diff): (Node, N, Product<RootTimestamp,()>, isize)) {
while self.nodes.len() <= (key as usize) / self.peers {
self.keys.push((self.peers * self.nodes.len() + self.index) as Node);
self.nodes.push(self.edges.len());
}
self.edges.push(val);
}
#[inline(never)]
fn done(mut self, lower: &[Product<RootTimestamp,()>], upper: &[Product<RootTimestamp,()>], since: &[Product<RootTimestamp,()>]) -> GraphBatch<N> {
println!("GraphBuilder::done(): {} nodes, {} edges.", self.nodes.len(), self.edges.len());
self.nodes.push(self.edges.len());
GraphBatch {
index: self.index,
peers: self.peers,
keys: self.keys,
nodes: self.nodes,
edges: self.edges,
desc: Description::new(lower, upper, since)
}
}
}
#[inline(never)]
pub fn advance<T, F: Fn(&T)->bool>(slice: &[T], function: F) -> usize {
let mut index = 0;
if index < slice.len() && function(&slice[index]) {
let mut step = 1;
while index + step < slice.len() && function(&slice[index + step]) {
index += step;
step = step << 1;
}
step = step >> 1;
while step > 0 {
if index + step < slice.len() && function(&slice[index + step]) {
index += step;
}
step = step >> 1;
}
index += 1;
}
index
}