rusty_leveldb/
skipmap.rs

1use crate::cmp::{Cmp, MemtableKeyCmp};
2use crate::rand::rngs::StdRng;
3use crate::rand::{RngCore, SeedableRng};
4use crate::types::LdbIterator;
5
6use bytes::Bytes;
7use std::cell::RefCell;
8use std::cmp::Ordering;
9use std::mem::size_of;
10use std::rc::Rc;
11
12const MAX_HEIGHT: usize = 12;
13const BRANCHING_FACTOR: u32 = 4;
14
15/// A node in a skipmap contains links to the next node and others that are further away (skips);
16/// `skips[0]` is the immediate element after, that is, the element contained in `next`.
17struct Node {
18    skips: Vec<Option<*mut Node>>,
19    next: Option<Box<Node>>,
20    key: Bytes,
21    value: Bytes,
22}
23
24/// Implements the backing store for a `MemTable`. The important methods are `insert()` and
25/// `contains()`; in order to get full key and value for an entry, use a `SkipMapIter` instance,
26/// `seek()` to the key to look up (this is as fast as any lookup in a skip map), and then call
27/// `current()`.
28struct InnerSkipMap {
29    head: Box<Node>,
30    rand: StdRng,
31    len: usize,
32    // approximation of memory used.
33    approx_mem: usize,
34    cmp: Rc<Box<dyn Cmp>>,
35}
36
37impl Drop for InnerSkipMap {
38    // Avoid possible stack overflow caused by dropping many nodes.
39    fn drop(&mut self) {
40        let mut next_node = self.head.next.take();
41        while let Some(mut boxed_node) = next_node {
42            next_node = boxed_node.next.take();
43        }
44    }
45}
46
47pub struct SkipMap {
48    map: Rc<RefCell<InnerSkipMap>>,
49}
50
51impl SkipMap {
52    /// Returns a SkipMap that wraps the comparator inside a MemtableKeyCmp.
53    pub fn new_memtable_map(cmp: Rc<Box<dyn Cmp>>) -> SkipMap {
54        SkipMap::new(Rc::new(Box::new(MemtableKeyCmp(cmp))))
55    }
56
57    /// Returns a SkipMap that uses the specified comparator.
58    pub fn new(cmp: Rc<Box<dyn Cmp>>) -> SkipMap {
59        let mut s = Vec::new();
60        s.resize(MAX_HEIGHT, None);
61
62        SkipMap {
63            map: Rc::new(RefCell::new(InnerSkipMap {
64                head: Box::new(Node {
65                    skips: s,
66                    next: None,
67                    key: Bytes::new(),
68                    value: Bytes::new(),
69                }),
70                rand: StdRng::seed_from_u64(0xdeadbeef),
71                len: 0,
72                approx_mem: size_of::<Self>() + MAX_HEIGHT * size_of::<Option<*mut Node>>(),
73                cmp,
74            })),
75        }
76    }
77
78    pub fn len(&self) -> usize {
79        self.map.borrow().len
80    }
81
82    #[must_use]
83    pub fn is_empty(&self) -> bool {
84        self.len() == 0
85    }
86
87    pub fn approx_memory(&self) -> usize {
88        self.map.borrow().approx_mem
89    }
90
91    pub fn contains(&self, key: &[u8]) -> bool {
92        self.map.borrow().contains(key)
93    }
94
95    /// inserts a key into the table. key may not be empty.
96    pub fn insert(&mut self, key: Vec<u8>, val: Vec<u8>) {
97        assert!(!key.is_empty());
98        self.map.borrow_mut().insert(key, val);
99    }
100
101    pub fn iter(&self) -> SkipMapIter {
102        SkipMapIter {
103            map: self.map.clone(),
104            current: self.map.borrow().head.as_ref() as *const Node,
105        }
106    }
107}
108
109impl InnerSkipMap {
110    fn random_height(&mut self) -> usize {
111        let mut height = 1;
112
113        while height < MAX_HEIGHT && self.rand.next_u32().is_multiple_of(BRANCHING_FACTOR) {
114            height += 1;
115        }
116
117        height
118    }
119
120    fn contains(&self, key: &[u8]) -> bool {
121        if let Some(n) = self.get_greater_or_equal(key) {
122            n.key.starts_with(key)
123        } else {
124            false
125        }
126    }
127
128    /// Returns the node with key or the next greater one
129    /// Returns None if the given key lies past the greatest key in the table.
130    fn get_greater_or_equal<'a>(&'a self, key: &[u8]) -> Option<&'a Node> {
131        // Start at the highest skip link of the head node, and work down from there
132        let mut current = self.head.as_ref() as *const Node;
133        let mut level = self.head.skips.len() - 1;
134
135        loop {
136            let current_node = unsafe { &*current };
137            if let Some(next) = current_node.skips[level] {
138                let next = unsafe { &*next };
139                let ord = self.cmp.cmp(next.key.iter().as_slice(), key);
140
141                match ord {
142                    Ordering::Less => {
143                        current = next;
144                        continue;
145                    }
146                    Ordering::Equal => return Some(next),
147                    Ordering::Greater => {
148                        if level == 0 {
149                            return Some(next);
150                        }
151                    }
152                }
153            }
154            if level == 0 {
155                break;
156            }
157            level -= 1;
158        }
159
160        unsafe {
161            if (current.is_null() || current == self.head.as_ref())
162                || (self.cmp.cmp(&(*current).key, key) == Ordering::Less)
163            {
164                None
165            } else {
166                Some(&(*current))
167            }
168        }
169    }
170
171    /// Finds the node immediately before the node with key.
172    /// Returns None if no smaller key was found.
173    fn get_next_smaller<'a>(&'a self, key: &[u8]) -> Option<&'a Node> {
174        // Start at the highest skip link of the head node, and work down from there
175        let mut current = self.head.as_ref() as *const Node;
176        let mut level = self.head.skips.len() - 1;
177
178        loop {
179            let current_node = unsafe { &*current };
180            if let Some(next) = current_node.skips[level] {
181                let next = unsafe { &*next };
182                let ord = self.cmp.cmp(next.key.iter().as_slice(), key);
183
184                if let Ordering::Less = ord {
185                    current = next;
186                    continue;
187                }
188            }
189
190            if level == 0 {
191                break;
192            }
193            level -= 1;
194        }
195
196        unsafe {
197            if current.is_null() || current == self.head.as_ref() {
198                // If we're past the end for some reason or at the head
199                None
200            } else if self.cmp.cmp(&(*current).key, key) != Ordering::Less {
201                None
202            } else {
203                Some(&(*current))
204            }
205        }
206    }
207
208    fn insert(&mut self, key: Vec<u8>, val: Vec<u8>) {
209        assert!(!key.is_empty());
210
211        // Keeping track of skip entries that will need to be updated
212        let mut prevs: [Option<*mut Node>; MAX_HEIGHT] = [None; MAX_HEIGHT];
213        let new_height = self.random_height();
214        let prevs = &mut prevs[0..new_height];
215
216        let mut level = MAX_HEIGHT - 1;
217        let mut current = self.head.as_mut() as *mut Node;
218        // Set previous node for all levels to current node.
219        (0..prevs.len()).for_each(|i| {
220            prevs[i] = Some(current);
221        });
222
223        // Find the node after which we want to insert the new node; this is the node with the key
224        // immediately smaller than the key to be inserted.
225        loop {
226            let current_node = unsafe { &*current };
227            if let Some(next) = current_node.skips[level] {
228                let next = unsafe { &mut *next };
229                // If the wanted position is after the current node
230                let ord = self.cmp.cmp(next.key.iter().as_slice(), &key);
231
232                assert!(ord != Ordering::Equal, "No duplicates allowed");
233
234                if ord == Ordering::Less {
235                    current = next;
236                    continue;
237                }
238            }
239
240            if level < new_height {
241                prevs[level] = Some(current);
242            }
243
244            if level == 0 {
245                break;
246            } else {
247                level -= 1;
248            }
249        }
250
251        // Construct new node
252        let mut new_skips = Vec::new();
253        new_skips.resize(new_height, None);
254        let mut new = Box::new(Node {
255            skips: new_skips,
256            next: None,
257            key: key.into(),
258            value: val.into(),
259        });
260        let newp = new.as_mut() as *mut Node;
261
262        (0..new_height).for_each(|i| {
263            if let Some(prev) = prevs[i] {
264                let prev = unsafe { &mut *prev };
265                new.skips[i] = prev.skips[i];
266                prev.skips[i] = Some(newp);
267            }
268        });
269
270        let added_mem = size_of::<Node>()
271            + size_of::<Option<*mut Node>>() * new.skips.len()
272            + new.key.len()
273            + new.value.len();
274        self.approx_mem += added_mem;
275        self.len += 1;
276
277        // Insert new node by first replacing the previous element's next field with None and
278        // assigning its value to new.next...
279        new.next = unsafe { (*current).next.take() };
280        // ...and then setting the previous element's next field to the new node
281        unsafe {
282            let _ = (*current).next.replace(new);
283        };
284    }
285    /// Runs through the skipmap and prints everything including addresses
286    fn dbg_print(&self) {
287        let mut current = self.head.as_ref() as *const Node;
288        loop {
289            let current_node = unsafe { &*current };
290            eprintln!(
291                "{:?} {:?}/{:?} - {:?}",
292                current, current_node.key, current_node.value, current_node.skips
293            );
294            if let Some(next) = current_node.skips[0] {
295                current = next;
296            } else {
297                break;
298            }
299        }
300    }
301}
302
303pub struct SkipMapIter {
304    map: Rc<RefCell<InnerSkipMap>>,
305    current: *const Node,
306}
307
308impl LdbIterator for SkipMapIter {
309    fn advance(&mut self) -> bool {
310        // we first go to the next element, then return that -- in order to skip the head node
311        let r = unsafe {
312            (*self.current)
313                .next
314                .as_ref()
315                .map(|next| {
316                    self.current = next.as_ref() as *const Node;
317                    true
318                })
319                .unwrap_or(false)
320        };
321        if !r {
322            self.reset();
323        }
324        r
325    }
326    fn reset(&mut self) {
327        self.current = self.map.borrow().head.as_ref();
328    }
329    fn seek(&mut self, key: &[u8]) {
330        if let Some(node) = self.map.borrow().get_greater_or_equal(key) {
331            self.current = node as *const Node;
332            return;
333        }
334        self.reset();
335    }
336    fn valid(&self) -> bool {
337        self.current != self.map.borrow().head.as_ref()
338    }
339    fn current(&self) -> Option<(Bytes, Bytes)> {
340        if self.valid() {
341            unsafe {
342                Some((
343                    Bytes::copy_from_slice(&(*self.current).key),
344                    Bytes::copy_from_slice(&(*self.current).value),
345                ))
346            }
347        } else {
348            None
349        }
350    }
351    fn prev(&mut self) -> bool {
352        // Going after the original implementation here; we just seek to the node before current().
353        if self.valid() {
354            if let Some(prev) = self
355                .map
356                .borrow()
357                .get_next_smaller(unsafe { &(*self.current).key })
358            {
359                self.current = prev as *const Node;
360                if !prev.key.is_empty() {
361                    return true;
362                }
363            }
364        }
365        self.reset();
366        false
367    }
368}
369
370#[cfg(test)]
371pub mod tests {
372    use super::*;
373    use crate::cmp::MemtableKeyCmp;
374    use crate::options;
375    use crate::test_util::{test_iterator_properties, LdbIteratorIter};
376    use crate::types::current_key_val;
377
378    pub fn make_skipmap() -> SkipMap {
379        let mut skm = SkipMap::new(options::for_test().cmp);
380        let keys = vec![
381            "aba", "abb", "abc", "abd", "abe", "abf", "abg", "abh", "abi", "abj", "abk", "abl",
382            "abm", "abn", "abo", "abp", "abq", "abr", "abs", "abt", "abu", "abv", "abw", "abx",
383            "aby", "abz",
384        ];
385
386        for k in keys {
387            skm.insert(k.as_bytes().to_vec(), "def".as_bytes().to_vec());
388        }
389        skm
390    }
391
392    #[test]
393    fn test_insert() {
394        let skm = make_skipmap();
395        assert_eq!(skm.len(), 26);
396        skm.map.borrow().dbg_print();
397    }
398
399    #[test]
400    #[should_panic]
401    fn test_no_dupes() {
402        let mut skm = make_skipmap();
403        // this should panic
404        skm.insert("abc".as_bytes().to_vec(), "def".as_bytes().to_vec());
405        skm.insert("abf".as_bytes().to_vec(), "def".as_bytes().to_vec());
406    }
407
408    #[test]
409    fn test_contains() {
410        let skm = make_skipmap();
411        assert!(skm.contains(b"aby"));
412        assert!(skm.contains(b"abc"));
413        assert!(skm.contains(b"abz"));
414        assert!(!skm.contains(b"ab{"));
415        assert!(!skm.contains(b"123"));
416        assert!(!skm.contains(b"aaa"));
417        assert!(!skm.contains(b"456"));
418    }
419
420    #[test]
421    fn test_find() {
422        let skm = make_skipmap();
423        assert_eq!(
424            &*skm.map.borrow().get_greater_or_equal(b"abf").unwrap().key,
425            b"abf"
426        );
427        assert!(skm.map.borrow().get_greater_or_equal(b"ab{").is_none());
428        assert_eq!(
429            &*skm.map.borrow().get_greater_or_equal(b"aaa").unwrap().key,
430            b"aba"
431        );
432        assert_eq!(
433            &*skm.map.borrow().get_greater_or_equal(b"ab").unwrap().key,
434            b"aba"
435        );
436        assert_eq!(
437            &*skm.map.borrow().get_greater_or_equal(b"abc").unwrap().key,
438            b"abc"
439        );
440        assert!(skm.map.borrow().get_next_smaller(b"ab0").is_none());
441        assert_eq!(
442            &*skm.map.borrow().get_next_smaller(b"abd").unwrap().key,
443            b"abc"
444        );
445        assert_eq!(
446            &*skm.map.borrow().get_next_smaller(b"ab{").unwrap().key,
447            b"abz"
448        );
449    }
450
451    #[test]
452    fn test_empty_skipmap_find_memtable_cmp() {
453        // Regression test: Make sure comparator isn't called with empty key.
454        let cmp: Rc<Box<dyn Cmp>> = Rc::new(Box::new(MemtableKeyCmp(options::for_test().cmp)));
455        let skm = SkipMap::new(cmp);
456
457        let mut it = skm.iter();
458        it.seek(b"abc");
459        assert!(!it.valid());
460    }
461
462    #[test]
463    fn test_skipmap_iterator_0() {
464        let skm = SkipMap::new(options::for_test().cmp);
465        let mut i = 0;
466
467        for (_, _) in LdbIteratorIter::wrap(&mut skm.iter()) {
468            i += 1;
469        }
470
471        assert_eq!(i, 0);
472        assert!(!skm.iter().valid());
473    }
474
475    #[test]
476    fn test_skipmap_iterator_init() {
477        let skm = make_skipmap();
478        let mut iter = skm.iter();
479
480        assert!(!iter.valid());
481        iter.next();
482        assert!(iter.valid());
483        iter.reset();
484        assert!(!iter.valid());
485
486        iter.next();
487        assert!(iter.valid());
488        iter.prev();
489        assert!(!iter.valid());
490    }
491
492    #[test]
493    fn test_skipmap_iterator() {
494        let skm = make_skipmap();
495        let mut i = 0;
496
497        for (k, v) in LdbIteratorIter::wrap(&mut skm.iter()) {
498            assert!(!k.is_empty());
499            assert!(!v.is_empty());
500            i += 1;
501        }
502        assert_eq!(i, 26);
503    }
504
505    #[test]
506    fn test_skipmap_iterator_seek_valid() {
507        let skm = make_skipmap();
508        let mut iter = skm.iter();
509
510        iter.next();
511        assert!(iter.valid());
512        assert_eq!(current_key_val(&iter).unwrap().0, "aba".as_bytes().to_vec());
513        iter.seek(b"abz");
514        assert_eq!(
515            current_key_val(&iter).unwrap(),
516            ("abz".as_bytes().to_vec(), "def".as_bytes().to_vec())
517        );
518        // go back to beginning
519        iter.seek(b"aba");
520        assert_eq!(
521            current_key_val(&iter).unwrap(),
522            (b"aba".to_vec(), b"def".to_vec())
523        );
524
525        iter.seek(b"");
526        assert!(iter.valid());
527        iter.prev();
528        assert!(!iter.valid());
529
530        while iter.advance() {}
531        assert!(!iter.valid());
532        assert!(!iter.prev());
533        assert_eq!(current_key_val(&iter), None);
534    }
535
536    #[test]
537    fn test_skipmap_behavior() {
538        let mut skm = SkipMap::new(options::for_test().cmp);
539        let keys = vec!["aba", "abb", "abc", "abd"];
540        for k in keys {
541            skm.insert(k.as_bytes().to_vec(), "def".as_bytes().to_vec());
542        }
543        test_iterator_properties(skm.iter());
544    }
545
546    #[test]
547    fn test_skipmap_iterator_prev() {
548        let skm = make_skipmap();
549        let mut iter = skm.iter();
550
551        iter.next();
552        assert!(iter.valid());
553        iter.prev();
554        assert!(!iter.valid());
555        iter.seek(b"abc");
556        iter.prev();
557        assert_eq!(
558            current_key_val(&iter).unwrap(),
559            ("abb".as_bytes().to_vec(), "def".as_bytes().to_vec())
560        );
561    }
562
563    #[test]
564    fn test_skipmap_iterator_concurrent_insert() {
565        time_test!();
566        // Asserts that the map can be mutated while an iterator exists; this is intentional.
567        let mut skm = make_skipmap();
568        let mut iter = skm.iter();
569
570        assert!(iter.advance());
571        skm.insert("abccc".as_bytes().to_vec(), "defff".as_bytes().to_vec());
572        // Assert that value inserted after obtaining iterator is present.
573        for (k, _) in LdbIteratorIter::wrap(&mut iter) {
574            if k == "abccc".as_bytes() {
575                return;
576            }
577        }
578        panic!("abccc not found in map.");
579    }
580}