pub mod pointstamp;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::generic::{builder_rc::OperatorBuilder, OutputBuilder};
use timely::dataflow::Scope;
use timely::order::Product;
use timely::progress::Antichain;
use timely::progress::Timestamp;
use crate::collection::AsCollection;
use crate::difference::Semigroup;
use crate::dynamic::pointstamp::PointStamp;
use crate::dynamic::pointstamp::PointStampSummary;
use crate::{Data, VecCollection};
impl<G, D, R, T, TOuter> VecCollection<G, D, R>
where
G: Scope<Timestamp = Product<TOuter, PointStamp<T>>>,
D: Data,
R: Semigroup + 'static,
T: Timestamp + Default,
TOuter: Timestamp,
{
pub fn enter_dynamic(&self, _level: usize) -> Self {
(*self).clone()
}
pub fn leave_dynamic(&self, level: usize) -> Self {
let mut builder = OperatorBuilder::new("LeaveDynamic".to_string(), self.scope());
let (output, stream) = builder.new_output();
let mut output = OutputBuilder::from(output);
let mut input = builder.new_input_connection(
&self.inner,
Pipeline,
[(
0,
Antichain::from_elem(Product {
outer: Default::default(),
inner: PointStampSummary {
retain: Some(level - 1),
actions: Vec::new(),
},
}),
)],
);
builder.build(move |_capability| {
move |_frontier| {
let mut output = output.activate();
input.for_each(|cap, data| {
let mut new_time = cap.time().clone();
let mut vec = std::mem::take(&mut new_time.inner).into_vec();
vec.truncate(level - 1);
new_time.inner = PointStamp::new(vec);
let new_cap = cap.delayed(&new_time);
for (_data, time, _diff) in data.iter_mut() {
let mut vec = std::mem::take(&mut time.inner).into_vec();
vec.truncate(level - 1);
time.inner = PointStamp::new(vec);
}
output.session(&new_cap).give_container(data);
});
}
});
stream.as_collection()
}
}
pub fn feedback_summary<T>(level: usize, summary: T::Summary) -> PointStampSummary<T::Summary>
where
T: Timestamp + Default,
{
PointStampSummary {
retain: None,
actions: std::iter::repeat(Default::default())
.take(level - 1)
.chain(std::iter::once(summary))
.collect(),
}
}