jj_lib/
stacked_table.rs

1// Copyright 2021 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A persistent table of fixed-size keys to variable-size values.
16//!
17//! The keys are stored in sorted order, with each key followed by an
18//! integer offset into the list of values. The values are
19//! concatenated after the keys. A file may have a parent file, and
20//! the parent may have its own parent, and so on. The child file then
21//! represents the union of the entries.
22
23#![allow(missing_docs)]
24
25use std::cmp::Ordering;
26use std::collections::BTreeMap;
27use std::collections::HashMap;
28use std::fs::File;
29use std::io;
30use std::io::Read;
31use std::io::Write;
32use std::path::PathBuf;
33use std::sync::Arc;
34use std::sync::RwLock;
35
36use blake2::Blake2b512;
37use blake2::Digest;
38use tempfile::NamedTempFile;
39use thiserror::Error;
40
41use crate::file_util::persist_content_addressed_temp_file;
42use crate::lock::FileLock;
43use crate::lock::FileLockError;
44
45pub trait TableSegment {
46    fn segment_num_entries(&self) -> usize;
47    fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>>;
48    fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]>;
49    fn segment_add_entries_to(&self, mut_table: &mut MutableTable);
50
51    fn num_entries(&self) -> usize {
52        if let Some(parent_file) = self.segment_parent_file() {
53            parent_file.num_entries() + self.segment_num_entries()
54        } else {
55            self.segment_num_entries()
56        }
57    }
58
59    fn get_value<'a>(&'a self, key: &[u8]) -> Option<&'a [u8]> {
60        self.segment_get_value(key)
61            .or_else(|| self.segment_parent_file()?.get_value(key))
62    }
63}
64
65pub struct ReadonlyTable {
66    key_size: usize,
67    parent_file: Option<Arc<ReadonlyTable>>,
68    name: String,
69    // Number of entries not counting the parent file
70    num_local_entries: usize,
71    // The file's entries in the raw format they're stored in on disk.
72    index: Vec<u8>,
73    values: Vec<u8>,
74}
75
76impl ReadonlyTable {
77    fn load_from(
78        file: &mut dyn Read,
79        store: &TableStore,
80        name: String,
81        key_size: usize,
82    ) -> TableStoreResult<Arc<ReadonlyTable>> {
83        let to_load_err = |err| TableStoreError::LoadSegment {
84            name: name.clone(),
85            err,
86        };
87        let read_u32 = |file: &mut dyn Read| -> TableStoreResult<u32> {
88            let mut buf = [0; 4];
89            file.read_exact(&mut buf).map_err(to_load_err)?;
90            Ok(u32::from_le_bytes(buf))
91        };
92        let parent_filename_len = read_u32(file)?;
93        let maybe_parent_file = if parent_filename_len > 0 {
94            let mut parent_filename_bytes = vec![0; parent_filename_len as usize];
95            file.read_exact(&mut parent_filename_bytes)
96                .map_err(to_load_err)?;
97            let parent_filename = String::from_utf8(parent_filename_bytes).unwrap();
98            let parent_file = store.load_table(parent_filename)?;
99            Some(parent_file)
100        } else {
101            None
102        };
103        let num_local_entries = read_u32(file)? as usize;
104        let index_size = num_local_entries * ReadonlyTableIndexEntry::size(key_size);
105        let mut data = vec![];
106        file.read_to_end(&mut data).map_err(to_load_err)?;
107        let values = data.split_off(index_size);
108        let index = data;
109        Ok(Arc::new(ReadonlyTable {
110            key_size,
111            parent_file: maybe_parent_file,
112            name,
113            num_local_entries,
114            index,
115            values,
116        }))
117    }
118
119    pub fn start_mutation(self: &Arc<Self>) -> MutableTable {
120        MutableTable::incremental(self.clone())
121    }
122
123    fn segment_value_offset_by_pos(&self, pos: usize) -> usize {
124        if pos == self.num_local_entries {
125            self.values.len()
126        } else {
127            ReadonlyTableIndexEntry::new(self, pos).value_offset()
128        }
129    }
130
131    fn segment_value_by_pos(&self, pos: usize) -> &[u8] {
132        &self.values
133            [self.segment_value_offset_by_pos(pos)..self.segment_value_offset_by_pos(pos + 1)]
134    }
135}
136
137impl TableSegment for ReadonlyTable {
138    fn segment_num_entries(&self) -> usize {
139        self.num_local_entries
140    }
141
142    fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>> {
143        self.parent_file.as_ref()
144    }
145
146    fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]> {
147        let mut low_pos = 0;
148        let mut high_pos = self.num_local_entries;
149        loop {
150            if high_pos == low_pos {
151                return None;
152            }
153            let mid_pos = (low_pos + high_pos) / 2;
154            let mid_entry = ReadonlyTableIndexEntry::new(self, mid_pos);
155            match key.cmp(mid_entry.key()) {
156                Ordering::Less => {
157                    high_pos = mid_pos;
158                }
159                Ordering::Equal => {
160                    return Some(self.segment_value_by_pos(mid_pos));
161                }
162                Ordering::Greater => {
163                    low_pos = mid_pos + 1;
164                }
165            }
166        }
167    }
168
169    fn segment_add_entries_to(&self, mut_table: &mut MutableTable) {
170        for pos in 0..self.num_local_entries {
171            let entry = ReadonlyTableIndexEntry::new(self, pos);
172            mut_table.add_entry(
173                entry.key().to_vec(),
174                self.segment_value_by_pos(pos).to_vec(),
175            );
176        }
177    }
178}
179
180struct ReadonlyTableIndexEntry<'table> {
181    data: &'table [u8],
182}
183
184impl<'table> ReadonlyTableIndexEntry<'table> {
185    fn new(table: &'table ReadonlyTable, pos: usize) -> Self {
186        let entry_size = ReadonlyTableIndexEntry::size(table.key_size);
187        let offset = entry_size * pos;
188        let data = &table.index[offset..][..entry_size];
189        ReadonlyTableIndexEntry { data }
190    }
191
192    fn size(key_size: usize) -> usize {
193        key_size + 4
194    }
195
196    fn key(&self) -> &'table [u8] {
197        &self.data[0..self.data.len() - 4]
198    }
199
200    fn value_offset(&self) -> usize {
201        u32::from_le_bytes(self.data[self.data.len() - 4..].try_into().unwrap()) as usize
202    }
203}
204
205pub struct MutableTable {
206    key_size: usize,
207    parent_file: Option<Arc<ReadonlyTable>>,
208    entries: BTreeMap<Vec<u8>, Vec<u8>>,
209}
210
211impl MutableTable {
212    fn full(key_size: usize) -> Self {
213        Self {
214            key_size,
215            parent_file: None,
216            entries: BTreeMap::new(),
217        }
218    }
219
220    fn incremental(parent_file: Arc<ReadonlyTable>) -> Self {
221        let key_size = parent_file.key_size;
222        Self {
223            key_size,
224            parent_file: Some(parent_file),
225            entries: BTreeMap::new(),
226        }
227    }
228
229    pub fn add_entry(&mut self, key: Vec<u8>, value: Vec<u8>) {
230        assert_eq!(key.len(), self.key_size);
231        self.entries.insert(key, value);
232    }
233
234    fn add_entries_from(&mut self, other: &dyn TableSegment) {
235        other.segment_add_entries_to(self);
236    }
237
238    #[allow(unknown_lints)] // XXX FIXME (aseipp): nightly bogons; re-test this occasionally
239    #[allow(clippy::assigning_clones)]
240    fn merge_in(&mut self, other: &Arc<ReadonlyTable>) {
241        let mut maybe_own_ancestor = self.parent_file.clone();
242        let mut maybe_other_ancestor = Some(other.clone());
243        let mut files_to_add = vec![];
244        loop {
245            if maybe_other_ancestor.is_none() {
246                break;
247            }
248            let other_ancestor = maybe_other_ancestor.as_ref().unwrap();
249            if maybe_own_ancestor.is_none() {
250                files_to_add.push(other_ancestor.clone());
251                maybe_other_ancestor = other_ancestor.parent_file.clone();
252                continue;
253            }
254            let own_ancestor = maybe_own_ancestor.as_ref().unwrap();
255            if own_ancestor.name == other_ancestor.name {
256                break;
257            }
258            if own_ancestor.num_entries() < other_ancestor.num_entries() {
259                files_to_add.push(other_ancestor.clone());
260                maybe_other_ancestor = other_ancestor.parent_file.clone();
261            } else {
262                maybe_own_ancestor = own_ancestor.parent_file.clone();
263            }
264        }
265
266        for file in files_to_add.iter().rev() {
267            self.add_entries_from(file.as_ref());
268        }
269    }
270
271    fn serialize(self) -> Vec<u8> {
272        let mut buf = vec![];
273
274        if let Some(parent_file) = &self.parent_file {
275            buf.extend(u32::try_from(parent_file.name.len()).unwrap().to_le_bytes());
276            buf.extend_from_slice(parent_file.name.as_bytes());
277        } else {
278            buf.extend(0_u32.to_le_bytes());
279        }
280
281        buf.extend(u32::try_from(self.entries.len()).unwrap().to_le_bytes());
282
283        let mut value_offset = 0_u32;
284        for (key, value) in &self.entries {
285            buf.extend_from_slice(key);
286            buf.extend(value_offset.to_le_bytes());
287            value_offset += u32::try_from(value.len()).unwrap();
288        }
289        for value in self.entries.values() {
290            buf.extend_from_slice(value);
291        }
292        buf
293    }
294
295    /// If the MutableTable has more than half the entries of its parent
296    /// ReadonlyTable, return MutableTable with the commits from both. This
297    /// is done recursively, so the stack of index files has O(log n) files.
298    #[allow(unknown_lints)] // XXX FIXME (aseipp): nightly bogons; re-test this occasionally
299    #[allow(clippy::assigning_clones)]
300    fn maybe_squash_with_ancestors(self) -> MutableTable {
301        let mut num_new_entries = self.entries.len();
302        let mut files_to_squash = vec![];
303        let mut maybe_parent_file = self.parent_file.clone();
304        let mut squashed;
305        loop {
306            match maybe_parent_file {
307                Some(parent_file) => {
308                    // TODO: We should probably also squash if the parent file has less than N
309                    // commits, regardless of how many (few) are in `self`.
310                    if 2 * num_new_entries < parent_file.num_local_entries {
311                        squashed = MutableTable::incremental(parent_file);
312                        break;
313                    }
314                    num_new_entries += parent_file.num_local_entries;
315                    files_to_squash.push(parent_file.clone());
316                    maybe_parent_file = parent_file.parent_file.clone();
317                }
318                None => {
319                    squashed = MutableTable::full(self.key_size);
320                    break;
321                }
322            }
323        }
324
325        if files_to_squash.is_empty() {
326            return self;
327        }
328
329        for parent_file in files_to_squash.iter().rev() {
330            squashed.add_entries_from(parent_file.as_ref());
331        }
332        squashed.add_entries_from(&self);
333        squashed
334    }
335
336    fn save_in(self, store: &TableStore) -> TableStoreResult<Arc<ReadonlyTable>> {
337        if self.entries.is_empty() && self.parent_file.is_some() {
338            return Ok(self.parent_file.unwrap());
339        }
340
341        let buf = self.maybe_squash_with_ancestors().serialize();
342        let mut hasher = Blake2b512::new();
343        hasher.update(&buf);
344        let file_id_hex = hex::encode(hasher.finalize());
345        let file_path = store.dir.join(&file_id_hex);
346        let to_save_err = |err| TableStoreError::SaveSegment {
347            name: file_id_hex.clone(),
348            err,
349        };
350
351        let mut temp_file = NamedTempFile::new_in(&store.dir).map_err(to_save_err)?;
352        let file = temp_file.as_file_mut();
353        file.write_all(&buf).map_err(to_save_err)?;
354        persist_content_addressed_temp_file(temp_file, file_path).map_err(to_save_err)?;
355
356        ReadonlyTable::load_from(&mut buf.as_slice(), store, file_id_hex, store.key_size)
357    }
358}
359
360impl TableSegment for MutableTable {
361    fn segment_num_entries(&self) -> usize {
362        self.entries.len()
363    }
364
365    fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>> {
366        self.parent_file.as_ref()
367    }
368
369    fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]> {
370        self.entries.get(key).map(Vec::as_slice)
371    }
372
373    fn segment_add_entries_to(&self, mut_table: &mut MutableTable) {
374        for (key, value) in &self.entries {
375            mut_table.add_entry(key.clone(), value.clone());
376        }
377    }
378}
379
380#[derive(Debug, Error)]
381pub enum TableStoreError {
382    #[error("Failed to load table heads")]
383    LoadHeads(#[source] io::Error),
384    #[error("Failed to save table heads")]
385    SaveHeads(#[source] io::Error),
386    #[error("Failed to load table segment '{name}'")]
387    LoadSegment {
388        name: String,
389        #[source]
390        err: io::Error,
391    },
392    #[error("Failed to save table segment '{name}'")]
393    SaveSegment {
394        name: String,
395        #[source]
396        err: io::Error,
397    },
398    #[error("Failed to lock table store")]
399    Lock(#[source] FileLockError),
400}
401
402pub type TableStoreResult<T> = Result<T, TableStoreError>;
403
404pub struct TableStore {
405    dir: PathBuf,
406    key_size: usize,
407    cached_tables: RwLock<HashMap<String, Arc<ReadonlyTable>>>,
408}
409
410impl TableStore {
411    pub fn init(dir: PathBuf, key_size: usize) -> Self {
412        std::fs::create_dir(dir.join("heads")).unwrap();
413        TableStore {
414            dir,
415            key_size,
416            cached_tables: Default::default(),
417        }
418    }
419
420    pub fn reinit(&self) {
421        std::fs::remove_dir_all(self.dir.join("heads")).unwrap();
422        TableStore::init(self.dir.clone(), self.key_size);
423    }
424
425    pub fn key_size(&self) -> usize {
426        self.key_size
427    }
428
429    pub fn load(dir: PathBuf, key_size: usize) -> Self {
430        TableStore {
431            dir,
432            key_size,
433            cached_tables: Default::default(),
434        }
435    }
436
437    pub fn save_table(&self, mut_table: MutableTable) -> TableStoreResult<Arc<ReadonlyTable>> {
438        let maybe_parent_table = mut_table.parent_file.clone();
439        let table = mut_table.save_in(self)?;
440        self.add_head(&table)?;
441        if let Some(parent_table) = maybe_parent_table {
442            if parent_table.name != table.name {
443                self.remove_head(&parent_table);
444            }
445        }
446        {
447            let mut locked_cache = self.cached_tables.write().unwrap();
448            locked_cache.insert(table.name.clone(), table.clone());
449        }
450        Ok(table)
451    }
452
453    fn add_head(&self, table: &Arc<ReadonlyTable>) -> TableStoreResult<()> {
454        std::fs::write(self.dir.join("heads").join(&table.name), "")
455            .map_err(TableStoreError::SaveHeads)
456    }
457
458    fn remove_head(&self, table: &Arc<ReadonlyTable>) {
459        // It's fine if the old head was not found. It probably means
460        // that we're on a distributed file system where the locking
461        // doesn't work. We'll probably end up with two current
462        // heads. We'll detect that next time we load the table.
463        std::fs::remove_file(self.dir.join("heads").join(&table.name)).ok();
464    }
465
466    fn lock(&self) -> TableStoreResult<FileLock> {
467        FileLock::lock(self.dir.join("lock")).map_err(TableStoreError::Lock)
468    }
469
470    fn load_table(&self, name: String) -> TableStoreResult<Arc<ReadonlyTable>> {
471        {
472            let read_locked_cached = self.cached_tables.read().unwrap();
473            if let Some(table) = read_locked_cached.get(&name).cloned() {
474                return Ok(table);
475            }
476        }
477        let to_load_err = |err| TableStoreError::LoadSegment {
478            name: name.clone(),
479            err,
480        };
481        let table_file_path = self.dir.join(&name);
482        let mut table_file = File::open(table_file_path).map_err(to_load_err)?;
483        let table = ReadonlyTable::load_from(&mut table_file, self, name, self.key_size)?;
484        {
485            let mut write_locked_cache = self.cached_tables.write().unwrap();
486            write_locked_cache.insert(table.name.clone(), table.clone());
487        }
488        Ok(table)
489    }
490
491    fn get_head_tables(&self) -> TableStoreResult<Vec<Arc<ReadonlyTable>>> {
492        let mut tables = vec![];
493        for head_entry in
494            std::fs::read_dir(self.dir.join("heads")).map_err(TableStoreError::LoadHeads)?
495        {
496            let head_file_name = head_entry.map_err(TableStoreError::LoadHeads)?.file_name();
497            let table = self.load_table(head_file_name.to_str().unwrap().to_string())?;
498            tables.push(table);
499        }
500        Ok(tables)
501    }
502
503    pub fn get_head(&self) -> TableStoreResult<Arc<ReadonlyTable>> {
504        let mut tables = self.get_head_tables()?;
505
506        if tables.is_empty() {
507            let empty_table = MutableTable::full(self.key_size);
508            self.save_table(empty_table)
509        } else if tables.len() == 1 {
510            Ok(tables.pop().unwrap())
511        } else {
512            // There are multiple heads. We take a lock, then check if there are still
513            // multiple heads (it's likely that another process was in the process of
514            // deleting on of them). If there are still multiple heads, we attempt to
515            // merge all the tables into one. We then save that table and record the new
516            // head. Note that the locking isn't necessary for correctness; we
517            // take the lock only to avoid other concurrent processes from doing
518            // the same work (and producing another set of divergent heads).
519            let (table, _) = self.get_head_locked()?;
520            Ok(table)
521        }
522    }
523
524    pub fn get_head_locked(&self) -> TableStoreResult<(Arc<ReadonlyTable>, FileLock)> {
525        let lock = self.lock()?;
526        let mut tables = self.get_head_tables()?;
527
528        if tables.is_empty() {
529            let empty_table = MutableTable::full(self.key_size);
530            let table = self.save_table(empty_table)?;
531            return Ok((table, lock));
532        }
533
534        if tables.len() == 1 {
535            // Return early so we don't write a table with no changes compared to its parent
536            return Ok((tables.pop().unwrap(), lock));
537        }
538
539        let mut merged_table = MutableTable::incremental(tables[0].clone());
540        for other in &tables[1..] {
541            merged_table.merge_in(other);
542        }
543        let merged_table = self.save_table(merged_table)?;
544        for table in &tables[1..] {
545            self.remove_head(table);
546        }
547        Ok((merged_table, lock))
548    }
549}
550
551#[cfg(test)]
552mod tests {
553    use test_case::test_case;
554
555    use super::*;
556    use crate::tests::new_temp_dir;
557
558    #[test_case(false; "memory")]
559    #[test_case(true; "file")]
560    fn stacked_table_empty(on_disk: bool) {
561        let temp_dir = new_temp_dir();
562        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
563        let mut_table = store.get_head().unwrap().start_mutation();
564        let mut _saved_table = None;
565        let table: &dyn TableSegment = if on_disk {
566            _saved_table = Some(store.save_table(mut_table).unwrap());
567            _saved_table.as_ref().unwrap().as_ref()
568        } else {
569            &mut_table
570        };
571
572        // Cannot find any keys
573        assert_eq!(table.get_value(b"\0\0\0"), None);
574        assert_eq!(table.get_value(b"aaa"), None);
575        assert_eq!(table.get_value(b"\xff\xff\xff"), None);
576    }
577
578    #[test_case(false; "memory")]
579    #[test_case(true; "file")]
580    fn stacked_table_single_key(on_disk: bool) {
581        let temp_dir = new_temp_dir();
582        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
583        let mut mut_table = store.get_head().unwrap().start_mutation();
584        mut_table.add_entry(b"abc".to_vec(), b"value".to_vec());
585        let mut _saved_table = None;
586        let table: &dyn TableSegment = if on_disk {
587            _saved_table = Some(store.save_table(mut_table).unwrap());
588            _saved_table.as_ref().unwrap().as_ref()
589        } else {
590            &mut_table
591        };
592
593        // Can find expected keys
594        assert_eq!(table.get_value(b"\0\0\0"), None);
595        assert_eq!(table.get_value(b"abc"), Some(b"value".as_slice()));
596        assert_eq!(table.get_value(b"\xff\xff\xff"), None);
597    }
598
599    #[test_case(false; "memory")]
600    #[test_case(true; "file")]
601    fn stacked_table_multiple_keys(on_disk: bool) {
602        let temp_dir = new_temp_dir();
603        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
604        let mut mut_table = store.get_head().unwrap().start_mutation();
605        mut_table.add_entry(b"zzz".to_vec(), b"val3".to_vec());
606        mut_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
607        mut_table.add_entry(b"abd".to_vec(), b"value 2".to_vec());
608        let mut _saved_table = None;
609        let table: &dyn TableSegment = if on_disk {
610            _saved_table = Some(store.save_table(mut_table).unwrap());
611            _saved_table.as_ref().unwrap().as_ref()
612        } else {
613            &mut_table
614        };
615
616        // Can find expected keys
617        assert_eq!(table.get_value(b"\0\0\0"), None);
618        assert_eq!(table.get_value(b"abb"), None);
619        assert_eq!(table.get_value(b"abc"), Some(b"value1".as_slice()));
620        assert_eq!(table.get_value(b"abd"), Some(b"value 2".as_slice()));
621        assert_eq!(table.get_value(b"abe"), None);
622        assert_eq!(table.get_value(b"zzz"), Some(b"val3".as_slice()));
623        assert_eq!(table.get_value(b"\xff\xff\xff"), None);
624    }
625
626    #[test]
627    fn stacked_table_multiple_keys_with_parent_file() {
628        let temp_dir = new_temp_dir();
629        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
630        let mut mut_table = store.get_head().unwrap().start_mutation();
631        mut_table.add_entry(b"abd".to_vec(), b"value 2".to_vec());
632        mut_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
633        mut_table.add_entry(b"zzz".to_vec(), b"val3".to_vec());
634        for round in 0..10 {
635            for i in 0..10 {
636                mut_table.add_entry(
637                    format!("x{i}{round}").into_bytes(),
638                    format!("value {i}{round}").into_bytes(),
639                );
640            }
641            let saved_table = store.save_table(mut_table).unwrap();
642            mut_table = MutableTable::incremental(saved_table);
643        }
644
645        // Can find expected keys
646        assert_eq!(mut_table.get_value(b"\0\0\0"), None);
647        assert_eq!(mut_table.get_value(b"x.."), None);
648        assert_eq!(mut_table.get_value(b"x14"), Some(b"value 14".as_slice()));
649        assert_eq!(mut_table.get_value(b"x41"), Some(b"value 41".as_slice()));
650        assert_eq!(mut_table.get_value(b"x49"), Some(b"value 49".as_slice()));
651        assert_eq!(mut_table.get_value(b"x94"), Some(b"value 94".as_slice()));
652        assert_eq!(mut_table.get_value(b"xAA"), None);
653        assert_eq!(mut_table.get_value(b"\xff\xff\xff"), None);
654    }
655
656    #[test]
657    fn stacked_table_merge() {
658        let temp_dir = new_temp_dir();
659        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
660        let mut mut_base_table = store.get_head().unwrap().start_mutation();
661        mut_base_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
662        let base_table = store.save_table(mut_base_table).unwrap();
663
664        let mut mut_table1 = MutableTable::incremental(base_table.clone());
665        mut_table1.add_entry(b"abd".to_vec(), b"value 2".to_vec());
666        mut_table1.add_entry(b"zzz".to_vec(), b"val3".to_vec());
667        mut_table1.add_entry(b"mmm".to_vec(), b"side 1".to_vec());
668        let table1 = store.save_table(mut_table1).unwrap();
669        let mut mut_table2 = MutableTable::incremental(base_table);
670        mut_table2.add_entry(b"yyy".to_vec(), b"val5".to_vec());
671        mut_table2.add_entry(b"mmm".to_vec(), b"side 2".to_vec());
672        mut_table2.add_entry(b"abe".to_vec(), b"value 4".to_vec());
673        mut_table2.merge_in(&table1);
674
675        // Can find expected keys
676        assert_eq!(mut_table2.get_value(b"\0\0\0"), None);
677        assert_eq!(mut_table2.get_value(b"abc"), Some(b"value1".as_slice()));
678        assert_eq!(mut_table2.get_value(b"abd"), Some(b"value 2".as_slice()));
679        assert_eq!(mut_table2.get_value(b"abe"), Some(b"value 4".as_slice()));
680        // The caller shouldn't write two values for the same key, so it's undefined
681        // which wins, but let's test how it currently behaves.
682        assert_eq!(mut_table2.get_value(b"mmm"), Some(b"side 1".as_slice()));
683        assert_eq!(mut_table2.get_value(b"yyy"), Some(b"val5".as_slice()));
684        assert_eq!(mut_table2.get_value(b"zzz"), Some(b"val3".as_slice()));
685        assert_eq!(mut_table2.get_value(b"\xff\xff\xff"), None);
686    }
687
688    #[test]
689    fn stacked_table_automatic_merge() {
690        // Same test as above, but here we let the store do the merging on load
691        let temp_dir = new_temp_dir();
692        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
693        let mut mut_base_table = store.get_head().unwrap().start_mutation();
694        mut_base_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
695        let base_table = store.save_table(mut_base_table).unwrap();
696
697        let mut mut_table1 = MutableTable::incremental(base_table.clone());
698        mut_table1.add_entry(b"abd".to_vec(), b"value 2".to_vec());
699        mut_table1.add_entry(b"zzz".to_vec(), b"val3".to_vec());
700        mut_table1.add_entry(b"mmm".to_vec(), b"side 1".to_vec());
701        store.save_table(mut_table1).unwrap();
702        let mut mut_table2 = MutableTable::incremental(base_table);
703        mut_table2.add_entry(b"yyy".to_vec(), b"val5".to_vec());
704        mut_table2.add_entry(b"mmm".to_vec(), b"side 2".to_vec());
705        mut_table2.add_entry(b"abe".to_vec(), b"value 4".to_vec());
706        let table2 = store.save_table(mut_table2).unwrap();
707
708        // The saved table does not have the keys from table1
709        assert_eq!(table2.get_value(b"abd"), None);
710
711        // Can find expected keys in the merged table we get from get_head()
712        let merged_table = store.get_head().unwrap();
713        assert_eq!(merged_table.get_value(b"\0\0\0"), None);
714        assert_eq!(merged_table.get_value(b"abc"), Some(b"value1".as_slice()));
715        assert_eq!(merged_table.get_value(b"abd"), Some(b"value 2".as_slice()));
716        assert_eq!(merged_table.get_value(b"abe"), Some(b"value 4".as_slice()));
717        // The caller shouldn't write two values for the same key, so it's undefined
718        // which wins.
719        let value_mmm = merged_table.get_value(b"mmm");
720        assert!(value_mmm == Some(b"side 1".as_slice()) || value_mmm == Some(b"side 2".as_slice()));
721        assert_eq!(merged_table.get_value(b"yyy"), Some(b"val5".as_slice()));
722        assert_eq!(merged_table.get_value(b"zzz"), Some(b"val3".as_slice()));
723        assert_eq!(merged_table.get_value(b"\xff\xff\xff"), None);
724    }
725
726    #[test]
727    fn stacked_table_store_save_empty() {
728        let temp_dir = new_temp_dir();
729        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
730
731        let mut mut_table = store.get_head().unwrap().start_mutation();
732        mut_table.add_entry(b"abc".to_vec(), b"value".to_vec());
733        store.save_table(mut_table).unwrap();
734
735        let mut_table = store.get_head().unwrap().start_mutation();
736        store.save_table(mut_table).unwrap();
737
738        // Table head shouldn't be removed on empty save
739        let table = store.get_head().unwrap();
740        assert_eq!(table.get_value(b"abc"), Some(b"value".as_slice()));
741    }
742}