Skip to main content

palimpsest_dataflow/trace/wrappers/
rc.rs

1//! A reference-counted wrapper sharing one owned trace.
2//!
3//! The types in this module, `TraceBox` and `TraceRc` and meant to parallel `RcBox` and `Rc` in `std::rc`.
4//!
5//! The first typee is an owned trace with some information about the cumulative requirements of the shared
6//! handles. This is roughly how much progress has each made, so we know which "read capabilities" they have
7//! collectively dropped, and when it is safe to inform the trace of such progress.
8//!
9//! The second type is a wrapper which presents as a `TraceReader`, but whose methods for advancing its read
10//! capabilities interact with the `TraceBox` rather than directly with the owned trace. Ideally, instances
11//! `TraceRc` should appear indistinguishable from the underlying trace from a reading perspective, with the
12//! exception that the trace may not compact its representation as fast as if it were exclusively owned.
13
14use std::cell::RefCell;
15use std::rc::Rc;
16
17use timely::progress::{
18    frontier::{AntichainRef, MutableAntichain},
19    Antichain,
20};
21
22use crate::trace::TraceReader;
23
24/// A wrapper around a trace which tracks the frontiers of all referees.
25///
26/// This is an internal type, unlikely to be useful to higher-level programs, but exposed just in case.
27/// This type is equivalent to a `RefCell`, in that it wraps the mutable state that multiple referrers
28/// may influence.
29pub struct TraceBox<Tr: TraceReader> {
30    /// accumulated holds on times for advancement.
31    pub logical_compaction: MutableAntichain<Tr::Time>,
32    /// accumulated holds on times for distinction.
33    pub physical_compaction: MutableAntichain<Tr::Time>,
34    /// The wrapped trace.
35    pub trace: Tr,
36}
37
38impl<Tr: TraceReader> TraceBox<Tr> {
39    /// Moves an existing trace into a shareable trace wrapper.
40    ///
41    /// The trace may already exist and have non-initial advance and distinguish frontiers. The boxing
42    /// process will fish these out and make sure that they are used for the initial read capabilities.
43    pub fn new(mut trace: Tr) -> Self {
44        let mut logical_compaction = MutableAntichain::new();
45        logical_compaction.update_iter(
46            trace
47                .get_logical_compaction()
48                .iter()
49                .cloned()
50                .map(|t| (t, 1)),
51        );
52        let mut physical_compaction = MutableAntichain::new();
53        physical_compaction.update_iter(
54            trace
55                .get_physical_compaction()
56                .iter()
57                .cloned()
58                .map(|t| (t, 1)),
59        );
60
61        TraceBox {
62            logical_compaction,
63            physical_compaction,
64            trace,
65        }
66    }
67    /// Replaces elements of `lower` with those of `upper`.
68    #[inline]
69    pub fn adjust_logical_compaction(
70        &mut self,
71        lower: AntichainRef<Tr::Time>,
72        upper: AntichainRef<Tr::Time>,
73    ) {
74        self.logical_compaction
75            .update_iter(upper.iter().cloned().map(|t| (t, 1)));
76        self.logical_compaction
77            .update_iter(lower.iter().cloned().map(|t| (t, -1)));
78        self.trace
79            .set_logical_compaction(self.logical_compaction.frontier());
80    }
81    /// Replaces elements of `lower` with those of `upper`.
82    #[inline]
83    pub fn adjust_physical_compaction(
84        &mut self,
85        lower: AntichainRef<Tr::Time>,
86        upper: AntichainRef<Tr::Time>,
87    ) {
88        self.physical_compaction
89            .update_iter(upper.iter().cloned().map(|t| (t, 1)));
90        self.physical_compaction
91            .update_iter(lower.iter().cloned().map(|t| (t, -1)));
92        self.trace
93            .set_physical_compaction(self.physical_compaction.frontier());
94    }
95}
96
97/// A handle to a shared trace.
98///
99/// As long as the handle exists, the wrapped trace should continue to exist and will not advance its
100/// timestamps past the frontier maintained by the handle. The intent is that such a handle appears as
101/// if it is a privately maintained trace, despite being backed by shared resources.
102pub struct TraceRc<Tr: TraceReader> {
103    logical_compaction: Antichain<Tr::Time>,
104    physical_compaction: Antichain<Tr::Time>,
105    /// Wrapped trace. Please be gentle when using.
106    pub wrapper: Rc<RefCell<TraceBox<Tr>>>,
107}
108
109use crate::trace::WithLayout;
110impl<Tr: TraceReader> WithLayout for TraceRc<Tr> {
111    type Layout = Tr::Layout;
112}
113
114impl<Tr: TraceReader> TraceReader for TraceRc<Tr> {
115    type Batch = Tr::Batch;
116    type Storage = Tr::Storage;
117    type Cursor = Tr::Cursor;
118
119    /// Sets frontier to now be elements in `frontier`.
120    ///
121    /// This change may not have immediately observable effects. It informs the shared trace that this
122    /// handle no longer requires access to times other than those in the future of `frontier`, but if
123    /// there are other handles to the same trace, it may not yet be able to compact.
124    fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
125        self.wrapper
126            .borrow_mut()
127            .adjust_logical_compaction(self.logical_compaction.borrow(), frontier);
128        self.logical_compaction = frontier.to_owned();
129    }
130    fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
131        self.logical_compaction.borrow()
132    }
133    /// Allows the trace to compact batches of times before `frontier`.
134    fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
135        self.wrapper
136            .borrow_mut()
137            .adjust_physical_compaction(self.physical_compaction.borrow(), frontier);
138        self.physical_compaction = frontier.to_owned();
139    }
140    fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
141        self.physical_compaction.borrow()
142    }
143    /// Creates a new cursor over the wrapped trace.
144    fn cursor_through(
145        &mut self,
146        frontier: AntichainRef<'_, Tr::Time>,
147    ) -> Option<(Tr::Cursor, Tr::Storage)> {
148        ::std::cell::RefCell::borrow_mut(&self.wrapper)
149            .trace
150            .cursor_through(frontier)
151    }
152
153    fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) {
154        ::std::cell::RefCell::borrow(&self.wrapper)
155            .trace
156            .map_batches(f)
157    }
158}
159
160impl<Tr: TraceReader> TraceRc<Tr> {
161    /// Allocates a new handle from an existing wrapped wrapper.
162    pub fn make_from(trace: Tr) -> (Self, Rc<RefCell<TraceBox<Tr>>>) {
163        let wrapped = Rc::new(RefCell::new(TraceBox::new(trace)));
164
165        let handle = TraceRc {
166            logical_compaction: wrapped.borrow().logical_compaction.frontier().to_owned(),
167            physical_compaction: wrapped.borrow().physical_compaction.frontier().to_owned(),
168            wrapper: wrapped.clone(),
169        };
170
171        (handle, wrapped)
172    }
173}
174
175impl<Tr: TraceReader> Clone for TraceRc<Tr> {
176    fn clone(&self) -> Self {
177        // increase ref counts for this frontier
178        self.wrapper
179            .borrow_mut()
180            .adjust_logical_compaction(Antichain::new().borrow(), self.logical_compaction.borrow());
181        self.wrapper.borrow_mut().adjust_physical_compaction(
182            Antichain::new().borrow(),
183            self.physical_compaction.borrow(),
184        );
185        TraceRc {
186            logical_compaction: self.logical_compaction.clone(),
187            physical_compaction: self.physical_compaction.clone(),
188            wrapper: self.wrapper.clone(),
189        }
190    }
191}
192
193impl<Tr: TraceReader> Drop for TraceRc<Tr> {
194    fn drop(&mut self) {
195        self.wrapper
196            .borrow_mut()
197            .adjust_logical_compaction(self.logical_compaction.borrow(), Antichain::new().borrow());
198        self.wrapper.borrow_mut().adjust_physical_compaction(
199            self.physical_compaction.borrow(),
200            Antichain::new().borrow(),
201        );
202        self.logical_compaction = Antichain::new();
203        self.physical_compaction = Antichain::new();
204    }
205}