Skip to main content

diamond_types_extended/
frontier.rs

1use std::borrow::Borrow;
2use std::fmt::Debug;
3use std::ops::{Index, IndexMut};
4
5use serde::{Deserialize, Serialize};
6use smallvec::{SmallVec, smallvec};
7
8use crate::causalgraph::graph::Graph;
9use crate::dtrange::DTRange;
10use crate::LV;
11
12/// A `LocalFrontier` is a set of local Time values which point at the set of changes with no
13/// children at this point in time. When there's a single writer this will always just be the last
14/// local version we've seen.
15///
16/// The start of time is named with an empty list.
17///
18/// A frontier must always remain sorted (in numerical order). Note: This is not checked when
19/// deserializing via serde!
20#[derive(Debug, Clone, Eq, PartialEq)]
21#[derive(Serialize, Deserialize)]
22#[serde(transparent)]
23pub struct Frontier(pub SmallVec<LV, 2>);
24
25pub type FrontierRef<'a> = &'a [LV];
26
27impl AsRef<[LV]> for Frontier {
28    fn as_ref(&self) -> &[LV] {
29        self.0.as_slice()
30    }
31}
32
33impl<'a> From<FrontierRef<'a>> for Frontier {
34    fn from(f: FrontierRef<'a>) -> Self {
35        // This is a bit dangerous - but we still verify that the data is sorted in debug mode...
36        Frontier::from_sorted(f)
37    }
38}
39
40impl From<SmallVec<LV, 2>> for Frontier {
41    fn from(f: SmallVec<LV, 2>) -> Self {
42        debug_assert_sorted(f.as_slice());
43        Frontier(f)
44    }
45}
46
47impl From<LV> for Frontier {
48    fn from(v: LV) -> Self {
49        Frontier::new_1(v)
50    }
51}
52
53impl Default for Frontier {
54    fn default() -> Self {
55        Self::root()
56    }
57}
58
59impl Index<usize> for Frontier {
60    type Output = LV;
61
62    fn index(&self, index: usize) -> &Self::Output {
63        self.0.index(index)
64    }
65}
66
67impl IndexMut<usize> for Frontier {
68    fn index_mut(&mut self, index: usize) -> &mut Self::Output {
69        self.0.index_mut(index)
70    }
71}
72
73// Helper method. Not sure where to put this.
74pub(crate) fn is_sorted_iter<const EXPECT_UNIQ: bool, V: Ord + Eq + Debug, I: Iterator<Item = V>>(mut iter: I) -> bool {
75    let Some(mut last) = iter.next() else { return true; };
76
77    for i in iter {
78        if EXPECT_UNIQ {
79            debug_assert_ne!(i, last);
80        }
81        if i <= last { return false; }
82        last = i;
83    }
84
85    true
86}
87pub(crate) fn is_sorted_iter_uniq<V: Ord + Eq + Debug, I: Iterator<Item = V>>(iter: I) -> bool {
88    is_sorted_iter::<true, V, I>(iter)
89}
90
91pub(crate) fn is_sorted_slice<const EXPECT_UNIQ: bool, V: Ord + Eq + Debug + Copy>(slice: &[V]) -> bool {
92    if slice.len() >= 2 {
93        let mut last = slice[0];
94        for t in &slice[1..] {
95            if EXPECT_UNIQ {
96                debug_assert!(*t != last);
97            }
98            if last > *t || (EXPECT_UNIQ && last == *t) { return false; }
99            last = *t;
100        }
101    }
102    true
103}
104
105pub(crate) fn frontier_is_sorted(f: FrontierRef) -> bool {
106    // is_sorted_iter(f.iter().copied())
107    is_sorted_slice::<true, _>(f)
108}
109
110pub(crate) fn debug_assert_sorted(frontier: FrontierRef) {
111    debug_assert!(frontier_is_sorted(frontier));
112}
113
114pub(crate) fn sort_frontier<const N: usize>(v: &mut SmallVec<LV, N>) {
115    if !frontier_is_sorted(v.as_slice()) {
116        v.sort_unstable();
117    }
118}
119
120impl IntoIterator for Frontier {
121    type Item = LV;
122    type IntoIter = <SmallVec<LV, 2> as IntoIterator>::IntoIter;
123
124    fn into_iter(self) -> Self::IntoIter {
125        self.0.into_iter()
126    }
127}
128
129impl FromIterator<LV> for Frontier {
130    fn from_iter<T: IntoIterator<Item=LV>>(iter: T) -> Self {
131        Frontier::from_unsorted_iter(iter.into_iter())
132    }
133}
134
135impl Frontier {
136    pub fn root() -> Self {
137        Self(smallvec![])
138    }
139
140    pub fn new_1(v: LV) -> Self {
141        Self(smallvec![v])
142    }
143
144    pub fn from_unsorted(data: &[LV]) -> Self {
145        let mut arr: SmallVec<LV, 2> = data.into();
146        sort_frontier(&mut arr);
147        Self(arr)
148    }
149
150    pub fn from_unsorted_iter<I: Iterator<Item=LV>>(iter: I) -> Self {
151        let mut arr: SmallVec<LV, 2> = iter.collect();
152        sort_frontier(&mut arr);
153        Self(arr)
154    }
155
156    pub fn from_sorted(data: &[LV]) -> Self {
157        debug_assert_sorted(data);
158        // SmallVec apparently generates worse code using From (/Into) vs using a specialized
159        // from_slice() method. However, the code for From is emitted anyway because (I think) its
160        // used any time a vec is .collect-ed.
161        //
162        // As a result, swapping to from_slice here increases code size and it doesn't seem to make
163        // any actual performance difference. It would be better - but I'm leaving it alone for now.
164        // Self(SmallVec::from_slice(data))
165        Self(data.into())
166    }
167
168    /// Frontiers should always be sorted smallest to largest.
169    pub fn len(&self) -> usize {
170        self.0.len()
171    }
172
173    pub fn is_root(&self) -> bool {
174        self.0.is_empty()
175    }
176    pub fn is_empty(&self) -> bool {
177        self.0.is_empty()
178    }
179
180    pub fn iter(&self) -> std::slice::Iter<'_, usize> {
181        self.0.iter()
182    }
183
184    pub fn try_get_single_entry(&self) -> Option<LV> {
185        if self.len() == 1 { Some(self.0[0]) }
186        else { None }
187    }
188
189    pub fn try_get_single_entry_mut(&mut self) -> Option<&mut LV> {
190        if self.len() == 1 { Some(&mut self.0[0]) }
191        else { None }
192    }
193
194    pub fn replace(&mut self, with: FrontierRef) {
195        // TODO: Is this faster than *self = with.into(); ?
196        self.0.resize(with.len(), 0);
197        self.0.copy_from_slice(with);
198    }
199
200    pub fn debug_check_sorted(&self) {
201        debug_assert_sorted(self.0.borrow());
202    }
203
204    /// Advance a frontier by the set of time spans in range
205    pub fn advance(&mut self, graph: &Graph, mut range: DTRange) {
206        if range.is_empty() { return; }
207
208        // This is a little crass. Might be nicer to use a &T iterator in RLEVec.
209        let txn_idx = graph.entries.find_index(range.start).unwrap();
210
211        for txn in &graph.entries[txn_idx..] {
212            debug_assert!(txn.contains(range.start));
213
214            let end = txn.span.end.min(range.end);
215            txn.with_parents(range.start, |parents| {
216                self.advance_by_known_run(parents, (range.start..end).into());
217            });
218
219            if end >= range.end { break; }
220            range.start = end;
221        }
222    }
223
224    /// Just like advance_by_known_run, the range MUST be in a single transaction in the graph.
225    pub fn advance_sparse_known_run(&mut self, graph: &Graph, parents: &[LV], range: DTRange) {
226        // Could copy the other cases from advance_by_known_run... eh.
227        if self.as_ref() == parents {
228            // Fastest path. We're just extending the span.
229            self.replace_with_1(range.last());
230        } else {
231            // We'll probably still replace the version with range.last(), but there's some edge
232            // cases for find_dominators to figure out.
233            self.merge_union(&[range.last()], graph);
234            // self.0 = graph.find_dominators_2(self.as_ref(), &[range.last()]).0;
235        }
236    }
237
238    /// advance_sparse is used for "sparse" causal graphs, which contain versions for other CRDTs
239    /// and things. In this case, range might not directly follow the current frontier.
240    ///
241    /// I think this function is equivalent to finding the dominators of self + all txns in range.
242    pub fn advance_sparse(&mut self, graph: &Graph, range: DTRange) {
243        let txn_idx = graph.entries.find_index(range.start).unwrap();
244        let first_txn = &graph.entries[txn_idx];
245        if range.end <= first_txn.span.end {
246            // Fast path. There's just one transaction to consider.
247            first_txn.with_parents(range.start, |parents| {
248                self.advance_sparse_known_run(graph, parents, range);
249            })
250        } else {
251            // This is a lot more complicated than I'd like, but I think its the fastest approach
252            // here. We'll make a frontier from the transactions within the range, then merge that
253            // with the current frontier.
254            let mut f2 = Frontier::root();
255            f2.advance(graph, range); // This is a bit cheeky, but the result should be correct.
256            // And merge that together. This will usually just return f2.
257            self.merge_union(f2.as_ref(), graph);
258            // self.0 = graph.find_dominators_2(self.as_ref(), f2.as_ref()).0;
259        }
260    }
261
262    /// Advance branch frontier by a transaction.
263    ///
264    /// This is ONLY VALID if the range is entirely within a txn.
265    pub fn advance_by_known_run(&mut self, parents: &[LV], span: DTRange) {
266        // TODO: Check the branch contains everything in txn_parents, but not txn_id:
267        // Check the operation fits. The operation should not be in the branch, but
268        // all the operation's parents should be.
269        // From braid-kernel:
270        // assert(!branchContainsVersion(db, order, branch), 'db already contains version')
271        // for (const parent of op.parents) {
272        //    assert(branchContainsVersion(db, parent, branch), 'operation in the future')
273        // }
274
275        if parents.len() == 1 && self.0.len() == 1 && parents[0] == self.0[0] {
276            // Short circuit the common case where time is just advancing linearly.
277            self.0[0] = span.last();
278        } else if self.0.as_slice() == parents {
279            self.replace_with_1(span.last());
280        } else {
281            assert!(!self.0.contains(&span.start)); // Remove this when branch_contains_version works.
282            debug_assert_sorted(self.0.as_slice());
283
284            self.0.retain(|o| !parents.contains(o)); // Usually removes all elements.
285
286            // In order to maintain the order of items in the branch, we want to insert the new item
287            // in the appropriate place. This will almost always do self.0.push(), but when changes
288            // are concurrent that won't be correct. (Do it and run the tests if you don't believe
289            // me).
290            // TODO: Check if its faster to try and append it to the end first.
291            self.insert_nonoverlapping(span.last());
292        }
293    }
294
295    /// Replaces self with dominators(self, other).
296    pub fn merge_union(&mut self, other: &[LV], graph: &Graph) {
297        if !other.is_empty()
298            && other != self.as_ref()
299            && (other.len() != 1 || !graph.frontier_contains_version(self.as_ref(), other[0]))
300        {
301            self.0 = graph.find_dominators_2(self.as_ref(), other).0;
302        }
303    }
304
305    pub fn retreat(&mut self, graph: &Graph, mut range: DTRange) {
306        if range.is_empty() { return; }
307
308        self.debug_check_sorted();
309
310        let mut txn_idx = graph.entries.find_index(range.last()).unwrap();
311        loop {
312            let last_order = range.last();
313            let txn = &graph.entries[txn_idx];
314            // debug_assert_eq!(txn_idx, history.0.find_index(range.last()).unwrap());
315            debug_assert_eq!(txn, graph.entries.find(last_order).unwrap());
316            // let mut idx = frontier.iter().position(|&e| e == last_order).unwrap();
317
318            if self.len() == 1 {
319                // Fast case. Just replace frontier's contents with parents.
320                if range.start > txn.span.start {
321                    self[0] = range.start - 1;
322                    break;
323                } else {
324                    // self.0 = txn.parents.as_ref().into();
325                    *self = txn.parents.clone()
326                }
327            } else {
328                // Remove the old item from frontier and only reinsert parents when they aren't included
329                // in the transitive history from this point.
330                self.0.retain(|t| *t != last_order);
331
332                txn.with_parents(range.start, |parents| {
333                    for parent in parents {
334                        // TODO: This is pretty inefficient. We're calling frontier_contains_time in a
335                        // loop and each call to frontier_contains_time does a call to history.find() in
336                        // turn for each item in branch.
337                        debug_assert!(!self.is_root());
338                        // TODO: At least check shadow directly.
339                        if !graph.frontier_contains_version(self.as_ref(), *parent) {
340                            self.insert_nonoverlapping(*parent);
341                        }
342                    }
343                });
344            }
345
346            if range.start >= txn.span.start {
347                break;
348            }
349
350            // Otherwise keep scanning down through the txns.
351            range.end = txn.span.start;
352            txn_idx -= 1;
353        }
354        if cfg!(debug_assertions) { self.check(graph); }
355        self.debug_check_sorted();
356    }
357
358    fn insert_nonoverlapping(&mut self, new_item: LV) {
359        // In order to maintain the order of items in the branch, we want to insert the new item in the
360        // appropriate place.
361
362        // Binary search might actually be slower here than a linear scan.
363        let new_idx = self.0.binary_search(&new_item).unwrap_err();
364        self.0.insert(new_idx, new_item);
365
366        // match self.0.last() {
367        //     Some(v) if *v < new_item => { self.0.push(new_item); }
368        //     None => { self.0.push(new_item); }
369        //     _ => {
370        //         let new_idx = self.0.binary_search(&new_item).unwrap_err();
371        //         self.0.insert(new_idx, new_item);
372        //     }
373        // }
374
375        self.debug_check_sorted();
376    }
377
378    pub fn insert(&mut self, new_item: LV) {
379        // And we're returning in the Ok() case here because it means the item is already in the
380        // frontier.
381        let Err(new_idx) = self.0.binary_search(&new_item) else { return; };
382        self.0.insert(new_idx, new_item);
383        self.debug_check_sorted();
384    }
385
386    pub(crate) fn check(&self, parents: &Graph) {
387        assert!(frontier_is_sorted(&self.0));
388        if self.len() >= 2 {
389            let dominators = parents.find_dominators(&self.0);
390            assert_eq!(&dominators, self);
391            // let mut self = self.iter().copied().collect::<Vec<_>>();
392            // let mut self = self.0.to_vec();
393            // for i in 0..self.len() {
394            //     let removed = self.remove(i);
395            //     assert!(!history.version_contains_time(&self, removed));
396            //     self.insert(i, removed);
397            // }
398        }
399    }
400
401    pub fn replace_with_1(&mut self, new_val: LV) {
402        // I could truncate / etc, but this is faster in benchmarks.
403        // replace(&mut self.0, smallvec::smallvec![new_val]);
404        self.0 = smallvec::smallvec![new_val];
405    }
406}
407
408pub fn local_frontier_eq<A: AsRef<[LV]> + ?Sized, B: AsRef<[LV]> + ?Sized>(a: &A, b: &B) -> bool {
409    // Almost all branches only have one element in them.
410    debug_assert_sorted(a.as_ref());
411    debug_assert_sorted(b.as_ref());
412    a.as_ref() == b.as_ref()
413}
414
415#[allow(unused)]
416pub fn local_frontier_is_root(branch: &[LV]) -> bool {
417    branch.is_empty()
418}
419
420
421// // This walks both frontiers and finds how the frontier has changed. There's probably a better way
422// // to implement this.
423// struct FrontierDiff<'a> {
424//     a: &'a [LV],
425//     b: &'a [LV],
426// }
427// 
428// pub(crate) fn diff_frontier_entries<'a>(a: &'a [LV], b: &'a [LV]) -> impl Iterator<Item = (DiffFlag, LV)> + 'a {
429//     FrontierDiff { a, b }
430// }
431// 
432// 
433// fn slice_take_first(slice: &mut &[LV]) -> Option<LV> {
434//     if let [first, tail @ ..] = slice {
435//         *slice = tail;
436//         Some(*first)
437//     } else { None }
438// }
439// 
440// impl<'a> Iterator for FrontierDiff<'a> {
441//     type Item = (DiffFlag, LV);
442// 
443//     fn next(&mut self) -> Option<Self::Item> {
444//         match (self.a.split_first(), self.b.split_first()) {
445//             (None, None) => None,
446//             (Some((a, rest)), None) => {
447//                 self.a = rest;
448//                 Some((DiffFlag::OnlyA, *a))
449//             },
450//             (None, Some((b, rest))) => {
451//                 self.b = rest;
452//                 Some((DiffFlag::OnlyB, *b))
453//             },
454//             (Some((a, a_rest)), Some((b, b_rest))) => {
455//                 match a.cmp(b) {
456//                     Ordering::Equal => {
457//                         // Take from both.
458//                         self.a = a_rest;
459//                         self.b = b_rest;
460//                         Some((DiffFlag::Shared, *a))
461//                     }
462//                     Ordering::Less => {
463//                         // Take from a.
464//                         self.a = a_rest;
465//                         Some((DiffFlag::OnlyA, *a))
466//                     }
467//                     Ordering::Greater => {
468//                         // Take from b.
469//                         self.b = b_rest;
470//                         Some((DiffFlag::OnlyB, *a))
471//                     }
472//                 }
473//             }
474//         }
475//     }
476// }
477
478/// This method clones a version or parents vector. Its slightly faster and smaller than just
479/// calling v.clone() directly.
480#[inline]
481pub fn clone_smallvec<T, const LEN: usize>(v: &SmallVec<T, LEN>) -> SmallVec<T, LEN> where T: Clone + Copy {
482    // This is now smaller again as of rust 1.60. Looks like the problem was fixed.
483    v.clone()
484
485    // if v.spilled() { // Unlikely. If only there was a stable rust intrinsic for this..
486    //     v.clone()
487    // } else {
488    //     unsafe {
489    //         // We only need to copy v.len() items, because LEN is small (2, usually) its actually
490    //         // faster & less code to just copy the bytes in all cases rather than branch.
491    //         // let mut arr: MaybeUninit<[T; LEN]> = MaybeUninit::uninit();
492    //         // std::ptr::copy_nonoverlapping(v.as_ptr(), arr.as_mut_ptr().cast(), LEN);
493    //         // SmallVec::from_buf_and_len_unchecked(arr, v.len())
494    //
495    //         let mut result: MaybeUninit<SmallVec<T, LEN>> = MaybeUninit::uninit();
496    //         std::ptr::copy_nonoverlapping(v, result.as_mut_ptr(), 1);
497    //         result.assume_init()
498    //     }
499    // }
500}
501
502#[cfg(test)]
503mod test {
504    use crate::causalgraph::graph::GraphEntrySimple;
505    use crate::Frontier;
506
507    use super::*;
508
509    #[test]
510    fn frontier_movement_smoke_tests() {
511        let mut branch: Frontier = Frontier::root();
512        branch.advance_by_known_run(&[], (0..10).into());
513        assert_eq!(branch.as_ref(), &[9]);
514
515        let graph = Graph::from_simple_items(&[
516            GraphEntrySimple { span: (0..10).into(), parents: Frontier::root() }
517        ]);
518        graph.dbg_check(true);
519
520        branch.retreat(&graph, (5..10).into());
521        assert_eq!(branch.as_ref(), &[4]);
522
523        branch.retreat(&graph, (0..5).into());
524        assert!(branch.is_root());
525    }
526
527    #[test]
528    fn frontier_stays_sorted() {
529        let graph = Graph::from_simple_items(&[
530            GraphEntrySimple { span: (0..2).into(), parents: Frontier::root() },
531            GraphEntrySimple { span: (2..6).into(), parents: Frontier::new_1(0) },
532            GraphEntrySimple { span: (6..50).into(), parents: Frontier::new_1(0) },
533        ]);
534        graph.dbg_check(true);
535
536        let mut branch: Frontier = Frontier::from_sorted(&[1, 10]);
537        branch.advance(&graph, (2..4).into());
538        assert_eq!(branch.as_ref(), &[1, 3, 10]);
539
540        branch.advance(&graph, (11..12).into());
541        assert_eq!(branch.as_ref(), &[1, 3, 11]);
542
543        branch.retreat(&graph, (2..4).into());
544        assert_eq!(branch.as_ref(), &[1, 11]);
545
546        branch.retreat(&graph, (11..12).into());
547        assert_eq!(branch.as_ref(), &[1, 10]);
548    }
549
550    #[test]
551    fn advance_sparse() {
552        let graph = Graph::from_simple_items(&[
553            GraphEntrySimple { span: (0..10).into(), parents: Frontier::root() },
554            GraphEntrySimple { span: (10..20).into(), parents: Frontier::new_1(5) },
555            // GraphEntrySimple { span: (6..50).into(), parents: Frontier::new_1(0) },
556        ]);
557        graph.dbg_check(true);
558
559        // This isn't thorough, but should be good enough.
560        let mut f = Frontier::root();
561        f.advance_sparse(&graph, (0..5).into());
562        // Should only include subgraph items
563        assert_eq!(f.as_ref(), &[4]);
564
565        f.advance_sparse(&graph, (7..8).into());
566        assert_eq!(f.as_ref(), &[7]);
567
568        f.advance_sparse(&graph, (9..15).into());
569        assert_eq!(f.as_ref(), &[9, 14]);
570    }
571
572    #[test]
573    fn advance_empty_by_known_run() {
574        // Regression.
575        // let graph = Graph::from_entries(&[
576        //     GraphEntrySimple { span: (0..10).into(), parents: Frontier::root(), },
577        //  ];
578
579        let mut f = Frontier::root();
580        f.insert_nonoverlapping(4);
581        assert_eq!(f.as_ref(), &[4]);
582    }
583}