palimpsest_dataflow/dynamic/
mod.rs1pub mod pointstamp;
15
16use timely::dataflow::channels::pact::Pipeline;
17use timely::dataflow::operators::generic::{builder_rc::OperatorBuilder, OutputBuilder};
18use timely::dataflow::Scope;
19use timely::order::Product;
20use timely::progress::Antichain;
21use timely::progress::Timestamp;
22
23use crate::collection::AsCollection;
24use crate::difference::Semigroup;
25use crate::dynamic::pointstamp::PointStamp;
26use crate::dynamic::pointstamp::PointStampSummary;
27use crate::{Data, VecCollection};
28
29impl<G, D, R, T, TOuter> VecCollection<G, D, R>
30where
31 G: Scope<Timestamp = Product<TOuter, PointStamp<T>>>,
32 D: Data,
33 R: Semigroup + 'static,
34 T: Timestamp + Default,
35 TOuter: Timestamp,
36{
37 pub fn enter_dynamic(&self, _level: usize) -> Self {
39 (*self).clone()
40 }
41 pub fn leave_dynamic(&self, level: usize) -> Self {
43 let mut builder = OperatorBuilder::new("LeaveDynamic".to_string(), self.scope());
45 let (output, stream) = builder.new_output();
46 let mut output = OutputBuilder::from(output);
47 let mut input = builder.new_input_connection(
48 &self.inner,
49 Pipeline,
50 [(
51 0,
52 Antichain::from_elem(Product {
53 outer: Default::default(),
54 inner: PointStampSummary {
55 retain: Some(level - 1),
56 actions: Vec::new(),
57 },
58 }),
59 )],
60 );
61
62 builder.build(move |_capability| {
63 move |_frontier| {
64 let mut output = output.activate();
65 input.for_each(|cap, data| {
66 let mut new_time = cap.time().clone();
67 let mut vec = std::mem::take(&mut new_time.inner).into_vec();
68 vec.truncate(level - 1);
69 new_time.inner = PointStamp::new(vec);
70 let new_cap = cap.delayed(&new_time);
71 for (_data, time, _diff) in data.iter_mut() {
72 let mut vec = std::mem::take(&mut time.inner).into_vec();
73 vec.truncate(level - 1);
74 time.inner = PointStamp::new(vec);
75 }
76 output.session(&new_cap).give_container(data);
77 });
78 }
79 });
80
81 stream.as_collection()
82 }
83}
84
85pub fn feedback_summary<T>(level: usize, summary: T::Summary) -> PointStampSummary<T::Summary>
87where
88 T: Timestamp + Default,
89{
90 PointStampSummary {
91 retain: None,
92 actions: std::iter::repeat(Default::default())
93 .take(level - 1)
94 .chain(std::iter::once(summary))
95 .collect(),
96 }
97}