palimpsest_dataflow/trace/wrappers/
rc.rs1use std::cell::RefCell;
15use std::rc::Rc;
16
17use timely::progress::{
18 frontier::{AntichainRef, MutableAntichain},
19 Antichain,
20};
21
22use crate::trace::TraceReader;
23
24pub struct TraceBox<Tr: TraceReader> {
30 pub logical_compaction: MutableAntichain<Tr::Time>,
32 pub physical_compaction: MutableAntichain<Tr::Time>,
34 pub trace: Tr,
36}
37
38impl<Tr: TraceReader> TraceBox<Tr> {
39 pub fn new(mut trace: Tr) -> Self {
44 let mut logical_compaction = MutableAntichain::new();
45 logical_compaction.update_iter(
46 trace
47 .get_logical_compaction()
48 .iter()
49 .cloned()
50 .map(|t| (t, 1)),
51 );
52 let mut physical_compaction = MutableAntichain::new();
53 physical_compaction.update_iter(
54 trace
55 .get_physical_compaction()
56 .iter()
57 .cloned()
58 .map(|t| (t, 1)),
59 );
60
61 TraceBox {
62 logical_compaction,
63 physical_compaction,
64 trace,
65 }
66 }
67 #[inline]
69 pub fn adjust_logical_compaction(
70 &mut self,
71 lower: AntichainRef<Tr::Time>,
72 upper: AntichainRef<Tr::Time>,
73 ) {
74 self.logical_compaction
75 .update_iter(upper.iter().cloned().map(|t| (t, 1)));
76 self.logical_compaction
77 .update_iter(lower.iter().cloned().map(|t| (t, -1)));
78 self.trace
79 .set_logical_compaction(self.logical_compaction.frontier());
80 }
81 #[inline]
83 pub fn adjust_physical_compaction(
84 &mut self,
85 lower: AntichainRef<Tr::Time>,
86 upper: AntichainRef<Tr::Time>,
87 ) {
88 self.physical_compaction
89 .update_iter(upper.iter().cloned().map(|t| (t, 1)));
90 self.physical_compaction
91 .update_iter(lower.iter().cloned().map(|t| (t, -1)));
92 self.trace
93 .set_physical_compaction(self.physical_compaction.frontier());
94 }
95}
96
97pub struct TraceRc<Tr: TraceReader> {
103 logical_compaction: Antichain<Tr::Time>,
104 physical_compaction: Antichain<Tr::Time>,
105 pub wrapper: Rc<RefCell<TraceBox<Tr>>>,
107}
108
109use crate::trace::WithLayout;
110impl<Tr: TraceReader> WithLayout for TraceRc<Tr> {
111 type Layout = Tr::Layout;
112}
113
114impl<Tr: TraceReader> TraceReader for TraceRc<Tr> {
115 type Batch = Tr::Batch;
116 type Storage = Tr::Storage;
117 type Cursor = Tr::Cursor;
118
119 fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
125 self.wrapper
126 .borrow_mut()
127 .adjust_logical_compaction(self.logical_compaction.borrow(), frontier);
128 self.logical_compaction = frontier.to_owned();
129 }
130 fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
131 self.logical_compaction.borrow()
132 }
133 fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
135 self.wrapper
136 .borrow_mut()
137 .adjust_physical_compaction(self.physical_compaction.borrow(), frontier);
138 self.physical_compaction = frontier.to_owned();
139 }
140 fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
141 self.physical_compaction.borrow()
142 }
143 fn cursor_through(
145 &mut self,
146 frontier: AntichainRef<'_, Tr::Time>,
147 ) -> Option<(Tr::Cursor, Tr::Storage)> {
148 ::std::cell::RefCell::borrow_mut(&self.wrapper)
149 .trace
150 .cursor_through(frontier)
151 }
152
153 fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) {
154 ::std::cell::RefCell::borrow(&self.wrapper)
155 .trace
156 .map_batches(f)
157 }
158}
159
160impl<Tr: TraceReader> TraceRc<Tr> {
161 pub fn make_from(trace: Tr) -> (Self, Rc<RefCell<TraceBox<Tr>>>) {
163 let wrapped = Rc::new(RefCell::new(TraceBox::new(trace)));
164
165 let handle = TraceRc {
166 logical_compaction: wrapped.borrow().logical_compaction.frontier().to_owned(),
167 physical_compaction: wrapped.borrow().physical_compaction.frontier().to_owned(),
168 wrapper: wrapped.clone(),
169 };
170
171 (handle, wrapped)
172 }
173}
174
175impl<Tr: TraceReader> Clone for TraceRc<Tr> {
176 fn clone(&self) -> Self {
177 self.wrapper
179 .borrow_mut()
180 .adjust_logical_compaction(Antichain::new().borrow(), self.logical_compaction.borrow());
181 self.wrapper.borrow_mut().adjust_physical_compaction(
182 Antichain::new().borrow(),
183 self.physical_compaction.borrow(),
184 );
185 TraceRc {
186 logical_compaction: self.logical_compaction.clone(),
187 physical_compaction: self.physical_compaction.clone(),
188 wrapper: self.wrapper.clone(),
189 }
190 }
191}
192
193impl<Tr: TraceReader> Drop for TraceRc<Tr> {
194 fn drop(&mut self) {
195 self.wrapper
196 .borrow_mut()
197 .adjust_logical_compaction(self.logical_compaction.borrow(), Antichain::new().borrow());
198 self.wrapper.borrow_mut().adjust_physical_compaction(
199 self.physical_compaction.borrow(),
200 Antichain::new().borrow(),
201 );
202 self.logical_compaction = Antichain::new();
203 self.physical_compaction = Antichain::new();
204 }
205}