use columnar::Columnar;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Columnar)]
#[columnar(derive(Eq, PartialEq, Ord, PartialOrd))]
pub struct PointStamp<T> {
vector: SmallVec<[T; 1]>,
}
impl<T: Timestamp> PartialEq<[T]> for PointStamp<T> {
fn eq(&self, other: &[T]) -> bool {
self.vector.iter()
.zip(other.iter().chain(std::iter::repeat(&T::minimum())))
.all(|(t1, t2)| t1.eq(t2))
}
}
impl<T: Timestamp> PartialEq<PointStamp<T>> for [T] {
fn eq(&self, other: &PointStamp<T>) -> bool {
self.iter()
.zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
.all(|(t1, t2)| t1.eq(t2))
}
}
impl<T: Timestamp> PartialOrder<[T]> for PointStamp<T> {
fn less_equal(&self, other: &[T]) -> bool {
self.vector.iter()
.zip(other.iter().chain(std::iter::repeat(&T::minimum())))
.all(|(t1, t2)| t1.less_equal(t2))
}
}
impl<T: Timestamp> PartialOrder<PointStamp<T>> for [T] {
fn less_equal(&self, other: &PointStamp<T>) -> bool {
self.iter()
.zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
.all(|(t1, t2)| t1.less_equal(t2))
}
}
impl<T: Timestamp> PointStamp<T> {
pub fn new(mut vector: SmallVec<[T; 1]>) -> Self {
while vector.last() == Some(&T::minimum()) {
vector.pop();
}
PointStamp { vector }
}
pub fn into_inner(self) -> SmallVec<[T; 1]> {
self.vector
}
}
impl<T> std::ops::Deref for PointStamp<T> {
type Target = [T];
fn deref(&self) -> &Self::Target {
&self.vector
}
}
use timely::order::PartialOrder;
impl<T: PartialOrder + Timestamp> PartialOrder for PointStamp<T> {
fn less_equal(&self, other: &Self) -> bool {
self.vector
.iter()
.zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
.all(|(t1, t2)| t1.less_equal(t2))
}
}
use timely::progress::timestamp::Refines;
impl<T: Timestamp> Refines<()> for PointStamp<T> {
fn to_inner(_outer: ()) -> Self {
Self { vector: Default::default() }
}
fn to_outer(self) -> () {
()
}
fn summarize(_summary: <Self>::Summary) -> () {
()
}
}
use timely::progress::PathSummary;
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)]
pub struct PointStampSummary<TS> {
pub retain: Option<usize>,
pub actions: Vec<TS>,
}
impl<T: Timestamp> PathSummary<PointStamp<T>> for PointStampSummary<T::Summary> {
fn results_in(&self, timestamp: &PointStamp<T>) -> Option<PointStamp<T>> {
let timestamps = if let Some(retain) = self.retain {
if retain < timestamp.vector.len() {
×tamp.vector[..retain]
} else {
×tamp.vector[..]
}
} else {
×tamp.vector[..]
};
let mut vector = Vec::with_capacity(std::cmp::max(timestamps.len(), self.actions.len()));
let min_len = std::cmp::min(timestamps.len(), self.actions.len());
for (action, timestamp) in self.actions.iter().zip(timestamps.iter()) {
vector.push(action.results_in(timestamp)?);
}
for timestamp in timestamps.iter().skip(min_len) {
vector.push(timestamp.clone());
}
for action in self.actions.iter().skip(min_len) {
vector.push(action.results_in(&T::minimum())?);
}
Some(PointStamp::new(vector.into()))
}
fn followed_by(&self, other: &Self) -> Option<Self> {
let retain = match (self.retain, other.retain) {
(Some(x), Some(y)) => Some(std::cmp::min(x, y)),
(Some(x), None) => Some(x),
(None, Some(y)) => Some(y),
(None, None) => None,
};
let self_actions = if let Some(retain) = other.retain {
if retain < self.actions.len() {
&self.actions[..retain]
} else {
&self.actions[..]
}
} else {
&self.actions[..]
};
let mut actions = Vec::with_capacity(std::cmp::max(self_actions.len(), other.actions.len()));
let min_len = std::cmp::min(self_actions.len(), other.actions.len());
for (action1, action2) in self_actions.iter().zip(other.actions.iter()) {
actions.push(action1.followed_by(action2)?);
}
actions.extend(self_actions.iter().skip(min_len).cloned());
actions.extend(other.actions.iter().skip(min_len).cloned());
Some(Self { retain, actions })
}
}
impl<TS: PartialOrder> PartialOrder for PointStampSummary<TS> {
fn less_equal(&self, other: &Self) -> bool {
self.retain == other.retain
&& self.actions.len() <= other.actions.len()
&& self
.actions
.iter()
.zip(other.actions.iter())
.all(|(t1, t2)| t1.less_equal(t2))
}
}
use timely::progress::Timestamp;
impl<T: Timestamp> Timestamp for PointStamp<T> {
fn minimum() -> Self {
Self::new(Default::default())
}
type Summary = PointStampSummary<T::Summary>;
}
use crate::lattice::Lattice;
impl<T: Lattice + Timestamp + Clone> Lattice for PointStamp<T> {
#[inline(always)]
fn join(&self, other: &Self) -> Self {
let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
let max_len = ::std::cmp::max(self.vector.len(), other.vector.len());
let mut vector = SmallVec::with_capacity(max_len);
for index in 0..min_len {
vector.push(self.vector[index].join(&other.vector[index]));
}
for time in &self.vector[min_len..] {
vector.push(time.clone());
}
for time in &other.vector[min_len..] {
vector.push(time.clone());
}
Self::new(vector)
}
#[inline(always)]
fn meet(&self, other: &Self) -> Self {
let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
let mut vector = SmallVec::with_capacity(min_len);
for index in 0..min_len {
vector.push(self.vector[index].meet(&other.vector[index]));
}
Self::new(vector)
}
}
mod columnation {
use columnation::{Columnation, Region};
use smallvec::SmallVec;
use crate::dynamic::pointstamp::PointStamp;
impl<T: Columnation+Clone> Columnation for PointStamp<T> {
type InnerRegion = PointStampStack<T::InnerRegion>;
}
pub struct PointStampStack<R: Region<Item: Columnation+Clone>>(<SmallVec<[R::Item; 1]> as Columnation>::InnerRegion);
impl<R: Region<Item: Columnation+Clone>> Default for PointStampStack<R> {
#[inline]
fn default() -> Self {
Self(Default::default())
}
}
impl<R: Region<Item: Columnation+Clone>> Region for PointStampStack<R> {
type Item = PointStamp<R::Item>;
#[inline]
unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
Self::Item { vector: self.0.copy(&item.vector) }
}
fn clear(&mut self) {
self.0.clear();
}
fn reserve_items<'a, I>(&mut self, items: I) where Self: 'a, I: Iterator<Item=&'a Self::Item> + Clone {
self.0.reserve_items(items.map(|x| &x.vector));
}
fn reserve_regions<'a, I>(&mut self, regions: I) where Self: 'a, I: Iterator<Item=&'a Self> + Clone {
self.0.reserve_regions(regions.map(|r| &r.0));
}
fn heap_size(&self, callback: impl FnMut(usize, usize)) {
self.0.heap_size(callback);
}
}
}