jj_lib/
stacked_table.rs

1// Copyright 2021 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A persistent table of fixed-size keys to variable-size values.
16//!
17//! The keys are stored in sorted order, with each key followed by an
18//! integer offset into the list of values. The values are
19//! concatenated after the keys. A file may have a parent file, and
20//! the parent may have its own parent, and so on. The child file then
21//! represents the union of the entries.
22
23#![allow(missing_docs)]
24
25use std::cmp::Ordering;
26use std::collections::BTreeMap;
27use std::collections::HashMap;
28use std::fs::File;
29use std::io;
30use std::io::Read;
31use std::io::Write as _;
32use std::path::PathBuf;
33use std::sync::Arc;
34use std::sync::RwLock;
35
36use blake2::Blake2b512;
37use blake2::Digest as _;
38use tempfile::NamedTempFile;
39use thiserror::Error;
40
41use crate::file_util::persist_content_addressed_temp_file;
42use crate::hex_util;
43use crate::lock::FileLock;
44use crate::lock::FileLockError;
45
46pub trait TableSegment {
47    fn segment_num_entries(&self) -> usize;
48    fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>>;
49    fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]>;
50    fn segment_add_entries_to(&self, mut_table: &mut MutableTable);
51
52    fn num_entries(&self) -> usize {
53        if let Some(parent_file) = self.segment_parent_file() {
54            parent_file.num_entries() + self.segment_num_entries()
55        } else {
56            self.segment_num_entries()
57        }
58    }
59
60    fn get_value<'a>(&'a self, key: &[u8]) -> Option<&'a [u8]> {
61        self.segment_get_value(key)
62            .or_else(|| self.segment_parent_file()?.get_value(key))
63    }
64}
65
66pub struct ReadonlyTable {
67    key_size: usize,
68    parent_file: Option<Arc<ReadonlyTable>>,
69    name: String,
70    // Number of entries not counting the parent file
71    num_local_entries: usize,
72    // The file's entries in the raw format they're stored in on disk.
73    index: Vec<u8>,
74    values: Vec<u8>,
75}
76
77impl ReadonlyTable {
78    fn load_from(
79        file: &mut dyn Read,
80        store: &TableStore,
81        name: String,
82        key_size: usize,
83    ) -> TableStoreResult<Arc<ReadonlyTable>> {
84        let to_load_err = |err| TableStoreError::LoadSegment {
85            name: name.clone(),
86            err,
87        };
88        let read_u32 = |file: &mut dyn Read| -> TableStoreResult<u32> {
89            let mut buf = [0; 4];
90            file.read_exact(&mut buf).map_err(to_load_err)?;
91            Ok(u32::from_le_bytes(buf))
92        };
93        let parent_filename_len = read_u32(file)?;
94        let maybe_parent_file = if parent_filename_len > 0 {
95            let mut parent_filename_bytes = vec![0; parent_filename_len as usize];
96            file.read_exact(&mut parent_filename_bytes)
97                .map_err(to_load_err)?;
98            let parent_filename = String::from_utf8(parent_filename_bytes).unwrap();
99            let parent_file = store.load_table(parent_filename)?;
100            Some(parent_file)
101        } else {
102            None
103        };
104        let num_local_entries = read_u32(file)? as usize;
105        let index_size = num_local_entries * ReadonlyTableIndexEntry::size(key_size);
106        let mut data = vec![];
107        file.read_to_end(&mut data).map_err(to_load_err)?;
108        let values = data.split_off(index_size);
109        let index = data;
110        Ok(Arc::new(ReadonlyTable {
111            key_size,
112            parent_file: maybe_parent_file,
113            name,
114            num_local_entries,
115            index,
116            values,
117        }))
118    }
119
120    pub fn start_mutation(self: &Arc<Self>) -> MutableTable {
121        MutableTable::incremental(self.clone())
122    }
123
124    fn segment_value_offset_by_pos(&self, pos: usize) -> usize {
125        if pos == self.num_local_entries {
126            self.values.len()
127        } else {
128            ReadonlyTableIndexEntry::new(self, pos).value_offset()
129        }
130    }
131
132    fn segment_value_by_pos(&self, pos: usize) -> &[u8] {
133        &self.values
134            [self.segment_value_offset_by_pos(pos)..self.segment_value_offset_by_pos(pos + 1)]
135    }
136}
137
138impl TableSegment for ReadonlyTable {
139    fn segment_num_entries(&self) -> usize {
140        self.num_local_entries
141    }
142
143    fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>> {
144        self.parent_file.as_ref()
145    }
146
147    fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]> {
148        let mut low_pos = 0;
149        let mut high_pos = self.num_local_entries;
150        loop {
151            if high_pos == low_pos {
152                return None;
153            }
154            let mid_pos = (low_pos + high_pos) / 2;
155            let mid_entry = ReadonlyTableIndexEntry::new(self, mid_pos);
156            match key.cmp(mid_entry.key()) {
157                Ordering::Less => {
158                    high_pos = mid_pos;
159                }
160                Ordering::Equal => {
161                    return Some(self.segment_value_by_pos(mid_pos));
162                }
163                Ordering::Greater => {
164                    low_pos = mid_pos + 1;
165                }
166            }
167        }
168    }
169
170    fn segment_add_entries_to(&self, mut_table: &mut MutableTable) {
171        for pos in 0..self.num_local_entries {
172            let entry = ReadonlyTableIndexEntry::new(self, pos);
173            mut_table.add_entry(
174                entry.key().to_vec(),
175                self.segment_value_by_pos(pos).to_vec(),
176            );
177        }
178    }
179}
180
181struct ReadonlyTableIndexEntry<'table> {
182    data: &'table [u8],
183}
184
185impl<'table> ReadonlyTableIndexEntry<'table> {
186    fn new(table: &'table ReadonlyTable, pos: usize) -> Self {
187        let entry_size = ReadonlyTableIndexEntry::size(table.key_size);
188        let offset = entry_size * pos;
189        let data = &table.index[offset..][..entry_size];
190        ReadonlyTableIndexEntry { data }
191    }
192
193    fn size(key_size: usize) -> usize {
194        key_size + 4
195    }
196
197    fn key(&self) -> &'table [u8] {
198        &self.data[0..self.data.len() - 4]
199    }
200
201    fn value_offset(&self) -> usize {
202        u32::from_le_bytes(self.data[self.data.len() - 4..].try_into().unwrap()) as usize
203    }
204}
205
206pub struct MutableTable {
207    key_size: usize,
208    parent_file: Option<Arc<ReadonlyTable>>,
209    entries: BTreeMap<Vec<u8>, Vec<u8>>,
210}
211
212impl MutableTable {
213    fn full(key_size: usize) -> Self {
214        Self {
215            key_size,
216            parent_file: None,
217            entries: BTreeMap::new(),
218        }
219    }
220
221    fn incremental(parent_file: Arc<ReadonlyTable>) -> Self {
222        let key_size = parent_file.key_size;
223        Self {
224            key_size,
225            parent_file: Some(parent_file),
226            entries: BTreeMap::new(),
227        }
228    }
229
230    pub fn add_entry(&mut self, key: Vec<u8>, value: Vec<u8>) {
231        assert_eq!(key.len(), self.key_size);
232        self.entries.insert(key, value);
233    }
234
235    fn add_entries_from(&mut self, other: &dyn TableSegment) {
236        other.segment_add_entries_to(self);
237    }
238
239    fn merge_in(&mut self, other: &Arc<ReadonlyTable>) {
240        let mut maybe_own_ancestor = self.parent_file.clone();
241        let mut maybe_other_ancestor = Some(other.clone());
242        let mut files_to_add = vec![];
243        loop {
244            if maybe_other_ancestor.is_none() {
245                break;
246            }
247            let other_ancestor = maybe_other_ancestor.as_ref().unwrap();
248            if maybe_own_ancestor.is_none() {
249                files_to_add.push(other_ancestor.clone());
250                maybe_other_ancestor = other_ancestor.parent_file.clone();
251                continue;
252            }
253            let own_ancestor = maybe_own_ancestor.as_ref().unwrap();
254            if own_ancestor.name == other_ancestor.name {
255                break;
256            }
257            if own_ancestor.num_entries() < other_ancestor.num_entries() {
258                files_to_add.push(other_ancestor.clone());
259                maybe_other_ancestor = other_ancestor.parent_file.clone();
260            } else {
261                maybe_own_ancestor = own_ancestor.parent_file.clone();
262            }
263        }
264
265        for file in files_to_add.iter().rev() {
266            self.add_entries_from(file.as_ref());
267        }
268    }
269
270    fn serialize(self) -> Vec<u8> {
271        let mut buf = vec![];
272
273        if let Some(parent_file) = &self.parent_file {
274            buf.extend(u32::try_from(parent_file.name.len()).unwrap().to_le_bytes());
275            buf.extend_from_slice(parent_file.name.as_bytes());
276        } else {
277            buf.extend(0_u32.to_le_bytes());
278        }
279
280        buf.extend(u32::try_from(self.entries.len()).unwrap().to_le_bytes());
281
282        let mut value_offset = 0_u32;
283        for (key, value) in &self.entries {
284            buf.extend_from_slice(key);
285            buf.extend(value_offset.to_le_bytes());
286            value_offset += u32::try_from(value.len()).unwrap();
287        }
288        for value in self.entries.values() {
289            buf.extend_from_slice(value);
290        }
291        buf
292    }
293
294    /// If the MutableTable has more than half the entries of its parent
295    /// ReadonlyTable, return MutableTable with the commits from both. This
296    /// is done recursively, so the stack of index files has O(log n) files.
297    #[expect(clippy::assigning_clones)]
298    fn maybe_squash_with_ancestors(self) -> MutableTable {
299        let mut num_new_entries = self.entries.len();
300        let mut files_to_squash = vec![];
301        let mut maybe_parent_file = self.parent_file.clone();
302        let mut squashed;
303        loop {
304            match maybe_parent_file {
305                Some(parent_file) => {
306                    // TODO: We should probably also squash if the parent file has less than N
307                    // commits, regardless of how many (few) are in `self`.
308                    if 2 * num_new_entries < parent_file.num_local_entries {
309                        squashed = MutableTable::incremental(parent_file);
310                        break;
311                    }
312                    num_new_entries += parent_file.num_local_entries;
313                    files_to_squash.push(parent_file.clone());
314                    maybe_parent_file = parent_file.parent_file.clone();
315                }
316                None => {
317                    squashed = MutableTable::full(self.key_size);
318                    break;
319                }
320            }
321        }
322
323        if files_to_squash.is_empty() {
324            return self;
325        }
326
327        for parent_file in files_to_squash.iter().rev() {
328            squashed.add_entries_from(parent_file.as_ref());
329        }
330        squashed.add_entries_from(&self);
331        squashed
332    }
333
334    fn save_in(self, store: &TableStore) -> TableStoreResult<Arc<ReadonlyTable>> {
335        if self.entries.is_empty() && self.parent_file.is_some() {
336            return Ok(self.parent_file.unwrap());
337        }
338
339        let buf = self.maybe_squash_with_ancestors().serialize();
340        let mut hasher = Blake2b512::new();
341        hasher.update(&buf);
342        let file_id_hex = hex_util::encode_hex(&hasher.finalize());
343        let file_path = store.dir.join(&file_id_hex);
344        let to_save_err = |err| TableStoreError::SaveSegment {
345            name: file_id_hex.clone(),
346            err,
347        };
348
349        let mut temp_file = NamedTempFile::new_in(&store.dir).map_err(to_save_err)?;
350        let file = temp_file.as_file_mut();
351        file.write_all(&buf).map_err(to_save_err)?;
352        persist_content_addressed_temp_file(temp_file, file_path).map_err(to_save_err)?;
353
354        ReadonlyTable::load_from(&mut buf.as_slice(), store, file_id_hex, store.key_size)
355    }
356}
357
358impl TableSegment for MutableTable {
359    fn segment_num_entries(&self) -> usize {
360        self.entries.len()
361    }
362
363    fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>> {
364        self.parent_file.as_ref()
365    }
366
367    fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]> {
368        self.entries.get(key).map(Vec::as_slice)
369    }
370
371    fn segment_add_entries_to(&self, mut_table: &mut MutableTable) {
372        for (key, value) in &self.entries {
373            mut_table.add_entry(key.clone(), value.clone());
374        }
375    }
376}
377
378#[derive(Debug, Error)]
379pub enum TableStoreError {
380    #[error("Failed to load table heads")]
381    LoadHeads(#[source] io::Error),
382    #[error("Failed to save table heads")]
383    SaveHeads(#[source] io::Error),
384    #[error("Failed to load table segment '{name}'")]
385    LoadSegment {
386        name: String,
387        #[source]
388        err: io::Error,
389    },
390    #[error("Failed to save table segment '{name}'")]
391    SaveSegment {
392        name: String,
393        #[source]
394        err: io::Error,
395    },
396    #[error("Failed to lock table store")]
397    Lock(#[source] FileLockError),
398}
399
400pub type TableStoreResult<T> = Result<T, TableStoreError>;
401
402pub struct TableStore {
403    dir: PathBuf,
404    key_size: usize,
405    cached_tables: RwLock<HashMap<String, Arc<ReadonlyTable>>>,
406}
407
408impl TableStore {
409    pub fn init(dir: PathBuf, key_size: usize) -> Self {
410        std::fs::create_dir(dir.join("heads")).unwrap();
411        TableStore {
412            dir,
413            key_size,
414            cached_tables: Default::default(),
415        }
416    }
417
418    pub fn reinit(&self) {
419        std::fs::remove_dir_all(self.dir.join("heads")).unwrap();
420        TableStore::init(self.dir.clone(), self.key_size);
421    }
422
423    pub fn key_size(&self) -> usize {
424        self.key_size
425    }
426
427    pub fn load(dir: PathBuf, key_size: usize) -> Self {
428        TableStore {
429            dir,
430            key_size,
431            cached_tables: Default::default(),
432        }
433    }
434
435    pub fn save_table(&self, mut_table: MutableTable) -> TableStoreResult<Arc<ReadonlyTable>> {
436        let maybe_parent_table = mut_table.parent_file.clone();
437        let table = mut_table.save_in(self)?;
438        self.add_head(&table)?;
439        if let Some(parent_table) = maybe_parent_table {
440            if parent_table.name != table.name {
441                self.remove_head(&parent_table);
442            }
443        }
444        {
445            let mut locked_cache = self.cached_tables.write().unwrap();
446            locked_cache.insert(table.name.clone(), table.clone());
447        }
448        Ok(table)
449    }
450
451    fn add_head(&self, table: &Arc<ReadonlyTable>) -> TableStoreResult<()> {
452        std::fs::write(self.dir.join("heads").join(&table.name), "")
453            .map_err(TableStoreError::SaveHeads)
454    }
455
456    fn remove_head(&self, table: &Arc<ReadonlyTable>) {
457        // It's fine if the old head was not found. It probably means
458        // that we're on a distributed file system where the locking
459        // doesn't work. We'll probably end up with two current
460        // heads. We'll detect that next time we load the table.
461        std::fs::remove_file(self.dir.join("heads").join(&table.name)).ok();
462    }
463
464    fn lock(&self) -> TableStoreResult<FileLock> {
465        FileLock::lock(self.dir.join("lock")).map_err(TableStoreError::Lock)
466    }
467
468    fn load_table(&self, name: String) -> TableStoreResult<Arc<ReadonlyTable>> {
469        {
470            let read_locked_cached = self.cached_tables.read().unwrap();
471            if let Some(table) = read_locked_cached.get(&name).cloned() {
472                return Ok(table);
473            }
474        }
475        let to_load_err = |err| TableStoreError::LoadSegment {
476            name: name.clone(),
477            err,
478        };
479        let table_file_path = self.dir.join(&name);
480        let mut table_file = File::open(table_file_path).map_err(to_load_err)?;
481        let table = ReadonlyTable::load_from(&mut table_file, self, name, self.key_size)?;
482        {
483            let mut write_locked_cache = self.cached_tables.write().unwrap();
484            write_locked_cache.insert(table.name.clone(), table.clone());
485        }
486        Ok(table)
487    }
488
489    fn get_head_tables(&self) -> TableStoreResult<Vec<Arc<ReadonlyTable>>> {
490        let mut tables = vec![];
491        for head_entry in
492            std::fs::read_dir(self.dir.join("heads")).map_err(TableStoreError::LoadHeads)?
493        {
494            let head_file_name = head_entry.map_err(TableStoreError::LoadHeads)?.file_name();
495            let table = self.load_table(head_file_name.to_str().unwrap().to_string())?;
496            tables.push(table);
497        }
498        Ok(tables)
499    }
500
501    pub fn get_head(&self) -> TableStoreResult<Arc<ReadonlyTable>> {
502        let mut tables = self.get_head_tables()?;
503
504        if tables.is_empty() {
505            let empty_table = MutableTable::full(self.key_size);
506            self.save_table(empty_table)
507        } else if tables.len() == 1 {
508            Ok(tables.pop().unwrap())
509        } else {
510            // There are multiple heads. We take a lock, then check if there are still
511            // multiple heads (it's likely that another process was in the process of
512            // deleting on of them). If there are still multiple heads, we attempt to
513            // merge all the tables into one. We then save that table and record the new
514            // head. Note that the locking isn't necessary for correctness; we
515            // take the lock only to avoid other concurrent processes from doing
516            // the same work (and producing another set of divergent heads).
517            let (table, _) = self.get_head_locked()?;
518            Ok(table)
519        }
520    }
521
522    pub fn get_head_locked(&self) -> TableStoreResult<(Arc<ReadonlyTable>, FileLock)> {
523        let lock = self.lock()?;
524        let mut tables = self.get_head_tables()?;
525
526        if tables.is_empty() {
527            let empty_table = MutableTable::full(self.key_size);
528            let table = self.save_table(empty_table)?;
529            return Ok((table, lock));
530        }
531
532        if tables.len() == 1 {
533            // Return early so we don't write a table with no changes compared to its parent
534            return Ok((tables.pop().unwrap(), lock));
535        }
536
537        let mut merged_table = MutableTable::incremental(tables[0].clone());
538        for other in &tables[1..] {
539            merged_table.merge_in(other);
540        }
541        let merged_table = self.save_table(merged_table)?;
542        for table in &tables[1..] {
543            self.remove_head(table);
544        }
545        Ok((merged_table, lock))
546    }
547}
548
549#[cfg(test)]
550mod tests {
551    use test_case::test_case;
552
553    use super::*;
554    use crate::tests::new_temp_dir;
555
556    #[test_case(false; "memory")]
557    #[test_case(true; "file")]
558    fn stacked_table_empty(on_disk: bool) {
559        let temp_dir = new_temp_dir();
560        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
561        let mut_table = store.get_head().unwrap().start_mutation();
562        let mut _saved_table = None;
563        let table: &dyn TableSegment = if on_disk {
564            _saved_table = Some(store.save_table(mut_table).unwrap());
565            _saved_table.as_ref().unwrap().as_ref()
566        } else {
567            &mut_table
568        };
569
570        // Cannot find any keys
571        assert_eq!(table.get_value(b"\0\0\0"), None);
572        assert_eq!(table.get_value(b"aaa"), None);
573        assert_eq!(table.get_value(b"\xff\xff\xff"), None);
574    }
575
576    #[test_case(false; "memory")]
577    #[test_case(true; "file")]
578    fn stacked_table_single_key(on_disk: bool) {
579        let temp_dir = new_temp_dir();
580        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
581        let mut mut_table = store.get_head().unwrap().start_mutation();
582        mut_table.add_entry(b"abc".to_vec(), b"value".to_vec());
583        let mut _saved_table = None;
584        let table: &dyn TableSegment = if on_disk {
585            _saved_table = Some(store.save_table(mut_table).unwrap());
586            _saved_table.as_ref().unwrap().as_ref()
587        } else {
588            &mut_table
589        };
590
591        // Can find expected keys
592        assert_eq!(table.get_value(b"\0\0\0"), None);
593        assert_eq!(table.get_value(b"abc"), Some(b"value".as_slice()));
594        assert_eq!(table.get_value(b"\xff\xff\xff"), None);
595    }
596
597    #[test_case(false; "memory")]
598    #[test_case(true; "file")]
599    fn stacked_table_multiple_keys(on_disk: bool) {
600        let temp_dir = new_temp_dir();
601        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
602        let mut mut_table = store.get_head().unwrap().start_mutation();
603        mut_table.add_entry(b"zzz".to_vec(), b"val3".to_vec());
604        mut_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
605        mut_table.add_entry(b"abd".to_vec(), b"value 2".to_vec());
606        let mut _saved_table = None;
607        let table: &dyn TableSegment = if on_disk {
608            _saved_table = Some(store.save_table(mut_table).unwrap());
609            _saved_table.as_ref().unwrap().as_ref()
610        } else {
611            &mut_table
612        };
613
614        // Can find expected keys
615        assert_eq!(table.get_value(b"\0\0\0"), None);
616        assert_eq!(table.get_value(b"abb"), None);
617        assert_eq!(table.get_value(b"abc"), Some(b"value1".as_slice()));
618        assert_eq!(table.get_value(b"abd"), Some(b"value 2".as_slice()));
619        assert_eq!(table.get_value(b"abe"), None);
620        assert_eq!(table.get_value(b"zzz"), Some(b"val3".as_slice()));
621        assert_eq!(table.get_value(b"\xff\xff\xff"), None);
622    }
623
624    #[test]
625    fn stacked_table_multiple_keys_with_parent_file() {
626        let temp_dir = new_temp_dir();
627        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
628        let mut mut_table = store.get_head().unwrap().start_mutation();
629        mut_table.add_entry(b"abd".to_vec(), b"value 2".to_vec());
630        mut_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
631        mut_table.add_entry(b"zzz".to_vec(), b"val3".to_vec());
632        for round in 0..10 {
633            for i in 0..10 {
634                mut_table.add_entry(
635                    format!("x{i}{round}").into_bytes(),
636                    format!("value {i}{round}").into_bytes(),
637                );
638            }
639            let saved_table = store.save_table(mut_table).unwrap();
640            mut_table = MutableTable::incremental(saved_table);
641        }
642
643        // Can find expected keys
644        assert_eq!(mut_table.get_value(b"\0\0\0"), None);
645        assert_eq!(mut_table.get_value(b"x.."), None);
646        assert_eq!(mut_table.get_value(b"x14"), Some(b"value 14".as_slice()));
647        assert_eq!(mut_table.get_value(b"x41"), Some(b"value 41".as_slice()));
648        assert_eq!(mut_table.get_value(b"x49"), Some(b"value 49".as_slice()));
649        assert_eq!(mut_table.get_value(b"x94"), Some(b"value 94".as_slice()));
650        assert_eq!(mut_table.get_value(b"xAA"), None);
651        assert_eq!(mut_table.get_value(b"\xff\xff\xff"), None);
652    }
653
654    #[test]
655    fn stacked_table_merge() {
656        let temp_dir = new_temp_dir();
657        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
658        let mut mut_base_table = store.get_head().unwrap().start_mutation();
659        mut_base_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
660        let base_table = store.save_table(mut_base_table).unwrap();
661
662        let mut mut_table1 = MutableTable::incremental(base_table.clone());
663        mut_table1.add_entry(b"abd".to_vec(), b"value 2".to_vec());
664        mut_table1.add_entry(b"zzz".to_vec(), b"val3".to_vec());
665        mut_table1.add_entry(b"mmm".to_vec(), b"side 1".to_vec());
666        let table1 = store.save_table(mut_table1).unwrap();
667        let mut mut_table2 = MutableTable::incremental(base_table);
668        mut_table2.add_entry(b"yyy".to_vec(), b"val5".to_vec());
669        mut_table2.add_entry(b"mmm".to_vec(), b"side 2".to_vec());
670        mut_table2.add_entry(b"abe".to_vec(), b"value 4".to_vec());
671        mut_table2.merge_in(&table1);
672
673        // Can find expected keys
674        assert_eq!(mut_table2.get_value(b"\0\0\0"), None);
675        assert_eq!(mut_table2.get_value(b"abc"), Some(b"value1".as_slice()));
676        assert_eq!(mut_table2.get_value(b"abd"), Some(b"value 2".as_slice()));
677        assert_eq!(mut_table2.get_value(b"abe"), Some(b"value 4".as_slice()));
678        // The caller shouldn't write two values for the same key, so it's undefined
679        // which wins, but let's test how it currently behaves.
680        assert_eq!(mut_table2.get_value(b"mmm"), Some(b"side 1".as_slice()));
681        assert_eq!(mut_table2.get_value(b"yyy"), Some(b"val5".as_slice()));
682        assert_eq!(mut_table2.get_value(b"zzz"), Some(b"val3".as_slice()));
683        assert_eq!(mut_table2.get_value(b"\xff\xff\xff"), None);
684    }
685
686    #[test]
687    fn stacked_table_automatic_merge() {
688        // Same test as above, but here we let the store do the merging on load
689        let temp_dir = new_temp_dir();
690        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
691        let mut mut_base_table = store.get_head().unwrap().start_mutation();
692        mut_base_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
693        let base_table = store.save_table(mut_base_table).unwrap();
694
695        let mut mut_table1 = MutableTable::incremental(base_table.clone());
696        mut_table1.add_entry(b"abd".to_vec(), b"value 2".to_vec());
697        mut_table1.add_entry(b"zzz".to_vec(), b"val3".to_vec());
698        mut_table1.add_entry(b"mmm".to_vec(), b"side 1".to_vec());
699        store.save_table(mut_table1).unwrap();
700        let mut mut_table2 = MutableTable::incremental(base_table);
701        mut_table2.add_entry(b"yyy".to_vec(), b"val5".to_vec());
702        mut_table2.add_entry(b"mmm".to_vec(), b"side 2".to_vec());
703        mut_table2.add_entry(b"abe".to_vec(), b"value 4".to_vec());
704        let table2 = store.save_table(mut_table2).unwrap();
705
706        // The saved table does not have the keys from table1
707        assert_eq!(table2.get_value(b"abd"), None);
708
709        // Can find expected keys in the merged table we get from get_head()
710        let merged_table = store.get_head().unwrap();
711        assert_eq!(merged_table.get_value(b"\0\0\0"), None);
712        assert_eq!(merged_table.get_value(b"abc"), Some(b"value1".as_slice()));
713        assert_eq!(merged_table.get_value(b"abd"), Some(b"value 2".as_slice()));
714        assert_eq!(merged_table.get_value(b"abe"), Some(b"value 4".as_slice()));
715        // The caller shouldn't write two values for the same key, so it's undefined
716        // which wins.
717        let value_mmm = merged_table.get_value(b"mmm");
718        assert!(value_mmm == Some(b"side 1".as_slice()) || value_mmm == Some(b"side 2".as_slice()));
719        assert_eq!(merged_table.get_value(b"yyy"), Some(b"val5".as_slice()));
720        assert_eq!(merged_table.get_value(b"zzz"), Some(b"val3".as_slice()));
721        assert_eq!(merged_table.get_value(b"\xff\xff\xff"), None);
722    }
723
724    #[test]
725    fn stacked_table_store_save_empty() {
726        let temp_dir = new_temp_dir();
727        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
728
729        let mut mut_table = store.get_head().unwrap().start_mutation();
730        mut_table.add_entry(b"abc".to_vec(), b"value".to_vec());
731        store.save_table(mut_table).unwrap();
732
733        let mut_table = store.get_head().unwrap().start_mutation();
734        store.save_table(mut_table).unwrap();
735
736        // Table head shouldn't be removed on empty save
737        let table = store.get_head().unwrap();
738        assert_eq!(table.get_value(b"abc"), Some(b"value".as_slice()));
739    }
740}