1use crate::cmp::{Cmp, MemtableKeyCmp};
2use crate::rand::rngs::StdRng;
3use crate::rand::{RngCore, SeedableRng};
4use crate::types::LdbIterator;
5
6use bytes::Bytes;
7use std::cell::RefCell;
8use std::cmp::Ordering;
9use std::mem::size_of;
10use std::rc::Rc;
11
12const MAX_HEIGHT: usize = 12;
13const BRANCHING_FACTOR: u32 = 4;
14
15struct Node {
18 skips: Vec<Option<*mut Node>>,
19 next: Option<Box<Node>>,
20 key: Bytes,
21 value: Bytes,
22}
23
24struct InnerSkipMap {
29 head: Box<Node>,
30 rand: StdRng,
31 len: usize,
32 approx_mem: usize,
34 cmp: Rc<Box<dyn Cmp>>,
35}
36
37impl Drop for InnerSkipMap {
38 fn drop(&mut self) {
40 let mut next_node = self.head.next.take();
41 while let Some(mut boxed_node) = next_node {
42 next_node = boxed_node.next.take();
43 }
44 }
45}
46
47pub struct SkipMap {
48 map: Rc<RefCell<InnerSkipMap>>,
49}
50
51impl SkipMap {
52 pub fn new_memtable_map(cmp: Rc<Box<dyn Cmp>>) -> SkipMap {
54 SkipMap::new(Rc::new(Box::new(MemtableKeyCmp(cmp))))
55 }
56
57 pub fn new(cmp: Rc<Box<dyn Cmp>>) -> SkipMap {
59 let mut s = Vec::new();
60 s.resize(MAX_HEIGHT, None);
61
62 SkipMap {
63 map: Rc::new(RefCell::new(InnerSkipMap {
64 head: Box::new(Node {
65 skips: s,
66 next: None,
67 key: Bytes::new(),
68 value: Bytes::new(),
69 }),
70 rand: StdRng::seed_from_u64(0xdeadbeef),
71 len: 0,
72 approx_mem: size_of::<Self>() + MAX_HEIGHT * size_of::<Option<*mut Node>>(),
73 cmp,
74 })),
75 }
76 }
77
78 pub fn len(&self) -> usize {
79 self.map.borrow().len
80 }
81
82 #[must_use]
83 pub fn is_empty(&self) -> bool {
84 self.len() == 0
85 }
86
87 pub fn approx_memory(&self) -> usize {
88 self.map.borrow().approx_mem
89 }
90
91 pub fn contains(&self, key: &[u8]) -> bool {
92 self.map.borrow().contains(key)
93 }
94
95 pub fn insert(&mut self, key: Vec<u8>, val: Vec<u8>) {
97 assert!(!key.is_empty());
98 self.map.borrow_mut().insert(key, val);
99 }
100
101 pub fn iter(&self) -> SkipMapIter {
102 SkipMapIter {
103 map: self.map.clone(),
104 current: self.map.borrow().head.as_ref() as *const Node,
105 }
106 }
107}
108
109impl InnerSkipMap {
110 fn random_height(&mut self) -> usize {
111 let mut height = 1;
112
113 while height < MAX_HEIGHT && self.rand.next_u32().is_multiple_of(BRANCHING_FACTOR) {
114 height += 1;
115 }
116
117 height
118 }
119
120 fn contains(&self, key: &[u8]) -> bool {
121 if let Some(n) = self.get_greater_or_equal(key) {
122 n.key.starts_with(key)
123 } else {
124 false
125 }
126 }
127
128 fn get_greater_or_equal<'a>(&'a self, key: &[u8]) -> Option<&'a Node> {
131 let mut current = self.head.as_ref() as *const Node;
133 let mut level = self.head.skips.len() - 1;
134
135 loop {
136 let current_node = unsafe { &*current };
137 if let Some(next) = current_node.skips[level] {
138 let next = unsafe { &*next };
139 let ord = self.cmp.cmp(next.key.iter().as_slice(), key);
140
141 match ord {
142 Ordering::Less => {
143 current = next;
144 continue;
145 }
146 Ordering::Equal => return Some(next),
147 Ordering::Greater => {
148 if level == 0 {
149 return Some(next);
150 }
151 }
152 }
153 }
154 if level == 0 {
155 break;
156 }
157 level -= 1;
158 }
159
160 unsafe {
161 if (current.is_null() || current == self.head.as_ref())
162 || (self.cmp.cmp(&(*current).key, key) == Ordering::Less)
163 {
164 None
165 } else {
166 Some(&(*current))
167 }
168 }
169 }
170
171 fn get_next_smaller<'a>(&'a self, key: &[u8]) -> Option<&'a Node> {
174 let mut current = self.head.as_ref() as *const Node;
176 let mut level = self.head.skips.len() - 1;
177
178 loop {
179 let current_node = unsafe { &*current };
180 if let Some(next) = current_node.skips[level] {
181 let next = unsafe { &*next };
182 let ord = self.cmp.cmp(next.key.iter().as_slice(), key);
183
184 if let Ordering::Less = ord {
185 current = next;
186 continue;
187 }
188 }
189
190 if level == 0 {
191 break;
192 }
193 level -= 1;
194 }
195
196 unsafe {
197 if current.is_null() || current == self.head.as_ref() {
198 None
200 } else if self.cmp.cmp(&(*current).key, key) != Ordering::Less {
201 None
202 } else {
203 Some(&(*current))
204 }
205 }
206 }
207
208 fn insert(&mut self, key: Vec<u8>, val: Vec<u8>) {
209 assert!(!key.is_empty());
210
211 let mut prevs: [Option<*mut Node>; MAX_HEIGHT] = [None; MAX_HEIGHT];
213 let new_height = self.random_height();
214 let prevs = &mut prevs[0..new_height];
215
216 let mut level = MAX_HEIGHT - 1;
217 let mut current = self.head.as_mut() as *mut Node;
218 (0..prevs.len()).for_each(|i| {
220 prevs[i] = Some(current);
221 });
222
223 loop {
226 let current_node = unsafe { &*current };
227 if let Some(next) = current_node.skips[level] {
228 let next = unsafe { &mut *next };
229 let ord = self.cmp.cmp(next.key.iter().as_slice(), &key);
231
232 assert!(ord != Ordering::Equal, "No duplicates allowed");
233
234 if ord == Ordering::Less {
235 current = next;
236 continue;
237 }
238 }
239
240 if level < new_height {
241 prevs[level] = Some(current);
242 }
243
244 if level == 0 {
245 break;
246 } else {
247 level -= 1;
248 }
249 }
250
251 let mut new_skips = Vec::new();
253 new_skips.resize(new_height, None);
254 let mut new = Box::new(Node {
255 skips: new_skips,
256 next: None,
257 key: key.into(),
258 value: val.into(),
259 });
260 let newp = new.as_mut() as *mut Node;
261
262 (0..new_height).for_each(|i| {
263 if let Some(prev) = prevs[i] {
264 let prev = unsafe { &mut *prev };
265 new.skips[i] = prev.skips[i];
266 prev.skips[i] = Some(newp);
267 }
268 });
269
270 let added_mem = size_of::<Node>()
271 + size_of::<Option<*mut Node>>() * new.skips.len()
272 + new.key.len()
273 + new.value.len();
274 self.approx_mem += added_mem;
275 self.len += 1;
276
277 new.next = unsafe { (*current).next.take() };
280 unsafe {
282 let _ = (*current).next.replace(new);
283 };
284 }
285 fn dbg_print(&self) {
287 let mut current = self.head.as_ref() as *const Node;
288 loop {
289 let current_node = unsafe { &*current };
290 eprintln!(
291 "{:?} {:?}/{:?} - {:?}",
292 current, current_node.key, current_node.value, current_node.skips
293 );
294 if let Some(next) = current_node.skips[0] {
295 current = next;
296 } else {
297 break;
298 }
299 }
300 }
301}
302
303pub struct SkipMapIter {
304 map: Rc<RefCell<InnerSkipMap>>,
305 current: *const Node,
306}
307
308impl LdbIterator for SkipMapIter {
309 fn advance(&mut self) -> bool {
310 let r = unsafe {
312 (*self.current)
313 .next
314 .as_ref()
315 .map(|next| {
316 self.current = next.as_ref() as *const Node;
317 true
318 })
319 .unwrap_or(false)
320 };
321 if !r {
322 self.reset();
323 }
324 r
325 }
326 fn reset(&mut self) {
327 self.current = self.map.borrow().head.as_ref();
328 }
329 fn seek(&mut self, key: &[u8]) {
330 if let Some(node) = self.map.borrow().get_greater_or_equal(key) {
331 self.current = node as *const Node;
332 return;
333 }
334 self.reset();
335 }
336 fn valid(&self) -> bool {
337 self.current != self.map.borrow().head.as_ref()
338 }
339 fn current(&self) -> Option<(Bytes, Bytes)> {
340 if self.valid() {
341 unsafe {
342 Some((
343 Bytes::copy_from_slice(&(*self.current).key),
344 Bytes::copy_from_slice(&(*self.current).value),
345 ))
346 }
347 } else {
348 None
349 }
350 }
351 fn prev(&mut self) -> bool {
352 if self.valid() {
354 if let Some(prev) = self
355 .map
356 .borrow()
357 .get_next_smaller(unsafe { &(*self.current).key })
358 {
359 self.current = prev as *const Node;
360 if !prev.key.is_empty() {
361 return true;
362 }
363 }
364 }
365 self.reset();
366 false
367 }
368}
369
370#[cfg(test)]
371pub mod tests {
372 use super::*;
373 use crate::cmp::MemtableKeyCmp;
374 use crate::options;
375 use crate::test_util::{test_iterator_properties, LdbIteratorIter};
376 use crate::types::current_key_val;
377
378 pub fn make_skipmap() -> SkipMap {
379 let mut skm = SkipMap::new(options::for_test().cmp);
380 let keys = vec![
381 "aba", "abb", "abc", "abd", "abe", "abf", "abg", "abh", "abi", "abj", "abk", "abl",
382 "abm", "abn", "abo", "abp", "abq", "abr", "abs", "abt", "abu", "abv", "abw", "abx",
383 "aby", "abz",
384 ];
385
386 for k in keys {
387 skm.insert(k.as_bytes().to_vec(), "def".as_bytes().to_vec());
388 }
389 skm
390 }
391
392 #[test]
393 fn test_insert() {
394 let skm = make_skipmap();
395 assert_eq!(skm.len(), 26);
396 skm.map.borrow().dbg_print();
397 }
398
399 #[test]
400 #[should_panic]
401 fn test_no_dupes() {
402 let mut skm = make_skipmap();
403 skm.insert("abc".as_bytes().to_vec(), "def".as_bytes().to_vec());
405 skm.insert("abf".as_bytes().to_vec(), "def".as_bytes().to_vec());
406 }
407
408 #[test]
409 fn test_contains() {
410 let skm = make_skipmap();
411 assert!(skm.contains(b"aby"));
412 assert!(skm.contains(b"abc"));
413 assert!(skm.contains(b"abz"));
414 assert!(!skm.contains(b"ab{"));
415 assert!(!skm.contains(b"123"));
416 assert!(!skm.contains(b"aaa"));
417 assert!(!skm.contains(b"456"));
418 }
419
420 #[test]
421 fn test_find() {
422 let skm = make_skipmap();
423 assert_eq!(
424 &*skm.map.borrow().get_greater_or_equal(b"abf").unwrap().key,
425 b"abf"
426 );
427 assert!(skm.map.borrow().get_greater_or_equal(b"ab{").is_none());
428 assert_eq!(
429 &*skm.map.borrow().get_greater_or_equal(b"aaa").unwrap().key,
430 b"aba"
431 );
432 assert_eq!(
433 &*skm.map.borrow().get_greater_or_equal(b"ab").unwrap().key,
434 b"aba"
435 );
436 assert_eq!(
437 &*skm.map.borrow().get_greater_or_equal(b"abc").unwrap().key,
438 b"abc"
439 );
440 assert!(skm.map.borrow().get_next_smaller(b"ab0").is_none());
441 assert_eq!(
442 &*skm.map.borrow().get_next_smaller(b"abd").unwrap().key,
443 b"abc"
444 );
445 assert_eq!(
446 &*skm.map.borrow().get_next_smaller(b"ab{").unwrap().key,
447 b"abz"
448 );
449 }
450
451 #[test]
452 fn test_empty_skipmap_find_memtable_cmp() {
453 let cmp: Rc<Box<dyn Cmp>> = Rc::new(Box::new(MemtableKeyCmp(options::for_test().cmp)));
455 let skm = SkipMap::new(cmp);
456
457 let mut it = skm.iter();
458 it.seek(b"abc");
459 assert!(!it.valid());
460 }
461
462 #[test]
463 fn test_skipmap_iterator_0() {
464 let skm = SkipMap::new(options::for_test().cmp);
465 let mut i = 0;
466
467 for (_, _) in LdbIteratorIter::wrap(&mut skm.iter()) {
468 i += 1;
469 }
470
471 assert_eq!(i, 0);
472 assert!(!skm.iter().valid());
473 }
474
475 #[test]
476 fn test_skipmap_iterator_init() {
477 let skm = make_skipmap();
478 let mut iter = skm.iter();
479
480 assert!(!iter.valid());
481 iter.next();
482 assert!(iter.valid());
483 iter.reset();
484 assert!(!iter.valid());
485
486 iter.next();
487 assert!(iter.valid());
488 iter.prev();
489 assert!(!iter.valid());
490 }
491
492 #[test]
493 fn test_skipmap_iterator() {
494 let skm = make_skipmap();
495 let mut i = 0;
496
497 for (k, v) in LdbIteratorIter::wrap(&mut skm.iter()) {
498 assert!(!k.is_empty());
499 assert!(!v.is_empty());
500 i += 1;
501 }
502 assert_eq!(i, 26);
503 }
504
505 #[test]
506 fn test_skipmap_iterator_seek_valid() {
507 let skm = make_skipmap();
508 let mut iter = skm.iter();
509
510 iter.next();
511 assert!(iter.valid());
512 assert_eq!(current_key_val(&iter).unwrap().0, "aba".as_bytes().to_vec());
513 iter.seek(b"abz");
514 assert_eq!(
515 current_key_val(&iter).unwrap(),
516 ("abz".as_bytes().to_vec(), "def".as_bytes().to_vec())
517 );
518 iter.seek(b"aba");
520 assert_eq!(
521 current_key_val(&iter).unwrap(),
522 (b"aba".to_vec(), b"def".to_vec())
523 );
524
525 iter.seek(b"");
526 assert!(iter.valid());
527 iter.prev();
528 assert!(!iter.valid());
529
530 while iter.advance() {}
531 assert!(!iter.valid());
532 assert!(!iter.prev());
533 assert_eq!(current_key_val(&iter), None);
534 }
535
536 #[test]
537 fn test_skipmap_behavior() {
538 let mut skm = SkipMap::new(options::for_test().cmp);
539 let keys = vec!["aba", "abb", "abc", "abd"];
540 for k in keys {
541 skm.insert(k.as_bytes().to_vec(), "def".as_bytes().to_vec());
542 }
543 test_iterator_properties(skm.iter());
544 }
545
546 #[test]
547 fn test_skipmap_iterator_prev() {
548 let skm = make_skipmap();
549 let mut iter = skm.iter();
550
551 iter.next();
552 assert!(iter.valid());
553 iter.prev();
554 assert!(!iter.valid());
555 iter.seek(b"abc");
556 iter.prev();
557 assert_eq!(
558 current_key_val(&iter).unwrap(),
559 ("abb".as_bytes().to_vec(), "def".as_bytes().to_vec())
560 );
561 }
562
563 #[test]
564 fn test_skipmap_iterator_concurrent_insert() {
565 time_test!();
566 let mut skm = make_skipmap();
568 let mut iter = skm.iter();
569
570 assert!(iter.advance());
571 skm.insert("abccc".as_bytes().to_vec(), "defff".as_bytes().to_vec());
572 for (k, _) in LdbIteratorIter::wrap(&mut iter) {
574 if k == "abccc".as_bytes() {
575 return;
576 }
577 }
578 panic!("abccc not found in map.");
579 }
580}