Skip to main content

differential_dataflow/trace/cursor/
mod.rs

1//! Traits and types for navigating order sequences of update tuples.
2//!
3//! The `Cursor` trait contains several methods for efficiently navigating ordered collections
4//! of tuples of the form `(key, val, time, diff)`. The cursor is different from an iterator
5//! both because it allows navigation on multiple levels (key and val), but also because it
6//! supports efficient seeking (via the `seek_key` and `seek_val` methods).
7
8pub mod cursor_list;
9
10pub use self::cursor_list::CursorList;
11
12use crate::trace::implementations::LayoutExt;
13
14/// A cursor for navigating ordered `(key, val, time, diff)` updates.
15pub trait Cursor : LayoutExt {
16
17    /// Storage required by the cursor.
18    type Storage;
19
20    /// Indicates if the current key is valid.
21    ///
22    /// A value of `false` indicates that the cursor has exhausted all keys.
23    fn key_valid(&self, storage: &Self::Storage) -> bool;
24    /// Indicates if the current value is valid.
25    ///
26    /// A value of `false` indicates that the cursor has exhausted all values for this key.
27    fn val_valid(&self, storage: &Self::Storage) -> bool;
28
29    /// A reference to the current key. Asserts if invalid.
30    fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a>;
31    /// A reference to the current value. Asserts if invalid.
32    fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a>;
33
34    /// Returns a reference to the current key, if valid.
35    fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>>;
36    /// Returns a reference to the current value, if valid.
37    fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>>;
38
39    /// Applies `logic` to each pair of time and difference. Intended for mutation of the
40    /// closure's scope.
41    fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L);
42
43    /// Advances the cursor to the next key.
44    fn step_key(&mut self, storage: &Self::Storage);
45    /// Advances the cursor to the specified key.
46    fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>);
47
48    /// Advances the cursor to the next value.
49    fn step_val(&mut self, storage: &Self::Storage);
50    /// Advances the cursor to the specified value.
51    fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>);
52
53    /// Rewinds the cursor to the first key.
54    fn rewind_keys(&mut self, storage: &Self::Storage);
55    /// Rewinds the cursor to the first value for current key.
56    fn rewind_vals(&mut self, storage: &Self::Storage);
57
58    /// Loads `target` with all updates associated with the supplied `key`.
59    ///
60    /// First `target` is cleared, and then if we find `key` we populated it with each of its `(val, time, diff)` updates.
61    /// If `meet` is supplied, the time is joined with each `time` in the updates, to advance the times before consolidation.
62    fn populate_key<'a>(&mut self, storage: &'a Self::Storage, key: Self::Key<'a>, meet: Option<&Self::Time>, target: &mut crate::operators::EditList<Self::Val<'a>, Self::Time, Self::Diff>) {
63        target.clear();
64        self.seek_key(storage, key);
65        if self.get_key(storage) == Some(key) {
66            self.rewind_vals(storage);
67            while let Some(val) = self.get_val(storage) {
68                self.map_times(storage, |time, diff| {
69                    use crate::lattice::Lattice;
70                    let mut time = Self::owned_time(time);
71                    if let Some(meet) = meet { time.join_assign(meet); }
72                    target.push(time, Self::owned_diff(diff))
73                });
74                target.seal(val);
75                self.step_val(storage);
76            }
77        }
78    }
79
80    /// Rewinds the cursor and outputs its contents to a Vec
81    fn to_vec<K, IK, V, IV>(&mut self, storage: &Self::Storage, into_key: IK, into_val: IV) -> Vec<((K, V), Vec<(Self::Time, Self::Diff)>)>
82    where
83        IK: for<'a> Fn(Self::Key<'a>) -> K,
84        IV: for<'a> Fn(Self::Val<'a>) -> V,
85    {
86        let mut out = Vec::new();
87        self.rewind_keys(storage);
88        while let Some(key) = self.get_key(storage) {
89            self.rewind_vals(storage);
90            while let Some(val) = self.get_val(storage) {
91                let mut kv_out = Vec::new();
92                self.map_times(storage, |ts, r| {
93                    kv_out.push((Self::owned_time(ts), Self::owned_diff(r)));
94                });
95                out.push(((into_key(key), into_val(val)), kv_out));
96                self.step_val(storage);
97            }
98            self.step_key(storage);
99        }
100        out
101    }
102}