sled_overlay/
tree.rs

1/* This file is part of sled-overlay
2 *
3 * Copyright (C) 2023-2024 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    cmp::Ordering,
21    collections::{btree_map::Keys, BTreeMap, BTreeSet},
22    iter::{FusedIterator, Peekable},
23};
24
25use sled::{IVec, Iter};
26
27/// Struct representing [`SledTreeOverlay`] cache state.
28#[derive(Debug, Default, Clone, PartialEq)]
29pub struct SledTreeOverlayState {
30    /// The cache is the actual overlayed data represented as a [`BTreeMap`].
31    pub cache: BTreeMap<IVec, IVec>,
32    /// In `removed`, we keep track of keys that were removed in the overlay.
33    pub removed: BTreeSet<IVec>,
34}
35
36impl SledTreeOverlayState {
37    /// Instantiate a new [`SledTreeOverlayState`].
38    pub fn new() -> Self {
39        Self {
40            cache: BTreeMap::new(),
41            removed: BTreeSet::new(),
42        }
43    }
44
45    /// Aggregate all the current tree overlay state changes into
46    /// a [`sled::Batch`] ready for further operation.
47    /// If there are no changes, return `None`.
48    pub fn aggregate(&self) -> Option<sled::Batch> {
49        if self.cache.is_empty() && self.removed.is_empty() {
50            return None;
51        }
52
53        let mut batch = sled::Batch::default();
54
55        // This kind of first-insert-then-remove operation should be fine
56        // provided it's handled correctly in the above functions.
57        for (k, v) in self.cache.iter() {
58            batch.insert(k, v);
59        }
60
61        for k in self.removed.iter() {
62            batch.remove(k);
63        }
64
65        Some(batch)
66    }
67
68    /// Add provided tree overlay state changes to our own.
69    pub fn add_diff(&mut self, diff: &SledTreeOverlayStateDiff) {
70        // Add all new keys into cache
71        for (k, v) in diff.cache.iter() {
72            self.removed.remove(k);
73            self.cache.insert(k.clone(), v.1.clone());
74        }
75
76        // Remove dropped keys
77        for k in diff.removed.keys() {
78            self.cache.remove(k);
79            self.removed.insert(k.clone());
80        }
81    }
82
83    /// Remove provided tree overlay state changes from our own.
84    pub fn remove_diff(&mut self, diff: &SledTreeOverlayStateDiff) {
85        for (k, v) in diff.cache.iter() {
86            // Skip if its not in cache
87            let Some(value) = self.cache.get(k) else {
88                continue;
89            };
90
91            // Check if its value has been modified again
92            if v.1 != value {
93                continue;
94            }
95
96            self.cache.remove(k);
97        }
98
99        for k in diff.removed.keys() {
100            self.removed.remove(k);
101        }
102    }
103}
104
105impl From<&SledTreeOverlayStateDiff> for SledTreeOverlayState {
106    fn from(diff: &SledTreeOverlayStateDiff) -> Self {
107        let mut cache = BTreeMap::new();
108        let mut removed = BTreeSet::new();
109
110        for (key, value) in diff.cache.iter() {
111            cache.insert(key.clone(), value.1.clone());
112        }
113
114        for key in diff.removed.keys() {
115            removed.insert(key.clone());
116        }
117
118        Self { cache, removed }
119    }
120}
121
122/// Auxilliary struct representing a [`SledTreeOverlayState`] diff log.
123#[derive(Debug, Default, Clone, PartialEq)]
124pub struct SledTreeOverlayStateDiff {
125    /// Inserted data represented as a [`BTreeMap`].
126    /// The value contains both the previous key value(if it existed), along
127    /// with the new one.
128    pub cache: BTreeMap<IVec, (Option<IVec>, IVec)>,
129    /// In `removed`, we keep track of keys that were removed in the overlay,
130    /// along with their value.
131    pub removed: BTreeMap<IVec, IVec>,
132}
133
134impl SledTreeOverlayStateDiff {
135    /// Instantiate a new [`SledTreeOverlayStateDiff`], over the provided
136    /// [`sled::Tree`] that is being overlayed.
137    pub fn new(tree: &sled::Tree, state: &SledTreeOverlayState) -> Result<Self, sled::Error> {
138        let mut cache = BTreeMap::new();
139        let mut removed = BTreeMap::new();
140
141        // Set inserted keys
142        for (key, value) in state.cache.iter() {
143            // Grab each key previous value, if it existed
144            let previous = tree.get::<IVec>(key.into())?;
145            cache.insert(key.into(), (previous, value.into()));
146        }
147
148        // Set removed keys
149        for key in state.removed.iter() {
150            // Grab each key last value, if it existed, otherwise
151            // use an empty value as its previous.
152            let previous = match tree.get(key)? {
153                Some(previous) => previous,
154                None => vec![].into(),
155            };
156            removed.insert(key.into(), previous);
157        }
158
159        Ok(Self { cache, removed })
160    }
161
162    /// Instantiate a new [`SledTreeOverlayStateDiff`], over the provided
163    /// [`sled::Tree`] that is being dropped. The diff will contain all
164    /// existing tree keys in its cache as inserts, representing the last tree state.
165    pub fn new_dropped(tree: &sled::Tree) -> Self {
166        let mut cache = BTreeMap::new();
167
168        // Insert all tree keys
169        for record in tree.iter() {
170            let (key, value) = record.unwrap();
171            cache.insert(key, (None, value));
172        }
173
174        Self {
175            cache,
176            removed: BTreeMap::new(),
177        }
178    }
179
180    /// Aggregate all the tree overlay state changes into
181    /// a [`sled::Batch`] ready for further operation.
182    /// If there are no changes, return `None`.
183    pub fn aggregate(&self) -> Option<sled::Batch> {
184        if self.cache.is_empty() && self.removed.is_empty() {
185            return None;
186        }
187
188        let mut batch = sled::Batch::default();
189
190        // This kind of first-insert-then-remove operation should be fine
191        // provided it's handled correctly in the above functions.
192        for (k, v) in self.cache.iter() {
193            batch.insert(k, v.1.clone());
194        }
195
196        for k in self.removed.keys() {
197            batch.remove(k);
198        }
199
200        Some(batch)
201    }
202
203    /// Aggregate all the current tree overlay state changes inverse
204    /// actions into a [`sled::Batch`] ready for further operation.
205    /// If there are no changes, return `None`.
206    pub fn revert(&self) -> Option<sled::Batch> {
207        if self.cache.is_empty() && self.removed.is_empty() {
208            return None;
209        }
210
211        let mut batch = sled::Batch::default();
212
213        // This kind of first-insert-then-remove operation should be fine
214        // provided it's handled correctly in the above functions.
215        for (k, v) in self.removed.iter() {
216            batch.insert(k, v.clone());
217        }
218
219        for (k, v) in self.cache.iter() {
220            // If key value has been modified, revert to previous one
221            if let Some(value) = &v.0 {
222                batch.insert(k, value.clone());
223                continue;
224            }
225            batch.remove(k);
226        }
227
228        Some(batch)
229    }
230
231    /// Produces a [`SledTreeOverlayStateDiff`] containing the inverse
232    /// changes from our own.
233    pub fn inverse(&self) -> Self {
234        let mut diff = Self::default();
235
236        // This kind of first-insert-then-remove operation should be fine
237        // provided it's handled correctly in the above functions.
238        for (k, v) in self.removed.iter() {
239            diff.cache.insert(k.clone(), (None, v.clone()));
240        }
241
242        for (k, v) in self.cache.iter() {
243            // If its value has been modified, flip it
244            if let Some(previous) = &v.0 {
245                diff.cache
246                    .insert(k.clone(), (Some(v.1.clone()), previous.clone()));
247                continue;
248            }
249            diff.removed.insert(k.clone(), v.1.clone());
250        }
251
252        diff
253    }
254
255    /// Remove provided tree overlay state changes from our own.
256    pub fn remove_diff(&mut self, other: &Self) {
257        for (k, v) in other.cache.iter() {
258            // Set as removed if its not in cache
259            let Some(values) = self.cache.get(k) else {
260                self.removed.insert(k.clone(), v.1.clone());
261                continue;
262            };
263
264            // Check if its value has been modified again
265            if v.1 != values.1 {
266                // Set previous value
267                self.cache
268                    .insert(k.clone(), (Some(v.1.clone()), values.1.clone()));
269                continue;
270            }
271
272            self.cache.remove(k);
273        }
274
275        for k in other.removed.keys() {
276            // Update cache key previous, if it exits
277            if let Some(values) = self.cache.get(k) {
278                self.cache.insert(k.clone(), (None, values.1.clone()));
279                continue;
280            }
281
282            self.removed.remove(k);
283        }
284    }
285
286    /// Update our cache key values to the ones in the provided
287    /// tree overlay state changes.
288    pub fn update_values(&mut self, other: &Self) {
289        for (k, v) in other.cache.iter() {
290            self.cache.insert(k.clone(), v.clone());
291        }
292
293        for k in other.removed.keys() {
294            self.cache.remove(k);
295        }
296    }
297}
298
299/// An overlay on top of a single [`sled::Tree`] instance.
300#[derive(Debug, Clone)]
301pub struct SledTreeOverlay {
302    /// The [`sled::Tree`] that is being overlayed.
303    pub tree: sled::Tree,
304    /// Current overlay cache state.
305    pub state: SledTreeOverlayState,
306    /// Checkpointed cache state to revert to.
307    checkpoint: SledTreeOverlayState,
308}
309
310impl SledTreeOverlay {
311    /// Instantiate a new [`SledTreeOverlay`] on top of a given [`sled::Tree`].
312    pub fn new(tree: &sled::Tree) -> Self {
313        Self {
314            tree: tree.clone(),
315            state: SledTreeOverlayState::new(),
316            checkpoint: SledTreeOverlayState::new(),
317        }
318    }
319
320    /// Returns `true` if the overlay contains a value for a specified key.
321    pub fn contains_key(&self, key: &[u8]) -> Result<bool, sled::Error> {
322        // First check if the key was removed in the overlay
323        if self.state.removed.contains::<IVec>(&key.into()) {
324            return Ok(false);
325        }
326
327        // Then check the cache and the main tree
328        if self.state.cache.contains_key::<IVec>(&key.into()) || self.tree.contains_key(key)? {
329            return Ok(true);
330        }
331
332        Ok(false)
333    }
334
335    /// Returns `true` if the overlay is empty.
336    pub fn is_empty(&self) -> bool {
337        // Keep a counter of all elements
338        let mut counter: i64 = 0;
339
340        // Add existing keys
341        counter += self.tree.len() as i64;
342
343        // Add new keys
344        counter += self.state.cache.len() as i64;
345
346        // Subtract removed keys
347        counter -= self.state.removed.len() as i64;
348
349        counter <= 0
350    }
351
352    /// Returns last key and value from the overlay or `None` if its empty,
353    /// based on the `Ord` implementation for `Vec<u8>`.
354    pub fn last(&self) -> Result<Option<(IVec, IVec)>, sled::Error> {
355        // If both main tree and cache are empty, return None
356        if self.tree.is_empty() && self.state.cache.is_empty() {
357            return Ok(None);
358        }
359
360        // Grab main tree last record
361        let tree_last = self.tree.last()?;
362
363        // If cache has no records, main tree last exists
364        if self.state.cache.is_empty() {
365            // We can safely unwrap here since main tree is not
366            // empty, as we have already checked if both main
367            // tree and cache are empty.
368            let record = tree_last.unwrap();
369
370            // Check if record is removed
371            if self.state.removed.contains(&record.0) {
372                // Find last existing record
373                let mut key = record.0.clone();
374                while let Some(record) = self.tree.get_lt(key)? {
375                    // Check if record is removed
376                    if self.state.removed.contains(&record.0) {
377                        key = record.0;
378                        continue;
379                    }
380                    // Return it
381                    return Ok(Some((record.0.clone(), record.1.clone())));
382                }
383
384                // Return None if no records exist
385                return Ok(None);
386            }
387
388            // Return it
389            return Ok(Some((record.0.clone(), record.1.clone())));
390        }
391
392        // Grab cache last record.
393        // We can safely unwrap here as we checked if the cache is
394        // empty on the previous step.
395        let cache_last = self.state.cache.last_key_value().unwrap();
396
397        // If the main tree has a last record, compare it with the cache
398        // last record, and return it if it's not removed
399        if let Some(tree_last) = tree_last {
400            if cache_last.0 < &tree_last.0 && !self.state.removed.contains(&tree_last.0) {
401                return Ok(Some((tree_last.0.clone(), tree_last.1.clone())));
402            }
403        }
404
405        // Return the cache last record
406        Ok(Some((cache_last.0.clone(), cache_last.1.clone())))
407    }
408
409    /// Retrieve a value from the overlay if it exists.
410    pub fn get(&self, key: &[u8]) -> Result<Option<IVec>, sled::Error> {
411        // First check if the key was removed in the overlay
412        if self.state.removed.contains::<IVec>(&key.into()) {
413            return Ok(None);
414        }
415
416        // Then check the cache
417        if let Some(v) = self.state.cache.get::<IVec>(&key.into()) {
418            return Ok(Some(v.clone()));
419        }
420
421        // And finally the main tree
422        self.tree.get(key)
423    }
424
425    /// Insert a key to a new value, returning the last value if it was set.
426    pub fn insert(&mut self, key: &[u8], value: &[u8]) -> Result<Option<IVec>, sled::Error> {
427        // Insert the value into the cache. We then optionally add the previous value
428        // into `prev`.
429        let mut prev: Option<IVec> = self.state.cache.insert(key.into(), value.into());
430
431        // In case this key was previously removed from the cache, we have to
432        // delete it from the `removed` set.
433        if self.state.removed.contains::<IVec>(&key.into()) {
434            self.state.removed.remove(key);
435            // And in that case, a previous value isn't supposed to exist
436            return Ok(None);
437        }
438
439        // If cache didn't contain this key previously, and it wasn't removed
440        // either, then check if it's in the main tree.
441        if prev.is_none() {
442            prev = self.tree.get::<IVec>(key.into())?;
443        }
444
445        Ok(prev)
446    }
447
448    /// Delete a value, if it exists, returning the old value.
449    pub fn remove(&mut self, key: &[u8]) -> Result<Option<IVec>, sled::Error> {
450        // If it was previously removed, we can just return None
451        if self.state.removed.contains::<IVec>(&key.into()) {
452            return Ok(None);
453        }
454
455        // Attempt to remove from cache, and if it wasn't in the cache before,
456        // we have to get the previous value from the sled tree:
457        let mut prev: Option<IVec> = self.state.cache.remove::<IVec>(&key.into());
458        if prev.is_none() {
459            prev = self.tree.get(key)?;
460        }
461
462        // Previous value must existed
463        if prev.is_none() {
464            return Err(sled::Error::CollectionNotFound(key.into()));
465        }
466
467        // Mark the key as removed
468        self.state.removed.insert(key.into());
469
470        Ok(prev)
471    }
472
473    /// Removes all values from the cache and marks all tree records as
474    /// removed.
475    pub fn clear(&mut self) -> Result<(), sled::Error> {
476        // Clear state
477        self.state.cache = BTreeMap::new();
478
479        // Mark all the db's keys as removed
480        self.state.removed = self
481            .tree
482            .iter()
483            .keys()
484            .collect::<Result<BTreeSet<IVec>, sled::Error>>()?;
485
486        Ok(())
487    }
488
489    /// Aggregate all the current overlay changes into a [`sled::Batch`] ready for
490    /// further operation. If there are no changes, return `None`.
491    pub fn aggregate(&self) -> Option<sled::Batch> {
492        self.state.aggregate()
493    }
494
495    /// Checkpoint current cache state so we can revert to it, if needed.
496    pub fn checkpoint(&mut self) {
497        self.checkpoint = self.state.clone();
498    }
499
500    /// Revert to current cache state checkpoint.
501    pub fn revert_to_checkpoint(&mut self) {
502        self.state = self.checkpoint.clone();
503    }
504
505    /// Calculate differences from provided overlay state changes
506    /// sequence. This can be used when we want to keep track of
507    /// consecutive individual changes performed over the current
508    /// overlay state. If the sequence is empty, current state
509    /// is returned as the diff.
510    pub fn diff(
511        &self,
512        sequence: &[SledTreeOverlayStateDiff],
513    ) -> Result<SledTreeOverlayStateDiff, sled::Error> {
514        // Grab current state
515        let mut current = SledTreeOverlayStateDiff::new(&self.tree, &self.state)?;
516
517        // Remove provided diffs sequence
518        for diff in sequence {
519            current.remove_diff(diff);
520        }
521
522        Ok(current)
523    }
524
525    /// Add provided tree overlay state changes from our own.
526    pub fn add_diff(&mut self, diff: &SledTreeOverlayStateDiff) {
527        self.state.add_diff(diff)
528    }
529
530    /// Remove provided tree overlay state changes from our own.
531    pub fn remove_diff(&mut self, diff: &SledTreeOverlayStateDiff) {
532        self.state.remove_diff(diff)
533    }
534
535    /// Immutably iterate through the tree overlay.
536    pub fn iter(&self) -> SledTreeOverlayIter<'_> {
537        SledTreeOverlayIter::new(self)
538    }
539}
540
541/// Immutable iterator of a [`SledTreeOverlay`].
542pub struct SledTreeOverlayIter<'a> {
543    // Reference to the tree overlay.
544    overlay: &'a SledTreeOverlay,
545    // Iterator over [`sled::Tree`] keys that is being overlayed.
546    tree_iter: Peekable<Iter>,
547    // Iterator over the overlay's chache keys.
548    cache_iter: Peekable<Keys<'a, IVec, IVec>>,
549}
550
551impl<'a> SledTreeOverlayIter<'a> {
552    fn new(overlay: &'a SledTreeOverlay) -> Self {
553        Self {
554            overlay,
555            tree_iter: overlay.tree.iter().peekable(),
556            cache_iter: overlay.state.cache.keys().peekable(),
557        }
558    }
559}
560
561impl Iterator for SledTreeOverlayIter<'_> {
562    type Item = Result<(IVec, IVec), sled::Error>;
563
564    fn next(&mut self) -> Option<Self::Item> {
565        // First we peek over the next tree key
566        let peek1 = self.tree_iter.peek();
567
568        // Check if a sled error occured
569        if let Some(Err(e)) = peek1 {
570            return Some(Err(e.clone()));
571        }
572
573        // Peek over the next cache key
574        let peek2 = self.cache_iter.peek();
575
576        // Find the next key we have to grab,
577        // and which iterator to advance.
578        let mut advance_iter: u8 = 0;
579        let next_key = match (peek1, peek2) {
580            (Some(k1), Some(&k2)) => {
581                // Its safe to unwrap here since we already checked for errors
582                let (k1, _) = k1.as_ref().unwrap();
583                match k1.cmp(k2) {
584                    Ordering::Equal => Some(k1.clone()),
585                    Ordering::Greater => {
586                        advance_iter = 2;
587                        Some(k2.clone())
588                    }
589                    Ordering::Less => {
590                        advance_iter = 1;
591                        Some(k1.clone())
592                    }
593                }
594            }
595            (Some(k1), None) => {
596                // Its safe to unwrap here since we already checked for errors
597                let (k1, _) = k1.as_ref().unwrap();
598                advance_iter = 1;
599                Some(k1.clone())
600            }
601            (None, Some(&k2)) => {
602                advance_iter = 2;
603                Some(k2.clone())
604            }
605            (None, None) => None,
606        };
607
608        // Check if we have a key to grab
609        let next_key = next_key?;
610
611        // Advance the corresponding iterator
612        match advance_iter {
613            1 => {
614                self.tree_iter.next();
615            }
616            2 => {
617                self.cache_iter.next();
618            }
619            _ => {
620                self.tree_iter.next();
621                self.cache_iter.next();
622            }
623        }
624
625        // Grab the next key value from the overlay
626        match self.overlay.get(&next_key) {
627            Ok(next_value) => match next_value {
628                Some(next_value) => Some(Ok((next_key, next_value))),
629                // If the value doesn't exist, it means it's in the
630                // removed set, so we advance the iterator.
631                None => self.next(),
632            },
633            Err(e) => Some(Err(e)),
634        }
635    }
636}
637
638impl FusedIterator for SledTreeOverlayIter<'_> {}
639
640/// Define fusion iteration behavior, allowing
641/// us to use the [`SledTreeOverlayIter`] iterator in
642/// loops directly, without using .iter() method
643/// of [`SledTreeOverlay`].
644impl<'a> IntoIterator for &'a SledTreeOverlay {
645    type Item = Result<(IVec, IVec), sled::Error>;
646
647    type IntoIter = SledTreeOverlayIter<'a>;
648
649    fn into_iter(self) -> Self::IntoIter {
650        self.iter()
651    }
652}