differential_dataflow/dynamic/
mod.rs1pub mod pointstamp;
15
16use timely::dataflow::Scope;
17use timely::order::Product;
18use timely::progress::Timestamp;
19use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
20use timely::dataflow::channels::pact::Pipeline;
21use timely::progress::Antichain;
22
23use crate::difference::Semigroup;
24use crate::{Collection, Data};
25use crate::collection::AsCollection;
26use crate::dynamic::pointstamp::PointStamp;
27use crate::dynamic::pointstamp::PointStampSummary;
28
29impl<G, D, R, T, TOuter> Collection<G, D, R>
30where
31 G: Scope<Timestamp = Product<TOuter, PointStamp<T>>>,
32 D: Data,
33 R: Semigroup,
34 T: Timestamp+Default,
35 TOuter: Timestamp,
36{
37 pub fn enter_dynamic(&self, _level: usize) -> Self {
39 (*self).clone()
40 }
41 pub fn leave_dynamic(&self, level: usize) -> Self {
43 let mut builder = OperatorBuilder::new("LeaveDynamic".to_string(), self.scope());
45 let (mut output, stream) = builder.new_output();
46 let mut input = builder.new_input_connection(&self.inner, Pipeline, vec![Antichain::from_elem(Product { outer: Default::default(), inner: PointStampSummary { retain: Some(level - 1), actions: Vec::new() } })]);
47
48 let mut vector = Default::default();
49 builder.build(move |_capability| move |_frontier| {
50 let mut output = output.activate();
51 input.for_each(|cap, data| {
52 data.swap(&mut vector);
53 let mut new_time = cap.time().clone();
54 new_time.inner.vector.truncate(level - 1);
55 let new_cap = cap.delayed(&new_time);
56 for (_data, time, _diff) in vector.iter_mut() {
57 time.inner.vector.truncate(level - 1);
58 }
59 output.session(&new_cap).give_vec(&mut vector);
60 });
61 });
62
63 stream.as_collection()
64 }
65}
66
67pub fn feedback_summary<T>(level: usize, summary: T::Summary) -> PointStampSummary<T::Summary>
69where
70 T: Timestamp+Default,
71{
72 PointStampSummary {
73 retain: None,
74 actions: std::iter::repeat(Default::default()).take(level-1).chain(std::iter::once(summary)).collect(),
75 }
76}