use std::ops::Deref;
use std::rc::Rc;
use std::cell::RefCell;
use std::fmt::{self, Debug};
use crate::order::PartialOrder;
use crate::progress::Timestamp;
use crate::progress::ChangeBatch;
use crate::scheduling::Activations;
pub trait CapabilityTrait<T: Timestamp> {
fn time(&self) -> &T;
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool;
}
impl<'a, T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &'a C {
fn time(&self) -> &T { (**self).time() }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
(**self).valid_for_output(query_buffer)
}
}
impl<'a, T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &'a mut C {
fn time(&self) -> &T { (**self).time() }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
(**self).valid_for_output(query_buffer)
}
}
pub struct Capability<T: Timestamp> {
time: T,
internal: Rc<RefCell<ChangeBatch<T>>>,
}
impl<T: Timestamp> CapabilityTrait<T> for Capability<T> {
fn time(&self) -> &T { &self.time }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
Rc::ptr_eq(&self.internal, query_buffer)
}
}
impl<T: Timestamp> Capability<T> {
#[inline]
pub fn time(&self) -> &T {
&self.time
}
#[inline]
pub fn delayed(&self, new_time: &T) -> Capability<T> {
if !self.time.less_equal(new_time) {
panic!("Attempted to delay {:?} to {:?}, which is not `less_equal` the capability's time.", self, new_time);
}
mint(new_time.clone(), self.internal.clone())
}
#[inline]
pub fn downgrade(&mut self, new_time: &T) {
let new_cap = self.delayed(new_time);
*self = new_cap;
}
}
#[inline]
pub fn mint<T: Timestamp>(time: T, internal: Rc<RefCell<ChangeBatch<T>>>) -> Capability<T> {
internal.borrow_mut().update(time.clone(), 1);
Capability {
time,
internal,
}
}
impl<T: Timestamp> Drop for Capability<T> {
#[inline]
fn drop(&mut self) {
self.internal.borrow_mut().update(self.time.clone(), -1);
}
}
impl<T: Timestamp> Clone for Capability<T> {
#[inline]
fn clone(&self) -> Capability<T> {
mint(self.time.clone(), self.internal.clone())
}
}
impl<T: Timestamp> Deref for Capability<T> {
type Target = T;
#[inline]
fn deref(&self) -> &T {
&self.time
}
}
impl<T: Timestamp> Debug for Capability<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Capability {{ time: {:?}, internal: ... }}", self.time)
}
}
impl<T: Timestamp> PartialEq for Capability<T> {
#[inline]
fn eq(&self, other: &Self) -> bool {
self.time() == other.time() && Rc::ptr_eq(&self.internal, &other.internal)
}
}
impl<T: Timestamp> Eq for Capability<T> { }
impl<T: Timestamp> PartialOrder for Capability<T> {
#[inline]
fn less_equal(&self, other: &Self) -> bool {
self.time().less_equal(other.time()) && Rc::ptr_eq(&self.internal, &other.internal)
}
}
impl<T: Timestamp> ::std::hash::Hash for Capability<T> {
#[inline]
fn hash<H: ::std::hash::Hasher>(&self, state: &mut H) {
self.time.hash(state);
}
}
pub struct CapabilityRef<'cap, T: Timestamp+'cap> {
time: &'cap T,
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
}
impl<'cap, T: Timestamp+'cap> CapabilityTrait<T> for CapabilityRef<'cap, T> {
fn time(&self) -> &T { self.time }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
self.internal.borrow().iter().any(|rc| Rc::ptr_eq(rc, query_buffer))
}
}
impl<'cap, T: Timestamp+'cap> CapabilityRef<'cap, T> {
#[inline]
pub fn time(&self) -> &T {
self.time
}
#[inline]
pub fn delayed(&self, new_time: &T) -> Capability<T> {
self.delayed_for_output(new_time, 0)
}
pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability<T> {
if !self.time.less_equal(new_time) {
panic!("Attempted to delay {:?} to {:?}, which is not `less_equal` the capability's time.", self, new_time);
}
if output_port < self.internal.borrow().len() {
mint(new_time.clone(), self.internal.borrow()[output_port].clone())
}
else {
panic!("Attempted to acquire a capability for a non-existent output port.");
}
}
#[inline]
pub fn retain(self) -> Capability<T> {
self.retain_for_output(0)
}
pub fn retain_for_output(self, output_port: usize) -> Capability<T> {
if output_port < self.internal.borrow().len() {
mint(self.time.clone(), self.internal.borrow()[output_port].clone())
}
else {
panic!("Attempted to acquire a capability for a non-existent output port.");
}
}
}
impl<'cap, T: Timestamp> Deref for CapabilityRef<'cap, T> {
type Target = T;
#[inline]
fn deref(&self) -> &T {
self.time
}
}
impl<'cap, T: Timestamp> Debug for CapabilityRef<'cap, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "CapabilityRef {{ time: {:?}, internal: ... }}", self.time)
}
}
#[inline]
pub fn mint_ref<'cap, T: Timestamp>(time: &'cap T, internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>) -> CapabilityRef<'cap, T> {
CapabilityRef {
time,
internal,
}
}
#[derive(Clone)]
pub struct ActivateCapability<T: Timestamp> {
pub(crate) capability: Capability<T>,
pub(crate) address: Rc<Vec<usize>>,
pub(crate) activations: Rc<RefCell<Activations>>,
}
impl<T: Timestamp> CapabilityTrait<T> for ActivateCapability<T> {
fn time(&self) -> &T { self.capability.time() }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
self.capability.valid_for_output(query_buffer)
}
}
impl<T: Timestamp> ActivateCapability<T> {
pub fn new(capability: Capability<T>, address: &[usize], activations: Rc<RefCell<Activations>>) -> Self {
Self {
capability,
address: Rc::new(address.to_vec()),
activations,
}
}
pub fn time(&self) -> &T {
self.capability.time()
}
pub fn delayed(&self, time: &T) -> Self {
ActivateCapability {
capability: self.capability.delayed(time),
address: self.address.clone(),
activations: self.activations.clone(),
}
}
pub fn downgrade(&mut self, time: &T) {
self.capability.downgrade(time);
self.activations.borrow_mut().activate(&self.address[..]);
}
}
impl<T: Timestamp> Drop for ActivateCapability<T> {
fn drop(&mut self) {
self.activations.borrow_mut().activate(&self.address[..]);
}
}
#[derive(Clone, Debug)]
pub struct CapabilitySet<T: Timestamp> {
elements: Vec<Capability<T>>,
}
impl<T: Timestamp> CapabilitySet<T> {
pub fn new() -> Self {
CapabilitySet { elements: Vec::new() }
}
pub fn from_elem(cap: Capability<T>) -> Self {
CapabilitySet { elements: vec![cap] }
}
pub fn insert(&mut self, capability: Capability<T>) {
if !self.elements.iter().any(|c| c.less_equal(&capability)) {
self.elements.retain(|c| !capability.less_equal(c));
self.elements.push(capability);
}
}
pub fn delayed(&self, time: &T) -> Capability<T> {
self.elements.iter().find(|c| c.time().less_equal(time)).unwrap().delayed(time)
}
pub fn downgrade<B, F>(&mut self, frontier: F)
where
B: std::borrow::Borrow<T>,
F: IntoIterator<Item=B>,
{
let count = self.elements.len();
for time in frontier.into_iter() {
let capability = self.delayed(time.borrow());
self.elements.push(capability);
}
self.elements.drain(..count);
}
}
impl<T: Timestamp> Deref for CapabilitySet<T> {
type Target=[Capability<T>];
fn deref(&self) -> &[Capability<T>] {
&self.elements[..]
}
}