diamond_types_extended/frontier.rs
1use std::borrow::Borrow;
2use std::fmt::Debug;
3use std::ops::{Index, IndexMut};
4
5use serde::{Deserialize, Serialize};
6use smallvec::{SmallVec, smallvec};
7
8use crate::causalgraph::graph::Graph;
9use crate::dtrange::DTRange;
10use crate::LV;
11
12/// A `LocalFrontier` is a set of local Time values which point at the set of changes with no
13/// children at this point in time. When there's a single writer this will always just be the last
14/// local version we've seen.
15///
16/// The start of time is named with an empty list.
17///
18/// A frontier must always remain sorted (in numerical order). Note: This is not checked when
19/// deserializing via serde!
20#[derive(Debug, Clone, Eq, PartialEq)]
21#[derive(Serialize, Deserialize)]
22#[serde(transparent)]
23pub struct Frontier(pub SmallVec<LV, 2>);
24
25pub type FrontierRef<'a> = &'a [LV];
26
27impl AsRef<[LV]> for Frontier {
28 fn as_ref(&self) -> &[LV] {
29 self.0.as_slice()
30 }
31}
32
33impl<'a> From<FrontierRef<'a>> for Frontier {
34 fn from(f: FrontierRef<'a>) -> Self {
35 // This is a bit dangerous - but we still verify that the data is sorted in debug mode...
36 Frontier::from_sorted(f)
37 }
38}
39
40impl From<SmallVec<LV, 2>> for Frontier {
41 fn from(f: SmallVec<LV, 2>) -> Self {
42 debug_assert_sorted(f.as_slice());
43 Frontier(f)
44 }
45}
46
47impl From<LV> for Frontier {
48 fn from(v: LV) -> Self {
49 Frontier::new_1(v)
50 }
51}
52
53impl Default for Frontier {
54 fn default() -> Self {
55 Self::root()
56 }
57}
58
59impl Index<usize> for Frontier {
60 type Output = LV;
61
62 fn index(&self, index: usize) -> &Self::Output {
63 self.0.index(index)
64 }
65}
66
67impl IndexMut<usize> for Frontier {
68 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
69 self.0.index_mut(index)
70 }
71}
72
73// Helper method. Not sure where to put this.
74pub(crate) fn is_sorted_iter<const EXPECT_UNIQ: bool, V: Ord + Eq + Debug, I: Iterator<Item = V>>(mut iter: I) -> bool {
75 let Some(mut last) = iter.next() else { return true; };
76
77 for i in iter {
78 if EXPECT_UNIQ {
79 debug_assert_ne!(i, last);
80 }
81 if i <= last { return false; }
82 last = i;
83 }
84
85 true
86}
87pub(crate) fn is_sorted_iter_uniq<V: Ord + Eq + Debug, I: Iterator<Item = V>>(iter: I) -> bool {
88 is_sorted_iter::<true, V, I>(iter)
89}
90
91pub(crate) fn is_sorted_slice<const EXPECT_UNIQ: bool, V: Ord + Eq + Debug + Copy>(slice: &[V]) -> bool {
92 if slice.len() >= 2 {
93 let mut last = slice[0];
94 for t in &slice[1..] {
95 if EXPECT_UNIQ {
96 debug_assert!(*t != last);
97 }
98 if last > *t || (EXPECT_UNIQ && last == *t) { return false; }
99 last = *t;
100 }
101 }
102 true
103}
104
105pub(crate) fn frontier_is_sorted(f: FrontierRef) -> bool {
106 // is_sorted_iter(f.iter().copied())
107 is_sorted_slice::<true, _>(f)
108}
109
110pub(crate) fn debug_assert_sorted(frontier: FrontierRef) {
111 debug_assert!(frontier_is_sorted(frontier));
112}
113
114pub(crate) fn sort_frontier<const N: usize>(v: &mut SmallVec<LV, N>) {
115 if !frontier_is_sorted(v.as_slice()) {
116 v.sort_unstable();
117 }
118}
119
120impl IntoIterator for Frontier {
121 type Item = LV;
122 type IntoIter = <SmallVec<LV, 2> as IntoIterator>::IntoIter;
123
124 fn into_iter(self) -> Self::IntoIter {
125 self.0.into_iter()
126 }
127}
128
129impl FromIterator<LV> for Frontier {
130 fn from_iter<T: IntoIterator<Item=LV>>(iter: T) -> Self {
131 Frontier::from_unsorted_iter(iter.into_iter())
132 }
133}
134
135impl Frontier {
136 pub fn root() -> Self {
137 Self(smallvec![])
138 }
139
140 pub fn new_1(v: LV) -> Self {
141 Self(smallvec![v])
142 }
143
144 pub fn from_unsorted(data: &[LV]) -> Self {
145 let mut arr: SmallVec<LV, 2> = data.into();
146 sort_frontier(&mut arr);
147 Self(arr)
148 }
149
150 pub fn from_unsorted_iter<I: Iterator<Item=LV>>(iter: I) -> Self {
151 let mut arr: SmallVec<LV, 2> = iter.collect();
152 sort_frontier(&mut arr);
153 Self(arr)
154 }
155
156 pub fn from_sorted(data: &[LV]) -> Self {
157 debug_assert_sorted(data);
158 // SmallVec apparently generates worse code using From (/Into) vs using a specialized
159 // from_slice() method. However, the code for From is emitted anyway because (I think) its
160 // used any time a vec is .collect-ed.
161 //
162 // As a result, swapping to from_slice here increases code size and it doesn't seem to make
163 // any actual performance difference. It would be better - but I'm leaving it alone for now.
164 // Self(SmallVec::from_slice(data))
165 Self(data.into())
166 }
167
168 /// Frontiers should always be sorted smallest to largest.
169 pub fn len(&self) -> usize {
170 self.0.len()
171 }
172
173 pub fn is_root(&self) -> bool {
174 self.0.is_empty()
175 }
176 pub fn is_empty(&self) -> bool {
177 self.0.is_empty()
178 }
179
180 pub fn iter(&self) -> std::slice::Iter<'_, usize> {
181 self.0.iter()
182 }
183
184 pub fn try_get_single_entry(&self) -> Option<LV> {
185 if self.len() == 1 { Some(self.0[0]) }
186 else { None }
187 }
188
189 pub fn try_get_single_entry_mut(&mut self) -> Option<&mut LV> {
190 if self.len() == 1 { Some(&mut self.0[0]) }
191 else { None }
192 }
193
194 pub fn replace(&mut self, with: FrontierRef) {
195 // TODO: Is this faster than *self = with.into(); ?
196 self.0.resize(with.len(), 0);
197 self.0.copy_from_slice(with);
198 }
199
200 pub fn debug_check_sorted(&self) {
201 debug_assert_sorted(self.0.borrow());
202 }
203
204 /// Advance a frontier by the set of time spans in range
205 pub fn advance(&mut self, graph: &Graph, mut range: DTRange) {
206 if range.is_empty() { return; }
207
208 // This is a little crass. Might be nicer to use a &T iterator in RLEVec.
209 let txn_idx = graph.entries.find_index(range.start).unwrap();
210
211 for txn in &graph.entries[txn_idx..] {
212 debug_assert!(txn.contains(range.start));
213
214 let end = txn.span.end.min(range.end);
215 txn.with_parents(range.start, |parents| {
216 self.advance_by_known_run(parents, (range.start..end).into());
217 });
218
219 if end >= range.end { break; }
220 range.start = end;
221 }
222 }
223
224 /// Just like advance_by_known_run, the range MUST be in a single transaction in the graph.
225 pub fn advance_sparse_known_run(&mut self, graph: &Graph, parents: &[LV], range: DTRange) {
226 // Could copy the other cases from advance_by_known_run... eh.
227 if self.as_ref() == parents {
228 // Fastest path. We're just extending the span.
229 self.replace_with_1(range.last());
230 } else {
231 // We'll probably still replace the version with range.last(), but there's some edge
232 // cases for find_dominators to figure out.
233 self.merge_union(&[range.last()], graph);
234 // self.0 = graph.find_dominators_2(self.as_ref(), &[range.last()]).0;
235 }
236 }
237
238 /// advance_sparse is used for "sparse" causal graphs, which contain versions for other CRDTs
239 /// and things. In this case, range might not directly follow the current frontier.
240 ///
241 /// I think this function is equivalent to finding the dominators of self + all txns in range.
242 pub fn advance_sparse(&mut self, graph: &Graph, range: DTRange) {
243 let txn_idx = graph.entries.find_index(range.start).unwrap();
244 let first_txn = &graph.entries[txn_idx];
245 if range.end <= first_txn.span.end {
246 // Fast path. There's just one transaction to consider.
247 first_txn.with_parents(range.start, |parents| {
248 self.advance_sparse_known_run(graph, parents, range);
249 })
250 } else {
251 // This is a lot more complicated than I'd like, but I think its the fastest approach
252 // here. We'll make a frontier from the transactions within the range, then merge that
253 // with the current frontier.
254 let mut f2 = Frontier::root();
255 f2.advance(graph, range); // This is a bit cheeky, but the result should be correct.
256 // And merge that together. This will usually just return f2.
257 self.merge_union(f2.as_ref(), graph);
258 // self.0 = graph.find_dominators_2(self.as_ref(), f2.as_ref()).0;
259 }
260 }
261
262 /// Advance branch frontier by a transaction.
263 ///
264 /// This is ONLY VALID if the range is entirely within a txn.
265 pub fn advance_by_known_run(&mut self, parents: &[LV], span: DTRange) {
266 // TODO: Check the branch contains everything in txn_parents, but not txn_id:
267 // Check the operation fits. The operation should not be in the branch, but
268 // all the operation's parents should be.
269 // From braid-kernel:
270 // assert(!branchContainsVersion(db, order, branch), 'db already contains version')
271 // for (const parent of op.parents) {
272 // assert(branchContainsVersion(db, parent, branch), 'operation in the future')
273 // }
274
275 if parents.len() == 1 && self.0.len() == 1 && parents[0] == self.0[0] {
276 // Short circuit the common case where time is just advancing linearly.
277 self.0[0] = span.last();
278 } else if self.0.as_slice() == parents {
279 self.replace_with_1(span.last());
280 } else {
281 assert!(!self.0.contains(&span.start)); // Remove this when branch_contains_version works.
282 debug_assert_sorted(self.0.as_slice());
283
284 self.0.retain(|o| !parents.contains(o)); // Usually removes all elements.
285
286 // In order to maintain the order of items in the branch, we want to insert the new item
287 // in the appropriate place. This will almost always do self.0.push(), but when changes
288 // are concurrent that won't be correct. (Do it and run the tests if you don't believe
289 // me).
290 // TODO: Check if its faster to try and append it to the end first.
291 self.insert_nonoverlapping(span.last());
292 }
293 }
294
295 /// Replaces self with dominators(self, other).
296 pub fn merge_union(&mut self, other: &[LV], graph: &Graph) {
297 if !other.is_empty()
298 && other != self.as_ref()
299 && (other.len() != 1 || !graph.frontier_contains_version(self.as_ref(), other[0]))
300 {
301 self.0 = graph.find_dominators_2(self.as_ref(), other).0;
302 }
303 }
304
305 pub fn retreat(&mut self, graph: &Graph, mut range: DTRange) {
306 if range.is_empty() { return; }
307
308 self.debug_check_sorted();
309
310 let mut txn_idx = graph.entries.find_index(range.last()).unwrap();
311 loop {
312 let last_order = range.last();
313 let txn = &graph.entries[txn_idx];
314 // debug_assert_eq!(txn_idx, history.0.find_index(range.last()).unwrap());
315 debug_assert_eq!(txn, graph.entries.find(last_order).unwrap());
316 // let mut idx = frontier.iter().position(|&e| e == last_order).unwrap();
317
318 if self.len() == 1 {
319 // Fast case. Just replace frontier's contents with parents.
320 if range.start > txn.span.start {
321 self[0] = range.start - 1;
322 break;
323 } else {
324 // self.0 = txn.parents.as_ref().into();
325 *self = txn.parents.clone()
326 }
327 } else {
328 // Remove the old item from frontier and only reinsert parents when they aren't included
329 // in the transitive history from this point.
330 self.0.retain(|t| *t != last_order);
331
332 txn.with_parents(range.start, |parents| {
333 for parent in parents {
334 // TODO: This is pretty inefficient. We're calling frontier_contains_time in a
335 // loop and each call to frontier_contains_time does a call to history.find() in
336 // turn for each item in branch.
337 debug_assert!(!self.is_root());
338 // TODO: At least check shadow directly.
339 if !graph.frontier_contains_version(self.as_ref(), *parent) {
340 self.insert_nonoverlapping(*parent);
341 }
342 }
343 });
344 }
345
346 if range.start >= txn.span.start {
347 break;
348 }
349
350 // Otherwise keep scanning down through the txns.
351 range.end = txn.span.start;
352 txn_idx -= 1;
353 }
354 if cfg!(debug_assertions) { self.check(graph); }
355 self.debug_check_sorted();
356 }
357
358 fn insert_nonoverlapping(&mut self, new_item: LV) {
359 // In order to maintain the order of items in the branch, we want to insert the new item in the
360 // appropriate place.
361
362 // Binary search might actually be slower here than a linear scan.
363 let new_idx = self.0.binary_search(&new_item).unwrap_err();
364 self.0.insert(new_idx, new_item);
365
366 // match self.0.last() {
367 // Some(v) if *v < new_item => { self.0.push(new_item); }
368 // None => { self.0.push(new_item); }
369 // _ => {
370 // let new_idx = self.0.binary_search(&new_item).unwrap_err();
371 // self.0.insert(new_idx, new_item);
372 // }
373 // }
374
375 self.debug_check_sorted();
376 }
377
378 pub fn insert(&mut self, new_item: LV) {
379 // And we're returning in the Ok() case here because it means the item is already in the
380 // frontier.
381 let Err(new_idx) = self.0.binary_search(&new_item) else { return; };
382 self.0.insert(new_idx, new_item);
383 self.debug_check_sorted();
384 }
385
386 pub(crate) fn check(&self, parents: &Graph) {
387 assert!(frontier_is_sorted(&self.0));
388 if self.len() >= 2 {
389 let dominators = parents.find_dominators(&self.0);
390 assert_eq!(&dominators, self);
391 // let mut self = self.iter().copied().collect::<Vec<_>>();
392 // let mut self = self.0.to_vec();
393 // for i in 0..self.len() {
394 // let removed = self.remove(i);
395 // assert!(!history.version_contains_time(&self, removed));
396 // self.insert(i, removed);
397 // }
398 }
399 }
400
401 pub fn replace_with_1(&mut self, new_val: LV) {
402 // I could truncate / etc, but this is faster in benchmarks.
403 // replace(&mut self.0, smallvec::smallvec![new_val]);
404 self.0 = smallvec::smallvec![new_val];
405 }
406}
407
408pub fn local_frontier_eq<A: AsRef<[LV]> + ?Sized, B: AsRef<[LV]> + ?Sized>(a: &A, b: &B) -> bool {
409 // Almost all branches only have one element in them.
410 debug_assert_sorted(a.as_ref());
411 debug_assert_sorted(b.as_ref());
412 a.as_ref() == b.as_ref()
413}
414
415#[allow(unused)]
416pub fn local_frontier_is_root(branch: &[LV]) -> bool {
417 branch.is_empty()
418}
419
420
421// // This walks both frontiers and finds how the frontier has changed. There's probably a better way
422// // to implement this.
423// struct FrontierDiff<'a> {
424// a: &'a [LV],
425// b: &'a [LV],
426// }
427//
428// pub(crate) fn diff_frontier_entries<'a>(a: &'a [LV], b: &'a [LV]) -> impl Iterator<Item = (DiffFlag, LV)> + 'a {
429// FrontierDiff { a, b }
430// }
431//
432//
433// fn slice_take_first(slice: &mut &[LV]) -> Option<LV> {
434// if let [first, tail @ ..] = slice {
435// *slice = tail;
436// Some(*first)
437// } else { None }
438// }
439//
440// impl<'a> Iterator for FrontierDiff<'a> {
441// type Item = (DiffFlag, LV);
442//
443// fn next(&mut self) -> Option<Self::Item> {
444// match (self.a.split_first(), self.b.split_first()) {
445// (None, None) => None,
446// (Some((a, rest)), None) => {
447// self.a = rest;
448// Some((DiffFlag::OnlyA, *a))
449// },
450// (None, Some((b, rest))) => {
451// self.b = rest;
452// Some((DiffFlag::OnlyB, *b))
453// },
454// (Some((a, a_rest)), Some((b, b_rest))) => {
455// match a.cmp(b) {
456// Ordering::Equal => {
457// // Take from both.
458// self.a = a_rest;
459// self.b = b_rest;
460// Some((DiffFlag::Shared, *a))
461// }
462// Ordering::Less => {
463// // Take from a.
464// self.a = a_rest;
465// Some((DiffFlag::OnlyA, *a))
466// }
467// Ordering::Greater => {
468// // Take from b.
469// self.b = b_rest;
470// Some((DiffFlag::OnlyB, *a))
471// }
472// }
473// }
474// }
475// }
476// }
477
478/// This method clones a version or parents vector. Its slightly faster and smaller than just
479/// calling v.clone() directly.
480#[inline]
481pub fn clone_smallvec<T, const LEN: usize>(v: &SmallVec<T, LEN>) -> SmallVec<T, LEN> where T: Clone + Copy {
482 // This is now smaller again as of rust 1.60. Looks like the problem was fixed.
483 v.clone()
484
485 // if v.spilled() { // Unlikely. If only there was a stable rust intrinsic for this..
486 // v.clone()
487 // } else {
488 // unsafe {
489 // // We only need to copy v.len() items, because LEN is small (2, usually) its actually
490 // // faster & less code to just copy the bytes in all cases rather than branch.
491 // // let mut arr: MaybeUninit<[T; LEN]> = MaybeUninit::uninit();
492 // // std::ptr::copy_nonoverlapping(v.as_ptr(), arr.as_mut_ptr().cast(), LEN);
493 // // SmallVec::from_buf_and_len_unchecked(arr, v.len())
494 //
495 // let mut result: MaybeUninit<SmallVec<T, LEN>> = MaybeUninit::uninit();
496 // std::ptr::copy_nonoverlapping(v, result.as_mut_ptr(), 1);
497 // result.assume_init()
498 // }
499 // }
500}
501
502#[cfg(test)]
503mod test {
504 use crate::causalgraph::graph::GraphEntrySimple;
505 use crate::Frontier;
506
507 use super::*;
508
509 #[test]
510 fn frontier_movement_smoke_tests() {
511 let mut branch: Frontier = Frontier::root();
512 branch.advance_by_known_run(&[], (0..10).into());
513 assert_eq!(branch.as_ref(), &[9]);
514
515 let graph = Graph::from_simple_items(&[
516 GraphEntrySimple { span: (0..10).into(), parents: Frontier::root() }
517 ]);
518 graph.dbg_check(true);
519
520 branch.retreat(&graph, (5..10).into());
521 assert_eq!(branch.as_ref(), &[4]);
522
523 branch.retreat(&graph, (0..5).into());
524 assert!(branch.is_root());
525 }
526
527 #[test]
528 fn frontier_stays_sorted() {
529 let graph = Graph::from_simple_items(&[
530 GraphEntrySimple { span: (0..2).into(), parents: Frontier::root() },
531 GraphEntrySimple { span: (2..6).into(), parents: Frontier::new_1(0) },
532 GraphEntrySimple { span: (6..50).into(), parents: Frontier::new_1(0) },
533 ]);
534 graph.dbg_check(true);
535
536 let mut branch: Frontier = Frontier::from_sorted(&[1, 10]);
537 branch.advance(&graph, (2..4).into());
538 assert_eq!(branch.as_ref(), &[1, 3, 10]);
539
540 branch.advance(&graph, (11..12).into());
541 assert_eq!(branch.as_ref(), &[1, 3, 11]);
542
543 branch.retreat(&graph, (2..4).into());
544 assert_eq!(branch.as_ref(), &[1, 11]);
545
546 branch.retreat(&graph, (11..12).into());
547 assert_eq!(branch.as_ref(), &[1, 10]);
548 }
549
550 #[test]
551 fn advance_sparse() {
552 let graph = Graph::from_simple_items(&[
553 GraphEntrySimple { span: (0..10).into(), parents: Frontier::root() },
554 GraphEntrySimple { span: (10..20).into(), parents: Frontier::new_1(5) },
555 // GraphEntrySimple { span: (6..50).into(), parents: Frontier::new_1(0) },
556 ]);
557 graph.dbg_check(true);
558
559 // This isn't thorough, but should be good enough.
560 let mut f = Frontier::root();
561 f.advance_sparse(&graph, (0..5).into());
562 // Should only include subgraph items
563 assert_eq!(f.as_ref(), &[4]);
564
565 f.advance_sparse(&graph, (7..8).into());
566 assert_eq!(f.as_ref(), &[7]);
567
568 f.advance_sparse(&graph, (9..15).into());
569 assert_eq!(f.as_ref(), &[9, 14]);
570 }
571
572 #[test]
573 fn advance_empty_by_known_run() {
574 // Regression.
575 // let graph = Graph::from_entries(&[
576 // GraphEntrySimple { span: (0..10).into(), parents: Frontier::root(), },
577 // ];
578
579 let mut f = Frontier::root();
580 f.insert_nonoverlapping(4);
581 assert_eq!(f.as_ref(), &[4]);
582 }
583}