differential_dataflow/trace/wrappers/
rc.rs1use std::rc::Rc;
15use std::cell::RefCell;
16
17use timely::progress::{Antichain, frontier::{AntichainRef, MutableAntichain}};
18
19use crate::lattice::Lattice;
20use crate::trace::TraceReader;
21
22pub struct TraceBox<Tr>
28where
29 Tr::Time: Lattice+Ord+Clone+'static,
30 Tr: TraceReader
31{
32 pub logical_compaction: MutableAntichain<Tr::Time>,
34 pub physical_compaction: MutableAntichain<Tr::Time>,
36 pub trace: Tr,
38}
39
40impl<Tr> TraceBox<Tr>
41where
42 Tr::Time: Lattice+Ord+Clone+'static,
43 Tr: TraceReader,
44{
45 pub fn new(mut trace: Tr) -> Self {
50
51 let mut logical_compaction = MutableAntichain::new();
52 logical_compaction.update_iter(trace.get_logical_compaction().iter().cloned().map(|t| (t,1)));
53 let mut physical_compaction = MutableAntichain::new();
54 physical_compaction.update_iter(trace.get_physical_compaction().iter().cloned().map(|t| (t,1)));
55
56 TraceBox {
57 logical_compaction,
58 physical_compaction,
59 trace,
60 }
61 }
62 #[inline]
64 pub fn adjust_logical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
65 self.logical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
66 self.logical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
67 self.trace.set_logical_compaction(self.logical_compaction.frontier());
68 }
69 #[inline]
71 pub fn adjust_physical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
72 self.physical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
73 self.physical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
74 self.trace.set_physical_compaction(self.physical_compaction.frontier());
75 }
76}
77
78pub struct TraceRc<Tr>
84where
85 Tr::Time: Lattice+Ord+Clone+'static,
86 Tr: TraceReader,
87{
88 logical_compaction: Antichain<Tr::Time>,
89 physical_compaction: Antichain<Tr::Time>,
90 pub wrapper: Rc<RefCell<TraceBox<Tr>>>,
92}
93
94impl<Tr> TraceReader for TraceRc<Tr>
95where
96 Tr::Time: Lattice+Ord+Clone+'static,
97 Tr: TraceReader,
98{
99 type Key<'a> = Tr::Key<'a>;
100 type KeyOwned = Tr::KeyOwned;
101 type Val<'a> = Tr::Val<'a>;
102 type ValOwned = Tr::ValOwned;
103 type Time = Tr::Time;
104 type Diff = Tr::Diff;
105
106 type Batch = Tr::Batch;
107 type Storage = Tr::Storage;
108 type Cursor = Tr::Cursor;
109
110 fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
116 self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), frontier);
117 self.logical_compaction = frontier.to_owned();
118 }
119 fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.logical_compaction.borrow() }
120 fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
122 self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), frontier);
123 self.physical_compaction = frontier.to_owned();
124 }
125 fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.physical_compaction.borrow() }
126 fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, Tr::Storage)> {
128 ::std::cell::RefCell::borrow_mut(&self.wrapper).trace.cursor_through(frontier)
129 }
130
131 fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) {
132 ::std::cell::RefCell::borrow(&self.wrapper).trace.map_batches(f)
133 }
134}
135
136impl<Tr> TraceRc<Tr>
137where
138 Tr::Time: Lattice+Ord+Clone+'static,
139 Tr: TraceReader,
140{
141 pub fn make_from(trace: Tr) -> (Self, Rc<RefCell<TraceBox<Tr>>>) {
143
144 let wrapped = Rc::new(RefCell::new(TraceBox::new(trace)));
145
146 let handle = TraceRc {
147 logical_compaction: wrapped.borrow().logical_compaction.frontier().to_owned(),
148 physical_compaction: wrapped.borrow().physical_compaction.frontier().to_owned(),
149 wrapper: wrapped.clone(),
150 };
151
152 (handle, wrapped)
153 }
154}
155
156impl<Tr> Clone for TraceRc<Tr>
157where
158 Tr::Time: Lattice+Ord+Clone,
159 Tr: TraceReader,
160{
161 fn clone(&self) -> Self {
162 self.wrapper.borrow_mut().adjust_logical_compaction(Antichain::new().borrow(), self.logical_compaction.borrow());
164 self.wrapper.borrow_mut().adjust_physical_compaction(Antichain::new().borrow(), self.physical_compaction.borrow());
165 TraceRc {
166 logical_compaction: self.logical_compaction.clone(),
167 physical_compaction: self.physical_compaction.clone(),
168 wrapper: self.wrapper.clone(),
169 }
170 }
171}
172
173impl<Tr> Drop for TraceRc<Tr>
174where
175 Tr::Time: Lattice+Ord+Clone+'static,
176 Tr: TraceReader,
177{
178 fn drop(&mut self) {
179 self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), Antichain::new().borrow());
180 self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), Antichain::new().borrow());
181 self.logical_compaction = Antichain::new();
182 self.physical_compaction = Antichain::new();
183 }
184}