use std::{borrow, error::Error, fmt::Display, ops::Deref};
use std::rc::Rc;
use std::cell::RefCell;
use std::fmt::{self, Debug};
use crate::order::PartialOrder;
use crate::progress::Timestamp;
use crate::progress::ChangeBatch;
use crate::progress::operate::PortConnectivity;
use crate::scheduling::Activations;
use crate::dataflow::channels::pullers::counter::ConsumedGuard;
pub trait CapabilityTrait<T: Timestamp> {
fn time(&self) -> &T;
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool;
}
impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &C {
fn time(&self) -> &T { (**self).time() }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
(**self).valid_for_output(query_buffer, port)
}
}
impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &mut C {
fn time(&self) -> &T { (**self).time() }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
(**self).valid_for_output(query_buffer, port)
}
}
pub struct Capability<T: Timestamp> {
time: T,
internal: Rc<RefCell<ChangeBatch<T>>>,
}
impl<T: Timestamp> CapabilityTrait<T> for Capability<T> {
fn time(&self) -> &T { &self.time }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, _port: usize) -> bool {
Rc::ptr_eq(&self.internal, query_buffer)
}
}
impl<T: Timestamp> Capability<T> {
pub(crate) fn new(time: T, internal: Rc<RefCell<ChangeBatch<T>>>) -> Self {
internal.borrow_mut().update(time.clone(), 1);
Self {
time,
internal,
}
}
pub fn time(&self) -> &T {
&self.time
}
pub fn delayed(&self, new_time: &T) -> Capability<T> {
#[cold]
#[inline(never)]
fn delayed_panic(capability: &dyn Debug, invalid_time: &dyn Debug) -> ! {
panic!(
"Attempted to delay {:?} to {:?}, which is not beyond the capability's time.",
capability,
invalid_time,
)
}
self.try_delayed(new_time)
.unwrap_or_else(|| delayed_panic(self, new_time))
}
pub fn try_delayed(&self, new_time: &T) -> Option<Capability<T>> {
if self.time.less_equal(new_time) {
Some(Self::new(new_time.clone(), Rc::clone(&self.internal)))
} else {
None
}
}
pub fn downgrade(&mut self, new_time: &T) {
#[cold]
#[inline(never)]
fn downgrade_panic(capability: &dyn Debug, invalid_time: &dyn Debug) -> ! {
panic!(
"Attempted to downgrade {:?} to {:?}, which is not beyond the capability's time.",
capability,
invalid_time,
)
}
self.try_downgrade(new_time)
.unwrap_or_else(|_| downgrade_panic(self, new_time))
}
pub fn try_downgrade(&mut self, new_time: &T) -> Result<(), DowngradeError> {
if let Some(new_capability) = self.try_delayed(new_time) {
*self = new_capability;
Ok(())
} else {
Err(DowngradeError(()))
}
}
}
impl<T: Timestamp> Drop for Capability<T> {
fn drop(&mut self) {
self.internal.borrow_mut().update(self.time.clone(), -1);
}
}
impl<T: Timestamp> Clone for Capability<T> {
fn clone(&self) -> Capability<T> {
Self::new(self.time.clone(), Rc::clone(&self.internal))
}
}
impl<T: Timestamp> Deref for Capability<T> {
type Target = T;
fn deref(&self) -> &T {
&self.time
}
}
impl<T: Timestamp> Debug for Capability<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Capability")
.field("time", &self.time)
.field("internal", &"...")
.finish()
}
}
impl<T: Timestamp> PartialEq for Capability<T> {
fn eq(&self, other: &Self) -> bool {
self.time() == other.time() && Rc::ptr_eq(&self.internal, &other.internal)
}
}
impl<T: Timestamp> Eq for Capability<T> { }
impl<T: Timestamp> PartialOrder for Capability<T> {
fn less_equal(&self, other: &Self) -> bool {
self.time().less_equal(other.time()) && Rc::ptr_eq(&self.internal, &other.internal)
}
}
impl<T: Timestamp+::std::hash::Hash> ::std::hash::Hash for Capability<T> {
fn hash<H: ::std::hash::Hasher>(&self, state: &mut H) {
self.time.hash(state);
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct DowngradeError(());
impl Display for DowngradeError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("could not downgrade the given capability")
}
}
impl Error for DowngradeError {}
type CapabilityUpdates<T> = Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>;
pub struct InputCapability<T: Timestamp> {
internal: CapabilityUpdates<T>,
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
consumed_guard: ConsumedGuard<T>,
}
impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
fn time(&self) -> &T { self.time() }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
let summaries_borrow = self.summaries.borrow();
let internal_borrow = self.internal.borrow();
Rc::ptr_eq(&internal_borrow[port], query_buffer) &&
summaries_borrow.get(port).map_or(false, |path| path.elements() == [Default::default()])
}
}
impl<T: Timestamp> InputCapability<T> {
pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<PortConnectivity<T::Summary>>>, guard: ConsumedGuard<T>) -> Self {
InputCapability {
internal,
summaries,
consumed_guard: guard,
}
}
#[inline]
pub fn time(&self) -> &T {
self.consumed_guard.time()
}
pub fn delayed(&self, new_time: &T, output_port: usize) -> Capability<T> {
use crate::progress::timestamp::PathSummary;
if let Some(path) = self.summaries.borrow().get(output_port) {
if path.iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port]))
} else {
panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, path, self.time());
}
}
else {
panic!("Attempted to delay a capability for a disconnected output");
}
}
#[inline]
pub fn retain(&self, output_port: usize) -> Capability<T> {
self.delayed(self.time(), output_port)
}
}
impl<T: Timestamp> Deref for InputCapability<T> {
type Target = T;
fn deref(&self) -> &T {
self.time()
}
}
impl<T: Timestamp> Debug for InputCapability<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("InputCapability")
.field("time", self.time())
.field("internal", &"...")
.finish()
}
}
#[derive(Clone, Debug)]
pub struct ActivateCapability<T: Timestamp> {
pub(crate) capability: Capability<T>,
pub(crate) address: Rc<[usize]>,
pub(crate) activations: Rc<RefCell<Activations>>,
}
impl<T: Timestamp> CapabilityTrait<T> for ActivateCapability<T> {
fn time(&self) -> &T { self.capability.time() }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
self.capability.valid_for_output(query_buffer, port)
}
}
impl<T: Timestamp> ActivateCapability<T> {
pub fn new(capability: Capability<T>, address: Rc<[usize]>, activations: Rc<RefCell<Activations>>) -> Self {
Self {
capability,
address,
activations,
}
}
pub fn time(&self) -> &T {
self.capability.time()
}
pub fn delayed(&self, time: &T) -> Self {
ActivateCapability {
capability: self.capability.delayed(time),
address: Rc::clone(&self.address),
activations: Rc::clone(&self.activations),
}
}
pub fn downgrade(&mut self, time: &T) {
self.capability.downgrade(time);
self.activations.borrow_mut().activate(&self.address);
}
}
impl<T: Timestamp> Drop for ActivateCapability<T> {
fn drop(&mut self) {
self.activations.borrow_mut().activate(&self.address);
}
}
#[derive(Clone, Debug)]
pub struct CapabilitySet<T: Timestamp> {
elements: Vec<Capability<T>>,
}
impl<T: Timestamp> CapabilitySet<T> {
pub fn new() -> Self {
Self { elements: Vec::new() }
}
pub fn with_capacity(capacity: usize) -> Self {
Self { elements: Vec::with_capacity(capacity) }
}
pub fn from_elem(cap: Capability<T>) -> Self {
Self { elements: vec![cap] }
}
pub fn insert(&mut self, capability: Capability<T>) {
if !self.elements.iter().any(|c| c.less_equal(&capability)) {
self.elements.retain(|c| !capability.less_equal(c));
self.elements.push(capability);
}
}
pub fn delayed(&self, time: &T) -> Capability<T> {
#[cold]
#[inline(never)]
fn delayed_panic(invalid_time: &dyn Debug) -> ! {
panic!(
"failed to create a delayed capability, the current set does not \
have an element less than or equal to {:?}",
invalid_time,
)
}
self.try_delayed(time)
.unwrap_or_else(|| delayed_panic(time))
}
pub fn try_delayed(&self, time: &T) -> Option<Capability<T>> {
self.elements
.iter()
.find(|capability| capability.time().less_equal(time))
.and_then(|capability| capability.try_delayed(time))
}
pub fn downgrade<B, F>(&mut self, frontier: F)
where
B: borrow::Borrow<T>,
F: IntoIterator<Item = B>,
{
#[cold]
#[inline(never)]
fn downgrade_panic() -> ! {
panic!(
"Attempted to downgrade a CapabilitySet with a frontier containing an element \
that was not beyond an element within the set"
)
}
self.try_downgrade(frontier)
.unwrap_or_else(|_| downgrade_panic())
}
pub fn try_downgrade<B, F>(&mut self, frontier: F) -> Result<(), DowngradeError>
where
B: borrow::Borrow<T>,
F: IntoIterator<Item = B>,
{
let count = self.elements.len();
for time in frontier.into_iter() {
let capability = self.try_delayed(time.borrow()).ok_or(DowngradeError(()))?;
self.elements.push(capability);
}
self.elements.drain(..count);
Ok(())
}
}
impl<T> From<Vec<Capability<T>>> for CapabilitySet<T>
where
T: Timestamp,
{
fn from(capabilities: Vec<Capability<T>>) -> Self {
let mut this = Self::with_capacity(capabilities.len());
for capability in capabilities {
this.insert(capability);
}
this
}
}
impl<T: Timestamp> Default for CapabilitySet<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Timestamp> Deref for CapabilitySet<T> {
type Target=[Capability<T>];
fn deref(&self) -> &[Capability<T>] {
&self.elements
}
}