1#![deny(missing_docs)]
2use std::cell::UnsafeCell;
5use std::fs::OpenOptions;
6use std::hash::{Hash, Hasher};
7use std::io;
8use std::marker::PhantomData;
9use std::mem;
10use std::ops::{Deref, DerefMut};
11use std::path::{Path, PathBuf};
12
13use arrayvec::ArrayVec;
14use lazy_static::lazy_static;
15use memmap::MmapMut;
16use parking_lot::{Mutex, MutexGuard};
17use seahash::SeaHasher;
18
19const NUM_LANES: usize = 64;
20const NUM_SHARDS: usize = 1024;
21const PAGE_SIZE: usize = 4096;
22const FIRST_LANE_PAGES: usize = 64;
23
24struct Shard;
26
27lazy_static! {
28 static ref SHARDS: ArrayVec<[Mutex<Shard>; NUM_SHARDS]> = {
29 let mut locks = ArrayVec::new();
30 for _ in 0..NUM_SHARDS {
31 locks.push(Mutex::new(Shard))
32 }
33 locks
34 };
35}
36
37#[inline(always)]
38fn hash_val<T: Hash>(t: &T) -> u64 {
39 let mut hasher = SeaHasher::new();
40 t.hash(&mut hasher);
41 hasher.finish()
42}
43
44enum Found<'a, K, V> {
45 Some(&'a Entry<K, V>),
46 None(usize, usize, usize),
47 Invalid(usize, usize, usize),
48}
49
50pub type AlreadyThere = bool;
52
53pub struct Index<K, V> {
55 lanes: UnsafeCell<ArrayVec<[MmapMut; NUM_LANES]>>,
56 path: PathBuf,
57 pages: Mutex<u64>,
58 _marker: PhantomData<(K, V)>,
59}
60
61unsafe impl<K, V> Send for Index<K, V>
62where
63 K: Send,
64 V: Send,
65{
66}
67unsafe impl<K, V> Sync for Index<K, V>
68where
69 K: Sync,
70 V: Sync,
71{
72}
73
74#[derive(Debug)]
75#[repr(C)]
76struct Entry<K, V> {
77 key: K,
78 val: V,
79 next: u64,
80 kv_checksum: u64,
81 next_checksum: u64,
82}
83
84struct EntryMut<'a, K, V> {
86 entry: &'a mut Entry<K, V>,
87 _lock: MutexGuard<'a, Shard>,
88}
89
90impl<'a, K, V> Deref for EntryMut<'a, K, V> {
91 type Target = Entry<K, V>;
92 fn deref(&self) -> &Self::Target {
93 &self.entry
94 }
95}
96
97impl<'a, K, V> DerefMut for EntryMut<'a, K, V> {
98 fn deref_mut(&mut self) -> &mut Self::Target {
99 &mut self.entry
100 }
101}
102
103impl<K: Hash, V: Hash> Entry<K, V> {
104 fn new(key: K, val: V) -> Self {
105 let kv_checksum = hash_val(&key).wrapping_add(hash_val(&val));
106 let entry = Entry {
107 key,
108 val,
109 kv_checksum,
110 next: 0,
111 next_checksum: 0 + 1,
112 };
113 debug_assert!(entry.valid());
114 entry
115 }
116
117 fn valid(&self) -> bool {
118 if hash_val(&self.key).wrapping_add(hash_val(&self.val))
119 == self.kv_checksum
120 && self.next + 1 == self.next_checksum
121 {
122 true
123 } else {
124 false
125 }
126 }
127
128 fn set_next<I: Into<u64>>(&mut self, next: I) {
129 let next = next.into();
130 self.next = next;
131 self.next_checksum = next + 1;
132 }
133}
134
135impl<K, V> Index<K, V>
136where
137 K: 'static + Hash + Copy + PartialEq,
138 V: 'static + Hash + Copy,
139{
140 pub fn new<P: AsRef<Path>>(path: &P) -> io::Result<Self> {
142 let mut lanes = ArrayVec::new();
143
144 for n in 0..NUM_LANES {
146 let mut pathbuf = PathBuf::from(path.as_ref());
147 pathbuf.push(&format!("{:02x}", n));
148
149 if pathbuf.exists() {
150 let file =
151 OpenOptions::new().read(true).write(true).open(&pathbuf)?;
152
153 let lane_pages = Self::lane_pages(n);
154 let file_len = PAGE_SIZE as u64 * lane_pages as u64;
155 file.set_len(file_len)?;
156 unsafe { lanes.push(MmapMut::map_mut(&file)?) };
157 }
158 }
159
160 let mut num_pages = 0;
162 if let Some(last) = lanes.last() {
163 let last: &MmapMut = last;
165
166 let mut full_pages = 0;
168 for n in 0..lanes.len().saturating_sub(1) {
169 full_pages += Self::lane_pages(n)
170 }
171
172 let mut low_bound = 0;
174 let mut high_bound = Self::lane_pages(lanes.len() - 1) - 1;
175
176 while low_bound + 1 != high_bound {
177 let check = low_bound + (high_bound - low_bound) / 2;
178 let page_ofs = PAGE_SIZE * check;
179
180 for slot in 0..Self::entries_per_page() {
182 let slot_ofs =
183 page_ofs + slot * mem::size_of::<Entry<K, V>>();
184
185 let ptr = last.as_ptr();
186
187 let entry: &Entry<K, V> = unsafe {
188 mem::transmute(ptr.offset(slot_ofs as isize))
189 };
190
191 if entry.valid() {
192 low_bound = check;
193 break;
194 }
195 }
196 if low_bound != check {
197 high_bound = check
198 }
199 }
200
201 num_pages = full_pages + high_bound;
202 }
203
204 let index = Index {
206 lanes: UnsafeCell::new(lanes),
207 path: PathBuf::from(path.as_ref()),
208 pages: Mutex::new(num_pages as u64),
209 _marker: PhantomData,
210 };
211
212 if num_pages == 0 {
214 assert_eq!(index.new_page()?, 0);
215 }
216 Ok(index)
217 }
218
219 pub fn pages(&self) -> usize {
221 *self.pages.lock() as usize
222 }
223
224 #[inline(always)]
226 fn lane_pages(n: usize) -> usize {
227 2_usize.pow(n as u32) * FIRST_LANE_PAGES
228 }
229
230 #[inline(always)]
231 fn entries_per_page() -> usize {
232 PAGE_SIZE / mem::size_of::<Entry<K, V>>()
233 }
234
235 #[inline(always)]
238 fn slot(key_hash: u64, depth: usize) -> usize {
239 (hash_val(&(key_hash + depth as u64)) % Self::entries_per_page() as u64)
240 as usize
241 }
242
243 #[inline(always)]
249 fn lane_page(page: usize) -> (usize, usize) {
250 let usize_bits = mem::size_of::<usize>() * 8;
251 let i = page / FIRST_LANE_PAGES + 1;
252 let lane = usize_bits - i.leading_zeros() as usize - 1;
253 let page = page - (2usize.pow(lane as u32) - 1) * FIRST_LANE_PAGES;
254 (lane, page)
255 }
256
257 fn new_lane(&self) -> io::Result<()> {
258 let lanes_ptr = self.lanes.get();
259 let lane_nr = unsafe { (*lanes_ptr).len() };
260
261 let num_pages = Self::lane_pages(lane_nr);
262
263 let mut path = self.path.clone();
264 path.push(format!("{:02x}", lane_nr));
265
266 let file_len = PAGE_SIZE as u64 * num_pages as u64;
267
268 let file = OpenOptions::new()
269 .read(true)
270 .write(true)
271 .create(true)
272 .open(&path)?;
273
274 file.set_len(file_len)?;
275
276 unsafe { (*lanes_ptr).push(MmapMut::map_mut(&file)?) }
277 Ok(())
278 }
279
280 fn new_page(&self) -> io::Result<u64> {
281 let mut page_nr = self.pages.lock();
282
283 let (_, offset) = Self::lane_page(*page_nr as usize);
284
285 if offset == 0 {
286 self.new_lane()?
288 }
289
290 let new_page_nr = *page_nr;
291 *page_nr += 1;
292
293 Ok(new_page_nr)
294 }
295
296 fn entry(&self, lane: usize, page: usize, slot: usize) -> &Entry<K, V> {
298 let page_ofs = PAGE_SIZE * page;
300 let slot_ofs = page_ofs + slot * mem::size_of::<Entry<K, V>>();
301 unsafe {
302 mem::transmute(
303 (*self.lanes.get())[lane].as_ptr().offset(slot_ofs as isize),
304 )
305 }
306 }
307
308 fn entry_mut(
311 &self,
312 lane: usize,
313 page: usize,
314 slot: usize,
315 ) -> EntryMut<K, V> {
316 let shard = (page ^ slot) % NUM_SHARDS;
317 let lock = SHARDS[shard].lock();
319
320 let page_ofs = PAGE_SIZE * page;
321 let slot_ofs = page_ofs + slot * mem::size_of::<Entry<K, V>>();
322 EntryMut {
323 entry: unsafe {
324 mem::transmute(
325 (*self.lanes.get())[lane]
326 .as_ptr()
327 .offset(slot_ofs as isize),
328 )
329 },
330 _lock: lock,
331 }
332 }
333
334 fn find_key(&self, k: &K) -> io::Result<Found<K, V>> {
336 let mut depth = 0;
337 let mut abs_page = 0;
338 loop {
339 let hash = hash_val(&k);
340 let slot = Self::slot(hash, depth);
341
342 let (lane, page) = Self::lane_page(abs_page);
343 let entry = self.entry(lane, page, slot);
344
345 if !entry.valid() {
346 return Ok(Found::Invalid(lane, page, slot));
347 }
348
349 if &entry.key == k {
350 return Ok(Found::Some(entry));
351 } else if entry.next == 0 {
352 return Ok(Found::None(lane, page, slot));
353 } else {
354 abs_page = entry.next as usize;
355 }
356
357 depth += 1;
358 }
359 }
360
361 pub fn insert(&self, key: K, val: V) -> io::Result<AlreadyThere> {
364 match self.find_key(&key)? {
365 Found::Some(_) => {
366 Ok(true)
368 }
369 Found::Invalid(lane, page, slot) => {
370 let mut entry = self.entry_mut(lane, page, slot);
371
372 if entry.valid() && entry.next != 0 {
373 mem::drop(entry);
378 self.insert(key, val)
379 } else {
380 *entry = Entry::new(key, val);
381 return Ok(false);
382 }
383 }
384 Found::None(lane, page, slot) => {
385 let mut entry = self.entry_mut(lane, page, slot);
386 if entry.next != 0 {
387 } else {
389 entry.set_next(self.new_page()?);
390 }
391 mem::drop(entry);
393 self.insert(key, val)
394 }
395 }
396 }
397
398 pub fn get(&self, key: &K) -> io::Result<Option<&V>> {
400 match self.find_key(key)? {
401 Found::Some(entry) => Ok(Some(&entry.val)),
402 _ => Ok(None),
403 }
404 }
405
406 pub fn flush(&mut self) -> std::io::Result<()> {
408 unsafe {
409 let lanes = &mut *self.lanes.get();
410 for mmap in lanes {
411 mmap.flush()?;
412 }
413 }
414 Ok(())
415 }
416
417 pub fn on_disk_size(&self) -> usize {
419 *self.pages.lock() as usize * PAGE_SIZE
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use std::sync::Arc;
426 use std::thread;
427
428 use rand::{seq::SliceRandom, thread_rng};
429 use tempfile::tempdir;
430
431 use super::*;
432
433 #[test]
434 fn simple() {
435 let dir = tempdir().unwrap();
436 let index = Index::new(&dir).unwrap();
437 index.insert(0, 0).unwrap();
438 assert_eq!(index.get(&0).unwrap(), Some(&0));
439 assert_eq!(index.on_disk_size(), PAGE_SIZE);
440 }
441
442 const N: u64 = 1024 * 256;
443
444 #[test]
445 fn multiple() {
446 let dir = tempdir().unwrap();
447 let index = Index::new(&dir).unwrap();
448 for i in 0..N {
449 index.insert(i, i).unwrap();
450 }
451 for i in 0..N {
452 assert_eq!(index.get(&i).unwrap(), Some(&i));
453 }
454 }
455
456 #[test]
457 fn reload() {
458 let dir = tempdir().unwrap();
459 let mut pages;
460 {
461 {
462 let index_a = Index::new(&dir).unwrap();
463 for i in 0..N {
464 index_a.insert(i, i).unwrap();
465 }
466 pages = index_a.pages();
467 mem::drop(index_a);
468 }
469
470 let index_b = Index::new(&dir).unwrap();
471
472 assert_eq!(pages, index_b.pages());
474
475 for i in 0..N {
476 assert_eq!(index_b.get(&i).unwrap(), Some(&i));
477 }
478
479 for i in N..N * 2 {
480 index_b.insert(i, i).unwrap();
481 }
482 pages = index_b.pages();
483 mem::drop(index_b);
484 }
485
486 let index_c = Index::new(&dir).unwrap();
487
488 assert_eq!(pages, index_c.pages());
490
491 for i in 0..N * 2 {
492 assert_eq!(index_c.get(&i).unwrap(), Some(&i));
493 }
494 }
495
496 const N_THREADS: usize = 8;
497
498 #[test]
505 fn stress() {
506 let dir = tempdir().unwrap();
507 let index = Arc::new(Index::new(&dir).unwrap());
508
509 let mut all_indicies = vec![];
510 for i in 0..N {
511 all_indicies.push(i);
512 }
513
514 let mut rng = thread_rng();
515
516 let mut shuffles_write = vec![];
518 for _ in 0..N_THREADS {
519 let mut new = all_indicies.clone();
520 SliceRandom::shuffle(&mut new[..], &mut rng);
521 shuffles_write.push(new);
522 }
523
524 let mut shuffles_read = vec![];
526 for _ in 0..N_THREADS {
527 let mut new = all_indicies.clone();
528 SliceRandom::shuffle(&mut new[..], &mut rng);
529 shuffles_read.push(new);
530 }
531
532 let mut threads_running = vec![];
533
534 for i in 0..N_THREADS {
535 let shuffle_write = mem::replace(&mut shuffles_write[i], vec![]);
537 let index_write = index.clone();
538
539 let shuffle_read = mem::replace(&mut shuffles_read[i], vec![]);
541 let index_read = index.clone();
542
543 threads_running.push(thread::spawn(move || {
545 for write in shuffle_write {
546 index_write.insert(write, write).unwrap();
547 }
548 }));
549
550 threads_running.push(thread::spawn(move || {
552 for read in shuffle_read {
553 match index_read.get(&read).unwrap() {
554 Some(val) => assert_eq!(val, &read),
555 None => (),
556 }
557 }
558 }));
559 }
560
561 for thread in threads_running {
563 thread.join().unwrap()
564 }
565
566 for i in 0..N {
567 assert_eq!(index.get(&i).unwrap(), Some(&i));
568 }
569 }
570}