1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
//! A reference-counted wrapper sharing one owned trace.
//!
//! The types in this module, `TraceBox` and `TraceRc` and meant to parallel `RcBox` and `Rc` in `std::rc`.
//!
//! The first typee is an owned trace with some information about the cumulative requirements of the shared
//! handles. This is roughly how much progress has each made, so we know which "read capabilities" they have
//! collectively dropped, and when it is safe to inform the trace of such progress.
//!
//! The second type is a wrapper which presents as a `TraceReader`, but whose methods for advancing its read
//! capabilities interact with the `TraceBox` rather than directly with the owned trace. Ideally, instances
//! `TraceRc` should appear indistinguishable from the underlying trace from a reading perspective, with the
//! exception that the trace may not compact its representation as fast as if it were exclusively owned.

use std::rc::Rc;
use std::cell::RefCell;

use timely::progress::{Antichain, frontier::{AntichainRef, MutableAntichain}};

use lattice::Lattice;
use trace::TraceReader;
use trace::cursor::Cursor;

/// A wrapper around a trace which tracks the frontiers of all referees.
///
/// This is an internal type, unlikely to be useful to higher-level programs, but exposed just in case.
/// This type is equivalent to a `RefCell`, in that it wraps the mutable state that multiple referrers
/// may influence.
pub struct TraceBox<Tr>
where
    Tr::Time: Lattice+Ord+Clone+'static,
    Tr: TraceReader
{
    /// accumulated holds on times for advancement.
    pub logical_compaction: MutableAntichain<Tr::Time>,
    /// accumulated holds on times for distinction.
    pub physical_compaction: MutableAntichain<Tr::Time>,
    /// The wrapped trace.
    pub trace: Tr,
}

impl<Tr> TraceBox<Tr>
where
    Tr::Time: Lattice+Ord+Clone+'static,
    Tr: TraceReader,
{
    /// Moves an existing trace into a shareable trace wrapper.
    ///
    /// The trace may already exist and have non-initial advance and distinguish frontiers. The boxing
    /// process will fish these out and make sure that they are used for the initial read capabilities.
    pub fn new(mut trace: Tr) -> Self {

        let mut logical_compaction = MutableAntichain::new();
        logical_compaction.update_iter(trace.get_logical_compaction().iter().cloned().map(|t| (t,1)));
        let mut physical_compaction = MutableAntichain::new();
        physical_compaction.update_iter(trace.get_physical_compaction().iter().cloned().map(|t| (t,1)));

        TraceBox {
            logical_compaction,
            physical_compaction,
            trace: trace,
        }
    }
    /// Replaces elements of `lower` with those of `upper`.
    pub fn adjust_logical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
        self.logical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
        self.logical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
        self.trace.set_logical_compaction(self.logical_compaction.frontier());
    }
    /// Replaces elements of `lower` with those of `upper`.
    pub fn adjust_physical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
        self.physical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
        self.physical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
        self.trace.set_physical_compaction(self.physical_compaction.frontier());
    }
}

/// A handle to a shared trace.
///
/// As long as the handle exists, the wrapped trace should continue to exist and will not advance its
/// timestamps past the frontier maintained by the handle. The intent is that such a handle appears as
/// if it is a privately maintained trace, despite being backed by shared resources.
pub struct TraceRc<Tr>
where
    Tr::Time: Lattice+Ord+Clone+'static,
    Tr: TraceReader,
{
    logical_compaction: Antichain<Tr::Time>,
    physical_compaction: Antichain<Tr::Time>,
    /// Wrapped trace. Please be gentle when using.
    pub wrapper: Rc<RefCell<TraceBox<Tr>>>,
}

impl<Tr> TraceReader for TraceRc<Tr>
where
    Tr::Time: Lattice+Ord+Clone+'static,
    Tr: TraceReader,
{
    type Key = Tr::Key;
    type Val = Tr::Val;
    type Time = Tr::Time;
    type R = Tr::R;

    type Batch = Tr::Batch;
    type Cursor = Tr::Cursor;

    /// Sets frontier to now be elements in `frontier`.
    ///
    /// This change may not have immediately observable effects. It informs the shared trace that this
    /// handle no longer requires access to times other than those in the future of `frontier`, but if
    /// there are other handles to the same trace, it may not yet be able to compact.
    fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
        self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), frontier);
        self.logical_compaction = frontier.to_owned();
    }
    fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.logical_compaction.borrow() }
    /// Allows the trace to compact batches of times before `frontier`.
    fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
        self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), frontier);
        self.physical_compaction = frontier.to_owned();
    }
    fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.physical_compaction.borrow() }
    /// Creates a new cursor over the wrapped trace.
    fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>>::Storage)> {
        ::std::cell::RefCell::borrow_mut(&self.wrapper).trace.cursor_through(frontier)
    }

    fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) {
        ::std::cell::RefCell::borrow(&self.wrapper).trace.map_batches(f)
    }
}

impl<Tr> TraceRc<Tr>
where
    Tr::Time: Lattice+Ord+Clone+'static,
    Tr: TraceReader,
{
    /// Allocates a new handle from an existing wrapped wrapper.
    pub fn make_from(trace: Tr) -> (Self, Rc<RefCell<TraceBox<Tr>>>) {

        let wrapped = Rc::new(RefCell::new(TraceBox::new(trace)));

        let handle = TraceRc {
            logical_compaction: wrapped.borrow().logical_compaction.frontier().to_owned(),
            physical_compaction: wrapped.borrow().physical_compaction.frontier().to_owned(),
            wrapper: wrapped.clone(),
        };

        (handle, wrapped)
    }
}

impl<Tr> Clone for TraceRc<Tr>
where
    Tr::Time: Lattice+Ord+Clone,
    Tr: TraceReader,
{
    fn clone(&self) -> Self {
        // increase ref counts for this frontier
        self.wrapper.borrow_mut().adjust_logical_compaction(Antichain::new().borrow(), self.logical_compaction.borrow());
        self.wrapper.borrow_mut().adjust_physical_compaction(Antichain::new().borrow(), self.physical_compaction.borrow());
        TraceRc {
            logical_compaction: self.logical_compaction.clone(),
            physical_compaction: self.physical_compaction.clone(),
            wrapper: self.wrapper.clone(),
        }
    }
}

impl<Tr> Drop for TraceRc<Tr>
where
    Tr::Time: Lattice+Ord+Clone+'static,
    Tr: TraceReader,
{
    fn drop(&mut self) {
        self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), Antichain::new().borrow());
        self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), Antichain::new().borrow());
        self.logical_compaction = Antichain::new();
        self.physical_compaction = Antichain::new();
    }
}