use std::rc::Rc;
use std::cell::RefCell;
use std::any::Any;
use progress::timestamp::RootTimestamp;
use progress::{Timestamp, Operate, Subgraph};
use timely_communication::{Allocate, Data};
use {Push, Pull};
use super::{ScopeParent, Child};
pub struct Root<A: Allocate> {
allocator: Rc<RefCell<A>>,
identifiers: Rc<RefCell<usize>>,
dataflows: Rc<RefCell<Vec<Wrapper>>>,
dataflow_counter: Rc<RefCell<usize>>,
}
impl<A: Allocate> Root<A> {
pub fn new(c: A) -> Root<A> {
let mut result = Root {
allocator: Rc::new(RefCell::new(c)),
identifiers: Rc::new(RefCell::new(0)),
dataflows: Rc::new(RefCell::new(Vec::new())),
dataflow_counter: Rc::new(RefCell::new(0)),
};
if cfg!(feature = "logging") {
::logging::initialize(&mut result);
}
result
}
pub fn step(&mut self) -> bool {
if cfg!(feature = "logging") {
::logging::flush_logs();
}
let mut active = false;
for dataflow in self.dataflows.borrow_mut().iter_mut() {
let sub_active = dataflow.step();
active = active || sub_active;
}
self.dataflows.borrow_mut().retain(|dataflow| dataflow.active());
active
}
pub fn step_while<F: FnMut()->bool>(&mut self, mut func: F) {
while func() { self.step(); }
}
pub fn index(&self) -> usize { self.allocator.borrow().index() }
pub fn peers(&self) -> usize { self.allocator.borrow().peers() }
pub fn dataflow<T: Timestamp, R, F:FnOnce(&mut Child<Self, T>)->R>(&mut self, func: F) -> R {
self.dataflow_using(Box::new(()), |_, child| func(child))
}
pub fn dataflow_using<T: Timestamp, R, F:FnOnce(&mut V, &mut Child<Self, T>)->R, V: Any+'static>(&mut self, mut resources: V, func: F) -> R {
let addr = vec![self.allocator.borrow().index()];
let dataflow_index = self.allocate_dataflow_index();
let subscope = Subgraph::new_from(&mut (*self.allocator.borrow_mut()), dataflow_index, addr);
let subscope = RefCell::new(subscope);
let result = {
let mut builder = Child {
subgraph: &subscope,
parent: self.clone(),
};
func(&mut resources, &mut builder)
};
let mut operator = subscope.into_inner();
operator.get_internal_summary();
operator.set_external_summary(Vec::new(), &mut []);
let wrapper = Wrapper {
_index: dataflow_index,
operate: Some(Box::new(operator)),
resources: Some(Box::new(resources)),
};
self.dataflows.borrow_mut().push(wrapper);
result
}
fn allocate_dataflow_index(&mut self) -> usize {
*self.dataflow_counter.borrow_mut() += 1;
*self.dataflow_counter.borrow() - 1
}
}
impl<A: Allocate> ScopeParent for Root<A> {
type Timestamp = RootTimestamp;
fn new_identifier(&mut self) -> usize {
*self.identifiers.borrow_mut() += 1;
*self.identifiers.borrow() - 1
}
}
impl<A: Allocate> Allocate for Root<A> {
fn index(&self) -> usize { self.allocator.borrow().index() }
fn peers(&self) -> usize { self.allocator.borrow().peers() }
fn allocate<D: Data>(&mut self) -> (Vec<Box<Push<D>>>, Box<Pull<D>>) {
self.allocator.borrow_mut().allocate()
}
}
impl<A: Allocate> Clone for Root<A> {
fn clone(&self) -> Self {
Root {
allocator: self.allocator.clone(),
identifiers: self.identifiers.clone(),
dataflows: self.dataflows.clone(),
dataflow_counter: self.dataflow_counter.clone(),
}
}
}
struct Wrapper {
_index: usize,
operate: Option<Box<Operate<RootTimestamp>>>,
resources: Option<Box<Any>>,
}
impl Wrapper {
fn step(&mut self) -> bool {
self.operate.as_mut().map(|op| op.pull_internal_progress(&mut [], &mut [], &mut [])).unwrap_or(false)
}
fn active(&self) -> bool { self.operate.is_some() }
}
impl Drop for Wrapper {
fn drop(&mut self) {
self.operate = None;
self.resources = None;
}
}