rusty_leveldb_arc/
skipmap.rs

1use crate::cmp::{Cmp, MemtableKeyCmp};
2use crate::rand::rngs::StdRng;
3use crate::rand::{RngCore, SeedableRng};
4use crate::types::LdbIterator;
5
6use std::cell::RefCell;
7use std::cmp::Ordering;
8use std::mem::{replace, size_of};
9use std::sync::Arc;
10
11const MAX_HEIGHT: usize = 12;
12const BRANCHING_FACTOR: u32 = 4;
13
14/// A node in a skipmap contains links to the next node and others that are further away (skips);
15/// `skips[0]` is the immediate element after, that is, the element contained in `next`.
16struct Node {
17    skips: Vec<Option<*mut Node>>,
18    next: Option<Box<Node>>,
19    key: Vec<u8>,
20    value: Vec<u8>,
21}
22
23/// Implements the backing store for a `MemTable`. The important methods are `insert()` and
24/// `contains()`; in order to get full key and value for an entry, use a `SkipMapIter` instance,
25/// `seek()` to the key to look up (this is as fast as any lookup in a skip map), and then call
26/// `current()`.
27struct InnerSkipMap {
28    head: Box<Node>,
29    rand: StdRng,
30    len: usize,
31    // approximation of memory used.
32    approx_mem: usize,
33    cmp: Arc<Box<dyn Cmp>>,
34}
35
36impl Drop for InnerSkipMap {
37    // Avoid possible stack overflow caused by dropping many nodes.
38    fn drop(&mut self) {
39        let mut next_node = self.head.next.take();
40        while let Some(mut boxed_node) = next_node {
41            next_node = boxed_node.next.take();
42        }
43    }
44}
45
46pub struct SkipMap {
47    map: Arc<RefCell<InnerSkipMap>>,
48}
49
50impl SkipMap {
51    /// Returns a SkipMap that wraps the comparator inside a MemtableKeyCmp.
52    pub fn new_memtable_map(cmp: Arc<Box<dyn Cmp>>) -> SkipMap {
53        SkipMap::new(Arc::new(Box::new(MemtableKeyCmp(cmp))))
54    }
55
56    /// Returns a SkipMap that uses the specified comparator.
57    pub fn new(cmp: Arc<Box<dyn Cmp>>) -> SkipMap {
58        let mut s = Vec::new();
59        s.resize(MAX_HEIGHT, None);
60
61        SkipMap {
62            map: Arc::new(RefCell::new(InnerSkipMap {
63                head: Box::new(Node {
64                    skips: s,
65                    next: None,
66                    key: Vec::new(),
67                    value: Vec::new(),
68                }),
69                rand: StdRng::seed_from_u64(0xdeadbeef),
70                len: 0,
71                approx_mem: size_of::<Self>() + MAX_HEIGHT * size_of::<Option<*mut Node>>(),
72                cmp,
73            })),
74        }
75    }
76
77    pub fn len(&self) -> usize {
78        self.map.borrow().len
79    }
80    pub fn approx_memory(&self) -> usize {
81        self.map.borrow().approx_mem
82    }
83    pub fn contains(&self, key: &[u8]) -> bool {
84        self.map.borrow().contains(key)
85    }
86
87    /// inserts a key into the table. key may not be empty.
88    pub fn insert(&mut self, key: Vec<u8>, val: Vec<u8>) {
89        assert!(!key.is_empty());
90        self.map.borrow_mut().insert(key, val);
91    }
92
93    pub fn iter(&self) -> SkipMapIter {
94        SkipMapIter {
95            map: self.map.clone(),
96            current: self.map.borrow().head.as_ref() as *const Node,
97        }
98    }
99}
100
101impl InnerSkipMap {
102    fn random_height(&mut self) -> usize {
103        let mut height = 1;
104
105        while height < MAX_HEIGHT && self.rand.next_u32() % BRANCHING_FACTOR == 0 {
106            height += 1;
107        }
108
109        height
110    }
111
112    fn contains(&self, key: &[u8]) -> bool {
113        if let Some(n) = self.get_greater_or_equal(key) {
114            n.key.starts_with(&key)
115        } else {
116            false
117        }
118    }
119
120    /// Returns the node with key or the next greater one
121    /// Returns None if the given key lies past the greatest key in the table.
122    fn get_greater_or_equal<'a>(&'a self, key: &[u8]) -> Option<&'a Node> {
123        // Start at the highest skip link of the head node, and work down from there
124        let mut current = self.head.as_ref() as *const Node;
125        let mut level = self.head.skips.len() - 1;
126
127        loop {
128            unsafe {
129                if let Some(next) = (*current).skips[level] {
130                    let ord = self.cmp.cmp((*next).key.as_slice(), key);
131
132                    match ord {
133                        Ordering::Less => {
134                            current = next;
135                            continue;
136                        }
137                        Ordering::Equal => return Some(&(*next)),
138                        Ordering::Greater => {
139                            if level == 0 {
140                                return Some(&(*next));
141                            }
142                        }
143                    }
144                }
145            }
146            if level == 0 {
147                break;
148            }
149            level -= 1;
150        }
151
152        unsafe {
153            if current.is_null() || current == self.head.as_ref() {
154                None
155            } else if self.cmp.cmp(&(*current).key, key) == Ordering::Less {
156                None
157            } else {
158                Some(&(*current))
159            }
160        }
161    }
162
163    /// Finds the node immediately before the node with key.
164    /// Returns None if no smaller key was found.
165    fn get_next_smaller<'a>(&'a self, key: &[u8]) -> Option<&'a Node> {
166        // Start at the highest skip link of the head node, and work down from there
167        let mut current = self.head.as_ref() as *const Node;
168        let mut level = self.head.skips.len() - 1;
169
170        loop {
171            unsafe {
172                if let Some(next) = (*current).skips[level] {
173                    let ord = self.cmp.cmp((*next).key.as_slice(), key);
174
175                    match ord {
176                        Ordering::Less => {
177                            current = next;
178                            continue;
179                        }
180                        _ => (),
181                    }
182                }
183            }
184            if level == 0 {
185                break;
186            }
187            level -= 1;
188        }
189
190        unsafe {
191            if current.is_null() || current == self.head.as_ref() {
192                // If we're past the end for some reason or at the head
193                None
194            } else if self.cmp.cmp(&(*current).key, key) != Ordering::Less {
195                None
196            } else {
197                Some(&(*current))
198            }
199        }
200    }
201
202    fn insert(&mut self, key: Vec<u8>, val: Vec<u8>) {
203        assert!(!key.is_empty());
204
205        // Keeping track of skip entries that will need to be updated
206        let mut prevs: [Option<*mut Node>; MAX_HEIGHT] = [None; MAX_HEIGHT];
207        let new_height = self.random_height();
208        let prevs = &mut prevs[0..new_height];
209
210        let mut level = MAX_HEIGHT - 1;
211        let mut current = self.head.as_mut() as *mut Node;
212        // Set previous node for all levels to current node.
213        for i in 0..prevs.len() {
214            prevs[i] = Some(current);
215        }
216
217        // Find the node after which we want to insert the new node; this is the node with the key
218        // immediately smaller than the key to be inserted.
219        loop {
220            unsafe {
221                if let Some(next) = (*current).skips[level] {
222                    // If the wanted position is after the current node
223                    let ord = self.cmp.cmp(&(*next).key, &key);
224
225                    assert!(ord != Ordering::Equal, "No duplicates allowed");
226
227                    if ord == Ordering::Less {
228                        current = next;
229                        continue;
230                    }
231                }
232            }
233
234            if level < new_height {
235                prevs[level] = Some(current);
236            }
237
238            if level == 0 {
239                break;
240            } else {
241                level -= 1;
242            }
243        }
244
245        // Construct new node
246        let mut new_skips = Vec::new();
247        new_skips.resize(new_height, None);
248        let mut new = Box::new(Node {
249            skips: new_skips,
250            next: None,
251            key,
252            value: val,
253        });
254        let newp = new.as_mut() as *mut Node;
255
256        for i in 0..new_height {
257            if let Some(prev) = prevs[i] {
258                unsafe {
259                    new.skips[i] = (*prev).skips[i];
260                    (*prev).skips[i] = Some(newp);
261                }
262            }
263        }
264
265        let added_mem = size_of::<Node>()
266            + size_of::<Option<*mut Node>>() * new.skips.len()
267            + new.key.len()
268            + new.value.len();
269        self.approx_mem += added_mem;
270        self.len += 1;
271
272        // Insert new node by first replacing the previous element's next field with None and
273        // assigning its value to new.next...
274        new.next = unsafe { replace(&mut (*current).next, None) };
275        // ...and then setting the previous element's next field to the new node
276        unsafe { replace(&mut (*current).next, Some(new)) };
277    }
278    /// Runs through the skipmap and prints everything including addresses
279    fn dbg_print(&self) {
280        let mut current = self.head.as_ref() as *const Node;
281        loop {
282            unsafe {
283                eprintln!(
284                    "{:?} {:?}/{:?} - {:?}",
285                    current,
286                    (*current).key,
287                    (*current).value,
288                    (*current).skips
289                );
290                if let Some(next) = (*current).skips[0].clone() {
291                    current = next;
292                } else {
293                    break;
294                }
295            }
296        }
297    }
298}
299
300pub struct SkipMapIter {
301    map: Arc<RefCell<InnerSkipMap>>,
302    current: *const Node,
303}
304
305impl LdbIterator for SkipMapIter {
306    fn advance(&mut self) -> bool {
307        // we first go to the next element, then return that -- in order to skip the head node
308        let r = unsafe {
309            (*self.current)
310                .next
311                .as_ref()
312                .map(|next| {
313                    self.current = next.as_ref() as *const Node;
314                    true
315                })
316                .unwrap_or(false)
317        };
318        if !r {
319            self.reset();
320        }
321        r
322    }
323    fn reset(&mut self) {
324        self.current = self.map.borrow().head.as_ref();
325    }
326    fn seek(&mut self, key: &[u8]) {
327        if let Some(node) = self.map.borrow().get_greater_or_equal(key) {
328            self.current = node as *const Node;
329            return;
330        }
331        self.reset();
332    }
333    fn valid(&self) -> bool {
334        self.current != self.map.borrow().head.as_ref()
335    }
336    fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool {
337        if self.valid() {
338            key.clear();
339            val.clear();
340            unsafe {
341                key.extend_from_slice(&(*self.current).key);
342                val.extend_from_slice(&(*self.current).value);
343            }
344            true
345        } else {
346            false
347        }
348    }
349    fn prev(&mut self) -> bool {
350        // Going after the original implementation here; we just seek to the node before current().
351        if self.valid() {
352            if let Some(prev) = self
353                .map
354                .borrow()
355                .get_next_smaller(unsafe { &(*self.current).key })
356            {
357                self.current = prev as *const Node;
358                if !prev.key.is_empty() {
359                    return true;
360                }
361            }
362        }
363        self.reset();
364        false
365    }
366}
367
368#[cfg(test)]
369pub mod tests {
370    use super::*;
371    use crate::cmp::MemtableKeyCmp;
372    use crate::options;
373    use crate::test_util::{test_iterator_properties, LdbIteratorIter};
374    use crate::types::current_key_val;
375
376    pub fn make_skipmap() -> SkipMap {
377        let mut skm = SkipMap::new(options::for_test().cmp);
378        let keys = vec![
379            "aba", "abb", "abc", "abd", "abe", "abf", "abg", "abh", "abi", "abj", "abk", "abl",
380            "abm", "abn", "abo", "abp", "abq", "abr", "abs", "abt", "abu", "abv", "abw", "abx",
381            "aby", "abz",
382        ];
383
384        for k in keys {
385            skm.insert(k.as_bytes().to_vec(), "def".as_bytes().to_vec());
386        }
387        skm
388    }
389
390    #[test]
391    fn test_insert() {
392        let skm = make_skipmap();
393        assert_eq!(skm.len(), 26);
394        skm.map.borrow().dbg_print();
395    }
396
397    #[test]
398    #[should_panic]
399    fn test_no_dupes() {
400        let mut skm = make_skipmap();
401        // this should panic
402        skm.insert("abc".as_bytes().to_vec(), "def".as_bytes().to_vec());
403        skm.insert("abf".as_bytes().to_vec(), "def".as_bytes().to_vec());
404    }
405
406    #[test]
407    fn test_contains() {
408        let skm = make_skipmap();
409        assert!(skm.contains(&"aby".as_bytes().to_vec()));
410        assert!(skm.contains(&"abc".as_bytes().to_vec()));
411        assert!(skm.contains(&"abz".as_bytes().to_vec()));
412        assert!(!skm.contains(&"ab{".as_bytes().to_vec()));
413        assert!(!skm.contains(&"123".as_bytes().to_vec()));
414        assert!(!skm.contains(&"aaa".as_bytes().to_vec()));
415        assert!(!skm.contains(&"456".as_bytes().to_vec()));
416    }
417
418    #[test]
419    fn test_find() {
420        let skm = make_skipmap();
421        assert_eq!(
422            skm.map
423                .borrow()
424                .get_greater_or_equal(&"abf".as_bytes().to_vec())
425                .unwrap()
426                .key,
427            "abf".as_bytes().to_vec()
428        );
429        assert!(skm
430            .map
431            .borrow()
432            .get_greater_or_equal(&"ab{".as_bytes().to_vec())
433            .is_none());
434        assert_eq!(
435            skm.map
436                .borrow()
437                .get_greater_or_equal(&"aaa".as_bytes().to_vec())
438                .unwrap()
439                .key,
440            "aba".as_bytes().to_vec()
441        );
442        assert_eq!(
443            skm.map
444                .borrow()
445                .get_greater_or_equal(&"ab".as_bytes())
446                .unwrap()
447                .key
448                .as_slice(),
449            "aba".as_bytes()
450        );
451        assert_eq!(
452            skm.map
453                .borrow()
454                .get_greater_or_equal(&"abc".as_bytes())
455                .unwrap()
456                .key
457                .as_slice(),
458            "abc".as_bytes()
459        );
460        assert!(skm
461            .map
462            .borrow()
463            .get_next_smaller(&"ab0".as_bytes())
464            .is_none());
465        assert_eq!(
466            skm.map
467                .borrow()
468                .get_next_smaller(&"abd".as_bytes())
469                .unwrap()
470                .key
471                .as_slice(),
472            "abc".as_bytes()
473        );
474        assert_eq!(
475            skm.map
476                .borrow()
477                .get_next_smaller(&"ab{".as_bytes())
478                .unwrap()
479                .key
480                .as_slice(),
481            "abz".as_bytes()
482        );
483    }
484
485    #[test]
486    fn test_empty_skipmap_find_memtable_cmp() {
487        // Regression test: Make sure comparator isn't called with empty key.
488        let cmp: Arc<Box<dyn Cmp>> = Arc::new(Box::new(MemtableKeyCmp(options::for_test().cmp)));
489        let skm = SkipMap::new(cmp);
490
491        let mut it = skm.iter();
492        it.seek("abc".as_bytes());
493        assert!(!it.valid());
494    }
495
496    #[test]
497    fn test_skipmap_iterator_0() {
498        let skm = SkipMap::new(options::for_test().cmp);
499        let mut i = 0;
500
501        for (_, _) in LdbIteratorIter::wrap(&mut skm.iter()) {
502            i += 1;
503        }
504
505        assert_eq!(i, 0);
506        assert!(!skm.iter().valid());
507    }
508
509    #[test]
510    fn test_skipmap_iterator_init() {
511        let skm = make_skipmap();
512        let mut iter = skm.iter();
513
514        assert!(!iter.valid());
515        iter.next();
516        assert!(iter.valid());
517        iter.reset();
518        assert!(!iter.valid());
519
520        iter.next();
521        assert!(iter.valid());
522        iter.prev();
523        assert!(!iter.valid());
524    }
525
526    #[test]
527    fn test_skipmap_iterator() {
528        let skm = make_skipmap();
529        let mut i = 0;
530
531        for (k, v) in LdbIteratorIter::wrap(&mut skm.iter()) {
532            assert!(!k.is_empty());
533            assert!(!v.is_empty());
534            i += 1;
535        }
536        assert_eq!(i, 26);
537    }
538
539    #[test]
540    fn test_skipmap_iterator_seek_valid() {
541        let skm = make_skipmap();
542        let mut iter = skm.iter();
543
544        iter.next();
545        assert!(iter.valid());
546        assert_eq!(current_key_val(&iter).unwrap().0, "aba".as_bytes().to_vec());
547        iter.seek(&"abz".as_bytes().to_vec());
548        assert_eq!(
549            current_key_val(&iter).unwrap(),
550            ("abz".as_bytes().to_vec(), "def".as_bytes().to_vec())
551        );
552        // go back to beginning
553        iter.seek(&"aba".as_bytes().to_vec());
554        assert_eq!(
555            current_key_val(&iter).unwrap(),
556            ("aba".as_bytes().to_vec(), "def".as_bytes().to_vec())
557        );
558
559        iter.seek(&"".as_bytes().to_vec());
560        assert!(iter.valid());
561        iter.prev();
562        assert!(!iter.valid());
563
564        while iter.advance() {}
565        assert!(!iter.valid());
566        assert!(!iter.prev());
567        assert_eq!(current_key_val(&iter), None);
568    }
569
570    #[test]
571    fn test_skipmap_behavior() {
572        let mut skm = SkipMap::new(options::for_test().cmp);
573        let keys = vec!["aba", "abb", "abc", "abd"];
574        for k in keys {
575            skm.insert(k.as_bytes().to_vec(), "def".as_bytes().to_vec());
576        }
577        test_iterator_properties(skm.iter());
578    }
579
580    #[test]
581    fn test_skipmap_iterator_prev() {
582        let skm = make_skipmap();
583        let mut iter = skm.iter();
584
585        iter.next();
586        assert!(iter.valid());
587        iter.prev();
588        assert!(!iter.valid());
589        iter.seek(&"abc".as_bytes());
590        iter.prev();
591        assert_eq!(
592            current_key_val(&iter).unwrap(),
593            ("abb".as_bytes().to_vec(), "def".as_bytes().to_vec())
594        );
595    }
596
597    #[test]
598    fn test_skipmap_iterator_concurrent_insert() {
599        time_test!();
600        // Asserts that the map can be mutated while an iterator exists; this is intentional.
601        let mut skm = make_skipmap();
602        let mut iter = skm.iter();
603
604        assert!(iter.advance());
605        skm.insert("abccc".as_bytes().to_vec(), "defff".as_bytes().to_vec());
606        // Assert that value inserted after obtaining iterator is present.
607        for (k, _) in LdbIteratorIter::wrap(&mut iter) {
608            if k == "abccc".as_bytes() {
609                return;
610            }
611        }
612        panic!("abccc not found in map.");
613    }
614}