Skip to main content

palimpsest_dataflow/trace/cursor/
cursor_list.rs

1//! A generic cursor implementation merging multiple cursors.
2
3use super::Cursor;
4
5/// Provides a cursor interface over a list of cursors.
6///
7/// The `CursorList` tracks the indices of cursors with the minimum key, and the the indices of cursors with
8/// the minimum key and minimum value. It performs no clever management of these sets otherwise.
9#[derive(Debug)]
10pub struct CursorList<C> {
11    cursors: Vec<C>,
12    min_key: Vec<usize>,
13    min_val: Vec<usize>,
14}
15
16impl<C: Cursor> CursorList<C> {
17    /// Creates a new cursor list from pre-existing cursors.
18    pub fn new(cursors: Vec<C>, storage: &[C::Storage]) -> Self {
19        let mut result = CursorList {
20            cursors,
21            min_key: Vec::new(),
22            min_val: Vec::new(),
23        };
24
25        result.minimize_keys(storage);
26        result
27    }
28
29    /// Initialize min_key with the indices of cursors with the minimum key.
30    ///
31    /// This method scans the current keys of each cursor, and tracks the indices
32    /// of cursors whose key equals the minimum valid key seen so far. As it goes,
33    /// if it observes an improved key it clears the current list, updates the
34    /// minimum key, and continues.
35    ///
36    /// Once finished, it invokes `minimize_vals()` to ensure the value cursor is
37    /// in a consistent state as well.
38    fn minimize_keys(&mut self, storage: &[C::Storage]) {
39        self.min_key.clear();
40
41        // We'll visit each non-`None` key, maintaining the indexes of the least keys in `self.min_key`.
42        let mut iter = self
43            .cursors
44            .iter()
45            .enumerate()
46            .flat_map(|(idx, cur)| cur.get_key(&storage[idx]).map(|key| (idx, key)));
47        if let Some((idx, key)) = iter.next() {
48            let mut min_key = key;
49            self.min_key.push(idx);
50            for (idx, key) in iter {
51                match key.cmp(&min_key) {
52                    std::cmp::Ordering::Less => {
53                        self.min_key.clear();
54                        self.min_key.push(idx);
55                        min_key = key;
56                    }
57                    std::cmp::Ordering::Equal => {
58                        self.min_key.push(idx);
59                    }
60                    std::cmp::Ordering::Greater => {}
61                }
62            }
63        }
64
65        self.minimize_vals(storage);
66    }
67
68    /// Initialize min_val with the indices of minimum key cursors with the minimum value.
69    ///
70    /// This method scans the current values of cursor with minimum keys, and tracks the
71    /// indices of cursors whose value equals the minimum valid value seen so far. As it
72    /// goes, if it observes an improved value it clears the current list, updates the minimum
73    /// value, and continues.
74    fn minimize_vals(&mut self, storage: &[C::Storage]) {
75        self.min_val.clear();
76
77        // We'll visit each non-`None` value, maintaining the indexes of the least values in `self.min_val`.
78        let mut iter = self.min_key.iter().cloned().flat_map(|idx| {
79            self.cursors[idx]
80                .get_val(&storage[idx])
81                .map(|val| (idx, val))
82        });
83        if let Some((idx, val)) = iter.next() {
84            let mut min_val = val;
85            self.min_val.push(idx);
86            for (idx, val) in iter {
87                match val.cmp(&min_val) {
88                    std::cmp::Ordering::Less => {
89                        self.min_val.clear();
90                        self.min_val.push(idx);
91                        min_val = val;
92                    }
93                    std::cmp::Ordering::Equal => {
94                        self.min_val.push(idx);
95                    }
96                    std::cmp::Ordering::Greater => {}
97                }
98            }
99        }
100    }
101}
102
103use crate::trace::implementations::WithLayout;
104impl<C: Cursor> WithLayout for CursorList<C> {
105    type Layout = C::Layout;
106}
107
108impl<C: Cursor> Cursor for CursorList<C> {
109    type Storage = Vec<C::Storage>;
110
111    // validation methods
112    #[inline]
113    fn key_valid(&self, _storage: &Vec<C::Storage>) -> bool {
114        !self.min_key.is_empty()
115    }
116    #[inline]
117    fn val_valid(&self, _storage: &Vec<C::Storage>) -> bool {
118        !self.min_val.is_empty()
119    }
120
121    // accessors
122    #[inline]
123    fn key<'a>(&self, storage: &'a Vec<C::Storage>) -> Self::Key<'a> {
124        debug_assert!(self.key_valid(storage));
125        debug_assert!(self.cursors[self.min_key[0]].key_valid(&storage[self.min_key[0]]));
126        self.cursors[self.min_key[0]].key(&storage[self.min_key[0]])
127    }
128    #[inline]
129    fn val<'a>(&self, storage: &'a Vec<C::Storage>) -> Self::Val<'a> {
130        debug_assert!(self.key_valid(storage));
131        debug_assert!(self.val_valid(storage));
132        debug_assert!(self.cursors[self.min_val[0]].val_valid(&storage[self.min_val[0]]));
133        self.cursors[self.min_val[0]].val(&storage[self.min_val[0]])
134    }
135    #[inline]
136    fn get_key<'a>(&self, storage: &'a Vec<C::Storage>) -> Option<Self::Key<'a>> {
137        self.min_key
138            .get(0)
139            .map(|idx| self.cursors[*idx].key(&storage[*idx]))
140    }
141    #[inline]
142    fn get_val<'a>(&self, storage: &'a Vec<C::Storage>) -> Option<Self::Val<'a>> {
143        self.min_val
144            .get(0)
145            .map(|idx| self.cursors[*idx].val(&storage[*idx]))
146    }
147
148    #[inline]
149    fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(
150        &mut self,
151        storage: &Vec<C::Storage>,
152        mut logic: L,
153    ) {
154        for &index in self.min_val.iter() {
155            self.cursors[index].map_times(&storage[index], |t, d| logic(t, d));
156        }
157    }
158
159    // key methods
160    #[inline]
161    fn step_key(&mut self, storage: &Vec<C::Storage>) {
162        for &index in self.min_key.iter() {
163            self.cursors[index].step_key(&storage[index]);
164        }
165        self.minimize_keys(storage);
166    }
167    #[inline]
168    fn seek_key(&mut self, storage: &Vec<C::Storage>, key: Self::Key<'_>) {
169        for (cursor, storage) in self.cursors.iter_mut().zip(storage) {
170            cursor.seek_key(storage, key);
171        }
172        self.minimize_keys(storage);
173    }
174
175    // value methods
176    #[inline]
177    fn step_val(&mut self, storage: &Vec<C::Storage>) {
178        for &index in self.min_val.iter() {
179            self.cursors[index].step_val(&storage[index]);
180        }
181        self.minimize_vals(storage);
182    }
183    #[inline]
184    fn seek_val(&mut self, storage: &Vec<C::Storage>, val: Self::Val<'_>) {
185        for (cursor, storage) in self.cursors.iter_mut().zip(storage) {
186            cursor.seek_val(storage, val);
187        }
188        self.minimize_vals(storage);
189    }
190
191    // rewinding methods
192    #[inline]
193    fn rewind_keys(&mut self, storage: &Vec<C::Storage>) {
194        for (cursor, storage) in self.cursors.iter_mut().zip(storage) {
195            cursor.rewind_keys(storage);
196        }
197        self.minimize_keys(storage);
198    }
199    #[inline]
200    fn rewind_vals(&mut self, storage: &Vec<C::Storage>) {
201        for &index in self.min_key.iter() {
202            self.cursors[index].rewind_vals(&storage[index]);
203        }
204        self.minimize_vals(storage);
205    }
206}