use std::rc::Rc;
use std::cell::RefCell;
use crate::scheduling::activate::Activations;
use crate::progress::{Timestamp, Operate, Subgraph, SubgraphBuilder};
use crate::progress::{Source, Target};
use crate::progress::timestamp::Refines;
use crate::order::Product;
use crate::worker::Worker;
pub type Iterative<'scope, TOuter, TInner> = Scope<'scope, Product<TOuter, TInner>>;
pub struct Scope<'scope, T: Timestamp> {
pub(crate) subgraph: &'scope RefCell<SubgraphBuilder<T>>,
pub(crate) worker: &'scope Worker,
}
impl<'scope, T: Timestamp> Scope<'scope, T> {
pub fn worker(&self) -> &'scope Worker { self.worker }
pub fn index(&self) -> usize { self.worker.index() }
pub fn peers(&self) -> usize { self.worker.peers() }
pub fn activations(&self) -> Rc<RefCell<Activations>> { self.worker.activations() }
pub fn activator_for(&self, path: Rc<[usize]>) -> crate::scheduling::Activator { self.worker.activator_for(path) }
pub fn name(&self) -> String { self.subgraph.borrow().name.clone() }
pub fn addr(&self) -> Rc<[usize]> { Rc::clone(&self.subgraph.borrow().path) }
pub fn add_edge(&self, source: Source, target: Target) {
self.subgraph.borrow_mut().connect(source, target);
}
pub fn reserve_operator(&self) -> OperatorSlot<'scope, T> {
let index = self.subgraph.borrow_mut().allocate_child_id();
let identifier = self.worker().new_identifier();
OperatorSlot {
scope: *self,
index,
identifier,
installed: false,
}
}
#[inline]
pub fn scoped<T2, R, F>(&self, name: &str, func: F) -> R
where
T2: Timestamp + Refines<T>,
F: FnOnce(Scope<T2>) -> R,
{
let (result, subgraph, slot) = self.scoped_raw(name, func);
slot.install(Box::new(subgraph));
result
}
pub fn scoped_raw<T2, R, F>(&self, name: &str, func: F) -> (R, Subgraph<T, T2>, OperatorSlot<'scope, T>)
where
T2: Timestamp + Refines<T>,
F: FnOnce(Scope<T2>) -> R,
{
let slot = self.reserve_operator();
let path = slot.addr();
let identifier = slot.identifier();
let subgraph = RefCell::new(SubgraphBuilder::new_from(path, identifier, name));
let child = Scope { subgraph: &subgraph, worker: self.worker };
let result = func(child);
let subgraph = subgraph.into_inner().build(self.worker);
(result, subgraph, slot)
}
pub fn iterative<T2, R, F>(&self, func: F) -> R
where
T2: Timestamp,
F: FnOnce(Scope<Product<T, T2>>) -> R,
{
self.scoped::<Product<T, T2>, R, F>("Iterative", func)
}
pub fn region<R, F>(&self, func: F) -> R
where
F: FnOnce(Scope<T>) -> R,
{
self.region_named("Region", func)
}
pub fn region_named<R, F>(&self, name: &str, func: F) -> R
where
F: FnOnce(Scope<T>) -> R,
{
self.scoped::<T, R, F>(name, func)
}
}
impl<'scope, T: Timestamp> Copy for Scope<'scope, T> {}
impl<'scope, T: Timestamp> Clone for Scope<'scope, T> {
fn clone(&self) -> Self { *self }
}
impl<'scope, T: Timestamp> std::fmt::Debug for Scope<'scope, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Scope")
.field("name", &self.subgraph.borrow().name)
.field("path", &self.subgraph.borrow().path)
.finish_non_exhaustive()
}
}
#[derive(Debug)]
pub struct OperatorSlot<'scope, T: Timestamp> {
scope: Scope<'scope, T>,
index: usize,
identifier: usize,
installed: bool,
}
impl<'scope, T: Timestamp> OperatorSlot<'scope, T> {
pub fn index(&self) -> usize { self.index }
pub fn identifier(&self) -> usize { self.identifier }
pub fn addr(&self) -> Rc<[usize]> {
let scope_path = &self.scope.subgraph.borrow().path[..];
let mut addr = Vec::with_capacity(scope_path.len() + 1);
addr.extend_from_slice(scope_path);
addr.push(self.index);
addr.into()
}
pub fn install(mut self, operator: Box<dyn Operate<T>>) {
self.scope.subgraph.borrow_mut().add_child(operator, self.index, self.identifier);
self.installed = true;
}
}
impl<'scope, T: Timestamp> Drop for OperatorSlot<'scope, T> {
fn drop(&mut self) {
if !self.installed && !std::thread::panicking() {
panic!(
"OperatorSlot for index {} dropped without `install` being called. \
Every reserved operator slot must be filled.",
self.index,
);
}
}
}