Skip to main content

dag/
verlink.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8use std::cmp;
9use std::collections::HashMap;
10use std::fmt;
11use std::sync::atomic;
12use std::sync::atomic::AtomicU32;
13use std::sync::Arc;
14use std::sync::OnceLock;
15use std::sync::RwLock;
16
17/// A linked list tracking a logic "version" with compatibility rules:
18/// - Append-only changes bump the version, the new version is backwards
19///   compatible.
20/// - Non-append-only changes create a new version that is incompatible with
21///   all other versions (in the current process).
22/// - Clones (cheaply) preserve the version.
23///
24/// Supported operations:
25/// - `new() -> x`: Create a new version that is not comparable (compatible)
26///   with other versions.
27/// - `clone(x) -> y`: Clone `x` to `y`. `x == y`. `x` and `y` are compatible.
28/// - `bump(x) -> y`: Bump `x` to `y`. `y > x`. `y` is backwards-compatible with
29///   `x`. Note: `y` is not comparable (compatible) with other `bump(x)`.
30/// - `x > y`: `true` if `x` is backwards compatible with `y`.
31///
32/// The linked list can be shared in other linked lists. So they form a tree
33/// effectively. Comparing to a DAG, there is no "merge" operation.
34/// Compatibility questions become reachability questions.
35#[derive(Clone)]
36pub struct VerLink {
37    inner: Arc<Inner>,
38}
39
40#[derive(Clone)]
41struct Inner {
42    parent: Option<VerLink>,
43
44    /// The base number. Two `VerLink`s with different base numbers are incompatible.
45    /// Used as an optimization to exit `compatible` early.
46    base: u32,
47
48    /// The "generation number", distance to root (the parentless `VerLink`).
49    /// If `x.compatible(y)` returns `x`, then `x.gen` must be >= `y.gen`.
50    /// Used as an optimization to exit `compatible` early.
51    gen: u32,
52}
53
54impl VerLink {
55    /// Creates a new `VerLink` that is incompatible with all other `VerLink`s
56    /// in the process.
57    pub fn new() -> Self {
58        let inner = Inner {
59            parent: None,
60            base: next_id(),
61            gen: 0,
62        };
63        Self {
64            inner: Arc::new(inner),
65        }
66    }
67
68    /// Bumps the `VerLink` for backward-compatible (ex. append-only) changes.
69    /// Note the "append-only" means only adding commits without "stripping"
70    /// or "rewriting" commits. It is different from the "append-only" concept
71    /// from the storage layer, because the "stripping" or "rewriting" might
72    /// be implemented as "appending" special data on the storage layer.
73    pub fn bump(&mut self) {
74        match Arc::get_mut(&mut self.inner) {
75            // This is an optimization to avoid increasing the length of the
76            // linked list (which slows down "compatible" calculation) if
77            // possible.
78            Some(_inner) => {
79                // Increasing gen is not necessary for correctness.
80                // _inner.gen += 1;
81            }
82            None => {
83                let next_inner = Inner {
84                    parent: Some(self.clone()),
85                    base: self.inner.base,
86                    gen: self.inner.gen + 1,
87                };
88                let next = Self {
89                    inner: Arc::new(next_inner),
90                };
91                *self = next;
92            }
93        }
94    }
95}
96
97impl PartialOrd for VerLink {
98    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
99        if self.inner.base != other.inner.base {
100            // Fast path: self and other have different root nodes and are not
101            // reachable from each other.
102            return None;
103        }
104        if self.inner.gen < other.inner.gen {
105            other.partial_cmp(self).map(|o| o.reverse())
106        } else {
107            debug_assert!(self.inner.gen >= other.inner.gen);
108            if Arc::ptr_eq(&self.inner, &other.inner) {
109                return Some(cmp::Ordering::Equal);
110            }
111            let mut cur = self.inner.parent.as_ref();
112            while let Some(this) = cur {
113                if this.inner.gen < other.inner.gen {
114                    // Fast path: not possible to reach other from here.
115                    return None;
116                }
117                if Arc::ptr_eq(&this.inner, &other.inner) {
118                    return Some(cmp::Ordering::Greater);
119                }
120                cur = this.inner.parent.as_ref();
121            }
122            None
123        }
124    }
125}
126
127impl PartialEq for VerLink {
128    fn eq(&self, other: &Self) -> bool {
129        Arc::ptr_eq(&self.inner, &other.inner)
130    }
131}
132
133impl fmt::Debug for VerLink {
134    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
135        let mut cur = Some(self);
136        while let Some(this) = cur {
137            write!(f, "{:p}", this.inner.as_ref())?;
138            cur = this.inner.parent.as_ref();
139            f.write_str("->")?;
140            if cur.is_none() {
141                write!(f, "{}", this.inner.base)?;
142            }
143        }
144        Ok(())
145    }
146}
147
148/// A small cache that associate `storage_version` with `VerLink`.
149/// Useful to "restore" `VerLink` after reading the same content from disk.
150/// For each filesystem path we only cache its latest version to reduce memory
151/// "leak" caused by a static state.
152static CACHE: OnceLock<RwLock<HashMap<String, ((u64, u64), VerLink)>>> = OnceLock::new();
153
154fn storage_version_cache() -> &'static RwLock<HashMap<String, ((u64, u64), VerLink)>> {
155    CACHE.get_or_init(Default::default)
156}
157
158// Cache related.
159impl VerLink {
160    /// Clear the cache that maps storage version to VerLink.
161    pub fn clear_storage_version_cache() {
162        let mut cache = storage_version_cache().write().unwrap();
163        cache.clear();
164    }
165
166    /// Lookup a `VerLink` from a given storage version.
167    pub fn from_storage_version(str_id: &str, version: (u64, u64)) -> Option<VerLink> {
168        let cache = storage_version_cache().read().unwrap();
169        let (cached_version, verlink) = cache.get(str_id)?;
170        if cached_version == &version {
171            Some(verlink.clone())
172        } else {
173            None
174        }
175    }
176
177    /// Associate the `VerLink` with a storage version.
178    pub fn associate_storage_version(&self, str_id: String, version: (u64, u64)) {
179        let mut cache = storage_version_cache().write().unwrap();
180        cache.insert(str_id, (version, self.clone()));
181    }
182
183    /// Lookup a `VerLink` from a given storage version, or create a new `VerLink`
184    /// and remember it in cache.
185    pub fn from_storage_version_or_new(str_id: &str, version: (u64, u64)) -> VerLink {
186        match Self::from_storage_version(str_id, version) {
187            Some(v) => v,
188            None => {
189                let v = Self::new();
190                v.associate_storage_version(str_id.to_string(), version);
191                v
192            }
193        }
194    }
195}
196
197fn next_id() -> u32 {
198    static ID: AtomicU32 = AtomicU32::new(0);
199    ID.fetch_add(1, atomic::Ordering::AcqRel)
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205
206    #[test]
207    fn test_individual_news_are_incompatible() {
208        let a = VerLink::new();
209        let b = VerLink::new();
210        assert!(compatible(&a, &b).is_none());
211    }
212
213    #[test]
214    fn test_clone_compatible() {
215        let a = VerLink::new();
216        let b = a.clone();
217        assert_eq!(&a, &b);
218        assert_eq!(compatible(&a, &b), Some(&b));
219    }
220
221    #[test]
222    fn test_bump_is_different_and_backwards_compatible() {
223        let a = VerLink::new();
224        let mut b = a.clone();
225        b.bump();
226        assert_ne!(&b, &a);
227        assert_eq!(compatible(&a, &b), Some(&b));
228
229        b.bump();
230        b.bump();
231        assert_eq!(compatible(&a, &b), Some(&b));
232    }
233
234    #[test]
235    fn test_clone_bump_twice() {
236        let a = VerLink::new();
237        let mut b = a.clone();
238        b.bump();
239        let mut c = b.clone();
240        c.bump();
241        assert_eq!(compatible(&a, &c), Some(&c));
242        assert_eq!(compatible(&b, &c), Some(&c));
243    }
244
245    #[test]
246    fn test_bump_independently_become_incompatible() {
247        let mut a = VerLink::new();
248        let mut b = a.clone();
249        b.bump();
250        a.bump();
251        assert_eq!(compatible(&a, &b), None);
252    }
253
254    #[test]
255    fn test_bump_avoid_increase_len_if_possible() {
256        let a = VerLink::new();
257        let mut b = a.clone();
258        assert_eq!(a.chain_len(), 1);
259        assert_eq!(b.chain_len(), 1);
260        b.bump(); // Increases chain_len by 1.
261        assert_eq!(b.chain_len(), 2);
262        b.bump(); // Does not change chain len.
263        b.bump();
264        assert_eq!(b.chain_len(), 2);
265    }
266
267    #[test]
268    fn test_storage_version_cache() {
269        let a = VerLink::new();
270        let v = (100, 200);
271        assert!(VerLink::from_storage_version("x", v).is_none());
272        a.associate_storage_version("x".to_string(), v);
273        assert_eq!(VerLink::from_storage_version("x", v).unwrap(), a);
274
275        let b = VerLink::from_storage_version_or_new("y", v);
276        assert_ne!(&b, &a);
277        let c = VerLink::from_storage_version_or_new("y", v);
278        assert_eq!(&b, &c);
279    }
280
281    /// Find the more compatible version.
282    #[allow(clippy::neg_cmp_op_on_partial_ord)]
283    fn compatible<'a>(a: &'a VerLink, b: &'a VerLink) -> Option<&'a VerLink> {
284        if a == b {
285            assert!(!(a != b));
286            assert!(!(a < b));
287            assert!(!(b < a));
288            assert!(!(a > b));
289            assert!(!(b > a));
290            Some(a)
291        } else if a < b {
292            assert!(!(a == b));
293            assert!(a != b);
294            assert!(!(b < a));
295            assert!(!(a > b));
296            assert!(b > a);
297            Some(b)
298        } else if a > b {
299            assert!(!(a == b));
300            assert!(a != b);
301            assert!(!(a < b));
302            assert!(b < a);
303            assert!(!(b > a));
304            Some(a)
305        } else {
306            assert!(!(a == b));
307            assert!(a != b);
308            assert!(!(a < b));
309            assert!(!(a > b));
310            assert!(!(b < a));
311            assert!(!(b > a));
312            None
313        }
314    }
315
316    impl VerLink {
317        /// Length of the linked list.
318        fn chain_len(&self) -> usize {
319            let mut len = 0;
320            let mut cur = Some(self);
321            while let Some(this) = cur {
322                len += 1;
323                cur = this.inner.parent.as_ref();
324            }
325            len
326        }
327    }
328}