jujutsu_lib/
stacked_table.rs

1// Copyright 2021 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A persistent table of fixed-size keys to variable-size values. The keys are
16//! stored in sorted order, with each key followed by an integer offset into the
17//! list of values. The values are concatenated after the keys. A file may have
18//! a parent file, and the parent may have its own parent, and so on. The child
19//! file then represents the union of the entries.
20
21use std::cmp::Ordering;
22use std::collections::{BTreeMap, HashMap};
23use std::fs::File;
24use std::io;
25use std::io::{Cursor, Read, Write};
26use std::path::PathBuf;
27use std::sync::{Arc, RwLock};
28
29use blake2::{Blake2b512, Digest};
30use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
31use tempfile::NamedTempFile;
32use thiserror::Error;
33
34use crate::file_util::persist_content_addressed_temp_file;
35use crate::lock::FileLock;
36
37pub trait TableSegment {
38    fn segment_num_entries(&self) -> usize;
39    fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>>;
40    fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]>;
41    fn segment_add_entries_to(&self, mut_table: &mut MutableTable);
42
43    fn num_entries(&self) -> usize {
44        if let Some(parent_file) = self.segment_parent_file() {
45            parent_file.num_entries() + self.segment_num_entries()
46        } else {
47            self.segment_num_entries()
48        }
49    }
50
51    fn get_value<'a>(&'a self, key: &[u8]) -> Option<&'a [u8]> {
52        if let Some(value) = self.segment_get_value(key) {
53            return Some(value);
54        }
55        let parent_file = self.segment_parent_file()?;
56        let parent_file: &ReadonlyTable = parent_file.as_ref();
57        // The parent ReadonlyIndex outlives the child
58        let parent_file: &'a ReadonlyTable = unsafe { std::mem::transmute(parent_file) };
59        parent_file.get_value(key)
60    }
61}
62
63pub struct ReadonlyTable {
64    key_size: usize,
65    parent_file: Option<Arc<ReadonlyTable>>,
66    name: String,
67    // Number of entries not counting the parent file
68    num_local_entries: usize,
69    // The file's entries in the raw format they're stored in on disk.
70    index: Vec<u8>,
71    values: Vec<u8>,
72}
73
74impl ReadonlyTable {
75    fn load_from(
76        file: &mut dyn Read,
77        store: &TableStore,
78        name: String,
79        key_size: usize,
80    ) -> TableStoreResult<Arc<ReadonlyTable>> {
81        let parent_filename_len = file.read_u32::<LittleEndian>()?;
82        let maybe_parent_file = if parent_filename_len > 0 {
83            let mut parent_filename_bytes = vec![0; parent_filename_len as usize];
84            file.read_exact(&mut parent_filename_bytes)?;
85            let parent_filename = String::from_utf8(parent_filename_bytes).unwrap();
86            let parent_file = store.load_table(parent_filename)?;
87            Some(parent_file)
88        } else {
89            None
90        };
91        let num_local_entries = file.read_u32::<LittleEndian>()? as usize;
92        let index_size = num_local_entries * ReadonlyTableIndexEntry::size(key_size);
93        let mut data = vec![];
94        file.read_to_end(&mut data)?;
95        let values = data.split_off(index_size);
96        let index = data;
97        Ok(Arc::new(ReadonlyTable {
98            key_size,
99            parent_file: maybe_parent_file,
100            name,
101            num_local_entries,
102            index,
103            values,
104        }))
105    }
106
107    pub fn start_mutation(self: &Arc<Self>) -> MutableTable {
108        MutableTable::incremental(self.clone())
109    }
110
111    fn segment_value_offset_by_pos(&self, pos: usize) -> usize {
112        if pos == self.num_local_entries {
113            self.values.len()
114        } else {
115            ReadonlyTableIndexEntry::new(self, pos).value_offset()
116        }
117    }
118
119    fn segment_value_by_pos(&self, pos: usize) -> &[u8] {
120        &self.values
121            [self.segment_value_offset_by_pos(pos)..self.segment_value_offset_by_pos(pos + 1)]
122    }
123}
124
125impl TableSegment for ReadonlyTable {
126    fn segment_num_entries(&self) -> usize {
127        self.num_local_entries
128    }
129
130    fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>> {
131        self.parent_file.as_ref()
132    }
133
134    fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]> {
135        let mut low_pos = 0;
136        let mut high_pos = self.num_local_entries;
137        loop {
138            if high_pos == low_pos {
139                return None;
140            }
141            let mid_pos = (low_pos + high_pos) / 2;
142            let mid_entry = ReadonlyTableIndexEntry::new(self, mid_pos);
143            match key.cmp(mid_entry.key()) {
144                Ordering::Less => {
145                    high_pos = mid_pos;
146                }
147                Ordering::Equal => {
148                    return Some(self.segment_value_by_pos(mid_pos));
149                }
150                Ordering::Greater => {
151                    low_pos = mid_pos + 1;
152                }
153            }
154        }
155    }
156
157    fn segment_add_entries_to(&self, mut_table: &mut MutableTable) {
158        for pos in 0..self.num_local_entries {
159            let entry = ReadonlyTableIndexEntry::new(self, pos);
160            mut_table.add_entry(
161                entry.key().to_vec(),
162                self.segment_value_by_pos(pos).to_vec(),
163            );
164        }
165    }
166}
167
168struct ReadonlyTableIndexEntry<'table> {
169    data: &'table [u8],
170}
171
172impl<'table> ReadonlyTableIndexEntry<'table> {
173    fn new(table: &'table ReadonlyTable, pos: usize) -> Self {
174        let entry_size = ReadonlyTableIndexEntry::size(table.key_size);
175        let offset = entry_size * pos;
176        let data = &table.index[offset..offset + entry_size];
177        ReadonlyTableIndexEntry { data }
178    }
179
180    fn size(key_size: usize) -> usize {
181        key_size + 4
182    }
183
184    fn key(&self) -> &'table [u8] {
185        &self.data[0..self.data.len() - 4]
186    }
187
188    fn value_offset(&self) -> usize {
189        (&self.data[self.data.len() - 4..self.data.len()])
190            .read_u32::<LittleEndian>()
191            .unwrap() as usize
192    }
193}
194
195pub struct MutableTable {
196    key_size: usize,
197    parent_file: Option<Arc<ReadonlyTable>>,
198    entries: BTreeMap<Vec<u8>, Vec<u8>>,
199}
200
201impl MutableTable {
202    fn full(key_size: usize) -> Self {
203        Self {
204            key_size,
205            parent_file: None,
206            entries: BTreeMap::new(),
207        }
208    }
209
210    fn incremental(parent_file: Arc<ReadonlyTable>) -> Self {
211        let key_size = parent_file.key_size;
212        Self {
213            key_size,
214            parent_file: Some(parent_file),
215            entries: BTreeMap::new(),
216        }
217    }
218
219    pub fn add_entry(&mut self, key: Vec<u8>, value: Vec<u8>) {
220        assert_eq!(key.len(), self.key_size);
221        self.entries.insert(key, value);
222    }
223
224    fn add_entries_from(&mut self, other: &dyn TableSegment) {
225        other.segment_add_entries_to(self);
226    }
227
228    fn merge_in(&mut self, other: &Arc<ReadonlyTable>) {
229        let mut maybe_own_ancestor = self.parent_file.clone();
230        let mut maybe_other_ancestor = Some(other.clone());
231        let mut files_to_add = vec![];
232        loop {
233            if maybe_other_ancestor.is_none() {
234                break;
235            }
236            let other_ancestor = maybe_other_ancestor.as_ref().unwrap();
237            if maybe_own_ancestor.is_none() {
238                files_to_add.push(other_ancestor.clone());
239                maybe_other_ancestor = other_ancestor.parent_file.clone();
240                continue;
241            }
242            let own_ancestor = maybe_own_ancestor.as_ref().unwrap();
243            if own_ancestor.name == other_ancestor.name {
244                break;
245            }
246            if own_ancestor.num_entries() < other_ancestor.num_entries() {
247                files_to_add.push(other_ancestor.clone());
248                maybe_other_ancestor = other_ancestor.parent_file.clone();
249            } else {
250                maybe_own_ancestor = own_ancestor.parent_file.clone();
251            }
252        }
253
254        for file in files_to_add.iter().rev() {
255            self.add_entries_from(file.as_ref());
256        }
257    }
258
259    fn serialize(self) -> Vec<u8> {
260        let mut buf = vec![];
261
262        if let Some(parent_file) = &self.parent_file {
263            buf.write_u32::<LittleEndian>(parent_file.name.len() as u32)
264                .unwrap();
265            buf.write_all(parent_file.name.as_bytes()).unwrap();
266        } else {
267            buf.write_u32::<LittleEndian>(0).unwrap();
268        }
269
270        buf.write_u32::<LittleEndian>(self.entries.len() as u32)
271            .unwrap();
272
273        let mut value_offset = 0;
274        for (key, value) in &self.entries {
275            buf.write_all(key).unwrap();
276            buf.write_u32::<LittleEndian>(value_offset).unwrap();
277            value_offset += value.len() as u32;
278        }
279        for value in self.entries.values() {
280            buf.write_all(value).unwrap();
281        }
282        buf
283    }
284
285    /// If the MutableTable has more than half the entries of its parent
286    /// ReadonlyTable, return MutableTable with the commits from both. This
287    /// is done recursively, so the stack of index files has O(log n) files.
288    fn maybe_squash_with_ancestors(self) -> MutableTable {
289        let mut num_new_entries = self.entries.len();
290        let mut files_to_squash = vec![];
291        let mut maybe_parent_file = self.parent_file.clone();
292        let mut squashed;
293        loop {
294            match maybe_parent_file {
295                Some(parent_file) => {
296                    // TODO: We should probably also squash if the parent file has less than N
297                    // commits, regardless of how many (few) are in `self`.
298                    if 2 * num_new_entries < parent_file.num_local_entries {
299                        squashed = MutableTable::incremental(parent_file);
300                        break;
301                    }
302                    num_new_entries += parent_file.num_local_entries;
303                    files_to_squash.push(parent_file.clone());
304                    maybe_parent_file = parent_file.parent_file.clone();
305                }
306                None => {
307                    squashed = MutableTable::full(self.key_size);
308                    break;
309                }
310            }
311        }
312
313        if files_to_squash.is_empty() {
314            return self;
315        }
316
317        for parent_file in files_to_squash.iter().rev() {
318            squashed.add_entries_from(parent_file.as_ref());
319        }
320        squashed.add_entries_from(&self);
321        squashed
322    }
323
324    fn save_in(self, store: &TableStore) -> TableStoreResult<Arc<ReadonlyTable>> {
325        if self.entries.is_empty() && self.parent_file.is_some() {
326            return Ok(self.parent_file.unwrap());
327        }
328
329        let buf = self.maybe_squash_with_ancestors().serialize();
330        let mut hasher = Blake2b512::new();
331        hasher.update(&buf);
332        let file_id_hex = hex::encode(hasher.finalize());
333        let file_path = store.dir.join(&file_id_hex);
334
335        let mut temp_file = NamedTempFile::new_in(&store.dir)?;
336        let file = temp_file.as_file_mut();
337        file.write_all(&buf)?;
338        persist_content_addressed_temp_file(temp_file, file_path)?;
339
340        let mut cursor = Cursor::new(&buf);
341        ReadonlyTable::load_from(&mut cursor, store, file_id_hex, store.key_size)
342    }
343}
344
345impl TableSegment for MutableTable {
346    fn segment_num_entries(&self) -> usize {
347        self.entries.len()
348    }
349
350    fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>> {
351        self.parent_file.as_ref()
352    }
353
354    fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]> {
355        self.entries.get(key).map(Vec::as_slice)
356    }
357
358    fn segment_add_entries_to(&self, mut_table: &mut MutableTable) {
359        for (key, value) in &self.entries {
360            mut_table.add_entry(key.clone(), value.clone());
361        }
362    }
363}
364
365#[derive(Debug, Error)]
366#[error(transparent)]
367pub enum TableStoreError {
368    IoError(#[from] io::Error),
369    PersistError(#[from] tempfile::PersistError),
370}
371
372pub type TableStoreResult<T> = Result<T, TableStoreError>;
373
374pub struct TableStore {
375    dir: PathBuf,
376    key_size: usize,
377    cached_tables: RwLock<HashMap<String, Arc<ReadonlyTable>>>,
378}
379
380impl TableStore {
381    pub fn init(dir: PathBuf, key_size: usize) -> Self {
382        std::fs::create_dir(dir.join("heads")).unwrap();
383        TableStore {
384            dir,
385            key_size,
386            cached_tables: Default::default(),
387        }
388    }
389
390    pub fn reinit(&self) {
391        std::fs::remove_dir_all(self.dir.join("heads")).unwrap();
392        TableStore::init(self.dir.clone(), self.key_size);
393    }
394
395    pub fn key_size(&self) -> usize {
396        self.key_size
397    }
398
399    pub fn load(dir: PathBuf, key_size: usize) -> Self {
400        TableStore {
401            dir,
402            key_size,
403            cached_tables: Default::default(),
404        }
405    }
406
407    pub fn save_table(&self, mut_table: MutableTable) -> TableStoreResult<Arc<ReadonlyTable>> {
408        let maybe_parent_table = mut_table.parent_file.clone();
409        let table = mut_table.save_in(self)?;
410        self.add_head(&table)?;
411        if let Some(parent_table) = maybe_parent_table {
412            self.remove_head(&parent_table);
413        }
414        {
415            let mut locked_cache = self.cached_tables.write().unwrap();
416            locked_cache.insert(table.name.clone(), table.clone());
417        }
418        Ok(table)
419    }
420
421    fn add_head(&self, table: &Arc<ReadonlyTable>) -> std::io::Result<()> {
422        std::fs::write(self.dir.join("heads").join(&table.name), "")
423    }
424
425    fn remove_head(&self, table: &Arc<ReadonlyTable>) {
426        // It's fine if the old head was not found. It probably means
427        // that we're on a distributed file system where the locking
428        // doesn't work. We'll probably end up with two current
429        // heads. We'll detect that next time we load the table.
430        std::fs::remove_file(self.dir.join("heads").join(&table.name)).ok();
431    }
432
433    fn lock(&self) -> FileLock {
434        FileLock::lock(self.dir.join("lock"))
435    }
436
437    fn load_table(&self, name: String) -> TableStoreResult<Arc<ReadonlyTable>> {
438        {
439            let read_locked_cached = self.cached_tables.read().unwrap();
440            if let Some(table) = read_locked_cached.get(&name).cloned() {
441                return Ok(table);
442            }
443        }
444        let table_file_path = self.dir.join(&name);
445        let mut table_file = File::open(table_file_path)?;
446        let table = ReadonlyTable::load_from(&mut table_file, self, name, self.key_size)?;
447        {
448            let mut write_locked_cache = self.cached_tables.write().unwrap();
449            write_locked_cache.insert(table.name.clone(), table.clone());
450        }
451        Ok(table)
452    }
453
454    fn get_head_tables(&self) -> TableStoreResult<Vec<Arc<ReadonlyTable>>> {
455        let mut tables = vec![];
456        for head_entry in std::fs::read_dir(self.dir.join("heads"))? {
457            let head_file_name = head_entry?.file_name();
458            let table = self.load_table(head_file_name.to_str().unwrap().to_string())?;
459            tables.push(table);
460        }
461        Ok(tables)
462    }
463
464    pub fn get_head(&self) -> TableStoreResult<Arc<ReadonlyTable>> {
465        let mut tables = self.get_head_tables()?;
466
467        if tables.is_empty() {
468            let empty_table = MutableTable::full(self.key_size);
469            self.save_table(empty_table)
470        } else if tables.len() == 1 {
471            Ok(tables.pop().unwrap())
472        } else {
473            // There are multiple heads. We take a lock, then check if there are still
474            // multiple heads (it's likely that another process was in the process of
475            // deleting on of them). If there are still multiple heads, we attempt to
476            // merge all the tables into one. We then save that table and record the new
477            // head. Note that the locking isn't necessary for correctness; we
478            // take the lock only to avoid other concurrent processes from doing
479            // the same work (and producing another set of divergent heads).
480            let _lock = self.lock();
481            let mut tables = self.get_head_tables()?;
482
483            if tables.is_empty() {
484                let empty_table = MutableTable::full(self.key_size);
485                return self.save_table(empty_table);
486            }
487
488            if tables.len() == 1 {
489                // Return early so we don't write a table with no changes compared to its parent
490                return Ok(tables.pop().unwrap());
491            }
492
493            let mut merged_table = MutableTable::incremental(tables[0].clone());
494            for other in &tables[1..] {
495                merged_table.merge_in(other);
496            }
497            let merged_table = self.save_table(merged_table)?;
498            for table in &tables[1..] {
499                self.remove_head(table);
500            }
501            Ok(merged_table)
502        }
503    }
504}
505
506#[cfg(test)]
507mod tests {
508    use test_case::test_case;
509
510    use super::*;
511
512    #[test_case(false; "memory")]
513    #[test_case(true; "file")]
514    fn stacked_table_empty(on_disk: bool) {
515        let temp_dir = testutils::new_temp_dir();
516        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
517        let mut_table = store.get_head().unwrap().start_mutation();
518        let mut _saved_table = None;
519        let table: &dyn TableSegment = if on_disk {
520            _saved_table = Some(store.save_table(mut_table).unwrap());
521            _saved_table.as_ref().unwrap().as_ref()
522        } else {
523            &mut_table
524        };
525
526        // Cannot find any keys
527        assert_eq!(table.get_value(b"\0\0\0"), None);
528        assert_eq!(table.get_value(b"aaa"), None);
529        assert_eq!(table.get_value(b"\xff\xff\xff"), None);
530    }
531
532    #[test_case(false; "memory")]
533    #[test_case(true; "file")]
534    fn stacked_table_single_key(on_disk: bool) {
535        let temp_dir = testutils::new_temp_dir();
536        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
537        let mut mut_table = store.get_head().unwrap().start_mutation();
538        mut_table.add_entry(b"abc".to_vec(), b"value".to_vec());
539        let mut _saved_table = None;
540        let table: &dyn TableSegment = if on_disk {
541            _saved_table = Some(store.save_table(mut_table).unwrap());
542            _saved_table.as_ref().unwrap().as_ref()
543        } else {
544            &mut_table
545        };
546
547        // Can find expected keys
548        assert_eq!(table.get_value(b"\0\0\0"), None);
549        assert_eq!(table.get_value(b"abc"), Some(b"value".as_slice()));
550        assert_eq!(table.get_value(b"\xff\xff\xff"), None);
551    }
552
553    #[test_case(false; "memory")]
554    #[test_case(true; "file")]
555    fn stacked_table_multiple_keys(on_disk: bool) {
556        let temp_dir = testutils::new_temp_dir();
557        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
558        let mut mut_table = store.get_head().unwrap().start_mutation();
559        mut_table.add_entry(b"zzz".to_vec(), b"val3".to_vec());
560        mut_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
561        mut_table.add_entry(b"abd".to_vec(), b"value 2".to_vec());
562        let mut _saved_table = None;
563        let table: &dyn TableSegment = if on_disk {
564            _saved_table = Some(store.save_table(mut_table).unwrap());
565            _saved_table.as_ref().unwrap().as_ref()
566        } else {
567            &mut_table
568        };
569
570        // Can find expected keys
571        assert_eq!(table.get_value(b"\0\0\0"), None);
572        assert_eq!(table.get_value(b"abb"), None);
573        assert_eq!(table.get_value(b"abc"), Some(b"value1".as_slice()));
574        assert_eq!(table.get_value(b"abd"), Some(b"value 2".as_slice()));
575        assert_eq!(table.get_value(b"abe"), None);
576        assert_eq!(table.get_value(b"zzz"), Some(b"val3".as_slice()));
577        assert_eq!(table.get_value(b"\xff\xff\xff"), None);
578    }
579
580    #[test]
581    fn stacked_table_multiple_keys_with_parent_file() {
582        let temp_dir = testutils::new_temp_dir();
583        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
584        let mut mut_table = store.get_head().unwrap().start_mutation();
585        mut_table.add_entry(b"abd".to_vec(), b"value 2".to_vec());
586        mut_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
587        mut_table.add_entry(b"zzz".to_vec(), b"val3".to_vec());
588        for round in 0..10 {
589            for i in 0..10 {
590                mut_table.add_entry(
591                    format!("x{i}{round}").into_bytes(),
592                    format!("value {i}{round}").into_bytes(),
593                );
594            }
595            let saved_table = store.save_table(mut_table).unwrap();
596            mut_table = MutableTable::incremental(saved_table);
597        }
598
599        // Can find expected keys
600        assert_eq!(mut_table.get_value(b"\0\0\0"), None);
601        assert_eq!(mut_table.get_value(b"x.."), None);
602        assert_eq!(mut_table.get_value(b"x14"), Some(b"value 14".as_slice()));
603        assert_eq!(mut_table.get_value(b"x41"), Some(b"value 41".as_slice()));
604        assert_eq!(mut_table.get_value(b"x49"), Some(b"value 49".as_slice()));
605        assert_eq!(mut_table.get_value(b"x94"), Some(b"value 94".as_slice()));
606        assert_eq!(mut_table.get_value(b"xAA"), None);
607        assert_eq!(mut_table.get_value(b"\xff\xff\xff"), None);
608    }
609
610    #[test]
611    fn stacked_table_merge() {
612        let temp_dir = testutils::new_temp_dir();
613        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
614        let mut mut_base_table = store.get_head().unwrap().start_mutation();
615        mut_base_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
616        let base_table = store.save_table(mut_base_table).unwrap();
617
618        let mut mut_table1 = MutableTable::incremental(base_table.clone());
619        mut_table1.add_entry(b"abd".to_vec(), b"value 2".to_vec());
620        mut_table1.add_entry(b"zzz".to_vec(), b"val3".to_vec());
621        mut_table1.add_entry(b"mmm".to_vec(), b"side 1".to_vec());
622        let table1 = store.save_table(mut_table1).unwrap();
623        let mut mut_table2 = MutableTable::incremental(base_table);
624        mut_table2.add_entry(b"yyy".to_vec(), b"val5".to_vec());
625        mut_table2.add_entry(b"mmm".to_vec(), b"side 2".to_vec());
626        mut_table2.add_entry(b"abe".to_vec(), b"value 4".to_vec());
627        mut_table2.merge_in(&table1);
628
629        // Can find expected keys
630        assert_eq!(mut_table2.get_value(b"\0\0\0"), None);
631        assert_eq!(mut_table2.get_value(b"abc"), Some(b"value1".as_slice()));
632        assert_eq!(mut_table2.get_value(b"abd"), Some(b"value 2".as_slice()));
633        assert_eq!(mut_table2.get_value(b"abe"), Some(b"value 4".as_slice()));
634        // The caller shouldn't write two values for the same key, so it's undefined
635        // which wins, but let's test how it currently behaves.
636        assert_eq!(mut_table2.get_value(b"mmm"), Some(b"side 1".as_slice()));
637        assert_eq!(mut_table2.get_value(b"yyy"), Some(b"val5".as_slice()));
638        assert_eq!(mut_table2.get_value(b"zzz"), Some(b"val3".as_slice()));
639        assert_eq!(mut_table2.get_value(b"\xff\xff\xff"), None);
640    }
641
642    #[test]
643    fn stacked_table_automatic_merge() {
644        // Same test as above, but here we let the store do the merging on load
645        let temp_dir = testutils::new_temp_dir();
646        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
647        let mut mut_base_table = store.get_head().unwrap().start_mutation();
648        mut_base_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
649        let base_table = store.save_table(mut_base_table).unwrap();
650
651        let mut mut_table1 = MutableTable::incremental(base_table.clone());
652        mut_table1.add_entry(b"abd".to_vec(), b"value 2".to_vec());
653        mut_table1.add_entry(b"zzz".to_vec(), b"val3".to_vec());
654        mut_table1.add_entry(b"mmm".to_vec(), b"side 1".to_vec());
655        store.save_table(mut_table1).unwrap();
656        let mut mut_table2 = MutableTable::incremental(base_table);
657        mut_table2.add_entry(b"yyy".to_vec(), b"val5".to_vec());
658        mut_table2.add_entry(b"mmm".to_vec(), b"side 2".to_vec());
659        mut_table2.add_entry(b"abe".to_vec(), b"value 4".to_vec());
660        let table2 = store.save_table(mut_table2).unwrap();
661
662        // The saved table does not have the keys from table1
663        assert_eq!(table2.get_value(b"abd"), None);
664
665        // Can find expected keys in the merged table we get from get_head()
666        let merged_table = store.get_head().unwrap();
667        assert_eq!(merged_table.get_value(b"\0\0\0"), None);
668        assert_eq!(merged_table.get_value(b"abc"), Some(b"value1".as_slice()));
669        assert_eq!(merged_table.get_value(b"abd"), Some(b"value 2".as_slice()));
670        assert_eq!(merged_table.get_value(b"abe"), Some(b"value 4".as_slice()));
671        // The caller shouldn't write two values for the same key, so it's undefined
672        // which wins.
673        let value_mmm = merged_table.get_value(b"mmm");
674        assert!(value_mmm == Some(b"side 1".as_slice()) || value_mmm == Some(b"side 2".as_slice()));
675        assert_eq!(merged_table.get_value(b"yyy"), Some(b"val5".as_slice()));
676        assert_eq!(merged_table.get_value(b"zzz"), Some(b"val3".as_slice()));
677        assert_eq!(merged_table.get_value(b"\xff\xff\xff"), None);
678    }
679}