appendix/
lib.rs

1#![deny(missing_docs)]
2//! An append-only, on-disk key-value index with lockless reads
3
4use std::cell::UnsafeCell;
5use std::fs::OpenOptions;
6use std::hash::{Hash, Hasher};
7use std::io;
8use std::marker::PhantomData;
9use std::mem;
10use std::ops::{Deref, DerefMut};
11use std::path::{Path, PathBuf};
12
13use arrayvec::ArrayVec;
14use lazy_static::lazy_static;
15use memmap::MmapMut;
16use parking_lot::{Mutex, MutexGuard};
17use seahash::SeaHasher;
18
19const NUM_LANES: usize = 64;
20const NUM_SHARDS: usize = 1024;
21const PAGE_SIZE: usize = 4096;
22const FIRST_LANE_PAGES: usize = 64;
23
24// marker struct for shard-mutexes
25struct Shard;
26
27lazy_static! {
28    static ref SHARDS: ArrayVec<[Mutex<Shard>; NUM_SHARDS]> = {
29        let mut locks = ArrayVec::new();
30        for _ in 0..NUM_SHARDS {
31            locks.push(Mutex::new(Shard))
32        }
33        locks
34    };
35}
36
37#[inline(always)]
38fn hash_val<T: Hash>(t: &T) -> u64 {
39    let mut hasher = SeaHasher::new();
40    t.hash(&mut hasher);
41    hasher.finish()
42}
43
44enum Found<'a, K, V> {
45    Some(&'a Entry<K, V>),
46    None(usize, usize, usize),
47    Invalid(usize, usize, usize),
48}
49
50/// Marker type telling you your update was a no-op
51pub type AlreadyThere = bool;
52
53/// On-disk index structure mapping keys to values
54pub struct Index<K, V> {
55    lanes: UnsafeCell<ArrayVec<[MmapMut; NUM_LANES]>>,
56    path: PathBuf,
57    pages: Mutex<u64>,
58    _marker: PhantomData<(K, V)>,
59}
60
61unsafe impl<K, V> Send for Index<K, V>
62where
63    K: Send,
64    V: Send,
65{
66}
67unsafe impl<K, V> Sync for Index<K, V>
68where
69    K: Sync,
70    V: Sync,
71{
72}
73
74#[derive(Debug)]
75#[repr(C)]
76struct Entry<K, V> {
77    key: K,
78    val: V,
79    next: u64,
80    kv_checksum: u64,
81    next_checksum: u64,
82}
83
84// Wrapper reference for mutating entries, carrying a mutex guard
85struct EntryMut<'a, K, V> {
86    entry: &'a mut Entry<K, V>,
87    _lock: MutexGuard<'a, Shard>,
88}
89
90impl<'a, K, V> Deref for EntryMut<'a, K, V> {
91    type Target = Entry<K, V>;
92    fn deref(&self) -> &Self::Target {
93        &self.entry
94    }
95}
96
97impl<'a, K, V> DerefMut for EntryMut<'a, K, V> {
98    fn deref_mut(&mut self) -> &mut Self::Target {
99        &mut self.entry
100    }
101}
102
103impl<K: Hash, V: Hash> Entry<K, V> {
104    fn new(key: K, val: V) -> Self {
105        let kv_checksum = hash_val(&key).wrapping_add(hash_val(&val));
106        let entry = Entry {
107            key,
108            val,
109            kv_checksum,
110            next: 0,
111            next_checksum: 0 + 1,
112        };
113        debug_assert!(entry.valid());
114        entry
115    }
116
117    fn valid(&self) -> bool {
118        if hash_val(&self.key).wrapping_add(hash_val(&self.val))
119            == self.kv_checksum
120            && self.next + 1 == self.next_checksum
121        {
122            true
123        } else {
124            false
125        }
126    }
127
128    fn set_next<I: Into<u64>>(&mut self, next: I) {
129        let next = next.into();
130        self.next = next;
131        self.next_checksum = next + 1;
132    }
133}
134
135impl<K, V> Index<K, V>
136where
137    K: 'static + Hash + Copy + PartialEq,
138    V: 'static + Hash + Copy,
139{
140    /// Create or load an index at `path`
141    pub fn new<P: AsRef<Path>>(path: &P) -> io::Result<Self> {
142        let mut lanes = ArrayVec::new();
143
144        // check for lane files already on disk
145        for n in 0..NUM_LANES {
146            let mut pathbuf = PathBuf::from(path.as_ref());
147            pathbuf.push(&format!("{:02x}", n));
148
149            if pathbuf.exists() {
150                let file =
151                    OpenOptions::new().read(true).write(true).open(&pathbuf)?;
152
153                let lane_pages = Self::lane_pages(n);
154                let file_len = PAGE_SIZE as u64 * lane_pages as u64;
155                file.set_len(file_len)?;
156                unsafe { lanes.push(MmapMut::map_mut(&file)?) };
157            }
158        }
159
160        // find the number of already occupied pages
161        let mut num_pages = 0;
162        if let Some(last) = lanes.last() {
163            // help the type inferance along a bit.
164            let last: &MmapMut = last;
165
166            // add up pages of all but the last lane, since they must all be full
167            let mut full_pages = 0;
168            for n in 0..lanes.len().saturating_sub(1) {
169                full_pages += Self::lane_pages(n)
170            }
171
172            // do a binary search to find the last populated page in the last lane
173            let mut low_bound = 0;
174            let mut high_bound = Self::lane_pages(lanes.len() - 1) - 1;
175
176            while low_bound + 1 != high_bound {
177                let check = low_bound + (high_bound - low_bound) / 2;
178                let page_ofs = PAGE_SIZE * check;
179
180                // is there a valid entry in this page?
181                for slot in 0..Self::entries_per_page() {
182                    let slot_ofs =
183                        page_ofs + slot * mem::size_of::<Entry<K, V>>();
184
185                    let ptr = last.as_ptr();
186
187                    let entry: &Entry<K, V> = unsafe {
188                        mem::transmute(ptr.offset(slot_ofs as isize))
189                    };
190
191                    if entry.valid() {
192                        low_bound = check;
193                        break;
194                    }
195                }
196                if low_bound != check {
197                    high_bound = check
198                }
199            }
200
201            num_pages = full_pages + high_bound;
202        }
203
204        // create the index
205        let index = Index {
206            lanes: UnsafeCell::new(lanes),
207            path: PathBuf::from(path.as_ref()),
208            pages: Mutex::new(num_pages as u64),
209            _marker: PhantomData,
210        };
211
212        // initialize index with at least one page
213        if num_pages == 0 {
214            assert_eq!(index.new_page()?, 0);
215        }
216        Ok(index)
217    }
218
219    /// Returns how many pages have been allocated so far
220    pub fn pages(&self) -> usize {
221        *self.pages.lock() as usize
222    }
223
224    /// Returns how many pages fit into one lane
225    #[inline(always)]
226    fn lane_pages(n: usize) -> usize {
227        2_usize.pow(n as u32) * FIRST_LANE_PAGES
228    }
229
230    #[inline(always)]
231    fn entries_per_page() -> usize {
232        PAGE_SIZE / mem::size_of::<Entry<K, V>>()
233    }
234
235    // calculates the slot in the page this hashed key would
236    // occupy at a certain depth
237    #[inline(always)]
238    fn slot(key_hash: u64, depth: usize) -> usize {
239        (hash_val(&(key_hash + depth as u64)) % Self::entries_per_page() as u64)
240            as usize
241    }
242
243    // produces following output over page with FIRST_LANE_PAGES = 2
244    // (0, 0), (0, 1),
245    // (1, 0), (1, 1), (1, 2), (1, 3),
246    // (2, 0), (2, 1), (2, 2), (2, 3), (2, 4), (2, 5), (2, 6), (2, 7),
247    // ... and so on and so forth ...
248    #[inline(always)]
249    fn lane_page(page: usize) -> (usize, usize) {
250        let usize_bits = mem::size_of::<usize>() * 8;
251        let i = page / FIRST_LANE_PAGES + 1;
252        let lane = usize_bits - i.leading_zeros() as usize - 1;
253        let page = page - (2usize.pow(lane as u32) - 1) * FIRST_LANE_PAGES;
254        (lane, page)
255    }
256
257    fn new_lane(&self) -> io::Result<()> {
258        let lanes_ptr = self.lanes.get();
259        let lane_nr = unsafe { (*lanes_ptr).len() };
260
261        let num_pages = Self::lane_pages(lane_nr);
262
263        let mut path = self.path.clone();
264        path.push(format!("{:02x}", lane_nr));
265
266        let file_len = PAGE_SIZE as u64 * num_pages as u64;
267
268        let file = OpenOptions::new()
269            .read(true)
270            .write(true)
271            .create(true)
272            .open(&path)?;
273
274        file.set_len(file_len)?;
275
276        unsafe { (*lanes_ptr).push(MmapMut::map_mut(&file)?) }
277        Ok(())
278    }
279
280    fn new_page(&self) -> io::Result<u64> {
281        let mut page_nr = self.pages.lock();
282
283        let (_, offset) = Self::lane_page(*page_nr as usize);
284
285        if offset == 0 {
286            // create new lane
287            self.new_lane()?
288        }
289
290        let new_page_nr = *page_nr;
291        *page_nr += 1;
292
293        Ok(new_page_nr)
294    }
295
296    // Get a mutable reference to the `Entry`,
297    fn entry(&self, lane: usize, page: usize, slot: usize) -> &Entry<K, V> {
298        // Get a reference to the `Entry`
299        let page_ofs = PAGE_SIZE * page;
300        let slot_ofs = page_ofs + slot * mem::size_of::<Entry<K, V>>();
301        unsafe {
302            mem::transmute(
303                (*self.lanes.get())[lane].as_ptr().offset(slot_ofs as isize),
304            )
305        }
306    }
307
308    // Get a mutable reference to the `Entry`,
309    // locking the corresponding shard.
310    fn entry_mut(
311        &self,
312        lane: usize,
313        page: usize,
314        slot: usize,
315    ) -> EntryMut<K, V> {
316        let shard = (page ^ slot) % NUM_SHARDS;
317        // Lock the entry for writing
318        let lock = SHARDS[shard].lock();
319
320        let page_ofs = PAGE_SIZE * page;
321        let slot_ofs = page_ofs + slot * mem::size_of::<Entry<K, V>>();
322        EntryMut {
323            entry: unsafe {
324                mem::transmute(
325                    (*self.lanes.get())[lane]
326                        .as_ptr()
327                        .offset(slot_ofs as isize),
328                )
329            },
330            _lock: lock,
331        }
332    }
333
334    // Traverse the tree to find the entry for this key
335    fn find_key(&self, k: &K) -> io::Result<Found<K, V>> {
336        let mut depth = 0;
337        let mut abs_page = 0;
338        loop {
339            let hash = hash_val(&k);
340            let slot = Self::slot(hash, depth);
341
342            let (lane, page) = Self::lane_page(abs_page);
343            let entry = self.entry(lane, page, slot);
344
345            if !entry.valid() {
346                return Ok(Found::Invalid(lane, page, slot));
347            }
348
349            if &entry.key == k {
350                return Ok(Found::Some(entry));
351            } else if entry.next == 0 {
352                return Ok(Found::None(lane, page, slot));
353            } else {
354                abs_page = entry.next as usize;
355            }
356
357            depth += 1;
358        }
359    }
360
361    /// Inserts a key-value pair into the index, if the key is already
362    /// present, this is a no-op
363    pub fn insert(&self, key: K, val: V) -> io::Result<AlreadyThere> {
364        match self.find_key(&key)? {
365            Found::Some(_) => {
366                // no-op
367                Ok(true)
368            }
369            Found::Invalid(lane, page, slot) => {
370                let mut entry = self.entry_mut(lane, page, slot);
371
372                if entry.valid() && entry.next != 0 {
373                    // Someone already wrote here, recurse!
374                    // We accept the performance hit of re-traversing
375                    // the whole tree, since this case is uncommon,
376                    // and makes the implementation simpler.
377                    mem::drop(entry);
378                    self.insert(key, val)
379                } else {
380                    *entry = Entry::new(key, val);
381                    return Ok(false);
382                }
383            }
384            Found::None(lane, page, slot) => {
385                let mut entry = self.entry_mut(lane, page, slot);
386                if entry.next != 0 {
387                    // again, another thread was here before us
388                } else {
389                    entry.set_next(self.new_page()?);
390                }
391                // recurse
392                mem::drop(entry);
393                self.insert(key, val)
394            }
395        }
396    }
397
398    /// Looks up a value with `key` in the index
399    pub fn get(&self, key: &K) -> io::Result<Option<&V>> {
400        match self.find_key(key)? {
401            Found::Some(entry) => Ok(Some(&entry.val)),
402            _ => Ok(None),
403        }
404    }
405
406    /// Syncronizes and flushes data to disk
407    pub fn flush(&mut self) -> std::io::Result<()> {
408        unsafe {
409            let lanes = &mut *self.lanes.get();
410            for mmap in lanes {
411                mmap.flush()?;
412            }
413        }
414        Ok(())
415    }
416
417    /// Get the approximate size on disk for the index
418    pub fn on_disk_size(&self) -> usize {
419        *self.pages.lock() as usize * PAGE_SIZE
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use std::sync::Arc;
426    use std::thread;
427
428    use rand::{seq::SliceRandom, thread_rng};
429    use tempfile::tempdir;
430
431    use super::*;
432
433    #[test]
434    fn simple() {
435        let dir = tempdir().unwrap();
436        let index = Index::new(&dir).unwrap();
437        index.insert(0, 0).unwrap();
438        assert_eq!(index.get(&0).unwrap(), Some(&0));
439        assert_eq!(index.on_disk_size(), PAGE_SIZE);
440    }
441
442    const N: u64 = 1024 * 256;
443
444    #[test]
445    fn multiple() {
446        let dir = tempdir().unwrap();
447        let index = Index::new(&dir).unwrap();
448        for i in 0..N {
449            index.insert(i, i).unwrap();
450        }
451        for i in 0..N {
452            assert_eq!(index.get(&i).unwrap(), Some(&i));
453        }
454    }
455
456    #[test]
457    fn reload() {
458        let dir = tempdir().unwrap();
459        let mut pages;
460        {
461            {
462                let index_a = Index::new(&dir).unwrap();
463                for i in 0..N {
464                    index_a.insert(i, i).unwrap();
465                }
466                pages = index_a.pages();
467                mem::drop(index_a);
468            }
469
470            let index_b = Index::new(&dir).unwrap();
471
472            // make sure the page count matches
473            assert_eq!(pages, index_b.pages());
474
475            for i in 0..N {
476                assert_eq!(index_b.get(&i).unwrap(), Some(&i));
477            }
478
479            for i in N..N * 2 {
480                index_b.insert(i, i).unwrap();
481            }
482            pages = index_b.pages();
483            mem::drop(index_b);
484        }
485
486        let index_c = Index::new(&dir).unwrap();
487
488        // make sure the page count matches
489        assert_eq!(pages, index_c.pages());
490
491        for i in 0..N * 2 {
492            assert_eq!(index_c.get(&i).unwrap(), Some(&i));
493        }
494    }
495
496    const N_THREADS: usize = 8;
497
498    // The stress test creates an index, and simultaneously writes
499    // entries in random order from `N_THREADS` threads,
500    // while at the same time reading from an equal amount of threads.
501    //
502    // When all threads are finished, a final read-through is made to see
503    // that all key value pairs are present.
504    #[test]
505    fn stress() {
506        let dir = tempdir().unwrap();
507        let index = Arc::new(Index::new(&dir).unwrap());
508
509        let mut all_indicies = vec![];
510        for i in 0..N {
511            all_indicies.push(i);
512        }
513
514        let mut rng = thread_rng();
515
516        // shuffle the order of the writes
517        let mut shuffles_write = vec![];
518        for _ in 0..N_THREADS {
519            let mut new = all_indicies.clone();
520            SliceRandom::shuffle(&mut new[..], &mut rng);
521            shuffles_write.push(new);
522        }
523
524        // shuffle the order of the reads
525        let mut shuffles_read = vec![];
526        for _ in 0..N_THREADS {
527            let mut new = all_indicies.clone();
528            SliceRandom::shuffle(&mut new[..], &mut rng);
529            shuffles_read.push(new);
530        }
531
532        let mut threads_running = vec![];
533
534        for i in 0..N_THREADS {
535            // shuffled write
536            let shuffle_write = mem::replace(&mut shuffles_write[i], vec![]);
537            let index_write = index.clone();
538
539            // shuffled reads
540            let shuffle_read = mem::replace(&mut shuffles_read[i], vec![]);
541            let index_read = index.clone();
542
543            // write threads
544            threads_running.push(thread::spawn(move || {
545                for write in shuffle_write {
546                    index_write.insert(write, write).unwrap();
547                }
548            }));
549
550            // read threads
551            threads_running.push(thread::spawn(move || {
552                for read in shuffle_read {
553                    match index_read.get(&read).unwrap() {
554                        Some(val) => assert_eq!(val, &read),
555                        None => (),
556                    }
557                }
558            }));
559        }
560
561        // make sure all threads finish successfully
562        for thread in threads_running {
563            thread.join().unwrap()
564        }
565
566        for i in 0..N {
567            assert_eq!(index.get(&i).unwrap(), Some(&i));
568        }
569    }
570}