nihdb/
lib.rs

1//! NihDB
2//!
3//! ```
4//! use nihdb::Store;
5//! let dir = "./testdir-nihdb/";
6//! Store::create(dir).unwrap();
7//! {
8//!     let mut store: Store = Store::open(dir, 2000000).unwrap();
9//!
10//!     store.put("some_key".as_bytes(), "Some Value".as_bytes()).unwrap();
11//!     match store.get("some_key".as_bytes()).unwrap() {
12//!         None => {
13//!             println!("Not found!");
14//!         },
15//!         Some(v) => {
16//!             println!("The value is {}", String::from_utf8(v).unwrap());
17//!         },
18//!     };
19//! }
20//!
21//! std::fs::remove_dir_all(dir).unwrap();
22//! ```
23
24use std::collections::Bound;
25use std::iter::*;
26
27extern crate owning_ref;
28extern crate rand;
29extern crate libc;
30extern crate fnv;
31
32mod disk;
33use disk::*;
34mod encoding;
35mod error;
36use error::*;
37mod iter;
38use iter::*;
39mod memstore;
40use memstore::*;
41mod toc;
42use toc::*;
43mod util;
44use util::*;
45
46pub struct Store {
47    // Never empty
48    memstores: Vec<MemStore>,
49    threshold: usize,
50    directory: String,
51    toc_file: std::fs::File,
52    toc: Toc,
53}
54
55pub struct StoreIter<'a> {
56    interval: Interval<Buf>,
57    iters: MergeIterator<'a>,
58    direction: Direction,
59}
60
61impl Store {
62    /// Creates a new store in a new directory.
63    pub fn create(dir: &str) -> Result<()> {
64        // NOTE: We'll want directory locking and such.
65        std::fs::create_dir(dir)?;
66        create_toc(dir)?;
67        return Ok(());
68    }
69
70    /// Opens the store.
71    ///
72    /// `threshold` is an upper bound on the size of unflushed data.
73    pub fn open(dir: &str, threshold: usize) -> Result<Store> {
74        let (toc_file, toc) = read_toc(dir)?;
75        return Ok(Store::make_existing(threshold, dir.to_string(), toc_file, toc, MemStore::new()));
76    }
77
78    fn make_existing(threshold: usize, directory: String, toc_file: std::fs::File, toc: Toc, ms: MemStore) -> Store {
79        return Store{
80            memstores: vec![MemStore::new(), ms],
81            threshold: threshold,
82
83            directory: directory,
84            toc_file: toc_file,
85            toc: toc,
86        }
87    }
88
89    /// Inserts a key/value pair into the store if the key is not already present.
90    /// Returns true if an insertion happened.
91    pub fn insert(&mut self, key: &[u8], val: &[u8]) -> Result<bool> {
92        if !self.exists(key)? {
93            self.put(key, val)?;
94            return Ok(true);
95        }
96        return Ok(false);
97    }
98
99    /// Replaces an existing key/value pair in the store.  If the key is not present,
100    /// does nothing and returns false.
101    pub fn replace(&mut self, key: &[u8], val: &[u8]) -> Result<bool> {
102        if self.exists(key)? {
103            self.put(key, val)?;
104            return Ok(true);
105        }
106        return Ok(false);
107    }
108
109    /// Puts a key/value pair into the store, replacing the value if the key is
110    /// already present.  Compare to `insert` or `replace`.
111    pub fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()> {
112        self.memstores[0].apply(key.to_vec(), Mutation::Set(val.to_vec()));
113        return self.consider_split();
114    }
115
116    /// Removes a key/value pair from the store. Returns true if the key was present.
117    pub fn remove(&mut self, key: &[u8]) -> Result<bool> {
118        if self.exists(key)? {
119            self.memstores[0].apply(key.to_vec(), Mutation::Delete);
120            self.consider_split()?;
121            return Ok(true);
122        }
123        return Ok(false);
124    }
125
126    /// Ensures that all preceding write operations have been written
127    /// to disk (if you trust your kernel and your disk).
128    pub fn sync(&mut self) -> Result<()> {
129        // NOTE: We could, instead, sync file by file.
130        use libc;
131        self.flush()?;
132        unsafe {
133            libc::sync();
134        }
135        return Ok(());
136    }
137
138    /// Flushes any buffered write operations to disk.
139    pub fn flush(&mut self) -> Result<()> {
140        let ms: MemStore = self.memstores.remove(0);
141
142        // NOTE: Instead of flushing and compacting, we could, you know, do a
143        // flush into the compaction.
144        self.flush_and_record(0, &ms)?;
145        self.rebalance()?;
146
147        self.memstores.insert(0, MemStore::new());
148        return Ok(());
149    }
150
151    fn rebalance(&mut self) -> Result<()> {
152        if self.toc.level_infos.get(&0).map_or(false, |lz| lz.len() > 4) {
153            // Do a releveling with all but the latest (highest numbered) table.
154            let table_ids: Vec<TableId>
155                = self.toc.level_infos.get(&0).unwrap().iter().rev().skip(1).map(|&x| x).collect();
156            self.relevel(0, table_ids)?;
157            // Exit.  Don't do more than one releveling per "rebalance"
158            // operation.  Just to spread the work out, barely.
159            return Ok(());
160        }
161
162        // NOTE: We might want to spread out pending necessary relevelings
163        // instead of doing them all in a row.  We might need to do more than
164        // one releveling at a time, in order to keep up with writes, though.
165        // Basically, expect each releveling at level 0 to kick off a bunch of
166        // relevelings at level 1, 2, 3, 4, ...
167
168        // NOTE: Maybe relevel a batch of N consecutive files at once, instead
169        // of just 1 at a time.  This will minimize overhead of dealing with
170        // edges.  We'd probably have to relevel 4 at a time, no?
171
172        let max_level: LevelNumber
173            = self.toc.level_infos.iter().map(|(&level, _)| level).max().expect("at least one level");
174
175        for level in 1..max_level {
176            let to_relevel: (LevelNumber, TableId);
177            if let Some(table_ids) = self.toc.level_infos.get(&level) {
178                // NOTE: Icky conversion -- change LevelNumber to u32?
179                // NOTE: Should use total file size instead.
180                if table_ids.len() <= 4 * 10usize.pow(level as u32 - 1) {
181                    continue;
182                }
183                // Now what?  We want to kick out one table for this level.  The
184                // one which overlaps the fewest child tables.
185                // NOTE: A data structure for this would be nice.
186                let mut smallest_overlap = usize::max_value();
187                let mut smallest_overlap_table_id: TableId = TableId(0);
188
189                for &id in table_ids.iter() {
190                    // NOTE: Pass a slice to single TableInfo element without cloning.
191                    let infos: [TableInfo; 1]
192                        = [self.toc.table_infos.get(&id).expect("toc valid in rebalance").clone()];
193                    // NOTE: Would be nice not to allocate this vec.  Just count number of overlapping.
194                    let lower_overlapping_ids: Vec<_> = Store::get_overlapping_tables(&self.toc, &infos, level + 1);
195                    let overlap = lower_overlapping_ids.len();
196                    // NOTE: We're biased towards releveling left-most tables given equal overlap.
197                    if overlap < smallest_overlap {
198                        smallest_overlap = overlap;
199                        smallest_overlap_table_id = id;
200                    }
201                }
202
203                assert!(smallest_overlap != usize::max_value());
204                to_relevel = (level, smallest_overlap_table_id);
205            } else {
206                continue;
207            }
208            self.relevel(to_relevel.0, vec![to_relevel.1])?;
209        }
210
211        return Ok(());
212    }
213
214    // 'tables' is in order of precedence, such that frontmost tables supercede
215    // later tables when merged.  (They're in reverse order by table number, if
216    // in level zero.  In other levels, there's only one table, and even if there
217    // was more than one, they'd have non-overlapping key ranges.)
218    fn relevel<'a>(&'a mut self, level: LevelNumber, tables: Vec<TableId>) -> Result<()> {
219        assert!(if level == 0 { tables.len() > 0 } else { tables.len() == 1 });
220
221        // What to do:  Go to the next level, find which tables overlap.
222        let table_infos: Vec<TableInfo>
223            = tables.iter().map(|id| self.toc.table_infos.get(id).expect("toc valid in relevel").clone()).collect();
224        let lower_overlapping_ids: Vec<TableId> = Store::get_overlapping_tables(&self.toc, &table_infos, level + 1);
225
226        // NOTE: When releveling 0 -> 1, it's possible there are no overlapping tables.
227        if lower_overlapping_ids.is_empty() && !Store::self_overlaps(&table_infos) {
228            let additions: Vec<TableInfo>
229                = table_infos.into_iter().map(|x: TableInfo| TableInfo{level: level, .. x}).collect();
230            let entry = Entry{
231                removals: tables,
232                additions: additions,
233            };
234
235            append_toc(&mut self.toc, &mut self.toc_file, entry)?;
236            return Ok(());
237        } else {
238            let mut iters: Vec<Box<MutationIterator + 'a>> = Vec::new();
239            // NOTE: We might want a smarter iterator for the lower level --
240            // open only one table file at a time, instead of generically
241            // merging the non-overlapping tables together.
242
243            // Add upper level's tables in 'tables' existing order (which is in order of precedence).
244            // Order of lower level's tables doesn't matter, since they're non-overlapping.
245            for table_id in tables.iter().chain(lower_overlapping_ids.iter()) {
246                let interval = Interval{lower: Bound::Unbounded, upper: Bound::Unbounded};
247                self.add_table_iter_to_iters(&mut iters, *table_id, &interval, Direction::Forward)?;
248            }
249
250            let mut iter = MergeIterator::make(iters, Direction::Forward)?;
251
252            // Now we've got a store iter.  Iterate the store iter, building a set of tables.
253
254            let mut additions: Vec<TableInfo> = Vec::new();
255
256            'outer: loop {
257                let mut builder = TableBuilder::new();
258                'inner: loop {
259                    // NOTE: It would be nice to avoid cloning the key here.
260                    if let Some(key) = iter.current_key()?.map(|x| x.to_vec()) {
261                        let mutation = iter.current_value()?;
262                        builder.add_mutation(&key, &mutation);
263                        iter.step()?;
264                        if builder.lowerbound_file_size() > self.threshold {
265                            break 'inner;
266                        }
267                    } else {
268                        if builder.is_empty() {
269                            break 'outer;
270                        } else {
271                            break 'inner;
272                        }
273                    }
274                }
275
276                // We've got a non-empty builder.  Flush it to disk.
277                let table_id = TableId(self.toc.next_table_id);
278                self.toc.next_table_id += 1;
279
280                let mut f = std::fs::File::create(table_filepath(&self.directory, table_id))?;
281                let (keys_offset, file_size, smallest, biggest) = builder.finish(&mut f)?;
282                additions.push(TableInfo{
283                    id: table_id,
284                    level: level + 1,
285                    keys_offset: keys_offset,
286                    file_size: file_size,
287                    smallest_key: smallest,
288                    biggest_key: biggest,
289                });
290            }
291
292            let removals: Vec<TableId>
293                = tables.iter().chain(lower_overlapping_ids.iter()).map(|&x| x).collect();
294
295            let entry = Entry{
296                additions: additions,
297                removals: removals,
298            };
299
300            // to_delete will be the same as 'removals' defined above, but this
301            // is more robust against tweaks to our logic (such as fine-grained
302            // treatment of non-overlapping tables in level 0).
303            let to_delete = append_toc(&mut self.toc, &mut self.toc_file, entry)?;
304            for table_id in to_delete {
305                std::fs::remove_file(table_filepath(&self.directory, table_id))?;
306            }
307
308            return Ok(());
309        }
310    }
311
312    fn table_overlaps_interval(x: &TableInfo, y: &Interval<Buf>) -> bool {
313        return !(!above_lower_bound(&x.biggest_key, &y.lower) || !below_upper_bound(&x.smallest_key, &y.upper));
314    }
315
316    fn self_overlaps(xs: &[TableInfo]) -> bool {
317        for i in 0..xs.len() {
318            for j in i+1..xs.len() {
319                if Store::tables_overlap(&xs[i], &xs[j]) {
320                    return true;
321                }
322            }
323        }
324        return false;
325    }
326
327    fn tables_overlap(x: &TableInfo, y: &TableInfo) -> bool {
328        return !(x.biggest_key < y.smallest_key || y.biggest_key < x.smallest_key);
329    }
330
331    // NOTE: We'd like a better data structure for organizing a level's table by keys.
332    fn get_overlapping_tables(toc: &Toc, tables: &[TableInfo], level: LevelNumber) -> Vec<TableId> {
333        if let Some(level_tables) = toc.level_infos.get(&level) {
334            let mut ret: Vec<TableId> = Vec::new();
335            for id in level_tables {
336                for info in tables {
337                    if Store::tables_overlap(toc.table_infos.get(id).expect("toc valid in get_overlapping_tables"), info) {
338                        ret.push(*id);
339                        break;
340                    }
341                }
342            }
343            return ret;
344        } else {
345            return Vec::new();
346        }
347    }
348
349    fn consider_split(&mut self) -> Result<()> {
350        if self.memstores[0].mem_usage >= self.threshold {
351            self.flush()?;
352        }
353        return Ok(());
354    }
355
356    fn flush_and_record(&mut self, level: LevelNumber, ms: &MemStore) -> Result<()> {
357        if ms.entries.is_empty() {
358            return Ok(());
359        }
360        let table_id = TableId(self.toc.next_table_id);
361        self.toc.next_table_id += 1;
362        let (keys_offset, file_size, smallest, biggest) = flush_to_disk(&self.directory, table_id, &ms)?;
363        let ti = TableInfo{
364            id: table_id,
365            level: level,
366            keys_offset: keys_offset,
367            file_size: file_size,
368            smallest_key: smallest,
369            biggest_key: biggest,
370        };
371        append_toc(&mut self.toc, &mut self.toc_file, Entry{additions: vec![ti], removals: vec![]})?;
372        return Ok(());
373    }
374
375    /// Returns true if a key/value pair is present, for the given key.
376    pub fn exists(&mut self, key: &[u8]) -> Result<bool> {
377        for store in self.memstores.iter() {
378            if let Some(m) = store.lookup(key) {
379                return Ok(match m {
380                    &Mutation::Set(_) => true,
381                    &Mutation::Delete => false,
382                });
383            }
384        }
385
386        for (_level, table_ids) in self.toc.level_infos.iter() {
387            // For level zero, we want to iterate tables in reverse order.
388            for table_id in table_ids.iter().rev() {
389                let ti: &TableInfo = self.toc.table_infos.get(table_id).expect("invalid toc");
390                if key >= &ti.smallest_key && key <= &ti.biggest_key {
391                    // NOTE: We'll want to use exists_table.
392                    let opt_mut = lookup_table(&self.directory, ti, key)?;
393                    if let Some(m) = opt_mut {
394                        return Ok(match m {
395                            Mutation::Set(_) => true,
396                            Mutation::Delete => false,
397                        });
398                    }
399                }
400
401            }
402        }
403
404        return Ok(false);
405    }
406
407    /// Gets the value for the specified key/value pair, or `None` if the key
408    /// does not exist.
409    pub fn get(&mut self, key: &[u8]) -> Result<Option<Buf>> {
410        // NOTE: This doesn't need to be &_mut_ self, because using a StoreIter isn't.
411        for store in self.memstores.iter() {
412            if let Some(m) = store.lookup(key) {
413                return Ok(match m {
414                    &Mutation::Set(ref x) => Some(x.clone()),
415                    &Mutation::Delete => None,
416                });
417            }
418        }
419
420        for (_level, table_ids) in self.toc.level_infos.iter() {
421            // For level zero, we want to iterate tables in reverse order.
422            // NOTE: For other levels, we don't want to iterate at all.  Too much CPU.
423            for table_id in table_ids.iter().rev() {
424                let ti: &TableInfo = self.toc.table_infos.get(table_id).expect("invalid toc");
425                if key >= &ti.smallest_key && key <= &ti.biggest_key {
426                    let opt_mut = lookup_table(&self.directory, ti, key)?;
427                    if let Some(m) = opt_mut {
428                        return Ok(match m {
429                            Mutation::Set(x) => Some(x),
430                            Mutation::Delete => None,
431                        });
432                    }
433                }
434            }
435        }
436
437        return Ok(None);
438    }
439
440    fn add_table_iter_to_iters<'a>(
441        &self, iters: &mut Vec<Box<MutationIterator + 'a>>, table_id: TableId, interval: &Interval<Buf>,
442        direction: Direction
443    ) -> Result<()> {
444        let ti: &TableInfo = self.toc.table_infos.get(&table_id).expect("invalid toc");
445        let iter = TableIterator::make(&self.directory, ti, interval, direction)?;
446        iters.push(Box::new(iter));
447        return Ok(());
448    }
449
450    // NOTE: We could also add un-ordered range queries.
451
452    /// Produces a store iterator for iterating the store over the given interval,
453    /// in the given direction.
454    pub fn range_directed<'a>(&'a self, interval: &Interval<Buf>, direction: Direction
455    ) -> Result<StoreIter<'a>> {
456        // NOTE: Could short-circuit for empty/one-key interval.
457        let mut iters: Vec<Box<MutationIterator + 'a>> = Vec::new();
458        for store in self.memstores.iter() {
459            iters.push(Box::new(MemStoreIterator::<'a>::make(store, interval, direction)));
460        }
461
462        for (level, table_ids) in self.toc.level_infos.iter() {
463            if *level == 0 {
464                // Tables overlap, add them in reverse order.
465                for table_id in table_ids.iter().rev() {
466                    // NOTE: We could check if the intervals actually overlap.
467                    self.add_table_iter_to_iters(&mut iters, *table_id, &interval, direction)?;
468                }
469            } else {
470                let mut table_infos: Vec<&'a TableInfo> = Vec::new();
471
472                // NOTE: Would be nice to have a data structure ordered by key.
473                for table_id in table_ids.iter() {
474                    let table_info: &TableInfo = self.toc.table_infos.get(table_id).expect("valid toc in range");
475                    if Store::table_overlaps_interval(table_info, interval) {
476                        table_infos.push(table_info);
477                    }
478                }
479
480                table_infos.sort_unstable_by(|x, y| {
481                    let res = x.smallest_key.cmp(&y.smallest_key);
482                    match direction { Direction::Forward => res, Direction::Backward => res.reverse() }
483                });
484
485                let interval = interval.clone();
486                let mut ti_index = 0;
487                iters.push(Box::new(ConcatIterator::<'a>::make(Box::new(move || {
488                    Ok(if ti_index == table_infos.len() {
489                        None
490                    } else {
491                        let ti: &TableInfo = table_infos[ti_index];
492                        ti_index += 1;
493                        Some(Box::new(TableIterator::make(&self.directory, ti, &interval, direction)?))
494                    })
495                }))?));
496            }
497        }
498
499        return Ok(StoreIter{
500            interval: interval.clone(),
501            iters: MergeIterator::make(iters, direction)?,
502            direction: direction,
503        });
504    }
505
506    // NOTE: If the StoreIter keeps self borrowed, it should hold a reference to self that we can use
507    // to iterate.
508
509    /// Creates a StoreIter for iterating forwards through the interval.
510    pub fn range<'a>(&'a self, interval: &Interval<Buf>) -> Result<StoreIter<'a>> {
511        return self.range_directed(interval, Direction::Forward);
512    }
513
514    /// Creates a StoreIter for iterating backwards through the interval.
515    pub fn range_descending<'a>(&'a self, interval: &Interval<Buf>) -> Result<StoreIter<'a>> {
516        return self.range_directed(interval, Direction::Backward);
517    }
518
519    /// Produces the next key/value pair from the StoreIter.  Returns None
520    /// to mark the end of iteration.
521    pub fn next(&self, iter: &mut StoreIter) -> Result<Option<(Buf, Buf)>> {
522        loop {
523            let keyvec: Vec<u8>;
524            if let Some(key) = iter.iters.current_key()? {
525                let abandon = match iter.direction {
526                    Direction::Forward => !below_upper_bound(key, &iter.interval.upper),
527                    Direction::Backward => !above_lower_bound(key, &iter.interval.lower),
528                };
529                if abandon {
530                    return Ok(None);
531                }
532                keyvec = key.to_vec();
533            } else {
534                return Ok(None);
535            }
536            let mutation: Mutation = iter.iters.current_value()?;
537            iter.iters.step()?;
538            match mutation {
539                Mutation::Set(value) => {
540                    return Ok(Some((keyvec, value)));
541                },
542                Mutation::Delete => {
543                    continue;
544                }
545            }
546        }
547    }
548}
549
550#[cfg(test)]
551mod tests {
552    use std::collections::Bound;
553    use super::*;
554
555    use rand::*;
556
557    struct TestStore {
558        // In an option so we can drop it before deleting directory.
559        store: Option<Store>,
560        directory: String,
561    }
562
563    fn random_testdir() -> String {
564        let mut rng = rand::thread_rng();
565        let mut x: u32 = rng.gen();
566        let mut ret = "testdir-".to_string();
567        for _ in 0..6 {
568            ret.push(std::char::from_u32(97 + (x % 26)).unwrap());
569            x /= 26;
570        }
571        return ret;
572    }
573
574    impl Drop for TestStore {
575        fn drop(&mut self) {
576            // Cleanup the Store before we delete the directory.
577            self.close();
578            std::fs::remove_dir_all(&self.directory).expect("remove_dir_all");
579        }
580    }
581
582    impl TestStore {
583        fn create(threshold: usize) -> TestStore {
584            let dir: String = random_testdir();
585            Store::create(&dir).unwrap();
586            let mut ts = TestStore{store: None, directory: dir};
587            ts.open(threshold);
588            return ts;
589        }
590        fn open(&mut self, threshold: usize) {
591            assert!(self.store.is_none());
592            let store: Store = Store::open(&self.directory, threshold).unwrap();
593            self.store = Some(store);
594        }
595        fn close(&mut self) -> Option<()> {
596            return self.store.take().map(|_| ());
597        }
598        fn kv(&mut self) -> &mut Store {
599            return self.store.as_mut().unwrap();
600        }
601    }
602
603    fn b(s: &str) -> &[u8] {
604        return s.as_bytes();
605    }
606
607    #[test]
608    fn putget() {
609        let mut ts = TestStore::create(100);
610        let kv = ts.kv();
611        kv.put(b("foo"), b("Hey")).unwrap();
612        let x: Option<Buf> = kv.get(b("foo")).unwrap();
613        assert_eq!(Some(b("Hey").to_vec()), x);
614        assert!(kv.exists(b("foo")).unwrap());
615        assert_eq!(None, kv.get(b("bar")).unwrap());
616        assert!(!kv.exists(b("bar")).unwrap());
617    }
618
619    #[test]
620    fn range() {
621        let mut ts = TestStore::create(100);
622        let kv = ts.kv();
623        kv.put(b("a"), b("alpha")).unwrap();
624        kv.put(b("b"), b("beta")).unwrap();
625        kv.put(b("c"), b("charlie")).unwrap();
626        kv.put(b("d"), b("delta")).unwrap();
627        let interval = Interval::<Buf>{lower: Bound::Unbounded, upper: Bound::Excluded(b("d").to_vec())};
628        {
629            let mut it: StoreIter = kv.range(&interval).expect("range");
630            assert_eq!(Some((b("a").to_vec(), b("alpha").to_vec())), kv.next(&mut it).unwrap());
631            assert_eq!(Some((b("b").to_vec(), b("beta").to_vec())), kv.next(&mut it).unwrap());
632            assert_eq!(Some((b("c").to_vec(), b("charlie").to_vec())), kv.next(&mut it).unwrap());
633            assert_eq!(None, kv.next(&mut it).unwrap());
634        }
635        {
636            let mut it: StoreIter = kv.range_descending(&interval).expect("range");
637            assert_eq!(Some((b("c").to_vec(), b("charlie").to_vec())), kv.next(&mut it).unwrap());
638            assert_eq!(Some((b("b").to_vec(), b("beta").to_vec())), kv.next(&mut it).unwrap());
639            assert_eq!(Some((b("a").to_vec(), b("alpha").to_vec())), kv.next(&mut it).unwrap());
640            assert_eq!(None, kv.next(&mut it).unwrap());
641        }
642    }
643
644    #[test]
645    fn overwrite() {
646        let mut ts = TestStore::create(100);
647        let kv = ts.kv();
648
649        kv.put(b("a"), b("alpha")).unwrap();
650        kv.put(b("a"), b("alpha-2")).unwrap();
651        assert_eq!(Some(b("alpha-2").to_vec()), kv.get(b("a")).unwrap());
652        let inserted: bool = kv.insert(b("a"), b("alpha-3")).unwrap();
653        assert!(!inserted);
654        let overwrote: bool = kv.replace(b("a"), b("alpha-4")).unwrap();
655        assert!(overwrote);
656        assert_eq!(Some(b("alpha-4").to_vec()), kv.get(b("a")).unwrap());
657    }
658
659    fn write_basic_kv(ts: &mut TestStore) {
660        let kv = ts.kv();
661        for i in (0..102).rev() {
662            kv.put(b(&i.to_string()), b(&format!("value-{}", i.to_string()))).unwrap();
663        }
664        // Remove one, so that we test Delete entries really do override Set entries.
665        let removed: bool = kv.remove(b("11")).unwrap();
666        assert!(removed);
667        assert!(1 < kv.memstores.len());
668    }
669
670    fn verify_basic_kv(ts: &mut TestStore) {
671        let kv = ts.kv();
672        {
673            let interval = Interval::<Buf>{lower: Bound::Excluded(b("1").to_vec()), upper: Bound::Unbounded};
674            let mut it: StoreIter = kv.range(&interval).expect("range");
675            assert_eq!(Some((b("10").to_vec(), b("value-10").to_vec())), kv.next(&mut it).unwrap());
676            assert_eq!(Some((b("100").to_vec(), b("value-100").to_vec())), kv.next(&mut it).unwrap());
677            assert_eq!(Some((b("101").to_vec(), b("value-101").to_vec())), kv.next(&mut it).unwrap());
678            assert_eq!(Some((b("12").to_vec(), b("value-12").to_vec())), kv.next(&mut it).unwrap());
679            assert_eq!(Some((b("13").to_vec(), b("value-13").to_vec())), kv.next(&mut it).unwrap());
680        }
681        {
682            let interval = Interval::<Buf>{lower: Bound::Unbounded, upper: Bound::Excluded(b("99").to_vec())};
683            let mut it: StoreIter = kv.range_descending(&interval).expect("range descending");
684            assert_eq!(Some((b("98").to_vec(), b("value-98").to_vec())), kv.next(&mut it).unwrap());
685            assert_eq!(Some((b("97").to_vec(), b("value-97").to_vec())), kv.next(&mut it).unwrap());
686        }
687    }
688
689    #[test]
690    fn many() {
691        let mut ts = TestStore::create(100);
692        write_basic_kv(&mut ts);
693        verify_basic_kv(&mut ts);
694    }
695
696    #[test]
697    fn disk() {
698        let mut ts = TestStore::create(100);
699        write_basic_kv(&mut ts);
700        ts.kv().flush().unwrap();
701        // Remove (and drop) existing store.
702        assert!(ts.close().is_some());
703        ts.open(100);
704        verify_basic_kv(&mut ts);
705    }
706
707    #[test]
708    fn disk_missing_key() {
709        let mut ts = TestStore::create(100);
710        write_basic_kv(&mut ts);
711        ts.kv().flush().unwrap();
712        // Remove (and drop) existing store.
713        assert!(ts.close().is_some());
714        ts.open(100);
715        // This actually hits the disk, because the key has no reference in the memstores.
716        assert_eq!(None, ts.kv().get(b("bogus")).unwrap());
717    }
718
719    fn big_key(num: u64) -> Buf { format!("{:08}", num).as_bytes().to_vec() }
720    fn big_value(num: u64) -> Buf { format!("value-{}", num).as_bytes().to_vec() }
721
722    fn write_big_kv(ts: &mut TestStore, n: u64) {
723        let kv = ts.kv();
724        for i in 0..n {
725            kv.put(&big_key(i), &big_value(i)).unwrap();
726        }
727        for j in 0..n/2 {
728            let i = j * 2 + 1;
729            let removed: bool = kv.remove(&big_key(i)).unwrap();
730            assert!(removed);
731        }
732    }
733
734    fn verify_big_kv_range(kv: &mut Store, low: u64, high: u64) {
735        let interval = Interval::<Buf>{
736            lower: Bound::Included(big_key(low)),
737            upper: Bound::Included(big_key(high)),
738        };
739        {
740            let mut i = low;
741            if i % 2 == 1 {
742                i += 1;
743            }
744            let mut it: StoreIter = kv.range(&interval).expect("range");
745            while let Some((k, v)) = kv.next(&mut it).expect("next") {
746                assert_eq!(&big_key(i), &k);
747                assert_eq!(&big_value(i), &v);
748                i += 2;
749            }
750            if high % 2 == 0 {
751                assert_eq!(high + 2, i);
752            } else {
753                assert_eq!(high + 1, i);
754            }
755        }
756        {
757            let mut i = high;
758            if i % 2 == 1 {
759                i -= 1;
760            }
761            let mut it: StoreIter = kv.range_descending(&interval).expect("range_descending");
762            while let Some((k, v)) = kv.next(&mut it).expect("next desc") {
763                assert_eq!(&big_key(i), &k);
764                assert_eq!(&big_value(i), &v);
765                i -= 2;
766            }
767            if low % 2 == 0 {
768                assert_eq!(low - 2, i);
769            } else {
770                assert_eq!(low - 1, i);
771            }
772        }
773    }
774
775    fn verify_big_kv(ts: &mut TestStore) {
776        let kv = ts.kv();
777        verify_big_kv_range(kv, 145, 346);
778    }
779
780    #[test]
781    fn big_many() {
782        let mut ts = TestStore::create(100);
783        write_big_kv(&mut ts, 1000);
784        verify_big_kv(&mut ts);
785    }
786
787    #[test]
788    fn big_many_disk() {
789        let mut ts = TestStore::create(100);
790        write_big_kv(&mut ts, 1000);
791        ts.kv().flush().unwrap();
792        // Remove (and drop) existing store.
793        assert!(ts.close().is_some());
794        ts.open(100);
795        verify_big_kv(&mut ts);
796    }
797
798    #[test]
799    fn big_many_threshold() {
800        let mut ts = TestStore::create(2000000);
801        write_big_kv(&mut ts, 1000);
802        ts.kv().flush().unwrap();
803        // Remove (and drop) existing store.
804        assert!(ts.close().is_some());
805        ts.open(100);
806        verify_big_kv(&mut ts);
807    }
808
809    #[test]
810    fn sync() {
811        // Tests that sync generally works.
812        let mut ts = TestStore::create(100);
813        write_basic_kv(&mut ts);
814        ts.kv().sync().expect("sync to succeed");
815    }
816}