differential_dataflow/trace/wrappers/
rc.rs

1//! A reference-counted wrapper sharing one owned trace.
2//!
3//! The types in this module, `TraceBox` and `TraceRc` and meant to parallel `RcBox` and `Rc` in `std::rc`.
4//!
5//! The first typee is an owned trace with some information about the cumulative requirements of the shared
6//! handles. This is roughly how much progress has each made, so we know which "read capabilities" they have
7//! collectively dropped, and when it is safe to inform the trace of such progress.
8//!
9//! The second type is a wrapper which presents as a `TraceReader`, but whose methods for advancing its read
10//! capabilities interact with the `TraceBox` rather than directly with the owned trace. Ideally, instances
11//! `TraceRc` should appear indistinguishable from the underlying trace from a reading perspective, with the
12//! exception that the trace may not compact its representation as fast as if it were exclusively owned.
13
14use std::rc::Rc;
15use std::cell::RefCell;
16
17use timely::progress::{Antichain, frontier::{AntichainRef, MutableAntichain}};
18
19use crate::lattice::Lattice;
20use crate::trace::TraceReader;
21
22/// A wrapper around a trace which tracks the frontiers of all referees.
23///
24/// This is an internal type, unlikely to be useful to higher-level programs, but exposed just in case.
25/// This type is equivalent to a `RefCell`, in that it wraps the mutable state that multiple referrers
26/// may influence.
27pub struct TraceBox<Tr>
28where
29    Tr::Time: Lattice+Ord+Clone+'static,
30    Tr: TraceReader
31{
32    /// accumulated holds on times for advancement.
33    pub logical_compaction: MutableAntichain<Tr::Time>,
34    /// accumulated holds on times for distinction.
35    pub physical_compaction: MutableAntichain<Tr::Time>,
36    /// The wrapped trace.
37    pub trace: Tr,
38}
39
40impl<Tr> TraceBox<Tr>
41where
42    Tr::Time: Lattice+Ord+Clone+'static,
43    Tr: TraceReader,
44{
45    /// Moves an existing trace into a shareable trace wrapper.
46    ///
47    /// The trace may already exist and have non-initial advance and distinguish frontiers. The boxing
48    /// process will fish these out and make sure that they are used for the initial read capabilities.
49    pub fn new(mut trace: Tr) -> Self {
50
51        let mut logical_compaction = MutableAntichain::new();
52        logical_compaction.update_iter(trace.get_logical_compaction().iter().cloned().map(|t| (t,1)));
53        let mut physical_compaction = MutableAntichain::new();
54        physical_compaction.update_iter(trace.get_physical_compaction().iter().cloned().map(|t| (t,1)));
55
56        TraceBox {
57            logical_compaction,
58            physical_compaction,
59            trace,
60        }
61    }
62    /// Replaces elements of `lower` with those of `upper`.
63    #[inline]
64    pub fn adjust_logical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
65        self.logical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
66        self.logical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
67        self.trace.set_logical_compaction(self.logical_compaction.frontier());
68    }
69    /// Replaces elements of `lower` with those of `upper`.
70    #[inline]
71    pub fn adjust_physical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
72        self.physical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
73        self.physical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
74        self.trace.set_physical_compaction(self.physical_compaction.frontier());
75    }
76}
77
78/// A handle to a shared trace.
79///
80/// As long as the handle exists, the wrapped trace should continue to exist and will not advance its
81/// timestamps past the frontier maintained by the handle. The intent is that such a handle appears as
82/// if it is a privately maintained trace, despite being backed by shared resources.
83pub struct TraceRc<Tr>
84where
85    Tr::Time: Lattice+Ord+Clone+'static,
86    Tr: TraceReader,
87{
88    logical_compaction: Antichain<Tr::Time>,
89    physical_compaction: Antichain<Tr::Time>,
90    /// Wrapped trace. Please be gentle when using.
91    pub wrapper: Rc<RefCell<TraceBox<Tr>>>,
92}
93
94impl<Tr> TraceReader for TraceRc<Tr>
95where
96    Tr::Time: Lattice+Ord+Clone+'static,
97    Tr: TraceReader,
98{
99    type Key<'a> = Tr::Key<'a>;
100    type KeyOwned = Tr::KeyOwned;
101    type Val<'a> = Tr::Val<'a>;
102    type ValOwned = Tr::ValOwned;
103    type Time = Tr::Time;
104    type Diff = Tr::Diff;
105
106    type Batch = Tr::Batch;
107    type Storage = Tr::Storage;
108    type Cursor = Tr::Cursor;
109
110    /// Sets frontier to now be elements in `frontier`.
111    ///
112    /// This change may not have immediately observable effects. It informs the shared trace that this
113    /// handle no longer requires access to times other than those in the future of `frontier`, but if
114    /// there are other handles to the same trace, it may not yet be able to compact.
115    fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
116        self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), frontier);
117        self.logical_compaction = frontier.to_owned();
118    }
119    fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.logical_compaction.borrow() }
120    /// Allows the trace to compact batches of times before `frontier`.
121    fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
122        self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), frontier);
123        self.physical_compaction = frontier.to_owned();
124    }
125    fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.physical_compaction.borrow() }
126    /// Creates a new cursor over the wrapped trace.
127    fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, Tr::Storage)> {
128        ::std::cell::RefCell::borrow_mut(&self.wrapper).trace.cursor_through(frontier)
129    }
130
131    fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) {
132        ::std::cell::RefCell::borrow(&self.wrapper).trace.map_batches(f)
133    }
134}
135
136impl<Tr> TraceRc<Tr>
137where
138    Tr::Time: Lattice+Ord+Clone+'static,
139    Tr: TraceReader,
140{
141    /// Allocates a new handle from an existing wrapped wrapper.
142    pub fn make_from(trace: Tr) -> (Self, Rc<RefCell<TraceBox<Tr>>>) {
143
144        let wrapped = Rc::new(RefCell::new(TraceBox::new(trace)));
145
146        let handle = TraceRc {
147            logical_compaction: wrapped.borrow().logical_compaction.frontier().to_owned(),
148            physical_compaction: wrapped.borrow().physical_compaction.frontier().to_owned(),
149            wrapper: wrapped.clone(),
150        };
151
152        (handle, wrapped)
153    }
154}
155
156impl<Tr> Clone for TraceRc<Tr>
157where
158    Tr::Time: Lattice+Ord+Clone,
159    Tr: TraceReader,
160{
161    fn clone(&self) -> Self {
162        // increase ref counts for this frontier
163        self.wrapper.borrow_mut().adjust_logical_compaction(Antichain::new().borrow(), self.logical_compaction.borrow());
164        self.wrapper.borrow_mut().adjust_physical_compaction(Antichain::new().borrow(), self.physical_compaction.borrow());
165        TraceRc {
166            logical_compaction: self.logical_compaction.clone(),
167            physical_compaction: self.physical_compaction.clone(),
168            wrapper: self.wrapper.clone(),
169        }
170    }
171}
172
173impl<Tr> Drop for TraceRc<Tr>
174where
175    Tr::Time: Lattice+Ord+Clone+'static,
176    Tr: TraceReader,
177{
178    fn drop(&mut self) {
179        self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), Antichain::new().borrow());
180        self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), Antichain::new().borrow());
181        self.logical_compaction = Antichain::new();
182        self.physical_compaction = Antichain::new();
183    }
184}